Refactor copy to library backend code to make it re-useable

Also add tests for it
This commit is contained in:
Kovid Goyal 2019-01-27 19:55:52 +05:30
parent 4543fc14c9
commit 57a68ad841
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 176 additions and 103 deletions

View File

@ -0,0 +1,111 @@
#!/usr/bin/env python2
# vim:fileencoding=utf-8
# License: GPL v3 Copyright: 2019, Kovid Goyal <kovid at kovidgoyal.net>
from __future__ import absolute_import, division, print_function, unicode_literals
from calibre.db.utils import find_identical_books
from calibre.utils.config import tweaks
from calibre.utils.date import now
from polyglot.builtins import iteritems
def automerge_book(automerge_action, book_id, mi, identical_book_list, newdb, format_map):
seen_fmts = set()
replace = automerge_action == 'overwrite'
for identical_book in identical_book_list:
ib_fmts = newdb.formats(identical_book)
if ib_fmts:
seen_fmts |= {fmt.upper() for fmt in ib_fmts}
for fmt, path in iteritems(format_map):
newdb.add_format(identical_book, fmt, path, replace=replace, run_hooks=False)
if automerge_action == 'new record':
incoming_fmts = {fmt.upper() for fmt in format_map}
if incoming_fmts.intersection(seen_fmts):
# There was at least one duplicate format
# so create a new record and put the
# incoming formats into it
# We should arguably put only the duplicate
# formats, but no real harm is done by having
# all formats
return newdb.add_books(
[(mi, format_map)], add_duplicates=True, apply_import_tags=tweaks['add_new_book_tags_when_importing_books'],
preserve_uuid=False, run_hooks=False)[0][0]
def postprocess_copy(book_id, new_book_id, new_authors, db, newdb, identical_books_data, duplicate_action):
if not new_book_id:
return
if new_authors:
author_id_map = db.get_item_ids('authors', new_authors)
sort_map, link_map = {}, {}
for author, aid in iteritems(author_id_map):
if aid is not None:
adata = db.author_data((aid,)).get(aid)
if adata is not None:
aid = newdb.get_item_id('authors', author)
if aid is not None:
asv = adata.get('sort')
if asv:
sort_map[aid] = asv
alv = adata.get('link')
if alv:
link_map[aid] = alv
if sort_map:
newdb.set_sort_for_authors(sort_map, update_books=False)
if link_map:
newdb.set_link_for_authors(link_map)
co = db.conversion_options(book_id, 'PIPE')
if co is not None:
newdb.set_conversion_options(new_book_id, 'PIPE', co)
if identical_books_data is not None and duplicate_action != 'add':
newdb.update_data_for_find_identical_books(new_book_id, identical_books_data)
def copy_one_book(
book_id, src_db, dest_db, duplicate_action='add', automerge_action='overwrite',
preserve_date=True, identical_books_data=None, preserve_uuid=False):
db = src_db.new_api
newdb = dest_db.new_api
with db.safe_read_lock, newdb.write_lock:
mi = db.get_metadata(book_id, get_cover=True, cover_as_data=True)
if not preserve_date:
mi.timestamp = now()
format_map = {}
fmts = list(db.formats(book_id, verify_formats=False))
for fmt in fmts:
path = db.format_abspath(book_id, fmt)
if path:
format_map[fmt.upper()] = path
identical_book_list = set()
new_authors = {k for k, v in iteritems(newdb.get_item_ids('authors', mi.authors)) if v is None}
new_book_id = None
return_data = {
'book_id': book_id, 'title': mi.title, 'authors': mi.authors, 'author': mi.format_field('authors')[1],
'action': 'add', 'new_book_id': None
}
if duplicate_action != 'add':
# Scanning for dupes can be slow on a large library so
# only do it if the option is set
if identical_books_data is None:
identical_books_data = identical_books_data = newdb.data_for_find_identical_books()
identical_book_list = find_identical_books(mi, identical_books_data)
if identical_book_list: # books with same author and nearly same title exist in newdb
if duplicate_action == 'add_formats_to_existing':
new_book_id = automerge_book(automerge_action, book_id, mi, identical_book_list, newdb, format_map)
return_data['action'] = 'automerge'
return_data['new_book_id'] = new_book_id
postprocess_copy(book_id, new_book_id, new_authors, db, newdb, identical_books_data, duplicate_action)
else:
return_data['action'] = 'duplicate'
return return_data
new_book_id = newdb.add_books(
[(mi, format_map)], add_duplicates=True, apply_import_tags=tweaks['add_new_book_tags_when_importing_books'],
preserve_uuid=preserve_uuid, run_hooks=False)[0][0]
postprocess_copy(book_id, new_book_id, new_authors, db, newdb, identical_books_data, duplicate_action)
return_data['new_book_id'] = new_book_id
return return_data

View File

