Renaming of many-(one,many) items

This commit is contained in:
Kovid Goyal 2013-07-16 15:00:02 +05:30
parent 22f2aca3eb
commit 0c6d820f2b
4 changed files with 163 additions and 4 deletions

View File

@ -1213,6 +1213,33 @@ class Cache(object):
else: else:
table.remove_books(book_ids, self.backend) table.remove_books(book_ids, self.backend)
@read_api
def author_sort_strings_for_books(self, book_ids):
val_map = {}
for book_id in book_ids:
authors = self._field_ids_for('authors', book_id)
adata = self._author_data(authors)
val_map[book_id] = tuple(adata[aid]['sort'] for aid in authors)
return val_map
@write_api
def rename_items(self, field, item_id_to_new_name_map):
try:
func = self.fields[field].table.rename_item
except AttributeError:
raise ValueError('Cannot rename items for one-one fields: %s' % field)
affected_books = set()
for item_id, new_name in item_id_to_new_name_map.iteritems():
affected_books.update(func(item_id, new_name, self.backend))
if affected_books:
if field == 'authors':
self._set_field('author_sort', # also marks as dirty
{k:' & '.join(v) for k, v in self._author_sort_strings_for_books(affected_books).iteritems()})
self._update_path(affected_books, mark_as_dirtied=False)
else:
self._mark_as_dirty(affected_books)
return affected_books
@write_api @write_api
def remove_items(self, field, item_ids): def remove_items(self, field, item_ids):
''' Delete all items in the specified field with the specified ids. Returns the set of affected book ids. ''' ''' Delete all items in the specified field with the specified ids. Returns the set of affected book ids. '''

View File

@ -290,10 +290,7 @@ class LibraryDatabase(object):
def authors_sort_strings(self, index, index_is_id=False): def authors_sort_strings(self, index, index_is_id=False):
book_id = index if index_is_id else self.id(index) book_id = index if index_is_id else self.id(index)
with self.new_api.read_lock: return list(self.author_sort_strings_for_books.canonical_author_sort_for_books((book_id,))[book_id])
authors = self.new_api._field_ids_for('authors', book_id)
adata = self.new_api._author_data(authors)
return [adata[aid]['sort'] for aid in authors]
def author_sort_from_book(self, index, index_is_id=False): def author_sort_from_book(self, index, index_is_id=False):
return ' & '.join(self.authors_sort_strings(index, index_is_id=index_is_id)) return ' & '.join(self.authors_sort_strings(index, index_is_id=index_is_id))

View File

@ -222,6 +222,29 @@ class ManyToOneTable(Table):
db.conn.executemany('DELETE FROM {0} WHERE id=?'.format(self.metadata['table']), item_ids) db.conn.executemany('DELETE FROM {0} WHERE id=?'.format(self.metadata['table']), item_ids)
return affected_books return affected_books
def rename_item(self, item_id, new_name, db):
rmap = {icu_lower(v):k for k, v in self.id_map.iteritems()}
existing_item = rmap.get(icu_lower(new_name), None)
table, col, lcol = self.metadata['table'], self.metadata['column'], self.metadata['link_column']
affected_books = self.col_book_map.get(item_id, set())
if existing_item is None or existing_item == item_id:
# A simple rename will do the trick
self.id_map[item_id] = new_name
db.conn.execute('UPDATE {0} SET {1}=? WHERE id=?'.format(table, col), (new_name, item_id))
else:
# We have to replace
self.id_map.pop(item_id, None)
books = self.col_book_map.pop(item_id, set())
for book_id in books:
self.book_col_map[book_id] = existing_item
self.col_book_map[existing_item].update(books)
# For custom series this means that the series index can
# potentially have duplicates/be incorrect, but there is no way to
# handle that in this context.
db.conn.execute('UPDATE {0} SET {1}=? WHERE {1}=?; DELETE FROM {2} WHERE id=?'.format(
self.link_table, lcol, table), (existing_item, item_id, item_id))
return affected_books
class ManyToManyTable(ManyToOneTable): class ManyToManyTable(ManyToOneTable):
''' '''
@ -283,6 +306,32 @@ class ManyToManyTable(ManyToOneTable):
db.conn.executemany('DELETE FROM {0} WHERE id=?'.format(self.metadata['table']), item_ids) db.conn.executemany('DELETE FROM {0} WHERE id=?'.format(self.metadata['table']), item_ids)
return affected_books return affected_books
def rename_item(self, item_id, new_name, db):
rmap = {icu_lower(v):k for k, v in self.id_map.iteritems()}
existing_item = rmap.get(icu_lower(new_name), None)
table, col, lcol = self.metadata['table'], self.metadata['column'], self.metadata['link_column']
affected_books = self.col_book_map.get(item_id, set())
if existing_item is None or existing_item == item_id:
# A simple rename will do the trick
self.id_map[item_id] = new_name
db.conn.execute('UPDATE {0} SET {1}=? WHERE id=?'.format(table, col), (new_name, item_id))
else:
# We have to replace
self.id_map.pop(item_id, None)
books = self.col_book_map.pop(item_id, set())
# Replacing item_id with existing_item could cause the same id to
# appear twice in the book list. Handle that by removing existing
# item from the book list before replacing.
for book_id in books:
self.book_col_map[book_id] = tuple((existing_item if x == item_id else x) for x in self.book_col_map.get(book_id, ()) if x != existing_item)
self.col_book_map[existing_item].update(books)
db.conn.executemany('DELETE FROM {0} WHERE book=? AND {1}=?'.format(self.link_table, lcol), [
(book_id, existing_item) for book_id in books])
db.conn.execute('UPDATE {0} SET {1}=? WHERE {1}=?; DELETE FROM {2} WHERE id=?'.format(
self.link_table, lcol, table), (existing_item, item_id, item_id))
return affected_books
class AuthorsTable(ManyToManyTable): class AuthorsTable(ManyToManyTable):
def read_id_maps(self, db): def read_id_maps(self, db):
@ -314,6 +363,17 @@ class AuthorsTable(ManyToManyTable):
self.asort_map.pop(item_id, None) self.asort_map.pop(item_id, None)
return clean return clean
def rename_item(self, item_id, new_name, db):
ret = ManyToManyTable.rename_item(self, item_id, new_name, db)
if item_id not in self.id_map:
self.alink_map.pop(item_id, None)
self.asort_map.pop(item_id, None)
else:
# Was a simple rename, update the author sort value
self.set_sort_names({item_id:author_to_author_sort(new_name)}, db)
return ret
def remove_items(self, item_ids, db): def remove_items(self, item_ids, db):
raise ValueError('Direct removal of authors is not allowed') raise ValueError('Direct removal of authors is not allowed')
@ -377,6 +437,9 @@ class FormatsTable(ManyToManyTable):
def remove_items(self, item_ids, db): def remove_items(self, item_ids, db):
raise NotImplementedError('Cannot delete a format directly') raise NotImplementedError('Cannot delete a format directly')
def rename_item(self, item_id, new_name, db):
raise NotImplementedError('Cannot rename formats')
def update_fmt(self, book_id, fmt, fname, size, db): def update_fmt(self, book_id, fmt, fname, size, db):
fmts = list(self.book_col_map.get(book_id, [])) fmts = list(self.book_col_map.get(book_id, []))
try: try:
@ -430,6 +493,9 @@ class IdentifiersTable(ManyToManyTable):
def remove_items(self, item_ids, db): def remove_items(self, item_ids, db):
raise NotImplementedError('Direct deletion of identifiers is not implemented') raise NotImplementedError('Direct deletion of identifiers is not implemented')
def rename_item(self, item_id, new_name, db):
raise NotImplementedError('Cannot rename identifiers')
def all_identifier_types(self): def all_identifier_types(self):
return frozenset(k for k, v in self.col_book_map.iteritems() if v) return frozenset(k for k, v in self.col_book_map.iteritems() if v)

View File

