Switch to threaded worker for book render

Now that almost all code runs in C and releases the GIL these perform
much better. Cuts the time of first render for a 400K word book on my system
by half from 1.9 to 0.9 seconds.
This commit is contained in:
Kovid Goyal 2024-09-18 21:35:47 +05:30
parent cb07b649f2
commit 3d19fb01be
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -5,17 +5,15 @@
import json
import os
import sys
import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from functools import partial
from itertools import count
from math import ceil
from lxml.etree import Comment
from calibre import detect_ncpus, force_unicode, prepare_string_for_xml
from calibre.constants import iswindows
from calibre.customize.ui import plugin_for_input_format
from calibre.ebooks.oeb.base import EPUB, OEB_DOCS, OEB_STYLES, OPF, SMIL, XHTML, XHTML_NS, XLINK, rewrite_links, urlunquote
from calibre.ebooks.oeb.base import XPath as _XPath
@ -24,14 +22,10 @@ from calibre.ebooks.oeb.polish.container import Container as ContainerBase
from calibre.ebooks.oeb.polish.cover import find_cover_image, find_cover_image_in_page, find_cover_page
from calibre.ebooks.oeb.polish.toc import from_xpaths, get_landmarks, get_toc
from calibre.ebooks.oeb.polish.utils import guess_type
from calibre.ptempfile import PersistentTemporaryDirectory
from calibre.srv.metadata import encode_datetime
from calibre.srv.opts import grouper
from calibre.utils.date import EPOCH
from calibre.utils.filenames import rmtree
from calibre.utils.ipc.simple_worker import start_pipe_worker
from calibre.utils.logging import default_log
from calibre.utils.serialize import json_dumps, json_loads, msgpack_dumps, msgpack_loads
from calibre.utils.serialize import json_dumps, json_loads, msgpack_loads
from calibre.utils.short_uuid import uuid4
from calibre_extensions.fast_css_transform import transform_properties
from polyglot.binary import as_base64_unicode as encode_component
@ -501,102 +495,6 @@ def transform_html(container, name, virtualize_resources, link_uid, link_to_map,
f.write(shtml)
class RenderManager:
def __init__(self, max_workers):
self.max_workers = max_workers
def launch_worker(self):
with open(os.path.join(self.tdir, f'{len(self.workers)}.json'), 'wb') as output:
error = open(os.path.join(self.tdir, f'{len(self.workers)}.error'), 'wb')
p = start_pipe_worker('from calibre.srv.render_book import worker_main; worker_main()', stdout=error, stderr=error)
p.output_path = output.name
p.error_path = error.name
self.workers.append(p)
def __enter__(self):
self.workers = []
self.tdir = PersistentTemporaryDirectory()
return self
def __exit__(self, *a):
while self.workers:
p = self.workers.pop()
if p.poll() is not None:
continue
p.terminate()
if not iswindows and p.poll() is None:
time.sleep(0.02)
if p.poll() is None:
p.kill()
del self.workers
try:
rmtree(self.tdir)
except OSError:
time.sleep(0.1)
try:
rmtree(self.tdir)
except OSError:
pass
del self.tdir
def launch_workers(self, names, in_process_container):
num_workers = min(detect_ncpus(), len(names))
if self.max_workers:
num_workers = min(num_workers, self.max_workers)
if num_workers > 1:
if len(names) < 3 or sum(os.path.getsize(in_process_container.name_path_map[n]) for n in names) < 128 * 1024:
num_workers = 1
if num_workers > 1:
num_other_workers = num_workers - 1
while len(self.workers) < num_other_workers:
self.launch_worker()
return num_workers
def __call__(self, names, args, in_process_container):
num_workers = len(self.workers) + 1
if num_workers == 1:
return [process_book_files(names, *args, container=in_process_container)]
group_sz = int(ceil(len(names) / num_workers))
groups = tuple(grouper(group_sz, names))
for group, worker in zip(groups[:-1], self.workers):
worker.stdin.write(as_bytes(msgpack_dumps((worker.output_path, group,) + args)))
worker.stdin.flush(), worker.stdin.close()
worker.job_sent = True
for worker in self.workers:
if not hasattr(worker, 'job_sent'):
worker.stdin.write(b'_'), worker.stdin.flush(), worker.stdin.close()
error = None
results = [process_book_files(groups[-1], *args, container=in_process_container)]
for worker in self.workers:
if not hasattr(worker, 'job_sent'):
worker.wait()
continue
if worker.wait() != 0:
with open(worker.error_path, 'rb') as f:
error = f.read().decode('utf-8', 'replace')
else:
with open(worker.output_path, 'rb') as f:
results.append(msgpack_loads(f.read()))
if error is not None:
raise Exception('Render worker failed with error:\n' + error)
return results
def worker_main():
stdin = getattr(sys.stdin, 'buffer', sys.stdin)
raw = stdin.read()
if raw == b'_':
return
args = msgpack_loads(raw)
result = process_book_files(*args[1:])
with open(args[0], 'wb') as f:
f.write(as_bytes(msgpack_dumps(result)))
def virtualize_html(container, name, link_uid, link_to_map, virtualized_names):
changed = set()
@ -634,7 +532,7 @@ def virtualize_html(container, name, link_uid, link_to_map, virtualized_names):
__smil_file_names__ = ''
def process_book_files(names, container_dir, opfpath, virtualize_resources, link_uid, data_for_clone, container=None):
def process_book_files(names, container_dir, opfpath, virtualize_resources, link_uid, data_for_clone=None, container=None):
if container is None:
container = SimpleContainer(container_dir, opfpath, default_log, clone_data=data_for_clone)
container.cloned = False
@ -664,9 +562,19 @@ def process_book_files(names, container_dir, opfpath, virtualize_resources, link
return link_to_map, html_data, virtualized_names, smil_map
def calculate_number_of_workers(names, in_process_container, max_workers):
num_workers = min(detect_ncpus(), len(names))
if max_workers:
num_workers = min(num_workers, max_workers)
if num_workers > 1:
if len(names) < 3 or sum(os.path.getsize(in_process_container.name_path_map[n]) for n in names) < 128 * 1024:
num_workers = 1
return num_workers
def process_exploded_book(
book_fmt, opfpath, input_fmt, tdir, render_manager, log=None, book_hash=None, save_bookmark_data=False,
book_metadata=None, virtualize_resources=True
book_fmt, opfpath, input_fmt, tdir, log=None, book_hash=None, save_bookmark_data=False,
book_metadata=None, virtualize_resources=True, max_workers=1
):
log = log or default_log
container = SimpleContainer(tdir, opfpath, log)
@ -676,15 +584,8 @@ def process_exploded_book(
def needs_work(mt):
return mt in OEB_STYLES or mt in OEB_DOCS or mt in ('image/svg+xml', 'application/smil', 'application/smil+xml')
def work_priority(name):
# ensure workers with large files or stylesheets
# have the less names
size = os.path.getsize(container.name_path_map[name]),
is_html = container.mime_map.get(name) in OEB_DOCS
return (0 if is_html else 1), size
if not is_comic:
render_manager.launch_workers(tuple(n for n, mt in iteritems(container.mime_map) if needs_work(mt)), container)
names_that_need_work = tuple(n for n, mt in iteritems(container.mime_map) if needs_work(mt))
num_workers = calculate_number_of_workers(names_that_need_work, container, max_workers)
bookmark_data = None
if save_bookmark_data:
@ -741,15 +642,17 @@ def process_exploded_book(
'page_list_anchor_map': pagelist_anchor_map(page_list),
}
names = sorted(
(n for n, mt in iteritems(container.mime_map) if needs_work(mt)),
key=work_priority)
results = []
if num_workers < 2:
results.append(process_book_files(names_that_need_work, tdir, opfpath, virtualize_resources, book_render_data['link_uid'], container=container))
else:
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = tuple(
executor.submit(process_book_files, (name,), tdir, opfpath, virtualize_resources, book_render_data['link_uid'], container=container)
for name in names_that_need_work)
for future in futures:
results.append(future.result())
results = render_manager(
names, (
tdir, opfpath, virtualize_resources, book_render_data['link_uid'], container.data_for_clone()
), container
)
ltm = book_render_data['link_to_map']
html_data = {}
virtualized_names = set()
@ -899,33 +802,32 @@ def get_stored_annotations(container, bookmark_data):
def render(pathtoebook, output_dir, book_hash=None, serialize_metadata=False, extract_annotations=False, virtualize_resources=True, max_workers=1):
pathtoebook = os.path.abspath(pathtoebook)
with RenderManager(max_workers) as render_manager:
mi = None
if serialize_metadata:
from calibre.customize.ui import quick_metadata
from calibre.ebooks.metadata.meta import get_metadata
with open(pathtoebook, 'rb') as f, quick_metadata:
mi = get_metadata(f, os.path.splitext(pathtoebook)[1][1:].lower())
book_fmt, opfpath, input_fmt = extract_book(pathtoebook, output_dir, log=default_log)
container, bookmark_data = process_exploded_book(
book_fmt, opfpath, input_fmt, output_dir, render_manager,
book_hash=book_hash, save_bookmark_data=extract_annotations,
book_metadata=mi, virtualize_resources=virtualize_resources
)
if serialize_metadata:
from calibre.ebooks.metadata.book.serialize import metadata_as_dict
d = metadata_as_dict(mi)
d.pop('cover_data', None)
serialize_datetimes(d), serialize_datetimes(d.get('user_metadata', {}))
with open(os.path.join(output_dir, 'calibre-book-metadata.json'), 'wb') as f:
f.write(json_dumps(d))
if extract_annotations:
annotations = None
if bookmark_data:
annotations = json_dumps(tuple(get_stored_annotations(container, bookmark_data)))
if annotations:
with open(os.path.join(output_dir, 'calibre-book-annotations.json'), 'wb') as f:
f.write(annotations)
mi = None
if serialize_metadata:
from calibre.customize.ui import quick_metadata
from calibre.ebooks.metadata.meta import get_metadata
with open(pathtoebook, 'rb') as f, quick_metadata:
mi = get_metadata(f, os.path.splitext(pathtoebook)[1][1:].lower())
book_fmt, opfpath, input_fmt = extract_book(pathtoebook, output_dir, log=default_log)
container, bookmark_data = process_exploded_book(
book_fmt, opfpath, input_fmt, output_dir, max_workers=max_workers,
book_hash=book_hash, save_bookmark_data=extract_annotations,
book_metadata=mi, virtualize_resources=virtualize_resources
)
if serialize_metadata:
from calibre.ebooks.metadata.book.serialize import metadata_as_dict
d = metadata_as_dict(mi)
d.pop('cover_data', None)
serialize_datetimes(d), serialize_datetimes(d.get('user_metadata', {}))
with open(os.path.join(output_dir, 'calibre-book-metadata.json'), 'wb') as f:
f.write(json_dumps(d))
if extract_annotations:
annotations = None
if bookmark_data:
annotations = json_dumps(tuple(get_stored_annotations(container, bookmark_data)))
if annotations:
with open(os.path.join(output_dir, 'calibre-book-annotations.json'), 'wb') as f:
f.write(annotations)
def render_for_viewer(path, out_dir, book_hash):