Finish writing of tags and authors many-many fields

This commit is contained in:
Kovid Goyal 2013-03-03 22:02:52 +05:30
parent 2331bf29a2
commit df823bf79d
5 changed files with 104 additions and 15 deletions

View File

@ -217,6 +217,8 @@ class Cache(object):
field.series_field = self.fields[name[:-len('_index')]]
elif name == 'series_index':
field.series_field = self.fields['series']
elif name == 'authors':
field.author_sort_field = self.fields['author_sort']
@read_api
def field_for(self, name, book_id, default_value=None):

View File

@ -402,6 +402,13 @@ class AuthorsField(ManyToManyField):
def category_sort_value(self, item_id, book_ids, lang_map):
return self.table.asort_map[item_id]
def db_author_sort_for_book(self, book_id):
return self.author_sort_field.for_book(book_id)
def author_sort_for_book(self, book_id):
return ' & '.join(self.table.asort_map[k] for k in
self.table.book_col_map[book_id])
class FormatsField(ManyToManyField):
def for_book(self, book_id, default_value=None):

View File

@ -168,7 +168,7 @@ class AuthorsTable(ManyToManyTable):
self.asort_map = {}
for row in db.conn.execute(
'SELECT id, name, sort, link FROM authors'):
self.id_map[row[0]] = row[1]
self.id_map[row[0]] = self.unserialize(row[1])
self.asort_map[row[0]] = (row[2] if row[2] else
author_to_author_sort(row[1]))
self.alink_map[row[0]] = row[3]

View File

@ -203,14 +203,63 @@ class WritingTest(BaseTest):
# }}}
def test_many_many_basic(self): # {{{
'Test the different code paths for writing to a many-one field'
# Fields: identifiers, authors, tags, languages, #authors, #tags
'Test the different code paths for writing to a many-many field'
cl = self.cloned_library
cache = self.init_cache(cl)
ae, af, sf = self.assertEqual, self.assertFalse, cache.set_field
# Tags
ae(sf('#tags', {1:cache.field_for('tags', 1), 2:cache.field_for('tags', 2)}),
{1, 2})
for name in ('tags', '#tags'):
f = cache.fields[name]
af(sf(name, {1:('tag one', 'News')}, allow_case_change=False))
ae(sf(name, {1:'tag one, News'}), {1, 2})
ae(sf(name, {3:('tag two', 'sep,sep2')}), {2, 3})
ae(len(f.table.id_map), 4)
ae(sf(name, {1:None}), set([1]))
cache2 = self.init_cache(cl)
for c in (cache, cache2):
ae(c.field_for(name, 3), ('tag two', 'sep;sep2'))
ae(len(c.fields[name].table.id_map), 3)
ae(len(c.fields[name].table.id_map), 3)
ae(c.field_for(name, 1), ())
ae(c.field_for(name, 2), ('tag one', 'tag two'))
del cache2
# Authors
ae(sf('#authors', {k:cache.field_for('authors', k) for k in (1,2,3)}),
{1,2,3})
for name in ('authors', '#authors'):
f = cache.fields[name]
ae(len(f.table.id_map), 3)
af(cache.set_field(name, {3:None if name == 'authors' else 'Unknown'}))
ae(cache.set_field(name, {3:'Kovid Goyal & Divok Layog'}), set([3]))
ae(cache.set_field(name, {1:'', 2:'An, Author'}), {1,2})
cache2 = self.init_cache(cl)
for c in (cache, cache2):
ae(len(c.fields[name].table.id_map), 4 if name =='authors' else 3)
ae(c.field_for(name, 3), ('Kovid Goyal', 'Divok Layog'))
ae(c.field_for(name, 2), ('An, Author',))
ae(c.field_for(name, 1), ('Unknown',) if name=='authors' else ())
ae(c.field_for('author_sort', 1), 'Unknown')
ae(c.field_for('author_sort', 2), 'An, Author')
ae(c.field_for('author_sort', 3), 'Goyal, Kovid & Layog, Divok')
del cache2
ae(cache.set_field('authors', {1:'KoviD GoyaL'}), {1, 3})
ae(cache.field_for('author_sort', 1), 'GoyaL, KoviD')
ae(cache.field_for('author_sort', 3), 'GoyaL, KoviD & Layog, Divok')
# TODO: identifiers, languages
# }}}
def tests():
return unittest.TestLoader().loadTestsFromTestCase(WritingTest)
tl = unittest.TestLoader()
# return tl.loadTestsFromName('writing.WritingTest.test_many_many_basic')
return tl.loadTestsFromTestCase(WritingTest)
def run():
unittest.TextTestRunner(verbosity=2).run(tests())

View File

