From 1d8afd8c7c63cf88fdd69768e2633c76449a43f0 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sun, 9 Nov 2014 17:16:25 +0530 Subject: [PATCH] Restore pre-refactor save to disk performance by running copy and metadata update in parallel --- src/calibre/gui2/actions/save_to_disk.py | 2 +- src/calibre/gui2/save.py | 135 +++++++++++++---------- src/calibre/library/save_to_disk.py | 32 ++---- src/calibre/utils/ipc/pool.py | 2 +- 4 files changed, 91 insertions(+), 80 deletions(-) diff --git a/src/calibre/gui2/actions/save_to_disk.py b/src/calibre/gui2/actions/save_to_disk.py index 83ced9f158..f568fa121a 100644 --- a/src/calibre/gui2/actions/save_to_disk.py +++ b/src/calibre/gui2/actions/save_to_disk.py @@ -113,7 +113,7 @@ class SaveToDiskAction(InterfaceAction): if save_cover is not None: opts.save_cover = save_cover book_ids = set(map(self.gui.library_view.model().id, rows)) - Saver(book_ids, self.gui.current_db, opts, path, parent=self.gui, spare_server=self.gui.spare_server) + Saver(book_ids, self.gui.current_db, opts, path, parent=self.gui) else: paths = self.gui.current_view().model().paths(rows) self.gui.device_manager.save_books( diff --git a/src/calibre/gui2/save.py b/src/calibre/gui2/save.py index 8201f1cbfc..61dd191362 100644 --- a/src/calibre/gui2/save.py +++ b/src/calibre/gui2/save.py @@ -23,15 +23,14 @@ from calibre.ptempfile import PersistentTemporaryDirectory from calibre.gui2 import error_dialog, warning_dialog, gprefs, open_local_file from calibre.gui2.dialogs.progress import ProgressDialog from calibre.utils.formatter_functions import load_user_template_functions -from calibre.utils.ipc.job import ParallelJob -from calibre.utils.ipc.server import Server +from calibre.utils.ipc.pool import Pool, Failure from calibre.library.save_to_disk import sanitize_args, get_path_components, find_plugboard, plugboard_save_to_disk_value BookId = namedtuple('BookId', 'title authors') def ensure_unique_components(data): # {{{ cmap = {} - for book_id, (mi, components) in data.iteritems(): + for book_id, (mi, components, fmts) in data.iteritems(): c = tuple(components) if c in cmap: cmap[c].add(book_id) @@ -73,7 +72,7 @@ class Saver(QObject): do_one_signal = pyqtSignal() - def __init__(self, book_ids, db, opts, root, parent=None, spare_server=None): + def __init__(self, book_ids, db, opts, root, parent=None): QObject.__init__(self, parent) if parent is not None: setattr(parent, 'no_gc_%s' % id(self), self) @@ -82,7 +81,6 @@ class Saver(QObject): self.template_functions = self.db.pref('user_template_functions', []) load_user_template_functions('', self.template_functions) self.collected_data = {} - self.metadata_data = {} self.errors = defaultdict(list) self._book_id_data = {} self.all_book_ids = frozenset(book_ids) @@ -90,9 +88,9 @@ class Saver(QObject): self.do_one_signal.connect(self.tick, type=Qt.QueuedConnection) self.do_one = self.do_one_collect self.ids_to_collect = iter(self.all_book_ids) - self.plugboard_cache = {} + self.plugboards_cache = {} self.tdir = PersistentTemporaryDirectory('_save_to_disk') - self.server = spare_server + self.pool = None self.pd.show() self.root, self.opts, self.path_length = sanitize_args(root, opts) @@ -113,9 +111,9 @@ class Saver(QObject): p = self.parent() if p is not None: setattr(p, 'no_gc_%s' % id(self), None) - if self.server is not None: - self.server.close() - self.jobs = self.server = self.metadata_data = self.plugboard_cache = self.plugboards = self.template_functions = self.collected_data = self.all_book_ids = self.pd = self.db = None # noqa + if self.pool is not None: + self.pool.shutdown() + self.jobs = self.pool = self.plugboards_cache = self.plugboards = self.template_functions = self.collected_data = self.all_book_ids = self.pd = self.db = None # noqa def book_id_data(self, book_id): ans = self._book_id_data.get(book_id) @@ -143,15 +141,20 @@ class Saver(QObject): mi = self.db.get_metadata(book_id) self._book_id_data[book_id] = BookId(mi.title, mi.authors) components = get_path_components(self.opts, mi, book_id, self.path_length) - self.collected_data[book_id] = (mi, components) + self.collected_data[book_id] = (mi, components, {fmt.lower() for fmt in self.db.formats(book_id)}) def collection_finished(self): self.do_one = self.do_one_write ensure_unique_components(self.collected_data) self.ids_to_write = iter(self.collected_data) - self.pd.title = _('Copying files from calibre library to disk...') + self.pd.title = _('Copying files and writing metadata...') if self.opts.update_metadata else _( + 'Copying files...') self.pd.max = len(self.collected_data) self.pd.value = 0 + if self.opts.update_metadata: + all_fmts = {fmt for data in self.collected_data.itervalues() for fmt in data[2]} + self.plugboards_cache = {fmt:find_plugboard(plugboard_save_to_disk_value, fmt, self.plugboards) for fmt in all_fmts} + self.pool = Pool(name='SaveToDisk') if self.pool is None else self.pool self.do_one_signal.emit() def do_one_write(self): @@ -160,18 +163,42 @@ class Saver(QObject): except StopIteration: self.writing_finished() return - self.pd.msg = self.book_id_data(book_id).title - self.pd.value += 1 + if not self.opts.update_metadata: + self.pd.msg = self.book_id_data(book_id).title + self.pd.value += 1 try: self.write_book(book_id, *self.collected_data[book_id]) except Exception: self.errors[book_id].append(('critical', traceback.format_exc())) + self.consume_results() self.do_one_signal.emit() - def write_book(self, book_id, mi, components): + def consume_results(self): + if self.pool is not None: + while True: + try: + worker_result = self.pool.results.get_nowait() + except Empty: + break + book_id = worker_result.id + if worker_result.is_terminal_failure: + error_dialog(self.pd, _('Critical failure'), _( + 'The update metadata worker process crashed while processing' + ' the book %s. Saving is aborted.') % self.book_id_data(book_id).title, show=True) + self.pd.canceled = True + return + result = worker_result.result + self.pd.value += 1 + self.pd.msg = self.book_id_data(book_id).title + if result.err is not None: + self.errors[book_id].append(('metadata', (None, result.err + '\n' + result.traceback))) + if result.value: + for fmt, tb in result.value: + self.errors[book_id].append(('metadata', (fmt, tb))) + + def write_book(self, book_id, mi, components, fmts): base_path = os.path.join(self.root, *components) base_dir = os.path.dirname(base_path) - fmts = {f.lower() for f in self.db.formats(book_id)} if self.opts.formats != 'all': asked_formats = {x.lower().strip() for x in self.opts.formats.split(',')} fmts = asked_formats.intersection(fmts) @@ -189,7 +216,7 @@ class Saver(QObject): raise if self.opts.update_metadata: - self.metadata_data[book_id] = d = {} + d = {} d['last_modified'] = mi.last_modified.isoformat() cdata = self.db.cover(book_id) @@ -230,8 +257,18 @@ class Saver(QObject): d['fmts'].append(fmtpath) except Exception: self.errors[book_id].append(('fmt', (fmt, traceback.format_exc()))) - if self.opts.update_metadata and not d['fmts']: - self.metadata_data.pop(book_id, None) + if self.opts.update_metadata: + if d['fmts']: + try: + self.pool(book_id, 'calibre.library.save_to_disk', 'update_serialized_metadata', d, self.plugboards_cache) + except Failure as err: + error_dialog(self.pd, _('Critical failure'), _( + 'Could not save books to disk, click "Show details" for more information'), + det_msg=unicode(err) + '\n' + unicode(err.details), show=True) + self.pd.canceled = True + else: + self.pd.value += 1 + self.pd.msg = self.book_id_data(book_id).title def write_fmt(self, book_id, fmt, base_path): fmtpath = base_path + os.extsep + fmt @@ -249,47 +286,26 @@ class Saver(QObject): def writing_finished(self): if not self.opts.update_metadata: - self.metadata_data = {} - if not self.metadata_data: self.updating_metadata_finished() - return - self.pd.title = _('Updating metadata...') - self.pd.max = len(self.metadata_data) - self.pd.value = 0 - - all_fmts = {path.rpartition(os.extsep)[-1] for d in self.metadata_data.itervalues() for path in d['fmts']} - plugboards_cache = {fmt:find_plugboard(plugboard_save_to_disk_value, fmt, self.plugboards) for fmt in all_fmts} - self.server = server = Server() if self.server is None else self.server - tasks = server.split(list(self.metadata_data)) - self.jobs = set() - for i, book_ids in enumerate(tasks): - data = {book_id:self.metadata_data[book_id] for j, book_id in book_ids} - job = ParallelJob('save_book', 'Save books (job %d of %d)' % (i+1, len(tasks)), lambda x, y:x, args=(data, plugboards_cache)) - self.jobs.add(job) - server.add_job(job) - self.do_one = self.do_one_update - self.do_one_signal.emit() + else: + self.do_one = self.do_one_update + self.do_one_signal.emit() def do_one_update(self): - running = False - for job in self.jobs: - if not job.is_finished: - running = True - job.update(consume_notifications=False) - while True: - try: - book_id = job.notifications.get_nowait()[0] - self.pd.value += 1 - self.pd.msg = self.book_id_data(book_id).title - except Empty: - break - if running: - self.do_one_signal.emit() + self.consume_results() + try: + self.pool.wait_for_tasks(0.1) + except Failure as err: + error_dialog(self.pd, _('Critical failure'), _( + 'Could not save books to disk, click "Show details" for more information'), + det_msg=unicode(err) + '\n' + unicode(err.details), show=True) + self.pd.canceled = True + except RuntimeError: + pass # tasks not completed else: - for job in self.jobs: - for book_id, fmt, tb in (job.result or ()): - self.errors[book_id].append(('metadata', (fmt, tb))) - self.updating_metadata_finished() + self.consume_results() + return self.updating_metadata_finished() + self.do_one_signal.emit() def updating_metadata_finished(self): if DEBUG: @@ -312,7 +328,7 @@ class Saver(QObject): types = {t for t, data in errors} title, authors = self.book_id_data(book_id).title, authors_to_string(self.book_id_data(book_id).authors[:1]) if report: - a('\n' + ('_'*80) + '\n') + a('\n' + ('_'*70) + '\n') if 'critical' in types: a(_('Failed to save: {0} by {1} to disk, with error:').format(title, authors)) for t, tb in errors: @@ -326,7 +342,10 @@ class Saver(QObject): a(_('Failed to save the {2} format of: {0} by {1} to disk, with error:').format(title, authors, fmt.upper())) a(indent(tb)), a('') for fmt, tb in errs['metadata']: - a(_('Failed to update the metadata in the {2} format of: {0} by {1} to disk, with error:').format(title, authors, fmt.upper())) + if fmt: + a(_('Failed to update the metadata in the {2} format of: {0} by {1}, with error:').format(title, authors, fmt.upper())) + else: + a(_('Failed to update the metadata in all formats of: {0} by {1}, with error:').format(title, authors)) a(indent(tb)), a('') return '\n'.join(report) diff --git a/src/calibre/library/save_to_disk.py b/src/calibre/library/save_to_disk.py index 90520b11c0..e7631ec1b8 100644 --- a/src/calibre/library/save_to_disk.py +++ b/src/calibre/library/save_to_disk.py @@ -431,7 +431,7 @@ def save_to_disk(db, ids, root, opts=None, callback=None): break return failures -def read_serialized_metadata(book_id, data): +def read_serialized_metadata(data): from calibre.ebooks.metadata.opf2 import OPF from calibre.utils.date import parse_date mi = OPF(data['opf'], try_to_guess_cover=False, populate_spine=False, basedir=os.path.dirname(data['opf'])).to_book_metadata() @@ -446,29 +446,21 @@ def read_serialized_metadata(book_id, data): cdata = f.read() return mi, cdata -def update_serialized_metadata(books, plugboard_cache, notification=lambda x,y:x): +def update_serialized_metadata(book, plugboard_cache): result = [] from calibre.customize.ui import apply_null_metadata with apply_null_metadata: - for book_id, data in books.iteritems(): - fmts = [fp.rpartition(os.extsep)[-1] for fp in data['fmts']] - try: - mi, cdata = read_serialized_metadata(book_id, data) - except Exception: - tb = traceback.format_exc() - for fmt in fmts: - result.append((book_id, fmt, tb)) - else: - def report_error(fmt, tb): - result.append((book_id, fmt, tb)) + fmts = [fp.rpartition(os.extsep)[-1] for fp in book['fmts']] + mi, cdata = read_serialized_metadata(book) + def report_error(fmt, tb): + result.append((fmt, tb)) - for fmt, fmtpath in zip(fmts, data['fmts']): - try: - with lopen(fmtpath, 'r+b') as stream: - update_metadata(mi, fmt, stream, (), cdata, error_report=report_error, plugboard_cache=plugboard_cache) - except Exception: - report_error(fmt, traceback.format_exc()) - notification(book_id) + for fmt, fmtpath in zip(fmts, book['fmts']): + try: + with lopen(fmtpath, 'r+b') as stream: + update_metadata(mi, fmt, stream, (), cdata, error_report=report_error, plugboard_cache=plugboard_cache) + except Exception: + report_error(fmt, traceback.format_exc()) return result diff --git a/src/calibre/utils/ipc/pool.py b/src/calibre/utils/ipc/pool.py index 04c27867fa..6e66d803bd 100644 --- a/src/calibre/utils/ipc/pool.py +++ b/src/calibre/utils/ipc/pool.py @@ -119,7 +119,7 @@ class Pool(Thread): return self.run_job(job) else: worker_result = event - self.busy_workers.pop(worker_result.worker) + self.busy_workers.pop(worker_result.worker, None) self.available_workers.append(worker_result.worker) self.tracker.task_done() if worker_result.is_terminal_failure: