Restore pre-refactor save to disk performance by running copy and metadata update in parallel

This commit is contained in:
Kovid Goyal 2014-11-09 17:16:25 +05:30
parent 27489f610c
commit 1d8afd8c7c
4 changed files with 91 additions and 80 deletions

View File

@ -113,7 +113,7 @@ class SaveToDiskAction(InterfaceAction):
if save_cover is not None: if save_cover is not None:
opts.save_cover = save_cover opts.save_cover = save_cover
book_ids = set(map(self.gui.library_view.model().id, rows)) book_ids = set(map(self.gui.library_view.model().id, rows))
Saver(book_ids, self.gui.current_db, opts, path, parent=self.gui, spare_server=self.gui.spare_server) Saver(book_ids, self.gui.current_db, opts, path, parent=self.gui)
else: else:
paths = self.gui.current_view().model().paths(rows) paths = self.gui.current_view().model().paths(rows)
self.gui.device_manager.save_books( self.gui.device_manager.save_books(

View File

@ -23,15 +23,14 @@ from calibre.ptempfile import PersistentTemporaryDirectory
from calibre.gui2 import error_dialog, warning_dialog, gprefs, open_local_file from calibre.gui2 import error_dialog, warning_dialog, gprefs, open_local_file
from calibre.gui2.dialogs.progress import ProgressDialog from calibre.gui2.dialogs.progress import ProgressDialog
from calibre.utils.formatter_functions import load_user_template_functions from calibre.utils.formatter_functions import load_user_template_functions
from calibre.utils.ipc.job import ParallelJob from calibre.utils.ipc.pool import Pool, Failure
from calibre.utils.ipc.server import Server
from calibre.library.save_to_disk import sanitize_args, get_path_components, find_plugboard, plugboard_save_to_disk_value from calibre.library.save_to_disk import sanitize_args, get_path_components, find_plugboard, plugboard_save_to_disk_value
BookId = namedtuple('BookId', 'title authors') BookId = namedtuple('BookId', 'title authors')
def ensure_unique_components(data): # {{{ def ensure_unique_components(data): # {{{
cmap = {} cmap = {}
for book_id, (mi, components) in data.iteritems(): for book_id, (mi, components, fmts) in data.iteritems():
c = tuple(components) c = tuple(components)
if c in cmap: if c in cmap:
cmap[c].add(book_id) cmap[c].add(book_id)
@ -73,7 +72,7 @@ class Saver(QObject):
do_one_signal = pyqtSignal() do_one_signal = pyqtSignal()
def __init__(self, book_ids, db, opts, root, parent=None, spare_server=None): def __init__(self, book_ids, db, opts, root, parent=None):
QObject.__init__(self, parent) QObject.__init__(self, parent)
if parent is not None: if parent is not None:
setattr(parent, 'no_gc_%s' % id(self), self) setattr(parent, 'no_gc_%s' % id(self), self)
@ -82,7 +81,6 @@ class Saver(QObject):
self.template_functions = self.db.pref('user_template_functions', []) self.template_functions = self.db.pref('user_template_functions', [])
load_user_template_functions('', self.template_functions) load_user_template_functions('', self.template_functions)
self.collected_data = {} self.collected_data = {}
self.metadata_data = {}
self.errors = defaultdict(list) self.errors = defaultdict(list)
self._book_id_data = {} self._book_id_data = {}
self.all_book_ids = frozenset(book_ids) self.all_book_ids = frozenset(book_ids)
@ -90,9 +88,9 @@ class Saver(QObject):
self.do_one_signal.connect(self.tick, type=Qt.QueuedConnection) self.do_one_signal.connect(self.tick, type=Qt.QueuedConnection)
self.do_one = self.do_one_collect self.do_one = self.do_one_collect
self.ids_to_collect = iter(self.all_book_ids) self.ids_to_collect = iter(self.all_book_ids)
self.plugboard_cache = {} self.plugboards_cache = {}
self.tdir = PersistentTemporaryDirectory('_save_to_disk') self.tdir = PersistentTemporaryDirectory('_save_to_disk')
self.server = spare_server self.pool = None
self.pd.show() self.pd.show()
self.root, self.opts, self.path_length = sanitize_args(root, opts) self.root, self.opts, self.path_length = sanitize_args(root, opts)
@ -113,9 +111,9 @@ class Saver(QObject):
p = self.parent() p = self.parent()
if p is not None: if p is not None:
setattr(p, 'no_gc_%s' % id(self), None) setattr(p, 'no_gc_%s' % id(self), None)
if self.server is not None: if self.pool is not None:
self.server.close() self.pool.shutdown()
self.jobs = self.server = self.metadata_data = self.plugboard_cache = self.plugboards = self.template_functions = self.collected_data = self.all_book_ids = self.pd = self.db = None # noqa self.jobs = self.pool = self.plugboards_cache = self.plugboards = self.template_functions = self.collected_data = self.all_book_ids = self.pd = self.db = None # noqa
def book_id_data(self, book_id): def book_id_data(self, book_id):
ans = self._book_id_data.get(book_id) ans = self._book_id_data.get(book_id)
@ -143,15 +141,20 @@ class Saver(QObject):
mi = self.db.get_metadata(book_id) mi = self.db.get_metadata(book_id)
self._book_id_data[book_id] = BookId(mi.title, mi.authors) self._book_id_data[book_id] = BookId(mi.title, mi.authors)
components = get_path_components(self.opts, mi, book_id, self.path_length) components = get_path_components(self.opts, mi, book_id, self.path_length)
self.collected_data[book_id] = (mi, components) self.collected_data[book_id] = (mi, components, {fmt.lower() for fmt in self.db.formats(book_id)})
def collection_finished(self): def collection_finished(self):
self.do_one = self.do_one_write self.do_one = self.do_one_write
ensure_unique_components(self.collected_data) ensure_unique_components(self.collected_data)
self.ids_to_write = iter(self.collected_data) self.ids_to_write = iter(self.collected_data)
self.pd.title = _('Copying files from calibre library to disk...') self.pd.title = _('Copying files and writing metadata...') if self.opts.update_metadata else _(
'Copying files...')
self.pd.max = len(self.collected_data) self.pd.max = len(self.collected_data)
self.pd.value = 0 self.pd.value = 0
if self.opts.update_metadata:
all_fmts = {fmt for data in self.collected_data.itervalues() for fmt in data[2]}
self.plugboards_cache = {fmt:find_plugboard(plugboard_save_to_disk_value, fmt, self.plugboards) for fmt in all_fmts}
self.pool = Pool(name='SaveToDisk') if self.pool is None else self.pool
self.do_one_signal.emit() self.do_one_signal.emit()
def do_one_write(self): def do_one_write(self):
@ -160,18 +163,42 @@ class Saver(QObject):
except StopIteration: except StopIteration:
self.writing_finished() self.writing_finished()
return return
if not self.opts.update_metadata:
self.pd.msg = self.book_id_data(book_id).title self.pd.msg = self.book_id_data(book_id).title
self.pd.value += 1 self.pd.value += 1
try: try:
self.write_book(book_id, *self.collected_data[book_id]) self.write_book(book_id, *self.collected_data[book_id])
except Exception: except Exception:
self.errors[book_id].append(('critical', traceback.format_exc())) self.errors[book_id].append(('critical', traceback.format_exc()))
self.consume_results()
self.do_one_signal.emit() self.do_one_signal.emit()
def write_book(self, book_id, mi, components): def consume_results(self):
if self.pool is not None:
while True:
try:
worker_result = self.pool.results.get_nowait()
except Empty:
break
book_id = worker_result.id
if worker_result.is_terminal_failure:
error_dialog(self.pd, _('Critical failure'), _(
'The update metadata worker process crashed while processing'
' the book %s. Saving is aborted.') % self.book_id_data(book_id).title, show=True)
self.pd.canceled = True
return
result = worker_result.result
self.pd.value += 1
self.pd.msg = self.book_id_data(book_id).title
if result.err is not None:
self.errors[book_id].append(('metadata', (None, result.err + '\n' + result.traceback)))
if result.value:
for fmt, tb in result.value:
self.errors[book_id].append(('metadata', (fmt, tb)))
def write_book(self, book_id, mi, components, fmts):
base_path = os.path.join(self.root, *components) base_path = os.path.join(self.root, *components)
base_dir = os.path.dirname(base_path) base_dir = os.path.dirname(base_path)
fmts = {f.lower() for f in self.db.formats(book_id)}
if self.opts.formats != 'all': if self.opts.formats != 'all':
asked_formats = {x.lower().strip() for x in self.opts.formats.split(',')} asked_formats = {x.lower().strip() for x in self.opts.formats.split(',')}
fmts = asked_formats.intersection(fmts) fmts = asked_formats.intersection(fmts)
@ -189,7 +216,7 @@ class Saver(QObject):
raise raise
if self.opts.update_metadata: if self.opts.update_metadata:
self.metadata_data[book_id] = d = {} d = {}
d['last_modified'] = mi.last_modified.isoformat() d['last_modified'] = mi.last_modified.isoformat()
cdata = self.db.cover(book_id) cdata = self.db.cover(book_id)
@ -230,8 +257,18 @@ class Saver(QObject):
d['fmts'].append(fmtpath) d['fmts'].append(fmtpath)
except Exception: except Exception:
self.errors[book_id].append(('fmt', (fmt, traceback.format_exc()))) self.errors[book_id].append(('fmt', (fmt, traceback.format_exc())))
if self.opts.update_metadata and not d['fmts']: if self.opts.update_metadata:
self.metadata_data.pop(book_id, None) if d['fmts']:
try:
self.pool(book_id, 'calibre.library.save_to_disk', 'update_serialized_metadata', d, self.plugboards_cache)
except Failure as err:
error_dialog(self.pd, _('Critical failure'), _(
'Could not save books to disk, click "Show details" for more information'),
det_msg=unicode(err) + '\n' + unicode(err.details), show=True)
self.pd.canceled = True
else:
self.pd.value += 1
self.pd.msg = self.book_id_data(book_id).title
def write_fmt(self, book_id, fmt, base_path): def write_fmt(self, book_id, fmt, base_path):
fmtpath = base_path + os.extsep + fmt fmtpath = base_path + os.extsep + fmt
@ -249,47 +286,26 @@ class Saver(QObject):
def writing_finished(self): def writing_finished(self):
if not self.opts.update_metadata: if not self.opts.update_metadata:
self.metadata_data = {}
if not self.metadata_data:
self.updating_metadata_finished() self.updating_metadata_finished()
return else:
self.pd.title = _('Updating metadata...')
self.pd.max = len(self.metadata_data)
self.pd.value = 0
all_fmts = {path.rpartition(os.extsep)[-1] for d in self.metadata_data.itervalues() for path in d['fmts']}
plugboards_cache = {fmt:find_plugboard(plugboard_save_to_disk_value, fmt, self.plugboards) for fmt in all_fmts}
self.server = server = Server() if self.server is None else self.server
tasks = server.split(list(self.metadata_data))
self.jobs = set()
for i, book_ids in enumerate(tasks):
data = {book_id:self.metadata_data[book_id] for j, book_id in book_ids}
job = ParallelJob('save_book', 'Save books (job %d of %d)' % (i+1, len(tasks)), lambda x, y:x, args=(data, plugboards_cache))
self.jobs.add(job)
server.add_job(job)
self.do_one = self.do_one_update self.do_one = self.do_one_update
self.do_one_signal.emit() self.do_one_signal.emit()
def do_one_update(self): def do_one_update(self):
running = False self.consume_results()
for job in self.jobs:
if not job.is_finished:
running = True
job.update(consume_notifications=False)
while True:
try: try:
book_id = job.notifications.get_nowait()[0] self.pool.wait_for_tasks(0.1)
self.pd.value += 1 except Failure as err:
self.pd.msg = self.book_id_data(book_id).title error_dialog(self.pd, _('Critical failure'), _(
except Empty: 'Could not save books to disk, click "Show details" for more information'),
break det_msg=unicode(err) + '\n' + unicode(err.details), show=True)
if running: self.pd.canceled = True
self.do_one_signal.emit() except RuntimeError:
pass # tasks not completed
else: else:
for job in self.jobs: self.consume_results()
for book_id, fmt, tb in (job.result or ()): return self.updating_metadata_finished()
self.errors[book_id].append(('metadata', (fmt, tb))) self.do_one_signal.emit()
self.updating_metadata_finished()
def updating_metadata_finished(self): def updating_metadata_finished(self):
if DEBUG: if DEBUG:
@ -312,7 +328,7 @@ class Saver(QObject):
types = {t for t, data in errors} types = {t for t, data in errors}
title, authors = self.book_id_data(book_id).title, authors_to_string(self.book_id_data(book_id).authors[:1]) title, authors = self.book_id_data(book_id).title, authors_to_string(self.book_id_data(book_id).authors[:1])
if report: if report:
a('\n' + ('_'*80) + '\n') a('\n' + ('_'*70) + '\n')
if 'critical' in types: if 'critical' in types:
a(_('Failed to save: {0} by {1} to disk, with error:').format(title, authors)) a(_('Failed to save: {0} by {1} to disk, with error:').format(title, authors))
for t, tb in errors: for t, tb in errors:
@ -326,7 +342,10 @@ class Saver(QObject):
a(_('Failed to save the {2} format of: {0} by {1} to disk, with error:').format(title, authors, fmt.upper())) a(_('Failed to save the {2} format of: {0} by {1} to disk, with error:').format(title, authors, fmt.upper()))
a(indent(tb)), a('') a(indent(tb)), a('')
for fmt, tb in errs['metadata']: for fmt, tb in errs['metadata']:
a(_('Failed to update the metadata in the {2} format of: {0} by {1} to disk, with error:').format(title, authors, fmt.upper())) if fmt:
a(_('Failed to update the metadata in the {2} format of: {0} by {1}, with error:').format(title, authors, fmt.upper()))
else:
a(_('Failed to update the metadata in all formats of: {0} by {1}, with error:').format(title, authors))
a(indent(tb)), a('') a(indent(tb)), a('')
return '\n'.join(report) return '\n'.join(report)

View File

@ -431,7 +431,7 @@ def save_to_disk(db, ids, root, opts=None, callback=None):
break break
return failures return failures
def read_serialized_metadata(book_id, data): def read_serialized_metadata(data):
from calibre.ebooks.metadata.opf2 import OPF from calibre.ebooks.metadata.opf2 import OPF
from calibre.utils.date import parse_date from calibre.utils.date import parse_date
mi = OPF(data['opf'], try_to_guess_cover=False, populate_spine=False, basedir=os.path.dirname(data['opf'])).to_book_metadata() mi = OPF(data['opf'], try_to_guess_cover=False, populate_spine=False, basedir=os.path.dirname(data['opf'])).to_book_metadata()
@ -446,29 +446,21 @@ def read_serialized_metadata(book_id, data):
cdata = f.read() cdata = f.read()
return mi, cdata return mi, cdata
def update_serialized_metadata(books, plugboard_cache, notification=lambda x,y:x): def update_serialized_metadata(book, plugboard_cache):
result = [] result = []
from calibre.customize.ui import apply_null_metadata from calibre.customize.ui import apply_null_metadata
with apply_null_metadata: with apply_null_metadata:
for book_id, data in books.iteritems(): fmts = [fp.rpartition(os.extsep)[-1] for fp in book['fmts']]
fmts = [fp.rpartition(os.extsep)[-1] for fp in data['fmts']] mi, cdata = read_serialized_metadata(book)
try:
mi, cdata = read_serialized_metadata(book_id, data)
except Exception:
tb = traceback.format_exc()
for fmt in fmts:
result.append((book_id, fmt, tb))
else:
def report_error(fmt, tb): def report_error(fmt, tb):
result.append((book_id, fmt, tb)) result.append((fmt, tb))
for fmt, fmtpath in zip(fmts, data['fmts']): for fmt, fmtpath in zip(fmts, book['fmts']):
try: try:
with lopen(fmtpath, 'r+b') as stream: with lopen(fmtpath, 'r+b') as stream:
update_metadata(mi, fmt, stream, (), cdata, error_report=report_error, plugboard_cache=plugboard_cache) update_metadata(mi, fmt, stream, (), cdata, error_report=report_error, plugboard_cache=plugboard_cache)
except Exception: except Exception:
report_error(fmt, traceback.format_exc()) report_error(fmt, traceback.format_exc())
notification(book_id)
return result return result

View File

@ -119,7 +119,7 @@ class Pool(Thread):
return self.run_job(job) return self.run_job(job)
else: else:
worker_result = event worker_result = event
self.busy_workers.pop(worker_result.worker) self.busy_workers.pop(worker_result.worker, None)
self.available_workers.append(worker_result.worker) self.available_workers.append(worker_result.worker)
self.tracker.task_done() self.tracker.task_done()
if worker_result.is_terminal_failure: if worker_result.is_terminal_failure: