More work on fts indexing

This commit is contained in:
Kovid Goyal 2022-02-16 20:55:45 +05:30
parent 164dbe9cfb
commit 60e80e5684
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
7 changed files with 85 additions and 13 deletions

View File

@ -1,7 +1,7 @@
CREATE TABLE fts_db.dirtied_formats ( id INTEGER PRIMARY KEY, CREATE TABLE fts_db.dirtied_formats ( id INTEGER PRIMARY KEY,
book INTEGER NOT NULL, book INTEGER NOT NULL,
format TEXT NOT NULL COLLATE NOCASE, format TEXT NOT NULL COLLATE NOCASE,
in_progress INTEGER NOT NULL DEFAULT 0, in_progress INTEGER NOT NULL DEFAULT FALSE,
UNIQUE(book, format) UNIQUE(book, format)
); );

View File

@ -944,6 +944,12 @@ class DB:
def get_next_fts_job(self): def get_next_fts_job(self):
return self.fts.get_next_fts_job() return self.fts.get_next_fts_job()
def remove_dirty_fts(self, book_id, fmt):
return self.fts.remove_dirty(book_id, fmt)
def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash):
return self.fts.queue_fts_job(book_id, fmt, path, fmt_size, fmt_hash)
def get_connection(self): def get_connection(self):
return self.conn return self.conn

View File

@ -5,6 +5,7 @@ __license__ = 'GPL v3'
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>' __copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en' __docformat__ = 'restructuredtext en'
import hashlib
import operator import operator
import os import os
import random import random
@ -15,7 +16,7 @@ import weakref
from collections import defaultdict from collections import defaultdict
from collections.abc import MutableSet, Set from collections.abc import MutableSet, Set
from functools import partial, wraps from functools import partial, wraps
from io import BytesIO from io import DEFAULT_BUFFER_SIZE, BytesIO
from threading import Lock from threading import Lock
from time import time from time import time
@ -431,12 +432,30 @@ class Cache:
return self.backend.enable_fts(weakref.ref(self) if enabled else None) return self.backend.enable_fts(weakref.ref(self) if enabled else None)
@write_api @write_api
def get_next_fts_job(self): def queue_next_fts_job(self):
from .fts.text import is_fmt_ok
if not self.backend.fts_enabled: if not self.backend.fts_enabled:
return return
book_id, fmt = self.backend.get_next_fts_job() while True:
if book_id is None: book_id, fmt = self.backend.get_next_fts_job()
return if book_id is None:
return
path = self.format_abspath(book_id, fmt)
if not path or not is_fmt_ok(fmt):
self.backend.remove_dirty_fts(book_id, fmt)
continue
with PersistentTemporaryFile(suffix=f'.{fmt.lower()}') as pt, open(path, 'rb') as src:
sz = 0
h = hashlib.sha1()
while True:
chunk = src.read(DEFAULT_BUFFER_SIZE)
if not chunk:
break
sz += len(chunk)
h.update(chunk)
pt.write(chunk)
if self.backend.queue_fts_job(book_id, fmt, path, sz, h.hexdigest()):
break
# }}} # }}}

View File

@ -6,9 +6,11 @@
import builtins import builtins
import os import os
import sys import sys
from contextlib import suppress
from calibre.utils.date import EPOCH, utcnow from calibre.utils.date import EPOCH, utcnow
from .pool import Pool
from .schema_upgrade import SchemaUpgrade from .schema_upgrade import SchemaUpgrade
# TODO: check that closing of db connection works # TODO: check that closing of db connection works
@ -26,6 +28,7 @@ class FTS:
def __init__(self, dbref): def __init__(self, dbref):
self.dbref = dbref self.dbref = dbref
self.pool = Pool(dbref)
def initialize(self, conn): def initialize(self, conn):
main_db_path = os.path.abspath(conn.db_filename('main')) main_db_path = os.path.abspath(conn.db_filename('main'))
@ -33,6 +36,7 @@ class FTS:
conn.execute(f'ATTACH DATABASE "{dbpath}" AS fts_db') conn.execute(f'ATTACH DATABASE "{dbpath}" AS fts_db')
SchemaUpgrade(conn) SchemaUpgrade(conn)
conn.fts_dbpath = dbpath conn.fts_dbpath = dbpath
conn.execute('UPDATE fts_db.dirtied_formats SET in_progress=FALSE WHERE in_progress=TRUE')
def get_connection(self): def get_connection(self):
db = self.dbref() db = self.dbref()
@ -58,6 +62,10 @@ class FTS:
conn = self.get_connection() conn = self.get_connection()
conn.execute('DELETE FROM fts_db.dirtied_formats') conn.execute('DELETE FROM fts_db.dirtied_formats')
def remove_dirty(self, book_id, fmt):
conn = self.get_connection()
conn.execute('DELETE FROM fts_db.dirtied_formats WHERE book=? AND format=?', (book_id, fmt.upper()))
def add_text(self, book_id, fmt, text, text_hash='', fmt_size=0, fmt_hash=''): def add_text(self, book_id, fmt, text, text_hash='', fmt_size=0, fmt_hash=''):
conn = self.get_connection() conn = self.get_connection()
ts = (utcnow() - EPOCH).total_seconds() ts = (utcnow() - EPOCH).total_seconds()
@ -69,4 +77,25 @@ class FTS:
'(?, ?, ?, ?, ?, ?, ?, ?)', ( '(?, ?, ?, ?, ?, ?, ?, ?)', (
book_id, ts, fmt, fmt_size, fmt_hash, text, len(text), text_hash)) book_id, ts, fmt, fmt_size, fmt_hash, text, len(text), text_hash))
else: else:
conn.execute('DELETE FROM fts_db.dirtied_formats WHERE book=? and format=?', (book_id, fmt)) conn.execute('DELETE FROM fts_db.dirtied_formats WHERE book=? AND format=?', (book_id, fmt))
def get_next_fts_job(self):
conn = self.get_connection()
for book_id, fmt in conn.get('SELECT book,format FROM fts_db.dirtied_formats WHERE in_progress=FALSE ORDER BY id'):
return book_id, fmt
return None, None
def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash):
conn = self.get_connection()
fmt = fmt.upper()
for x in conn.get('SELECT id FROM fts_db.books_text WHERE book=? AND fmt=? AND format_size=? AND format_hash=?', (
book_id, fmt, fmt_size, fmt_hash)):
break
else:
self.pool.add_job(book_id, fmt, path, fmt_size, fmt_hash)
conn.execute('UPDATE fts_db.dirtied_formats SET in_progress=TRUE WHERE book=? AND format=? LIMIT 1', (book_id, fmt))
return True
self.remove_dirty(book_id, fmt)
with suppress(OSError):
os.remove(path)
return False

