From 60e80e5684cb5a2c00c471a5e434f342ba19e0f1 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Wed, 16 Feb 2022 20:55:45 +0530 Subject: [PATCH] More work on fts indexing --- resources/fts_sqlite.sql | 2 +- src/calibre/db/backend.py | 6 ++++++ src/calibre/db/cache.py | 29 ++++++++++++++++++++++++----- src/calibre/db/fts/connect.py | 31 ++++++++++++++++++++++++++++++- src/calibre/db/fts/pool.py | 9 +++++++-- src/calibre/db/fts/text.py | 15 +++++++++++---- src/calibre/db/tests/fts_api.py | 6 ++++++ 7 files changed, 85 insertions(+), 13 deletions(-) diff --git a/resources/fts_sqlite.sql b/resources/fts_sqlite.sql index f37b8a1ee7..8ef19cfd3b 100644 --- a/resources/fts_sqlite.sql +++ b/resources/fts_sqlite.sql @@ -1,7 +1,7 @@ CREATE TABLE fts_db.dirtied_formats ( id INTEGER PRIMARY KEY, book INTEGER NOT NULL, format TEXT NOT NULL COLLATE NOCASE, - in_progress INTEGER NOT NULL DEFAULT 0, + in_progress INTEGER NOT NULL DEFAULT FALSE, UNIQUE(book, format) ); diff --git a/src/calibre/db/backend.py b/src/calibre/db/backend.py index ae0256c82f..724871ea1b 100644 --- a/src/calibre/db/backend.py +++ b/src/calibre/db/backend.py @@ -944,6 +944,12 @@ class DB: def get_next_fts_job(self): return self.fts.get_next_fts_job() + def remove_dirty_fts(self, book_id, fmt): + return self.fts.remove_dirty(book_id, fmt) + + def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash): + return self.fts.queue_fts_job(book_id, fmt, path, fmt_size, fmt_hash) + def get_connection(self): return self.conn diff --git a/src/calibre/db/cache.py b/src/calibre/db/cache.py index d08402e798..3c277f0e4a 100644 --- a/src/calibre/db/cache.py +++ b/src/calibre/db/cache.py @@ -5,6 +5,7 @@ __license__ = 'GPL v3' __copyright__ = '2011, Kovid Goyal ' __docformat__ = 'restructuredtext en' +import hashlib import operator import os import random @@ -15,7 +16,7 @@ import weakref from collections import defaultdict from collections.abc import MutableSet, Set from functools import partial, wraps -from io import BytesIO +from io import DEFAULT_BUFFER_SIZE, BytesIO from threading import Lock from time import time @@ -431,12 +432,30 @@ class Cache: return self.backend.enable_fts(weakref.ref(self) if enabled else None) @write_api - def get_next_fts_job(self): + def queue_next_fts_job(self): + from .fts.text import is_fmt_ok if not self.backend.fts_enabled: return - book_id, fmt = self.backend.get_next_fts_job() - if book_id is None: - return + while True: + book_id, fmt = self.backend.get_next_fts_job() + if book_id is None: + return + path = self.format_abspath(book_id, fmt) + if not path or not is_fmt_ok(fmt): + self.backend.remove_dirty_fts(book_id, fmt) + continue + with PersistentTemporaryFile(suffix=f'.{fmt.lower()}') as pt, open(path, 'rb') as src: + sz = 0 + h = hashlib.sha1() + while True: + chunk = src.read(DEFAULT_BUFFER_SIZE) + if not chunk: + break + sz += len(chunk) + h.update(chunk) + pt.write(chunk) + if self.backend.queue_fts_job(book_id, fmt, path, sz, h.hexdigest()): + break # }}} diff --git a/src/calibre/db/fts/connect.py b/src/calibre/db/fts/connect.py index 3d10a714e3..e837f78cc7 100644 --- a/src/calibre/db/fts/connect.py +++ b/src/calibre/db/fts/connect.py @@ -6,9 +6,11 @@ import builtins import os import sys +from contextlib import suppress from calibre.utils.date import EPOCH, utcnow +from .pool import Pool from .schema_upgrade import SchemaUpgrade # TODO: check that closing of db connection works @@ -26,6 +28,7 @@ class FTS: def __init__(self, dbref): self.dbref = dbref + self.pool = Pool(dbref) def initialize(self, conn): main_db_path = os.path.abspath(conn.db_filename('main')) @@ -33,6 +36,7 @@ class FTS: conn.execute(f'ATTACH DATABASE "{dbpath}" AS fts_db') SchemaUpgrade(conn) conn.fts_dbpath = dbpath + conn.execute('UPDATE fts_db.dirtied_formats SET in_progress=FALSE WHERE in_progress=TRUE') def get_connection(self): db = self.dbref() @@ -58,6 +62,10 @@ class FTS: conn = self.get_connection() conn.execute('DELETE FROM fts_db.dirtied_formats') + def remove_dirty(self, book_id, fmt): + conn = self.get_connection() + conn.execute('DELETE FROM fts_db.dirtied_formats WHERE book=? AND format=?', (book_id, fmt.upper())) + def add_text(self, book_id, fmt, text, text_hash='', fmt_size=0, fmt_hash=''): conn = self.get_connection() ts = (utcnow() - EPOCH).total_seconds() @@ -69,4 +77,25 @@ class FTS: '(?, ?, ?, ?, ?, ?, ?, ?)', ( book_id, ts, fmt, fmt_size, fmt_hash, text, len(text), text_hash)) else: - conn.execute('DELETE FROM fts_db.dirtied_formats WHERE book=? and format=?', (book_id, fmt)) + conn.execute('DELETE FROM fts_db.dirtied_formats WHERE book=? AND format=?', (book_id, fmt)) + + def get_next_fts_job(self): + conn = self.get_connection() + for book_id, fmt in conn.get('SELECT book,format FROM fts_db.dirtied_formats WHERE in_progress=FALSE ORDER BY id'): + return book_id, fmt + return None, None + + def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash): + conn = self.get_connection() + fmt = fmt.upper() + for x in conn.get('SELECT id FROM fts_db.books_text WHERE book=? AND fmt=? AND format_size=? AND format_hash=?', ( + book_id, fmt, fmt_size, fmt_hash)): + break + else: + self.pool.add_job(book_id, fmt, path, fmt_size, fmt_hash) + conn.execute('UPDATE fts_db.dirtied_formats SET in_progress=TRUE WHERE book=? AND format=? LIMIT 1', (book_id, fmt)) + return True + self.remove_dirty(book_id, fmt) + with suppress(OSError): + os.remove(path) + return False diff --git a/src/calibre/db/fts/pool.py b/src/calibre/db/fts/pool.py index a2ca9b0913..02e0b94636 100644 --- a/src/calibre/db/fts/pool.py +++ b/src/calibre/db/fts/pool.py @@ -30,7 +30,6 @@ class Pool: def __init__(self, dbref): self.max_workers = 1 - self.supervisor_thread = Thread(name='FTSSupervisor', daemon=True, target=self.supervise) self.jobs_queue = Queue() self.supervise_queue = Queue() self.workers = [] @@ -39,9 +38,10 @@ class Pool: def initialize(self): if not self.initialized: - self.initialized = True + self.supervisor_thread = Thread(name='FTSSupervisor', daemon=True, target=self.supervise) self.supervisor_thread.start() self.expand_workers() + self.initialized = True def expand_workers(self): while len(self.workers) < self.max_workers: @@ -53,6 +53,11 @@ class Pool: self.initialize() self.supervise_queue.put(check_for_work) + def add_job(self, book_id, fmt, path, fmt_size, fmt_hash): + self.initialize() + job = Job(book_id, fmt, path, fmt_size, fmt_hash) + self.jobs_queue.put(job) + def supervise(self): while True: x = self.supervise_queue.get() diff --git a/src/calibre/db/fts/text.py b/src/calibre/db/fts/text.py index 4c9ca75a71..cc1215a337 100644 --- a/src/calibre/db/fts/text.py +++ b/src/calibre/db/fts/text.py @@ -51,15 +51,22 @@ def to_text(container, name): yield from html_to_text(root) +def is_fmt_ok(input_fmt): + input_fmt = input_fmt.upper() + input_plugin = plugin_for_input_format(input_fmt) + is_comic = bool(getattr(input_plugin, 'is_image_collection', False)) + if not input_plugin or is_comic: + return False + return input_plugin + + def extract_text(pathtoebook): input_fmt = pathtoebook.rpartition('.')[-1].upper() - input_plugin = plugin_for_input_format(input_fmt) ans = '' + input_plugin = is_fmt_ok(input_fmt) if not input_plugin: return ans - is_comic = bool(getattr(input_plugin, 'is_image_collection', False)) - if is_comic: - return ans + input_plugin = plugin_for_input_format(input_fmt) with TemporaryDirectory() as tdir: texts = [] book_fmt, opfpath, input_fmt = extract_book(pathtoebook, tdir, log=default_log) diff --git a/src/calibre/db/tests/fts_api.py b/src/calibre/db/tests/fts_api.py index 7f73e40437..3872bba4cf 100644 --- a/src/calibre/db/tests/fts_api.py +++ b/src/calibre/db/tests/fts_api.py @@ -50,6 +50,12 @@ class FTSAPITest(BaseTest): self.ae(fts.all_currently_dirty(), [(2, 'ADDED')]) fts.add_text(2, 'ADDED', 'data2') self.ae(fts.all_currently_dirty(), []) + fts.dirty_existing() + j = fts.get_next_fts_job() + self.ae(j, (2, 'ADDED')) + self.ae(j, fts.get_next_fts_job()) + fts.remove_dirty(*j) + self.assertNotEqual(j, fts.get_next_fts_job()) def test_fts_to_text(self): from calibre.ebooks.oeb.polish.parsing import parse