@ -474,3 +474,72 @@ class WritingTest(BaseTest):
for bid in c.all_book_ids(): for bid in c.all_book_ids():
self.assertIn(c.field_for('#series', bid), (None, 'My Series One')) self.assertIn(c.field_for('#series', bid), (None, 'My Series One'))
# }}} # }}}
def test_rename_items(self): # {{{
' Test renaming of many-(many,one) items '
cl = self.cloned_library
cache = self.init_cache(cl)
# Check that renaming authors updates author sort and path
a = {v:k for k, v in cache.get_id_map('authors').iteritems()}['Unknown']
self.assertEqual(cache.rename_items('authors', {a:'New Author'}), {3})
a = {v:k for k, v in cache.get_id_map('authors').iteritems()}['Author One']
self.assertEqual(cache.rename_items('authors', {a:'Author Two'}), {1, 2})
for c in (cache, self.init_cache(cl)):
self.assertEqual(c.all_field_names('authors'), {'New Author', 'Author Two'})
self.assertEqual(c.field_for('author_sort', 3), 'Author, New')
self.assertIn('New Author/', c.field_for('path', 3))
self.assertEqual(c.field_for('authors', 1), ('Author Two',))
self.assertEqual(c.field_for('author_sort', 1), 'Two, Author')
t = {v:k for k, v in cache.get_id_map('tags').iteritems()}['Tag One']
# Test case change
self.assertEqual(cache.rename_items('tags', {t:'tag one'}), {1, 2})
for c in (cache, self.init_cache(cl)):
self.assertEqual(c.all_field_names('tags'), {'tag one', 'Tag Two', 'News'})
self.assertEqual(set(c.field_for('tags', 1)), {'tag one', 'News'})
self.assertEqual(set(c.field_for('tags', 2)), {'tag one', 'Tag Two'})
# Test new name
self.assertEqual(cache.rename_items('tags', {t:'t1'}), {1,2})
for c in (cache, self.init_cache(cl)):
self.assertEqual(c.all_field_names('tags'), {'t1', 'Tag Two', 'News'})
self.assertEqual(set(c.field_for('tags', 1)), {'t1', 'News'})
self.assertEqual(set(c.field_for('tags', 2)), {'t1', 'Tag Two'})
# Test rename to existing
self.assertEqual(cache.rename_items('tags', {t:'Tag Two'}), {1,2})
for c in (cache, self.init_cache(cl)):
self.assertEqual(c.all_field_names('tags'), {'Tag Two', 'News'})
self.assertEqual(set(c.field_for('tags', 1)), {'Tag Two', 'News'})
self.assertEqual(set(c.field_for('tags', 2)), {'Tag Two'})
# Test on a custom column
t = {v:k for k, v in cache.get_id_map('#tags').iteritems()}['My Tag One']
self.assertEqual(cache.rename_items('#tags', {t:'My Tag Two'}), {2})
for c in (cache, self.init_cache(cl)):
self.assertEqual(c.all_field_names('#tags'), {'My Tag Two'})
self.assertEqual(set(c.field_for('#tags', 2)), {'My Tag Two'})
# Test a Many-one field
s = {v:k for k, v in cache.get_id_map('series').iteritems()}['A Series One']
# Test case change
self.assertEqual(cache.rename_items('series', {s:'a series one'}), {1, 2})
for c in (cache, self.init_cache(cl)):
self.assertEqual(c.all_field_names('series'), {'a series one'})
self.assertEqual(c.field_for('series', 1), 'a series one')
self.assertEqual(c.field_for('series_index', 1), 2.0)
# Test new name
self.assertEqual(cache.rename_items('series', {s:'series'}), {1, 2})
for c in (cache, self.init_cache(cl)):
self.assertEqual(c.all_field_names('series'), {'series'})
self.assertEqual(c.field_for('series', 1), 'series')
self.assertEqual(c.field_for('series', 2), 'series')
self.assertEqual(c.field_for('series_index', 1), 2.0)
s = {v:k for k, v in cache.get_id_map('#series').iteritems()}['My Series One']
# Test custom column with rename to existing
self.assertEqual(cache.rename_items('#series', {s:'My Series Two'}), {2})
for c in (cache, self.init_cache(cl)):
self.assertEqual(c.all_field_names('#series'), {'My Series Two'})
self.assertEqual(c.field_for('#series', 2), 'My Series Two')
self.assertEqual(c.field_for('#series_index', 1), 3.0)
self.assertEqual(c.field_for('#series_index', 2), 1.0)
# }}}