Queue FTS jobs in a separate thread

This allows turning FTS on/off to not block the UI
This commit is contained in:
Kovid Goyal 2022-04-27 16:12:18 +05:30
parent 2b23d75f13
commit 251fd7a92c
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 73 additions and 32 deletions

View File

@ -19,6 +19,7 @@ from functools import partial, wraps
from io import DEFAULT_BUFFER_SIZE, BytesIO from io import DEFAULT_BUFFER_SIZE, BytesIO
from threading import Lock from threading import Lock
from time import time from time import time
from queue import Queue
from calibre import as_unicode, isbytestring from calibre import as_unicode, isbytestring
from calibre.constants import iswindows, preferred_encoding from calibre.constants import iswindows, preferred_encoding
@ -139,6 +140,7 @@ class Cache:
EventType = EventType EventType = EventType
def __init__(self, backend): def __init__(self, backend):
self.shutting_down = False
self.backend = backend self.backend = backend
self.event_dispatcher = EventDispatcher() self.event_dispatcher = EventDispatcher()
self.fields = {} self.fields = {}
@ -426,35 +428,48 @@ class Cache:
# FTS API {{{ # FTS API {{{
def initialize_fts(self): def initialize_fts(self):
self.fts_queue_thread = None
self.fts_job_queue = Queue()
self.backend.initialize_fts(weakref.ref(self)) self.backend.initialize_fts(weakref.ref(self))
self.queue_next_fts_job() self.queue_next_fts_job()
def enable_fts(self, enabled=True, start_pool=True): @read_api
fts = self.backend.enable_fts(weakref.ref(self) if enabled else None) def is_fts_enabled(self):
if start_pool: # used in the tests return self.backend.fts_enabled
self.queue_next_fts_job()
return fts
@write_api @write_api
def queue_next_fts_job(self): def enable_fts(self, enabled=True, start_pool=True):
fts = self.backend.enable_fts(weakref.ref(self) if enabled else None)
if fts and start_pool: # used in the tests
from threading import Thread
self.fts_queue_thread = Thread(name='FTSQueue', target=self.dispatch_fts_jobs, args=(self.fts_job_queue,), daemon=True)
self.fts_queue_thread.start()
fts.pool.initialize()
fts.pool.initialized.wait()
self.queue_next_fts_job()
if not fts and self.fts_queue_thread:
self.fts_job_queue.put(None)
self.fts_queue_thread = None
self.fts_job_queue = Queue()
return fts
def dispatch_fts_jobs(self, queue):
from .fts.text import is_fmt_ok from .fts.text import is_fmt_ok
if not self.backend.fts_enabled:
return def do_one():
while True: with self.read_lock:
book_id, fmt = self.backend.get_next_fts_job() if not self.backend.fts_enabled:
if book_id is None: return False
return book_id, fmt = self.backend.get_next_fts_job()
path = self.format_abspath(book_id, fmt) if book_id is None:
return False
path = self._format_abspath(book_id, fmt)
if not path or not is_fmt_ok(fmt): if not path or not is_fmt_ok(fmt):
self.backend.remove_dirty_fts(book_id, fmt) with self.write_lock:
continue self.backend.remove_dirty_fts(book_id, fmt)
try: return True
src = open(path, 'rb')
except OSError: with self.read_lock, open(path, 'rb') as src, PersistentTemporaryFile(suffix=f'.{fmt.lower()}') as pt:
self.backend.remove_dirty_fts(book_id, fmt)
traceback.print_exc()
continue
with PersistentTemporaryFile(suffix=f'.{fmt.lower()}') as pt, src:
sz = 0 sz = 0
h = hashlib.sha1() h = hashlib.sha1()
while True: while True:
@ -464,9 +479,29 @@ class Cache:
sz += len(chunk) sz += len(chunk)
h.update(chunk) h.update(chunk)
pt.write(chunk) pt.write(chunk)
if self.backend.queue_fts_job(book_id, fmt, pt.name, sz, h.hexdigest()): with self.write_lock:
if not self.backend.fts_has_idle_workers: self.backend.queue_fts_job(book_id, fmt, pt.name, sz, h.hexdigest())
break return self.backend.fts_has_idle_workers
while not self.shutting_down:
x = queue.get()
if x is None:
break
if not self.backend.fts_enabled:
break
has_more = True
while has_more and not self.shutting_down and self.backend.fts_enabled:
try:
has_more = do_one()
except Exception:
import traceback
traceback.print_exc()
@write_api
def queue_next_fts_job(self):
if not self.backend.fts_enabled:
return
self.fts_job_queue.put(True)
@write_api @write_api
def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text, err_msg): def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text, err_msg):
@ -2321,7 +2356,10 @@ class Cache:
@write_api @write_api
def close(self): def close(self):
self.shutting_down = True
self.event_dispatcher.close() self.event_dispatcher.close()
if self.fts_queue_thread is not None:
self.fts_job_queue.put(None)
from calibre.customize.ui import available_library_closed_plugins from calibre.customize.ui import available_library_closed_plugins
for plugin in available_library_closed_plugins(): for plugin in available_library_closed_plugins():
try: try:
@ -2330,6 +2368,9 @@ class Cache:
import traceback import traceback
traceback.print_exc() traceback.print_exc()
self.backend.shutdown_fts() self.backend.shutdown_fts()
if self.fts_queue_thread is not None:
self.fts_queue_thread.join()
self.fts_queue_thread = None
self.backend.close() self.backend.close()
@property @property

