More work on fts pool

This commit is contained in:
Kovid Goyal 2022-02-17 20:46:17 +05:30
parent 60e80e5684
commit 13f0f67ea7
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
5 changed files with 139 additions and 6 deletions

View File

@ -950,6 +950,14 @@ class DB:
def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash):
return self.fts.queue_fts_job(book_id, fmt, path, fmt_size, fmt_hash)
def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text):
return self.fts.commit_result(book_id, fmt, fmt_size, fmt_hash, text)
def shutdown_fts(self):
if self.fts_enabled:
self.fts.shutdown()
self.fts = None
def get_connection(self):
return self.conn

View File

@ -457,6 +457,10 @@ class Cache:
if self.backend.queue_fts_job(book_id, fmt, path, sz, h.hexdigest()):
break
@write_api
def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text):
return self.backend.commit_fts_result(book_id, fmt, fmt_size, fmt_hash, text)
# }}}
# Cache Layer API {{{
@ -2285,6 +2289,7 @@ class Cache:
except Exception:
import traceback
traceback.print_exc()
self.backend.shutdown_fts()
self.backend.close()
@property

View File

@ -4,6 +4,7 @@
import builtins
import hashlib
import os
import sys
from contextlib import suppress
@ -85,6 +86,16 @@ class FTS:
return book_id, fmt
return None, None
def commit_result(self, book_id, fmt, fmt_size, fmt_hash, text):
conn = self.get_connection()
text_hash = ''
if text:
text_hash = hashlib.sha1(text.encode('utf-8')).hexdigest()
for x in conn.get('SELECT id FROM fts_db.books_text WHERE book=? AND fmt=? AND text_hash=?', (book_id, fmt, text_hash)):
text = ''
break
self.add_text(book_id, fmt, text, text_hash, fmt_size, fmt_hash)
def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash):
conn = self.get_connection()
fmt = fmt.upper()
@ -99,3 +110,6 @@ class FTS:
with suppress(OSError):
os.remove(path)
return False
def shutdown(self):
self.pool.shutdown()

View File

@ -3,14 +3,47 @@
# License: GPL v3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net>
from threading import Thread
import os, sys
import traceback, subprocess
from contextlib import suppress
from queue import Queue
from threading import Thread
from calibre.utils.ipc.simple_worker import start_pipe_worker
check_for_work = object()
quit = object()
class Job:
def __init__(self, book_id, fmt, path, fmt_size, fmt_hash):
self.book_id = book_id
self.fmt = fmt
self.fmt_size = fmt_size
self.fmt_hash = fmt_hash
self.path = path
class Result:
def __init__(self, job, err_msg=''):
self.book_id = job.book_id
self.fmt = job.fmt
self.fmt_size = job.fmt_size
self.fmt_hash = job.fmt_hash
self.ok = not bool(err_msg)
if self.ok:
with open(job.path + '.txt', 'rb') as src:
try:
self.text = src.read().decode('utf-8', 'replace')
except Exception:
self.ok = False
self.text = traceback.format_exc()
else:
self.text = err_msg
class Worker(Thread):
def __init__(self, jobs_queue, supervise_queue):
@ -18,12 +51,47 @@ class Worker(Thread):
self.currently_working = False
self.jobs_queue = jobs_queue
self.supervise_queue = supervise_queue
self.keep_going = True
def run(self):
while True:
while self.keep_going:
x = self.jobs_queue.get()
if x is quit:
break
try:
res = self.run_job(x)
if res is not None:
self.supervise_queue.put(res)
except Exception:
tb = traceback.format_exc()
self.supervise_queue.put(Result(x, tb))
def run_job(self, job):
txtpath = job.path + '.txt'
errpath = job.path + '.error'
try:
with open(errpath, 'wb') as error:
p = start_pipe_worker(
f'from calibre.db.fts.text import main; main({job.path!r})',
stdout=subprocess.DEVNULL, stderr=error, stdin=subprocess.DEVNULL, priority='low',
)
while self.keep_going:
p.wait(0.1)
if p.returncode is None:
p.kill()
return
if os.path.exists(txtpath):
return Result(job)
with open(errpath, 'rb') as f:
err = f.read().decode('utf-8', 'replace')
return Result(job, err)
finally:
with suppress(OSError):
os.remove(job.path)
with suppress(OSError):
os.remove(txtpath)
with suppress(OSError):
os.remove(errpath)
class Pool:
@ -43,6 +111,17 @@ class Pool:
self.expand_workers()
self.initialized = True
def shutdown(self):
if self.initialized:
self.initialized = False
self.supervise_queue.put(quit)
for w in self.workers:
w.keep_going = False
self.jobs_queue.put(quit)
self.supervisor_thread.join()
for w in self.workers:
w.join()
def expand_workers(self):
while len(self.workers) < self.max_workers:
w = Worker(self.jobs_queue, self.supervise_queue)
@ -53,15 +132,36 @@ class Pool:
self.initialize()
self.supervise_queue.put(check_for_work)
def do_check_for_work(self):
db = self.dbref()
if db is not None:
db.queue_next_fts_job()
def add_job(self, book_id, fmt, path, fmt_size, fmt_hash):
self.initialize()
job = Job(book_id, fmt, path, fmt_size, fmt_hash)
self.jobs_queue.put(job)
def commit_result(self, result):
text = result.text
if not result.ok:
print(f'Failed to get text from book_id: {result.book_id} format: {result.fmt}', file=sys.stderr)
print(text, file=sys.stderr)
text = ''
db = self.dbref()
if db is not None:
db.commit_fts_result(result.book_id, result.fmt, result.fmt_size, result.fmt_hash, text)
def supervise(self):
while True:
x = self.supervise_queue.get()
if x is check_for_work:
pass
elif x is quit:
break
try:
if x is check_for_work:
self.do_check_for_work()
elif x is quit:
break
elif isinstance(x, Result):
self.commit_result(x)
self.do_check_for_work()
except Exception:
traceback.print_exc()

View File

@ -79,3 +79,9 @@ def extract_text(pathtoebook):
texts.extend(to_text(container, name))
ans = '\n\n\n'.join(texts)
return unicodedata.normalize('NFC', ans)
def main(pathtoebook):
text = extract_text(pathtoebook)
with open(pathtoebook + '.txt', 'wb') as f:
f.write(text.encode('utf-8'))