@ -305,3 +305,49 @@ class AddRemoveTest(BaseTest):
self.assertEqual(len(old), len(new)) self.assertEqual(len(old), len(new))
self.assertNotIn(prefix, cache.fields['formats'].format_fname(1, 'FMT1')) self.assertNotIn(prefix, cache.fields['formats'].format_fname(1, 'FMT1'))
# }}} # }}}
def test_copy_to_library(self): # {{{
from calibre.db.copy_to_library import copy_one_book
from calibre.ebooks.metadata import authors_to_string
src_db = self.init_cache()
dest_db = self.init_cache(self.cloned_library)
def make_rdata(book_id=1, new_book_id=None, action='add'):
return {
'title': src_db.field_for('title', book_id),
'authors': list(src_db.field_for('authors', book_id)),
'author': authors_to_string(src_db.field_for('authors', book_id)),
'book_id': book_id, 'new_book_id': new_book_id, 'action': action
}
def compare_field(field, func=self.assertEqual):
func(src_db.field_for(field, rdata['book_id']), dest_db.field_for(field, rdata['new_book_id']))
rdata = copy_one_book(1, src_db, dest_db)
self.assertEqual(rdata, make_rdata(new_book_id=max(dest_db.all_book_ids())))
compare_field('timestamp')
compare_field('uuid', self.assertNotEqual)
rdata = copy_one_book(1, src_db, dest_db, preserve_date=False, preserve_uuid=True)
self.assertEqual(rdata, make_rdata(new_book_id=max(dest_db.all_book_ids())))
compare_field('timestamp', self.assertNotEqual)
compare_field('uuid')
rdata = copy_one_book(1, src_db, dest_db, duplicate_action='ignore')
self.assertIsNone(rdata['new_book_id'])
self.assertEqual(rdata['action'], 'duplicate')
src_db.add_format(1, 'FMT1', BytesIO(b'replaced'), run_hooks=False)
rdata = copy_one_book(1, src_db, dest_db, duplicate_action='add_formats_to_existing')
self.assertEqual(rdata['action'], 'automerge')
for new_book_id in (1, 4, 5):
self.assertEqual(dest_db.format(new_book_id, 'FMT1'), b'replaced')
src_db.add_format(1, 'FMT1', BytesIO(b'second-round'), run_hooks=False)
rdata = copy_one_book(1, src_db, dest_db, duplicate_action='add_formats_to_existing', automerge_action='ignore')
self.assertEqual(rdata['action'], 'automerge')
for new_book_id in (1, 4, 5):
self.assertEqual(dest_db.format(new_book_id, 'FMT1'), b'replaced')
rdata = copy_one_book(1, src_db, dest_db, duplicate_action='add_formats_to_existing', automerge_action='new record')
self.assertEqual(rdata['action'], 'automerge')
for new_book_id in (1, 4, 5):
self.assertEqual(dest_db.format(new_book_id, 'FMT1'), b'replaced')
self.assertEqual(dest_db.format(rdata['new_book_id'], 'FMT1'), b'second-round')
# }}}

View File

