Track indexing rate in the Cache object itself

This commit is contained in:
Kovid Goyal 2022-08-07 12:48:14 +05:30
parent 87d51c1802
commit 119df51028
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
6 changed files with 68 additions and 58 deletions

View File

@ -973,8 +973,8 @@ class DB:
def remove_dirty_fts(self, book_id, fmt):
return self.fts.remove_dirty(book_id, fmt)
def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash):
return self.fts.queue_job(book_id, fmt, path, fmt_size, fmt_hash)
def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash, start_time):
return self.fts.queue_job(book_id, fmt, path, fmt_size, fmt_hash, start_time)
def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text, err_msg):
if self.fts is not None:

View File

@ -19,9 +19,9 @@ from functools import partial, wraps
from io import DEFAULT_BUFFER_SIZE, BytesIO
from queue import Queue
from threading import Lock
from time import sleep, time
from time import monotonic, sleep, time
from calibre import as_unicode, isbytestring, detect_ncpus
from calibre import as_unicode, detect_ncpus, isbytestring
from calibre.constants import iswindows, preferred_encoding
from calibre.customize.ui import (
run_plugins_on_import, run_plugins_on_postadd, run_plugins_on_postimport
@ -437,6 +437,8 @@ class Cache:
# FTS API {{{
def initialize_fts(self):
self.fts_queue_thread = None
self.fts_measuring_rate = None
self.fts_num_done_since_start = 0
self.fts_job_queue = Queue()
self.fts_indexing_left = self.fts_indexing_total = 0
fts = self.backend.initialize_fts(weakref.ref(self))
@ -445,7 +447,7 @@ class Cache:
return fts
def start_fts_pool(self):
from threading import Thread, Event
from threading import Event, Thread
self.fts_dispatch_stop_event = Event()
self.fts_queue_thread = Thread(name='FTSQueue', target=Cache.dispatch_fts_jobs, args=(
self.fts_job_queue, self.fts_dispatch_stop_event, weakref.ref(self)), daemon=True)
@ -458,20 +460,32 @@ class Cache:
def is_fts_enabled(self):
return self.backend.fts_enabled
@api
def fts_indexing_progress(self):
return self.fts_indexing_left, self.fts_indexing_total
@write_api
def fts_start_measuring_rate(self, measure=True):
self.fts_measuring_rate = monotonic() if measure else None
self.fts_num_done_since_start = 0
def _update_fts_indexing_numbers(self):
def _update_fts_indexing_numbers(self, job_time=None):
# this is called when new formats are added and when a format is
# indexed, but NOT when books or formats are deleted, so total may not
# be up to date.
nl = self.backend.fts.number_dirtied()
nt = self.backend.get('SELECT COUNT(*) FROM main.data')[0][0] or 0
if (self.fts_indexing_left, self.fts_indexing_total) != (nl, nt):
if not nl:
self._fts_start_measuring_rate(measure=False)
if job_time is not None and self.fts_measuring_rate is not None:
self.fts_num_done_since_start += 1
if (self.fts_indexing_left, self.fts_indexing_total) != (nl, nt) or job_time is not None:
self.fts_indexing_left = nl
self.fts_indexing_total = nt
self.event_dispatcher(EventType.indexing_progress_changed, nl, nt)
self.event_dispatcher(EventType.indexing_progress_changed, *self._fts_indexing_progress())
@read_api
def fts_indexing_progress(self):
rate = None
if self.fts_measuring_rate is not None and self.fts_num_done_since_start > 4:
rate = self.fts_num_done_since_start / (monotonic() - self.fts_measuring_rate)
return self.fts_indexing_left, self.fts_indexing_total, rate
@write_api
def enable_fts(self, enabled=True, start_pool=True):
@ -498,6 +512,7 @@ class Cache:
self = dbref()
if self is None:
return False
start_time = monotonic()
with self.read_lock:
if not self.backend.fts_enabled:
return False
@ -522,9 +537,9 @@ class Cache:
h.update(chunk)
pt.write(chunk)
with self.write_lock:
queued = self.backend.queue_fts_job(book_id, fmt, pt.name, sz, h.hexdigest())
queued = self.backend.queue_fts_job(book_id, fmt, pt.name, sz, h.hexdigest(), start_time)
if not queued: # means a dirtied book was removed from the dirty list because the text has not changed
self._update_fts_indexing_numbers()
self._update_fts_indexing_numbers(monotonic() - start_time)
return self.backend.fts_has_idle_workers
def loop_while_more_available():
@ -555,9 +570,9 @@ class Cache:
self._update_fts_indexing_numbers()
@write_api
def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text, err_msg):
def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text, err_msg, start_time):
ans = self.backend.commit_fts_result(book_id, fmt, fmt_size, fmt_hash, text, err_msg)
self._update_fts_indexing_numbers()
self._update_fts_indexing_numbers(monotonic() - start_time)
return ans
@write_api
@ -583,23 +598,29 @@ class Cache:
self._queue_next_fts_job()
return fts
@api
def set_fts_num_of_workers(self, num=None):
@write_api
def set_fts_num_of_workers(self, num):
existing = self.backend.fts_num_of_workers
if num is not None:
if num != existing:
self.backend.fts_num_of_workers = num
if num > existing:
self.queue_next_fts_job()
return existing
self._queue_next_fts_job()
return True
return False
@api
@write_api
def set_fts_speed(self, slow=True):
orig = self.fts_indexing_sleep_time
if slow:
self.fts_indexing_sleep_time = Cache.fts_indexing_sleep_time
self.set_fts_num_of_workers(1)
changed = self._set_fts_num_of_workers(1)
else:
self.fts_indexing_sleep_time = 0.1
self.set_fts_num_of_workers(max(1, detect_ncpus()))
changed = self._set_fts_num_of_workers(max(1, detect_ncpus()))
changed = changed or orig != self.fts_indexing_sleep_time
if changed and self.fts_measuring_rate is not None:
self._fts_start_measuring_rate()
return changed
@read_api
def fts_search(
@ -967,7 +988,7 @@ class Cache:
return
ret = pt.name
elif as_pixmap or as_image:
from qt.core import QPixmap, QImage
from qt.core import QImage, QPixmap
ret = QImage() if as_image else QPixmap()
with self.safe_read_lock:
path = self._format_abspath(book_id, '__COVER_INTERNAL__')

View File

@ -133,14 +133,14 @@ class FTS:
break
self.add_text(book_id, fmt, text, text_hash, fmt_size, fmt_hash, err_msg)
def queue_job(self, book_id, fmt, path, fmt_size, fmt_hash):
def queue_job(self, book_id, fmt, path, fmt_size, fmt_hash, start_time):
conn = self.get_connection()
fmt = fmt.upper()
for x in conn.get('SELECT id FROM fts_db.books_text WHERE book=? AND format=? AND format_size=? AND format_hash=?', (
book_id, fmt, fmt_size, fmt_hash)):
break
else:
self.pool.add_job(book_id, fmt, path, fmt_size, fmt_hash)
self.pool.add_job(book_id, fmt, path, fmt_size, fmt_hash, start_time)
conn.execute('UPDATE fts_db.dirtied_formats SET in_progress=TRUE WHERE book=? AND format=?', (book_id, fmt))
return True
self.remove_dirty(book_id, fmt)

View File

@ -21,12 +21,13 @@ quit = object()
class Job:
def __init__(self, book_id, fmt, path, fmt_size, fmt_hash):
def __init__(self, book_id, fmt, path, fmt_size, fmt_hash, start_time):
self.book_id = book_id
self.fmt = fmt
self.fmt_size = fmt_size
self.fmt_hash = fmt_hash
self.path = path
self.start_time = start_time
class Result:
@ -37,6 +38,7 @@ class Result:
self.fmt_size = job.fmt_size
self.fmt_hash = job.fmt_hash
self.ok = not bool(err_msg)
self.start_time = job.start_time
if self.ok:
with open(job.path + '.txt', 'rb') as src:
try:
@ -177,9 +179,9 @@ class Pool:
self.initialize()
self.supervise_queue.put(check_for_work)
def add_job(self, book_id, fmt, path, fmt_size, fmt_hash):
def add_job(self, book_id, fmt, path, fmt_size, fmt_hash, start_time):
self.initialize()
job = Job(book_id, fmt, path, fmt_size, fmt_hash)
job = Job(book_id, fmt, path, fmt_size, fmt_hash, start_time)
self.jobs_queue.put(job)
def commit_result(self, result):
@ -192,7 +194,7 @@ class Pool:
text = ''
db = self.dbref()
if db is not None:
db.commit_fts_result(result.book_id, result.fmt, result.fmt_size, result.fmt_hash, text, err_msg)
db.commit_fts_result(result.book_id, result.fmt, result.fmt_size, result.fmt_hash, text, err_msg, result.start_time)
def shutdown(self):
if self.initialized.is_set():

View File

@ -4,7 +4,7 @@
__license__ = 'GPL v3'
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
import os, errno, sys, re, time
import os, errno, sys, re
from locale import localeconv
from collections import OrderedDict, namedtuple
from polyglot.builtins import iteritems, itervalues, string_or_bytes
@ -458,24 +458,15 @@ class IndexingProgress:
self.reset()
def __repr__(self):
return f'IndexingProgress(left={self.left}, total={self.total})'
def clear_rate_information(self):
from collections import deque
self.done_events = deque()
return f'IndexingProgress(left={self.left}, total={self.total}, rate={self.indexing_rate})'
def reset(self):
self.left = self.total = -1
self.clear_rate_information()
self.indexing_rate = None
def update(self, left, total):
changed = (left, total) != (self.left, self.total)
if changed:
done_num = self.left - left
if done_num > 0 and self.left > -1: # initial event will have self.left == -1
self.done_events.append((done_num, time.monotonic()))
if len(self.done_events) > 50:
self.done_events.popleft()
def update(self, left, total, indexing_rate):
changed = (left, total, indexing_rate) != (self.left, self.total, self.indexing_rate)
self.indexing_rate = indexing_rate
self.left, self.total = left, total
return changed
@ -493,14 +484,10 @@ class IndexingProgress:
return _('calculating time left')
if self.left < 2:
return _('almost done')
if len(self.done_events) < 5:
if self.indexing_rate is None:
return _('calculating time left')
try:
start_time = self.done_events[0][1]
end_time = self.done_events[-1][1]
num_done = sum(x[0] for x in self.done_events) - self.done_events[0][0]
rate = num_done / max(0.1, end_time - start_time)
seconds_left = self.left / rate
seconds_left = self.left / self.indexing_rate
return _('~{} left').format(human_readable_interval(seconds_left))
except Exception:
return _('calculating time left')

View File

@ -70,7 +70,6 @@ class ScanProgress(QWidget):
def change_speed(self):
db = get_db()
db.set_fts_speed(slow=not self.fast_button.isChecked())
self.indexing_progress.clear_rate_information()
def update(self, complete, left, total):
if complete:
@ -130,14 +129,15 @@ class ScanStatus(QWidget):
def gui_update_event(self, db, event_type, event_data):
if event_type is EventType.indexing_progress_changed:
self.update_stats()
self.update_stats(event_data)
def __call__(self, event_type, library_id, event_data):
if event_type is EventType.indexing_progress_changed:
self.update_stats()
self.update_stats(event_data)
def update_stats(self):
changed = self.indexing_progress.update(*self.db.fts_indexing_progress())
def update_stats(self, event_data=None):
event_data = event_data or self.db.fts_indexing_progress()
changed = self.indexing_progress.update(*event_data)
if changed:
self.indexing_progress_changed.emit(self.indexing_progress.complete, self.indexing_progress.left, self.indexing_progress.total)
@ -194,12 +194,12 @@ class ScanStatus(QWidget):
self.apply_fts_state()
def startup(self):
self.indexing_progress.clear_rate_information()
self.db.fts_start_measuring_rate(measure=True)
def shutdown(self):
self.scan_progress.slow_button.setChecked(True)
self.reset_indexing_state_for_current_db()
self.indexing_progress.clear_rate_information()
self.db.fts_start_measuring_rate(measure=False)
if __name__ == '__main__':