From 251fd7a92cff0ea07943a1966482b1154fbe6837 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Wed, 27 Apr 2022 16:12:18 +0530 Subject: [PATCH] Queue FTS jobs in a separate thread This allows turning FTS on/off to not block the UI --- src/calibre/db/cache.py | 91 ++++++++++++++++++++++++--------- src/calibre/db/fts/pool.py | 12 ++--- src/calibre/db/tests/fts_api.py | 2 +- 3 files changed, 73 insertions(+), 32 deletions(-) diff --git a/src/calibre/db/cache.py b/src/calibre/db/cache.py index 6d7bba33ac..80dca02766 100644 --- a/src/calibre/db/cache.py +++ b/src/calibre/db/cache.py @@ -19,6 +19,7 @@ from functools import partial, wraps from io import DEFAULT_BUFFER_SIZE, BytesIO from threading import Lock from time import time +from queue import Queue from calibre import as_unicode, isbytestring from calibre.constants import iswindows, preferred_encoding @@ -139,6 +140,7 @@ class Cache: EventType = EventType def __init__(self, backend): + self.shutting_down = False self.backend = backend self.event_dispatcher = EventDispatcher() self.fields = {} @@ -426,35 +428,48 @@ class Cache: # FTS API {{{ def initialize_fts(self): + self.fts_queue_thread = None + self.fts_job_queue = Queue() self.backend.initialize_fts(weakref.ref(self)) self.queue_next_fts_job() - def enable_fts(self, enabled=True, start_pool=True): - fts = self.backend.enable_fts(weakref.ref(self) if enabled else None) - if start_pool: # used in the tests - self.queue_next_fts_job() - return fts + @read_api + def is_fts_enabled(self): + return self.backend.fts_enabled @write_api - def queue_next_fts_job(self): + def enable_fts(self, enabled=True, start_pool=True): + fts = self.backend.enable_fts(weakref.ref(self) if enabled else None) + if fts and start_pool: # used in the tests + from threading import Thread + self.fts_queue_thread = Thread(name='FTSQueue', target=self.dispatch_fts_jobs, args=(self.fts_job_queue,), daemon=True) + self.fts_queue_thread.start() + fts.pool.initialize() + fts.pool.initialized.wait() + self.queue_next_fts_job() + if not fts and self.fts_queue_thread: + self.fts_job_queue.put(None) + self.fts_queue_thread = None + self.fts_job_queue = Queue() + return fts + + def dispatch_fts_jobs(self, queue): from .fts.text import is_fmt_ok - if not self.backend.fts_enabled: - return - while True: - book_id, fmt = self.backend.get_next_fts_job() - if book_id is None: - return - path = self.format_abspath(book_id, fmt) + + def do_one(): + with self.read_lock: + if not self.backend.fts_enabled: + return False + book_id, fmt = self.backend.get_next_fts_job() + if book_id is None: + return False + path = self._format_abspath(book_id, fmt) if not path or not is_fmt_ok(fmt): - self.backend.remove_dirty_fts(book_id, fmt) - continue - try: - src = open(path, 'rb') - except OSError: - self.backend.remove_dirty_fts(book_id, fmt) - traceback.print_exc() - continue - with PersistentTemporaryFile(suffix=f'.{fmt.lower()}') as pt, src: + with self.write_lock: + self.backend.remove_dirty_fts(book_id, fmt) + return True + + with self.read_lock, open(path, 'rb') as src, PersistentTemporaryFile(suffix=f'.{fmt.lower()}') as pt: sz = 0 h = hashlib.sha1() while True: @@ -464,9 +479,29 @@ class Cache: sz += len(chunk) h.update(chunk) pt.write(chunk) - if self.backend.queue_fts_job(book_id, fmt, pt.name, sz, h.hexdigest()): - if not self.backend.fts_has_idle_workers: - break + with self.write_lock: + self.backend.queue_fts_job(book_id, fmt, pt.name, sz, h.hexdigest()) + return self.backend.fts_has_idle_workers + + while not self.shutting_down: + x = queue.get() + if x is None: + break + if not self.backend.fts_enabled: + break + has_more = True + while has_more and not self.shutting_down and self.backend.fts_enabled: + try: + has_more = do_one() + except Exception: + import traceback + traceback.print_exc() + + @write_api + def queue_next_fts_job(self): + if not self.backend.fts_enabled: + return + self.fts_job_queue.put(True) @write_api def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text, err_msg): @@ -2321,7 +2356,10 @@ class Cache: @write_api def close(self): + self.shutting_down = True self.event_dispatcher.close() + if self.fts_queue_thread is not None: + self.fts_job_queue.put(None) from calibre.customize.ui import available_library_closed_plugins for plugin in available_library_closed_plugins(): try: @@ -2330,6 +2368,9 @@ class Cache: import traceback traceback.print_exc() self.backend.shutdown_fts() + if self.fts_queue_thread is not None: + self.fts_queue_thread.join() + self.fts_queue_thread = None self.backend.close() @property diff --git a/src/calibre/db/fts/pool.py b/src/calibre/db/fts/pool.py index 7cf8458e9a..99d0095f14 100644 --- a/src/calibre/db/fts/pool.py +++ b/src/calibre/db/fts/pool.py @@ -9,7 +9,7 @@ import sys import traceback from contextlib import suppress from queue import Queue -from threading import Thread +from threading import Thread, Event from time import monotonic from calibre import human_readable @@ -121,15 +121,15 @@ class Pool: self.jobs_queue = Queue() self.supervise_queue = Queue() self.workers = [] - self.initialized = False + self.initialized = Event() self.dbref = dbref def initialize(self): - if not self.initialized: + if not self.initialized.is_set(): self.supervisor_thread = Thread(name='FTSSupervisor', daemon=True, target=self.supervise) self.supervisor_thread.start() self.expand_workers() - self.initialized = True + self.initialized.set() def prune_dead_workers(self): self.workers = [w for w in self.workers if w.is_alive()] @@ -195,8 +195,7 @@ class Pool: db.commit_fts_result(result.book_id, result.fmt, result.fmt_size, result.fmt_hash, text, err_msg) def shutdown(self): - if self.initialized: - self.initialized = False + if self.initialized.is_set(): self.supervise_queue.put(quit) for w in self.workers: w.keep_going = False @@ -205,6 +204,7 @@ class Pool: for w in self.workers: w.join() self.workers = [] + self.initialized.clear() # }}} def do_check_for_work(self): diff --git a/src/calibre/db/tests/fts_api.py b/src/calibre/db/tests/fts_api.py index 45b5817ff6..ece7e4d675 100644 --- a/src/calibre/db/tests/fts_api.py +++ b/src/calibre/db/tests/fts_api.py @@ -85,7 +85,7 @@ class FTSAPITest(BaseTest): check(id=2, book=1, format='TXT', searchable_text='a test text2') # check closing shuts down all workers cache.close() - self.assertFalse(fts.pool.initialized) + self.assertFalse(fts.pool.initialized.is_set()) # check enabling scans pre-exisintg cache = self.new_library()