View File

@ -9,7 +9,7 @@ import sys
import traceback import traceback
from contextlib import suppress from contextlib import suppress
from queue import Queue from queue import Queue
from threading import Thread from threading import Thread, Event
from time import monotonic from time import monotonic
from calibre import human_readable from calibre import human_readable
@ -121,15 +121,15 @@ class Pool:
self.jobs_queue = Queue() self.jobs_queue = Queue()
self.supervise_queue = Queue() self.supervise_queue = Queue()
self.workers = [] self.workers = []
self.initialized = False self.initialized = Event()
self.dbref = dbref self.dbref = dbref
def initialize(self): def initialize(self):
if not self.initialized: if not self.initialized.is_set():
self.supervisor_thread = Thread(name='FTSSupervisor', daemon=True, target=self.supervise) self.supervisor_thread = Thread(name='FTSSupervisor', daemon=True, target=self.supervise)
self.supervisor_thread.start() self.supervisor_thread.start()
self.expand_workers() self.expand_workers()
self.initialized = True self.initialized.set()
def prune_dead_workers(self): def prune_dead_workers(self):
self.workers = [w for w in self.workers if w.is_alive()] self.workers = [w for w in self.workers if w.is_alive()]
@ -195,8 +195,7 @@ class Pool:
db.commit_fts_result(result.book_id, result.fmt, result.fmt_size, result.fmt_hash, text, err_msg) db.commit_fts_result(result.book_id, result.fmt, result.fmt_size, result.fmt_hash, text, err_msg)
def shutdown(self): def shutdown(self):
if self.initialized: if self.initialized.is_set():
self.initialized = False
self.supervise_queue.put(quit) self.supervise_queue.put(quit)
for w in self.workers: for w in self.workers:
w.keep_going = False w.keep_going = False
@ -205,6 +204,7 @@ class Pool:
for w in self.workers: for w in self.workers:
w.join() w.join()
self.workers = [] self.workers = []
self.initialized.clear()
# }}} # }}}
def do_check_for_work(self): def do_check_for_work(self):

View File

@ -85,7 +85,7 @@ class FTSAPITest(BaseTest):
check(id=2, book=1, format='TXT', searchable_text='a test text2') check(id=2, book=1, format='TXT', searchable_text='a test text2')
# check closing shuts down all workers # check closing shuts down all workers
cache.close() cache.close()
self.assertFalse(fts.pool.initialized) self.assertFalse(fts.pool.initialized.is_set())
# check enabling scans pre-exisintg # check enabling scans pre-exisintg
cache = self.new_library() cache = self.new_library()