diff --git a/resources/fts_sqlite.sql b/resources/fts_sqlite.sql index cfd503d654..f37b8a1ee7 100644 --- a/resources/fts_sqlite.sql +++ b/resources/fts_sqlite.sql @@ -1,6 +1,7 @@ CREATE TABLE fts_db.dirtied_formats ( id INTEGER PRIMARY KEY, book INTEGER NOT NULL, format TEXT NOT NULL COLLATE NOCASE, + in_progress INTEGER NOT NULL DEFAULT 0, UNIQUE(book, format) ); diff --git a/src/calibre/db/backend.py b/src/calibre/db/backend.py index dc3604b44e..ae0256c82f 100644 --- a/src/calibre/db/backend.py +++ b/src/calibre/db/backend.py @@ -937,6 +937,13 @@ class DB: self.fts.dirty_existing() return self.fts + @property + def fts_enabled(self): + return getattr(self, 'fts', None) is not None + + def get_next_fts_job(self): + return self.fts.get_next_fts_job() + def get_connection(self): return self.conn diff --git a/src/calibre/db/cache.py b/src/calibre/db/cache.py index 9d468e0c22..d08402e798 100644 --- a/src/calibre/db/cache.py +++ b/src/calibre/db/cache.py @@ -429,6 +429,15 @@ class Cache: def enable_fts(self, enabled=True): return self.backend.enable_fts(weakref.ref(self) if enabled else None) + + @write_api + def get_next_fts_job(self): + if not self.backend.fts_enabled: + return + book_id, fmt = self.backend.get_next_fts_job() + if book_id is None: + return + # }}} # Cache Layer API {{{ diff --git a/src/calibre/db/fts/connect.py b/src/calibre/db/fts/connect.py index a7e61d9ad4..3d10a714e3 100644 --- a/src/calibre/db/fts/connect.py +++ b/src/calibre/db/fts/connect.py @@ -38,7 +38,7 @@ class FTS: db = self.dbref() if db is None: raise RuntimeError('db has been garbage collected') - ans = db.backend.conn + ans = db.backend.get_connection() if ans.fts_dbpath is None: self.initialize(ans) return ans diff --git a/src/calibre/db/fts/pool.py b/src/calibre/db/fts/pool.py new file mode 100644 index 0000000000..a2ca9b0913 --- /dev/null +++ b/src/calibre/db/fts/pool.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +# vim:fileencoding=utf-8 +# License: GPL v3 Copyright: 2022, Kovid Goyal + + +from threading import Thread +from queue import Queue + + +check_for_work = object() +quit = object() + + +class Worker(Thread): + + def __init__(self, jobs_queue, supervise_queue): + super().__init__(name='FTSWorker', daemon=True) + self.currently_working = False + self.jobs_queue = jobs_queue + self.supervise_queue = supervise_queue + + def run(self): + while True: + x = self.jobs_queue.get() + if x is quit: + break + + +class Pool: + + def __init__(self, dbref): + self.max_workers = 1 + self.supervisor_thread = Thread(name='FTSSupervisor', daemon=True, target=self.supervise) + self.jobs_queue = Queue() + self.supervise_queue = Queue() + self.workers = [] + self.initialized = False + self.dbref = dbref + + def initialize(self): + if not self.initialized: + self.initialized = True + self.supervisor_thread.start() + self.expand_workers() + + def expand_workers(self): + while len(self.workers) < self.max_workers: + w = Worker(self.jobs_queue, self.supervise_queue) + self.workers.append(w) + w.start() + + def check_for_work(self): + self.initialize() + self.supervise_queue.put(check_for_work) + + def supervise(self): + while True: + x = self.supervise_queue.get() + if x is check_for_work: + pass + elif x is quit: + break