View File

@ -30,7 +30,6 @@ class Pool:
def __init__(self, dbref): def __init__(self, dbref):
self.max_workers = 1 self.max_workers = 1
self.supervisor_thread = Thread(name='FTSSupervisor', daemon=True, target=self.supervise)
self.jobs_queue = Queue() self.jobs_queue = Queue()
self.supervise_queue = Queue() self.supervise_queue = Queue()
self.workers = [] self.workers = []
@ -39,9 +38,10 @@ class Pool:
def initialize(self): def initialize(self):
if not self.initialized: if not self.initialized:
self.initialized = True self.supervisor_thread = Thread(name='FTSSupervisor', daemon=True, target=self.supervise)
self.supervisor_thread.start() self.supervisor_thread.start()
self.expand_workers() self.expand_workers()
self.initialized = True
def expand_workers(self): def expand_workers(self):
while len(self.workers) < self.max_workers: while len(self.workers) < self.max_workers:
@ -53,6 +53,11 @@ class Pool:
self.initialize() self.initialize()
self.supervise_queue.put(check_for_work) self.supervise_queue.put(check_for_work)
def add_job(self, book_id, fmt, path, fmt_size, fmt_hash):
self.initialize()
job = Job(book_id, fmt, path, fmt_size, fmt_hash)
self.jobs_queue.put(job)
def supervise(self): def supervise(self):
while True: while True:
x = self.supervise_queue.get() x = self.supervise_queue.get()

View File

@ -51,15 +51,22 @@ def to_text(container, name):
yield from html_to_text(root) yield from html_to_text(root)
def is_fmt_ok(input_fmt):
input_fmt = input_fmt.upper()
input_plugin = plugin_for_input_format(input_fmt)
is_comic = bool(getattr(input_plugin, 'is_image_collection', False))
if not input_plugin or is_comic:
return False
return input_plugin
def extract_text(pathtoebook): def extract_text(pathtoebook):
input_fmt = pathtoebook.rpartition('.')[-1].upper() input_fmt = pathtoebook.rpartition('.')[-1].upper()
input_plugin = plugin_for_input_format(input_fmt)
ans = '' ans = ''
input_plugin = is_fmt_ok(input_fmt)
if not input_plugin: if not input_plugin:
return ans return ans
is_comic = bool(getattr(input_plugin, 'is_image_collection', False)) input_plugin = plugin_for_input_format(input_fmt)
if is_comic:
return ans
with TemporaryDirectory() as tdir: with TemporaryDirectory() as tdir:
texts = [] texts = []
book_fmt, opfpath, input_fmt = extract_book(pathtoebook, tdir, log=default_log) book_fmt, opfpath, input_fmt = extract_book(pathtoebook, tdir, log=default_log)

View File

@ -50,6 +50,12 @@ class FTSAPITest(BaseTest):
self.ae(fts.all_currently_dirty(), [(2, 'ADDED')]) self.ae(fts.all_currently_dirty(), [(2, 'ADDED')])
fts.add_text(2, 'ADDED', 'data2') fts.add_text(2, 'ADDED', 'data2')
self.ae(fts.all_currently_dirty(), []) self.ae(fts.all_currently_dirty(), [])
fts.dirty_existing()
j = fts.get_next_fts_job()
self.ae(j, (2, 'ADDED'))
self.ae(j, fts.get_next_fts_job())
fts.remove_dirty(*j)
self.assertNotEqual(j, fts.get_next_fts_job())
def test_fts_to_text(self): def test_fts_to_text(self):
from calibre.ebooks.oeb.polish.parsing import parse from calibre.ebooks.oeb.polish.parsing import parse