More work on fts jobs

This commit is contained in:
Kovid Goyal 2022-02-16 14:02:48 +05:30
parent 55c67d57e4
commit 164dbe9cfb
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
5 changed files with 80 additions and 1 deletions

View File

@ -1,6 +1,7 @@
CREATE TABLE fts_db.dirtied_formats ( id INTEGER PRIMARY KEY,
book INTEGER NOT NULL,
format TEXT NOT NULL COLLATE NOCASE,
in_progress INTEGER NOT NULL DEFAULT 0,
UNIQUE(book, format)
);

View File

@ -937,6 +937,13 @@ class DB:
self.fts.dirty_existing()
return self.fts
@property
def fts_enabled(self):
return getattr(self, 'fts', None) is not None
def get_next_fts_job(self):
return self.fts.get_next_fts_job()
def get_connection(self):
return self.conn

View File

@ -429,6 +429,15 @@ class Cache:
def enable_fts(self, enabled=True):
return self.backend.enable_fts(weakref.ref(self) if enabled else None)
@write_api
def get_next_fts_job(self):
if not self.backend.fts_enabled:
return
book_id, fmt = self.backend.get_next_fts_job()
if book_id is None:
return
# }}}
# Cache Layer API {{{

View File

@ -38,7 +38,7 @@ class FTS:
db = self.dbref()
if db is None:
raise RuntimeError('db has been garbage collected')
ans = db.backend.conn
ans = db.backend.get_connection()
if ans.fts_dbpath is None:
self.initialize(ans)
return ans

View File

@ -0,0 +1,62 @@
#!/usr/bin/env python
# vim:fileencoding=utf-8
# License: GPL v3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net>
from threading import Thread
from queue import Queue
check_for_work = object()
quit = object()
class Worker(Thread):
def __init__(self, jobs_queue, supervise_queue):
super().__init__(name='FTSWorker', daemon=True)
self.currently_working = False
self.jobs_queue = jobs_queue
self.supervise_queue = supervise_queue
def run(self):
while True:
x = self.jobs_queue.get()
if x is quit:
break
class Pool:
def __init__(self, dbref):
self.max_workers = 1
self.supervisor_thread = Thread(name='FTSSupervisor', daemon=True, target=self.supervise)
self.jobs_queue = Queue()
self.supervise_queue = Queue()
self.workers = []
self.initialized = False
self.dbref = dbref
def initialize(self):
if not self.initialized:
self.initialized = True
self.supervisor_thread.start()
self.expand_workers()
def expand_workers(self):
while len(self.workers) < self.max_workers:
w = Worker(self.jobs_queue, self.supervise_queue)
self.workers.append(w)
w.start()
def check_for_work(self):
self.initialize()
self.supervise_queue.put(check_for_work)
def supervise(self):
while True:
x = self.supervise_queue.get()
if x is check_for_work:
pass
elif x is quit:
break