From 3d19fb01bef4dbfe964178bb7c05c36925d2c0fe Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Wed, 18 Sep 2024 21:35:47 +0530 Subject: [PATCH] Switch to threaded worker for book render Now that almost all code runs in C and releases the GIL these perform much better. Cuts the time of first render for a 400K word book on my system by half from 1.9 to 0.9 seconds. --- src/calibre/srv/render_book.py | 204 +++++++++------------------------ 1 file changed, 53 insertions(+), 151 deletions(-) diff --git a/src/calibre/srv/render_book.py b/src/calibre/srv/render_book.py index 286c9c136f..69eb4bb644 100644 --- a/src/calibre/srv/render_book.py +++ b/src/calibre/srv/render_book.py @@ -5,17 +5,15 @@ import json import os import sys -import time from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor from datetime import datetime from functools import partial from itertools import count -from math import ceil from lxml.etree import Comment from calibre import detect_ncpus, force_unicode, prepare_string_for_xml -from calibre.constants import iswindows from calibre.customize.ui import plugin_for_input_format from calibre.ebooks.oeb.base import EPUB, OEB_DOCS, OEB_STYLES, OPF, SMIL, XHTML, XHTML_NS, XLINK, rewrite_links, urlunquote from calibre.ebooks.oeb.base import XPath as _XPath @@ -24,14 +22,10 @@ from calibre.ebooks.oeb.polish.container import Container as ContainerBase from calibre.ebooks.oeb.polish.cover import find_cover_image, find_cover_image_in_page, find_cover_page from calibre.ebooks.oeb.polish.toc import from_xpaths, get_landmarks, get_toc from calibre.ebooks.oeb.polish.utils import guess_type -from calibre.ptempfile import PersistentTemporaryDirectory from calibre.srv.metadata import encode_datetime -from calibre.srv.opts import grouper from calibre.utils.date import EPOCH -from calibre.utils.filenames import rmtree -from calibre.utils.ipc.simple_worker import start_pipe_worker from calibre.utils.logging import default_log -from calibre.utils.serialize import json_dumps, json_loads, msgpack_dumps, msgpack_loads +from calibre.utils.serialize import json_dumps, json_loads, msgpack_loads from calibre.utils.short_uuid import uuid4 from calibre_extensions.fast_css_transform import transform_properties from polyglot.binary import as_base64_unicode as encode_component @@ -501,102 +495,6 @@ def transform_html(container, name, virtualize_resources, link_uid, link_to_map, f.write(shtml) -class RenderManager: - - def __init__(self, max_workers): - self.max_workers = max_workers - - def launch_worker(self): - with open(os.path.join(self.tdir, f'{len(self.workers)}.json'), 'wb') as output: - error = open(os.path.join(self.tdir, f'{len(self.workers)}.error'), 'wb') - p = start_pipe_worker('from calibre.srv.render_book import worker_main; worker_main()', stdout=error, stderr=error) - p.output_path = output.name - p.error_path = error.name - self.workers.append(p) - - def __enter__(self): - self.workers = [] - self.tdir = PersistentTemporaryDirectory() - return self - - def __exit__(self, *a): - while self.workers: - p = self.workers.pop() - if p.poll() is not None: - continue - p.terminate() - if not iswindows and p.poll() is None: - time.sleep(0.02) - if p.poll() is None: - p.kill() - del self.workers - try: - rmtree(self.tdir) - except OSError: - time.sleep(0.1) - try: - rmtree(self.tdir) - except OSError: - pass - del self.tdir - - def launch_workers(self, names, in_process_container): - num_workers = min(detect_ncpus(), len(names)) - if self.max_workers: - num_workers = min(num_workers, self.max_workers) - if num_workers > 1: - if len(names) < 3 or sum(os.path.getsize(in_process_container.name_path_map[n]) for n in names) < 128 * 1024: - num_workers = 1 - if num_workers > 1: - num_other_workers = num_workers - 1 - while len(self.workers) < num_other_workers: - self.launch_worker() - return num_workers - - def __call__(self, names, args, in_process_container): - num_workers = len(self.workers) + 1 - if num_workers == 1: - return [process_book_files(names, *args, container=in_process_container)] - - group_sz = int(ceil(len(names) / num_workers)) - groups = tuple(grouper(group_sz, names)) - for group, worker in zip(groups[:-1], self.workers): - worker.stdin.write(as_bytes(msgpack_dumps((worker.output_path, group,) + args))) - worker.stdin.flush(), worker.stdin.close() - worker.job_sent = True - - for worker in self.workers: - if not hasattr(worker, 'job_sent'): - worker.stdin.write(b'_'), worker.stdin.flush(), worker.stdin.close() - - error = None - results = [process_book_files(groups[-1], *args, container=in_process_container)] - for worker in self.workers: - if not hasattr(worker, 'job_sent'): - worker.wait() - continue - if worker.wait() != 0: - with open(worker.error_path, 'rb') as f: - error = f.read().decode('utf-8', 'replace') - else: - with open(worker.output_path, 'rb') as f: - results.append(msgpack_loads(f.read())) - if error is not None: - raise Exception('Render worker failed with error:\n' + error) - return results - - -def worker_main(): - stdin = getattr(sys.stdin, 'buffer', sys.stdin) - raw = stdin.read() - if raw == b'_': - return - args = msgpack_loads(raw) - result = process_book_files(*args[1:]) - with open(args[0], 'wb') as f: - f.write(as_bytes(msgpack_dumps(result))) - - def virtualize_html(container, name, link_uid, link_to_map, virtualized_names): changed = set() @@ -634,7 +532,7 @@ def virtualize_html(container, name, link_uid, link_to_map, virtualized_names): __smil_file_names__ = '' -def process_book_files(names, container_dir, opfpath, virtualize_resources, link_uid, data_for_clone, container=None): +def process_book_files(names, container_dir, opfpath, virtualize_resources, link_uid, data_for_clone=None, container=None): if container is None: container = SimpleContainer(container_dir, opfpath, default_log, clone_data=data_for_clone) container.cloned = False @@ -664,9 +562,19 @@ def process_book_files(names, container_dir, opfpath, virtualize_resources, link return link_to_map, html_data, virtualized_names, smil_map +def calculate_number_of_workers(names, in_process_container, max_workers): + num_workers = min(detect_ncpus(), len(names)) + if max_workers: + num_workers = min(num_workers, max_workers) + if num_workers > 1: + if len(names) < 3 or sum(os.path.getsize(in_process_container.name_path_map[n]) for n in names) < 128 * 1024: + num_workers = 1 + return num_workers + + def process_exploded_book( - book_fmt, opfpath, input_fmt, tdir, render_manager, log=None, book_hash=None, save_bookmark_data=False, - book_metadata=None, virtualize_resources=True + book_fmt, opfpath, input_fmt, tdir, log=None, book_hash=None, save_bookmark_data=False, + book_metadata=None, virtualize_resources=True, max_workers=1 ): log = log or default_log container = SimpleContainer(tdir, opfpath, log) @@ -676,15 +584,8 @@ def process_exploded_book( def needs_work(mt): return mt in OEB_STYLES or mt in OEB_DOCS or mt in ('image/svg+xml', 'application/smil', 'application/smil+xml') - def work_priority(name): - # ensure workers with large files or stylesheets - # have the less names - size = os.path.getsize(container.name_path_map[name]), - is_html = container.mime_map.get(name) in OEB_DOCS - return (0 if is_html else 1), size - - if not is_comic: - render_manager.launch_workers(tuple(n for n, mt in iteritems(container.mime_map) if needs_work(mt)), container) + names_that_need_work = tuple(n for n, mt in iteritems(container.mime_map) if needs_work(mt)) + num_workers = calculate_number_of_workers(names_that_need_work, container, max_workers) bookmark_data = None if save_bookmark_data: @@ -741,15 +642,17 @@ def process_exploded_book( 'page_list_anchor_map': pagelist_anchor_map(page_list), } - names = sorted( - (n for n, mt in iteritems(container.mime_map) if needs_work(mt)), - key=work_priority) + results = [] + if num_workers < 2: + results.append(process_book_files(names_that_need_work, tdir, opfpath, virtualize_resources, book_render_data['link_uid'], container=container)) + else: + with ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = tuple( + executor.submit(process_book_files, (name,), tdir, opfpath, virtualize_resources, book_render_data['link_uid'], container=container) + for name in names_that_need_work) + for future in futures: + results.append(future.result()) - results = render_manager( - names, ( - tdir, opfpath, virtualize_resources, book_render_data['link_uid'], container.data_for_clone() - ), container - ) ltm = book_render_data['link_to_map'] html_data = {} virtualized_names = set() @@ -899,33 +802,32 @@ def get_stored_annotations(container, bookmark_data): def render(pathtoebook, output_dir, book_hash=None, serialize_metadata=False, extract_annotations=False, virtualize_resources=True, max_workers=1): pathtoebook = os.path.abspath(pathtoebook) - with RenderManager(max_workers) as render_manager: - mi = None - if serialize_metadata: - from calibre.customize.ui import quick_metadata - from calibre.ebooks.metadata.meta import get_metadata - with open(pathtoebook, 'rb') as f, quick_metadata: - mi = get_metadata(f, os.path.splitext(pathtoebook)[1][1:].lower()) - book_fmt, opfpath, input_fmt = extract_book(pathtoebook, output_dir, log=default_log) - container, bookmark_data = process_exploded_book( - book_fmt, opfpath, input_fmt, output_dir, render_manager, - book_hash=book_hash, save_bookmark_data=extract_annotations, - book_metadata=mi, virtualize_resources=virtualize_resources - ) - if serialize_metadata: - from calibre.ebooks.metadata.book.serialize import metadata_as_dict - d = metadata_as_dict(mi) - d.pop('cover_data', None) - serialize_datetimes(d), serialize_datetimes(d.get('user_metadata', {})) - with open(os.path.join(output_dir, 'calibre-book-metadata.json'), 'wb') as f: - f.write(json_dumps(d)) - if extract_annotations: - annotations = None - if bookmark_data: - annotations = json_dumps(tuple(get_stored_annotations(container, bookmark_data))) - if annotations: - with open(os.path.join(output_dir, 'calibre-book-annotations.json'), 'wb') as f: - f.write(annotations) + mi = None + if serialize_metadata: + from calibre.customize.ui import quick_metadata + from calibre.ebooks.metadata.meta import get_metadata + with open(pathtoebook, 'rb') as f, quick_metadata: + mi = get_metadata(f, os.path.splitext(pathtoebook)[1][1:].lower()) + book_fmt, opfpath, input_fmt = extract_book(pathtoebook, output_dir, log=default_log) + container, bookmark_data = process_exploded_book( + book_fmt, opfpath, input_fmt, output_dir, max_workers=max_workers, + book_hash=book_hash, save_bookmark_data=extract_annotations, + book_metadata=mi, virtualize_resources=virtualize_resources + ) + if serialize_metadata: + from calibre.ebooks.metadata.book.serialize import metadata_as_dict + d = metadata_as_dict(mi) + d.pop('cover_data', None) + serialize_datetimes(d), serialize_datetimes(d.get('user_metadata', {})) + with open(os.path.join(output_dir, 'calibre-book-metadata.json'), 'wb') as f: + f.write(json_dumps(d)) + if extract_annotations: + annotations = None + if bookmark_data: + annotations = json_dumps(tuple(get_stored_annotations(container, bookmark_data))) + if annotations: + with open(os.path.join(output_dir, 'calibre-book-annotations.json'), 'wb') as f: + f.write(annotations) def render_for_viewer(path, out_dir, book_hash):