More work on fts pool

This commit is contained in:
Kovid Goyal 2022-02-17 22:41:10 +05:30
parent 13f0f67ea7
commit cb74720aa5
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
5 changed files with 47 additions and 13 deletions

View File

@ -948,7 +948,7 @@ class DB:
return self.fts.remove_dirty(book_id, fmt)
def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash):
return self.fts.queue_fts_job(book_id, fmt, path, fmt_size, fmt_hash)
return self.fts.queue_job(book_id, fmt, path, fmt_size, fmt_hash)
def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text):
return self.fts.commit_result(book_id, fmt, fmt_size, fmt_hash, text)

View File

@ -427,9 +427,13 @@ class Cache:
# FTS API {{{
def initialize_fts(self):
self.backend.initialize_fts(weakref.ref(self))
self.queue_next_fts_job()
def enable_fts(self, enabled=True):
return self.backend.enable_fts(weakref.ref(self) if enabled else None)
def enable_fts(self, enabled=True, start_pool=True):
fts = self.backend.enable_fts(weakref.ref(self) if enabled else None)
if start_pool: # used in the tests
self.queue_next_fts_job()
return fts
@write_api
def queue_next_fts_job(self):
@ -1618,6 +1622,7 @@ class Cache:
try:
stream = stream_or_path if hasattr(stream_or_path, 'read') else lopen(stream_or_path, 'rb')
size, fname = self._do_add_format(book_id, fmt, stream, name)
self._queue_next_fts_job()
finally:
if needs_close:
stream.close()

View File

@ -96,7 +96,7 @@ class FTS:
break
self.add_text(book_id, fmt, text, text_hash, fmt_size, fmt_hash)
def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash):
def queue_job(self, book_id, fmt, path, fmt_size, fmt_hash):
conn = self.get_connection()
fmt = fmt.upper()
for x in conn.get('SELECT id FROM fts_db.books_text WHERE book=? AND fmt=? AND format_size=? AND format_hash=?', (

View File

@ -64,6 +64,7 @@ class Worker(Thread):
self.supervise_queue.put(res)
except Exception:
tb = traceback.format_exc()
traceback.print_exc()
self.supervise_queue.put(Result(x, tb))
def run_job(self, job):
@ -122,21 +123,43 @@ class Pool:
for w in self.workers:
w.join()
def prune_dead_workers(self):
self.workers = [w for w in self.workers if w.is_alive()]
def expand_workers(self):
self.prune_dead_workers()
while len(self.workers) < self.max_workers:
w = Worker(self.jobs_queue, self.supervise_queue)
self.workers.append(w)
w.start()
self.workers.append(self.create_worker())
def create_worker(self):
w = Worker(self.jobs_queue, self.supervise_queue)
w.start()
while not w.is_alive():
w.join(0.01)
return w
def shrink_workers(self):
self.prune_dead_workers()
extra = len(self.workers) - self.max_workers
while extra > 0:
self.jobs_queue.put(quit)
extra -= 1
# external API {{{
def set_num_of_workers(self, num):
self.initialize()
self.prune_dead_workers()
num = max(1, num)
self.max_workers = num
if num > len(self.workers):
self.expand_workers()
elif num < self.workers:
self.shrink_workers()
def check_for_work(self):
self.initialize()
self.supervise_queue.put(check_for_work)
def do_check_for_work(self):
db = self.dbref()
if db is not None:
db.queue_next_fts_job()
def add_job(self, book_id, fmt, path, fmt_size, fmt_hash):
self.initialize()
job = Job(book_id, fmt, path, fmt_size, fmt_hash)
@ -151,6 +174,12 @@ class Pool:
db = self.dbref()
if db is not None:
db.commit_fts_result(result.book_id, result.fmt, result.fmt_size, result.fmt_hash, text)
# }}}
def do_check_for_work(self):
db = self.dbref()
if db is not None:
db.queue_next_fts_job()
def supervise(self):
while True:

View File

@ -30,7 +30,7 @@ class FTSAPITest(BaseTest):
def test_fts_triggers(self):
cache = self.init_cache()
fts = cache.enable_fts()
fts = cache.enable_fts(start_pool=False)
self.ae(fts.all_currently_dirty(), [(1, 'FMT1'), (1, 'FMT2'), (2, 'FMT1')])
fts.dirty_existing()
self.ae(fts.all_currently_dirty(), [(1, 'FMT1'), (1, 'FMT2'), (2, 'FMT1')])