Make shutting down the fts pool more robust

Fixes a deadlock and gives the worker threads a chance to clean exit
This commit is contained in:
Kovid Goyal 2022-04-29 22:33:54 +05:30
parent 89411a763e
commit 74b218a72a
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 27 additions and 19 deletions

View File

@ -2379,27 +2379,31 @@ class Cache:
def __del__(self): def __del__(self):
self.close() self.close()
@write_api @api
def close(self): def close(self):
if hasattr(self, 'close_called'): with self.write_lock:
return if hasattr(self, 'close_called'):
self.close_called = True return
self.shutting_down = True self.close_called = True
self.event_dispatcher.close() self.shutting_down = True
if self.fts_queue_thread is not None: self.event_dispatcher.close()
self.fts_job_queue.put(None) if self.fts_queue_thread is not None:
from calibre.customize.ui import available_library_closed_plugins self.fts_job_queue.put(None)
for plugin in available_library_closed_plugins(): from calibre.customize.ui import available_library_closed_plugins
try: for plugin in available_library_closed_plugins():
plugin.run(self) try:
except Exception: plugin.run(self)
import traceback except Exception:
traceback.print_exc() import traceback
traceback.print_exc()
# the fts supervisor thread could be in the middle of committing a
# result to the db, so holding a lock here will cause a deadlock
self.backend.shutdown_fts() self.backend.shutdown_fts()
if self.fts_queue_thread is not None: if self.fts_queue_thread is not None:
self.fts_queue_thread.join() self.fts_queue_thread.join()
self.fts_queue_thread = None self.fts_queue_thread = None
self.backend.close() with self.write_lock:
self.backend.close()
@property @property
def is_closed(self): def is_closed(self):

View File

@ -129,6 +129,7 @@ class Pool:
self.workers = [] self.workers = []
self.initialized = Event() self.initialized = Event()
self.dbref = dbref self.dbref = dbref
self.keep_going = True
def initialize(self): def initialize(self):
if not self.initialized.is_set(): if not self.initialized.is_set():
@ -202,10 +203,13 @@ class Pool:
def shutdown(self): def shutdown(self):
if self.initialized.is_set(): if self.initialized.is_set():
self.supervise_queue.put(quit) self.keep_going = False
for i in range(2):
self.supervise_queue.put(quit)
for w in self.workers: for w in self.workers:
w.keep_going = False w.keep_going = False
self.jobs_queue.put(quit) for i in range(2*len(self.workers)):
self.jobs_queue.put(quit)
self.supervisor_thread.join() self.supervisor_thread.join()
for w in self.workers: for w in self.workers:
w.join() w.join()
@ -219,7 +223,7 @@ class Pool:
db.queue_next_fts_job() db.queue_next_fts_job()
def supervise(self): def supervise(self):
while True: while self.keep_going:
x = self.supervise_queue.get() x = self.supervise_queue.get()
try: try:
if x is check_for_work: if x is check_for_work: