From cb74720aa52564bf2862dce3303c02601e37d076 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Thu, 17 Feb 2022 22:41:10 +0530 Subject: [PATCH] More work on fts pool --- src/calibre/db/backend.py | 2 +- src/calibre/db/cache.py | 9 +++++-- src/calibre/db/fts/connect.py | 2 +- src/calibre/db/fts/pool.py | 45 +++++++++++++++++++++++++++------ src/calibre/db/tests/fts_api.py | 2 +- 5 files changed, 47 insertions(+), 13 deletions(-) diff --git a/src/calibre/db/backend.py b/src/calibre/db/backend.py index 1f72a55a2c..1b6b6568df 100644 --- a/src/calibre/db/backend.py +++ b/src/calibre/db/backend.py @@ -948,7 +948,7 @@ class DB: return self.fts.remove_dirty(book_id, fmt) def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash): - return self.fts.queue_fts_job(book_id, fmt, path, fmt_size, fmt_hash) + return self.fts.queue_job(book_id, fmt, path, fmt_size, fmt_hash) def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text): return self.fts.commit_result(book_id, fmt, fmt_size, fmt_hash, text) diff --git a/src/calibre/db/cache.py b/src/calibre/db/cache.py index e80e77d5c9..218c199714 100644 --- a/src/calibre/db/cache.py +++ b/src/calibre/db/cache.py @@ -427,9 +427,13 @@ class Cache: # FTS API {{{ def initialize_fts(self): self.backend.initialize_fts(weakref.ref(self)) + self.queue_next_fts_job() - def enable_fts(self, enabled=True): - return self.backend.enable_fts(weakref.ref(self) if enabled else None) + def enable_fts(self, enabled=True, start_pool=True): + fts = self.backend.enable_fts(weakref.ref(self) if enabled else None) + if start_pool: # used in the tests + self.queue_next_fts_job() + return fts @write_api def queue_next_fts_job(self): @@ -1618,6 +1622,7 @@ class Cache: try: stream = stream_or_path if hasattr(stream_or_path, 'read') else lopen(stream_or_path, 'rb') size, fname = self._do_add_format(book_id, fmt, stream, name) + self._queue_next_fts_job() finally: if needs_close: stream.close() diff --git a/src/calibre/db/fts/connect.py b/src/calibre/db/fts/connect.py index 3a5ccb8414..2041a01ca1 100644 --- a/src/calibre/db/fts/connect.py +++ b/src/calibre/db/fts/connect.py @@ -96,7 +96,7 @@ class FTS: break self.add_text(book_id, fmt, text, text_hash, fmt_size, fmt_hash) - def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash): + def queue_job(self, book_id, fmt, path, fmt_size, fmt_hash): conn = self.get_connection() fmt = fmt.upper() for x in conn.get('SELECT id FROM fts_db.books_text WHERE book=? AND fmt=? AND format_size=? AND format_hash=?', ( diff --git a/src/calibre/db/fts/pool.py b/src/calibre/db/fts/pool.py index 05d1004b9d..d10f13c0fa 100644 --- a/src/calibre/db/fts/pool.py +++ b/src/calibre/db/fts/pool.py @@ -64,6 +64,7 @@ class Worker(Thread): self.supervise_queue.put(res) except Exception: tb = traceback.format_exc() + traceback.print_exc() self.supervise_queue.put(Result(x, tb)) def run_job(self, job): @@ -122,21 +123,43 @@ class Pool: for w in self.workers: w.join() + def prune_dead_workers(self): + self.workers = [w for w in self.workers if w.is_alive()] + def expand_workers(self): + self.prune_dead_workers() while len(self.workers) < self.max_workers: - w = Worker(self.jobs_queue, self.supervise_queue) - self.workers.append(w) - w.start() + self.workers.append(self.create_worker()) + + def create_worker(self): + w = Worker(self.jobs_queue, self.supervise_queue) + w.start() + while not w.is_alive(): + w.join(0.01) + return w + + def shrink_workers(self): + self.prune_dead_workers() + extra = len(self.workers) - self.max_workers + while extra > 0: + self.jobs_queue.put(quit) + extra -= 1 + + # external API {{{ + def set_num_of_workers(self, num): + self.initialize() + self.prune_dead_workers() + num = max(1, num) + self.max_workers = num + if num > len(self.workers): + self.expand_workers() + elif num < self.workers: + self.shrink_workers() def check_for_work(self): self.initialize() self.supervise_queue.put(check_for_work) - def do_check_for_work(self): - db = self.dbref() - if db is not None: - db.queue_next_fts_job() - def add_job(self, book_id, fmt, path, fmt_size, fmt_hash): self.initialize() job = Job(book_id, fmt, path, fmt_size, fmt_hash) @@ -151,6 +174,12 @@ class Pool: db = self.dbref() if db is not None: db.commit_fts_result(result.book_id, result.fmt, result.fmt_size, result.fmt_hash, text) + # }}} + + def do_check_for_work(self): + db = self.dbref() + if db is not None: + db.queue_next_fts_job() def supervise(self): while True: diff --git a/src/calibre/db/tests/fts_api.py b/src/calibre/db/tests/fts_api.py index 3872bba4cf..8c6a916ea6 100644 --- a/src/calibre/db/tests/fts_api.py +++ b/src/calibre/db/tests/fts_api.py @@ -30,7 +30,7 @@ class FTSAPITest(BaseTest): def test_fts_triggers(self): cache = self.init_cache() - fts = cache.enable_fts() + fts = cache.enable_fts(start_pool=False) self.ae(fts.all_currently_dirty(), [(1, 'FMT1'), (1, 'FMT2'), (2, 'FMT1')]) fts.dirty_existing() self.ae(fts.all_currently_dirty(), [(1, 'FMT1'), (1, 'FMT2'), (2, 'FMT1')])