@ -18,15 +18,14 @@ from PyQt5.Qt import (
from calibre import as_unicode from calibre import as_unicode
from calibre.constants import isosx from calibre.constants import isosx
from calibre.db.utils import find_identical_books
from calibre.gui2.actions import InterfaceAction from calibre.gui2.actions import InterfaceAction
from calibre.gui2 import (error_dialog, Dispatcher, warning_dialog, gprefs, from calibre.gui2 import (error_dialog, Dispatcher, warning_dialog, gprefs,
info_dialog, choose_dir) info_dialog, choose_dir)
from calibre.gui2.dialogs.progress import ProgressDialog from calibre.gui2.dialogs.progress import ProgressDialog
from calibre.gui2.widgets2 import Dialog from calibre.gui2.widgets2 import Dialog
from calibre.utils.config import prefs, tweaks from calibre.utils.config import prefs
from calibre.utils.date import now
from calibre.utils.icu import sort_key, numeric_sort_key from calibre.utils.icu import sort_key, numeric_sort_key
from calibre.db.copy_to_library import copy_one_book
def ask_about_cc_mismatch(gui, db, newdb, missing_cols, incompatible_cols): # {{{ def ask_about_cc_mismatch(gui, db, newdb, missing_cols, incompatible_cols): # {{{
@ -140,17 +139,11 @@ class Worker(Thread): # {{{
self.done() self.done()
def add_formats(self, id_, paths, newdb, replace=True):
for path in paths:
fmt = os.path.splitext(path)[-1].replace('.', '').upper()
with lopen(path, 'rb') as f:
newdb.add_format(id_, fmt, f, index_is_id=True,
notify=False, replace=replace)
def doit(self): def doit(self):
from calibre.gui2.ui import get_gui from calibre.gui2.ui import get_gui
library_broker = get_gui().library_broker library_broker = get_gui().library_broker
newdb = library_broker.get_library(self.loc) newdb = library_broker.get_library(self.loc)
self.find_identical_books_data = None
try: try:
if self.check_for_duplicates: if self.check_for_duplicates:
self.find_identical_books_data = newdb.new_api.data_for_find_identical_books() self.find_identical_books_data = newdb.new_api.data_for_find_identical_books()
@ -171,102 +164,25 @@ class Worker(Thread): # {{{
self.failed_books[x] = (err, as_unicode(traceback.format_exc())) self.failed_books[x] = (err, as_unicode(traceback.format_exc()))
def do_one(self, num, book_id, newdb): def do_one(self, num, book_id, newdb):
mi = self.db.get_metadata(book_id, index_is_id=True, get_cover=True, cover_as_data=True) duplicate_action = 'add'
if not gprefs['preserve_date_on_ctl']: if self.check_for_duplicates:
mi.timestamp = now() duplicate_action = 'add_formats_to_existing' if prefs['add_formats_to_existing'] else 'ignore'
self.progress(num, mi.title) rdata = copy_one_book(
fmts = self.db.formats(book_id, index_is_id=True) book_id, self.db, newdb,
if not fmts: preserve_date=gprefs['preserve_date_on_ctl'],
fmts = [] duplicate_action=duplicate_action, automerge_action=gprefs['automerge'],
else: identical_books_data=self.find_identical_books_data,
fmts = fmts.split(',') preserve_uuid=self.delete_after
identical_book_list = set() )
paths = [] self.progress(num, rdata['title'])
for fmt in fmts: if rdata['action'] == 'automerge':
p = self.db.format(book_id, fmt, index_is_id=True, self.auto_merged_ids[book_id] = _('%(title)s by %(author)s') % dict(title=rdata['title'], author=rdata['author'])
as_path=True) elif rdata['action'] == 'duplicate':
if p: self.duplicate_ids[book_id] = (rdata['title'], rdata['authors'])
paths.append(p)
try:
if self.check_for_duplicates:
# Scanning for dupes can be slow on a large library so
# only do it if the option is set
identical_book_list = find_identical_books(mi, self.find_identical_books_data)
if identical_book_list: # books with same author and nearly same title exist in newdb
if prefs['add_formats_to_existing']:
self.automerge_book(book_id, mi, identical_book_list, paths, newdb)
else: # Report duplicates for later processing
self.duplicate_ids[book_id] = (mi.title, mi.authors)
return
new_authors = {k for k, v in newdb.new_api.get_item_ids('authors', mi.authors).iteritems() if v is None}
new_book_id = newdb.import_book(mi, paths, notify=False, import_hooks=False,
apply_import_tags=tweaks['add_new_book_tags_when_importing_books'],
preserve_uuid=self.delete_after)
if new_authors:
author_id_map = self.db.new_api.get_item_ids('authors', new_authors)
sort_map, link_map = {}, {}
for author, aid in author_id_map.iteritems():
if aid is not None:
adata = self.db.new_api.author_data((aid,)).get(aid)
if adata is not None:
aid = newdb.new_api.get_item_id('authors', author)
if aid is not None:
asv = adata.get('sort')
if asv:
sort_map[aid] = asv
alv = adata.get('link')
if alv:
link_map[aid] = alv
if sort_map:
newdb.new_api.set_sort_for_authors(sort_map, update_books=False)
if link_map:
newdb.new_api.set_link_for_authors(link_map)
co = self.db.conversion_options(book_id, 'PIPE')
if co is not None:
newdb.set_conversion_options(new_book_id, 'PIPE', co)
if self.check_for_duplicates:
newdb.new_api.update_data_for_find_identical_books(new_book_id, self.find_identical_books_data)
self.processed.add(book_id)
finally:
for path in paths:
try:
os.remove(path)
except:
pass
def automerge_book(self, book_id, mi, identical_book_list, paths, newdb):
self.auto_merged_ids[book_id] = _('%(title)s by %(author)s') % dict(title=mi.title, author=mi.format_field('authors')[1])
seen_fmts = set()
self.processed.add(book_id) self.processed.add(book_id)
for identical_book in identical_book_list:
ib_fmts = newdb.formats(identical_book, index_is_id=True)
if ib_fmts:
seen_fmts |= set(ib_fmts.split(','))
replace = gprefs['automerge'] == 'overwrite'
self.add_formats(identical_book, paths, newdb,
replace=replace)
if gprefs['automerge'] == 'new record':
incoming_fmts = \
{os.path.splitext(path)[-1].replace('.',
'').upper() for path in paths}
if incoming_fmts.intersection(seen_fmts):
# There was at least one duplicate format
# so create a new record and put the
# incoming formats into it
# We should arguably put only the duplicate
# formats, but no real harm is done by having
# all formats
newdb.import_book(mi, paths, notify=False, import_hooks=False,
apply_import_tags=tweaks['add_new_book_tags_when_importing_books'],
preserve_uuid=False)
# }}} # }}}
class ChooseLibrary(Dialog): # {{{ class ChooseLibrary(Dialog): # {{{
def __init__(self, parent, locations): def __init__(self, parent, locations):