@ -12,8 +12,11 @@ from functools import partial
from datetime import datetime
from calibre.constants import preferred_encoding, ispy3
from calibre.ebooks.metadata import author_to_author_sort
from calibre.utils.date import (parse_only_date, parse_date, UNDEFINED_DATE,
isoformat)
from calibre.utils.icu import strcmp
if ispy3:
unicode = str
@ -185,28 +188,42 @@ def safe_lower(x):
return x
def get_db_id(val, db, m, table, kmap, rid_map, allow_case_change,
case_changes, val_map, sql_val_map=lambda x:x):
case_changes, val_map, is_authors=False):
''' Get the db id for the value val. If val does not exist in the db it is
inserted into the db. '''
kval = kmap(val)
item_id = rid_map.get(kval, None)
if item_id is None:
if is_authors:
aus = author_to_author_sort(val)
db.conn.execute('INSERT INTO authors(name,sort) VALUES (?,?)',
(val.replace(',', '|'), aus))
else:
db.conn.execute('INSERT INTO %s(%s) VALUES (?)'%(
m['table'], m['column']), (sql_val_map(val),))
m['table'], m['column']), (val,))
item_id = rid_map[kval] = db.conn.last_insert_rowid()
table.id_map[item_id] = val
table.col_book_map[item_id] = set()
if is_authors:
table.asort_map[item_id] = aus
table.alink_map[item_id] = ''
elif allow_case_change and val != table.id_map[item_id]:
case_changes[item_id] = val
val_map[val] = item_id
def change_case(case_changes, dirtied, db, table, m, sql_val_map=lambda x:x):
def change_case(case_changes, dirtied, db, table, m, is_authors=False):
if is_authors:
vals = ((val.replace(',', '|'), item_id) for item_id, val in
case_changes.iteritems())
else:
vals = ((val, item_id) for item_id, val in case_changes.iteritems())
db.conn.executemany(
'UPDATE %s SET %s=? WHERE id=?'%(m['table'], m['column']),
((sql_val_map(val), item_id) for item_id, val in case_changes.iteritems()))
'UPDATE %s SET %s=? WHERE id=?'%(m['table'], m['column']), vals)
for item_id, val in case_changes.iteritems():
table.id_map[item_id] = val
dirtied.update(table.col_book_map[item_id])
if is_authors:
table.asort_map[item_id] = author_to_author_sort(val)
def many_one(book_id_val_map, db, field, allow_case_change, *args):
dirtied = set()
@ -288,17 +305,24 @@ def many_many(book_id_val_map, db, field, allow_case_change, *args):
# Map values to db ids, including any new values
kmap = safe_lower if dt == 'text' else lambda x:x
rid_map = {kmap(item):item_id for item_id, item in table.id_map.iteritems()}
sql_val_map = (lambda x:x.replace(',', '|')) if is_authors else lambda x:x
val_map = {}
case_changes = {}
for vals in book_id_val_map.itervalues():
for val in vals:
get_db_id(val, db, m, table, kmap, rid_map, allow_case_change,
case_changes, val_map, sql_val_map=sql_val_map)
case_changes, val_map, is_authors=is_authors)
if case_changes:
change_case(case_changes, dirtied, db, table, m,
sql_val_map=sql_val_map)
change_case(case_changes, dirtied, db, table, m, is_authors=is_authors)
if is_authors:
for item_id, val in case_changes.iteritems():
for book_id in table.col_book_map[item_id]:
current_sort = field.db_author_sort_for_book(book_id)
new_sort = field.author_sort_for_book(book_id)
if strcmp(current_sort, new_sort) == 0:
# The sort strings differ only by case, update the db
# sort
field.author_sort_field.writer.set_books({book_id:new_sort}, db)
book_id_item_id_map = {k:tuple(val_map[v] for v in vals)
for k, vals in book_id_val_map.iteritems()}
@ -338,6 +362,10 @@ def many_many(book_id_val_map, db, field, allow_case_change, *args):
((k,) for k in updated))
db.conn.executemany('INSERT INTO {0}(book,{1}) VALUES(?, ?)'.format(
table.link_table, m['link_column']), vals)
if is_authors:
aus_map = {book_id:field.author_sort_for_book(book_id) for book_id
in updated}
field.author_sort_field.writer.set_books(aus_map, db)
# Remove no longer used items
remove = {item_id for item_id in table.id_map if not
@ -348,6 +376,9 @@ def many_many(book_id_val_map, db, field, allow_case_change, *args):
for item_id in remove:
del table.id_map[item_id]
table.col_book_map.pop(item_id, None)
if is_authors:
table.asort_map.pop(item_id, None)
table.alink_map.pop(item_id, None)
return dirtied