diff --git a/src/calibre/db/backend.py b/src/calibre/db/backend.py index 724871ea1b..1f72a55a2c 100644 --- a/src/calibre/db/backend.py +++ b/src/calibre/db/backend.py @@ -950,6 +950,14 @@ class DB: def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash): return self.fts.queue_fts_job(book_id, fmt, path, fmt_size, fmt_hash) + def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text): + return self.fts.commit_result(book_id, fmt, fmt_size, fmt_hash, text) + + def shutdown_fts(self): + if self.fts_enabled: + self.fts.shutdown() + self.fts = None + def get_connection(self): return self.conn diff --git a/src/calibre/db/cache.py b/src/calibre/db/cache.py index 3c277f0e4a..e80e77d5c9 100644 --- a/src/calibre/db/cache.py +++ b/src/calibre/db/cache.py @@ -457,6 +457,10 @@ class Cache: if self.backend.queue_fts_job(book_id, fmt, path, sz, h.hexdigest()): break + @write_api + def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text): + return self.backend.commit_fts_result(book_id, fmt, fmt_size, fmt_hash, text) + # }}} # Cache Layer API {{{ @@ -2285,6 +2289,7 @@ class Cache: except Exception: import traceback traceback.print_exc() + self.backend.shutdown_fts() self.backend.close() @property diff --git a/src/calibre/db/fts/connect.py b/src/calibre/db/fts/connect.py index e837f78cc7..3a5ccb8414 100644 --- a/src/calibre/db/fts/connect.py +++ b/src/calibre/db/fts/connect.py @@ -4,6 +4,7 @@ import builtins +import hashlib import os import sys from contextlib import suppress @@ -85,6 +86,16 @@ class FTS: return book_id, fmt return None, None + def commit_result(self, book_id, fmt, fmt_size, fmt_hash, text): + conn = self.get_connection() + text_hash = '' + if text: + text_hash = hashlib.sha1(text.encode('utf-8')).hexdigest() + for x in conn.get('SELECT id FROM fts_db.books_text WHERE book=? AND fmt=? AND text_hash=?', (book_id, fmt, text_hash)): + text = '' + break + self.add_text(book_id, fmt, text, text_hash, fmt_size, fmt_hash) + def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash): conn = self.get_connection() fmt = fmt.upper() @@ -99,3 +110,6 @@ class FTS: with suppress(OSError): os.remove(path) return False + + def shutdown(self): + self.pool.shutdown() diff --git a/src/calibre/db/fts/pool.py b/src/calibre/db/fts/pool.py index 02e0b94636..05d1004b9d 100644 --- a/src/calibre/db/fts/pool.py +++ b/src/calibre/db/fts/pool.py @@ -3,14 +3,47 @@ # License: GPL v3 Copyright: 2022, Kovid Goyal -from threading import Thread +import os, sys +import traceback, subprocess +from contextlib import suppress from queue import Queue +from threading import Thread +from calibre.utils.ipc.simple_worker import start_pipe_worker check_for_work = object() quit = object() +class Job: + + def __init__(self, book_id, fmt, path, fmt_size, fmt_hash): + self.book_id = book_id + self.fmt = fmt + self.fmt_size = fmt_size + self.fmt_hash = fmt_hash + self.path = path + + +class Result: + + def __init__(self, job, err_msg=''): + self.book_id = job.book_id + self.fmt = job.fmt + self.fmt_size = job.fmt_size + self.fmt_hash = job.fmt_hash + self.ok = not bool(err_msg) + if self.ok: + with open(job.path + '.txt', 'rb') as src: + try: + self.text = src.read().decode('utf-8', 'replace') + except Exception: + self.ok = False + self.text = traceback.format_exc() + else: + self.text = err_msg + + class Worker(Thread): def __init__(self, jobs_queue, supervise_queue): @@ -18,12 +51,47 @@ class Worker(Thread): self.currently_working = False self.jobs_queue = jobs_queue self.supervise_queue = supervise_queue + self.keep_going = True def run(self): - while True: + while self.keep_going: x = self.jobs_queue.get() if x is quit: break + try: + res = self.run_job(x) + if res is not None: + self.supervise_queue.put(res) + except Exception: + tb = traceback.format_exc() + self.supervise_queue.put(Result(x, tb)) + + def run_job(self, job): + txtpath = job.path + '.txt' + errpath = job.path + '.error' + try: + with open(errpath, 'wb') as error: + p = start_pipe_worker( + f'from calibre.db.fts.text import main; main({job.path!r})', + stdout=subprocess.DEVNULL, stderr=error, stdin=subprocess.DEVNULL, priority='low', + ) + while self.keep_going: + p.wait(0.1) + if p.returncode is None: + p.kill() + return + if os.path.exists(txtpath): + return Result(job) + with open(errpath, 'rb') as f: + err = f.read().decode('utf-8', 'replace') + return Result(job, err) + finally: + with suppress(OSError): + os.remove(job.path) + with suppress(OSError): + os.remove(txtpath) + with suppress(OSError): + os.remove(errpath) class Pool: @@ -43,6 +111,17 @@ class Pool: self.expand_workers() self.initialized = True + def shutdown(self): + if self.initialized: + self.initialized = False + self.supervise_queue.put(quit) + for w in self.workers: + w.keep_going = False + self.jobs_queue.put(quit) + self.supervisor_thread.join() + for w in self.workers: + w.join() + def expand_workers(self): while len(self.workers) < self.max_workers: w = Worker(self.jobs_queue, self.supervise_queue) @@ -53,15 +132,36 @@ class Pool: self.initialize() self.supervise_queue.put(check_for_work) + def do_check_for_work(self): + db = self.dbref() + if db is not None: + db.queue_next_fts_job() + def add_job(self, book_id, fmt, path, fmt_size, fmt_hash): self.initialize() job = Job(book_id, fmt, path, fmt_size, fmt_hash) self.jobs_queue.put(job) + def commit_result(self, result): + text = result.text + if not result.ok: + print(f'Failed to get text from book_id: {result.book_id} format: {result.fmt}', file=sys.stderr) + print(text, file=sys.stderr) + text = '' + db = self.dbref() + if db is not None: + db.commit_fts_result(result.book_id, result.fmt, result.fmt_size, result.fmt_hash, text) + def supervise(self): while True: x = self.supervise_queue.get() - if x is check_for_work: - pass - elif x is quit: - break + try: + if x is check_for_work: + self.do_check_for_work() + elif x is quit: + break + elif isinstance(x, Result): + self.commit_result(x) + self.do_check_for_work() + except Exception: + traceback.print_exc() diff --git a/src/calibre/db/fts/text.py b/src/calibre/db/fts/text.py index cc1215a337..4ee03b4aea 100644 --- a/src/calibre/db/fts/text.py +++ b/src/calibre/db/fts/text.py @@ -79,3 +79,9 @@ def extract_text(pathtoebook): texts.extend(to_text(container, name)) ans = '\n\n\n'.join(texts) return unicodedata.normalize('NFC', ans) + + +def main(pathtoebook): + text = extract_text(pathtoebook) + with open(pathtoebook + '.txt', 'wb') as f: + f.write(text.encode('utf-8'))