Use a weak reference to the db object in the fts queue thread

This commit is contained in:
Kovid Goyal 2022-04-27 19:47:16 +05:30
parent f65a05cc8f
commit 568e931dff
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -442,7 +442,7 @@ class Cache:
fts = self.backend.enable_fts(weakref.ref(self) if enabled else None) fts = self.backend.enable_fts(weakref.ref(self) if enabled else None)
if fts and start_pool: # used in the tests if fts and start_pool: # used in the tests
from threading import Thread from threading import Thread
self.fts_queue_thread = Thread(name='FTSQueue', target=self.dispatch_fts_jobs, args=(self.fts_job_queue,), daemon=True) self.fts_queue_thread = Thread(name='FTSQueue', target=Cache.dispatch_fts_jobs, args=(self.fts_job_queue, weakref.ref(self)), daemon=True)
self.fts_queue_thread.start() self.fts_queue_thread.start()
fts.pool.initialize() fts.pool.initialize()
fts.pool.initialized.wait() fts.pool.initialized.wait()
@ -453,10 +453,14 @@ class Cache:
self.fts_job_queue = Queue() self.fts_job_queue = Queue()
return fts return fts
def dispatch_fts_jobs(self, queue): @staticmethod
def dispatch_fts_jobs(queue, dbref):
from .fts.text import is_fmt_ok from .fts.text import is_fmt_ok
def do_one(): def do_one():
self = dbref()
if self is None:
return False
with self.read_lock: with self.read_lock:
if not self.backend.fts_enabled: if not self.backend.fts_enabled:
return False return False
@ -483,12 +487,10 @@ class Cache:
self.backend.queue_fts_job(book_id, fmt, pt.name, sz, h.hexdigest()) self.backend.queue_fts_job(book_id, fmt, pt.name, sz, h.hexdigest())
return self.backend.fts_has_idle_workers return self.backend.fts_has_idle_workers
while not self.shutting_down: def loop_while_more_available():
x = queue.get() self = dbref()
if x is None: if not self or not self.backend.fts_enabled:
break return
if not self.backend.fts_enabled:
break
has_more = True has_more = True
while has_more and not self.shutting_down and self.backend.fts_enabled: while has_more and not self.shutting_down and self.backend.fts_enabled:
try: try:
@ -498,6 +500,12 @@ class Cache:
import traceback import traceback
traceback.print_exc() traceback.print_exc()
while not getattr(dbref(), 'shutting_down', True):
x = queue.get()
if x is None:
break
loop_while_more_available()
@write_api @write_api
def queue_next_fts_job(self): def queue_next_fts_job(self):
if not self.backend.fts_enabled: if not self.backend.fts_enabled: