diff --git a/src/calibre/db/backend.py b/src/calibre/db/backend.py index 4240333bda..865b65ffa9 100644 --- a/src/calibre/db/backend.py +++ b/src/calibre/db/backend.py @@ -973,8 +973,8 @@ class DB: def remove_dirty_fts(self, book_id, fmt): return self.fts.remove_dirty(book_id, fmt) - def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash): - return self.fts.queue_job(book_id, fmt, path, fmt_size, fmt_hash) + def queue_fts_job(self, book_id, fmt, path, fmt_size, fmt_hash, start_time): + return self.fts.queue_job(book_id, fmt, path, fmt_size, fmt_hash, start_time) def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text, err_msg): if self.fts is not None: diff --git a/src/calibre/db/cache.py b/src/calibre/db/cache.py index 14a8279b3e..6f665f83b4 100644 --- a/src/calibre/db/cache.py +++ b/src/calibre/db/cache.py @@ -19,9 +19,9 @@ from functools import partial, wraps from io import DEFAULT_BUFFER_SIZE, BytesIO from queue import Queue from threading import Lock -from time import sleep, time +from time import monotonic, sleep, time -from calibre import as_unicode, isbytestring, detect_ncpus +from calibre import as_unicode, detect_ncpus, isbytestring from calibre.constants import iswindows, preferred_encoding from calibre.customize.ui import ( run_plugins_on_import, run_plugins_on_postadd, run_plugins_on_postimport @@ -437,6 +437,8 @@ class Cache: # FTS API {{{ def initialize_fts(self): self.fts_queue_thread = None + self.fts_measuring_rate = None + self.fts_num_done_since_start = 0 self.fts_job_queue = Queue() self.fts_indexing_left = self.fts_indexing_total = 0 fts = self.backend.initialize_fts(weakref.ref(self)) @@ -445,7 +447,7 @@ class Cache: return fts def start_fts_pool(self): - from threading import Thread, Event + from threading import Event, Thread self.fts_dispatch_stop_event = Event() self.fts_queue_thread = Thread(name='FTSQueue', target=Cache.dispatch_fts_jobs, args=( self.fts_job_queue, self.fts_dispatch_stop_event, weakref.ref(self)), daemon=True) @@ -458,20 +460,32 @@ class Cache: def is_fts_enabled(self): return self.backend.fts_enabled - @api - def fts_indexing_progress(self): - return self.fts_indexing_left, self.fts_indexing_total + @write_api + def fts_start_measuring_rate(self, measure=True): + self.fts_measuring_rate = monotonic() if measure else None + self.fts_num_done_since_start = 0 - def _update_fts_indexing_numbers(self): + def _update_fts_indexing_numbers(self, job_time=None): # this is called when new formats are added and when a format is # indexed, but NOT when books or formats are deleted, so total may not # be up to date. nl = self.backend.fts.number_dirtied() nt = self.backend.get('SELECT COUNT(*) FROM main.data')[0][0] or 0 - if (self.fts_indexing_left, self.fts_indexing_total) != (nl, nt): + if not nl: + self._fts_start_measuring_rate(measure=False) + if job_time is not None and self.fts_measuring_rate is not None: + self.fts_num_done_since_start += 1 + if (self.fts_indexing_left, self.fts_indexing_total) != (nl, nt) or job_time is not None: self.fts_indexing_left = nl self.fts_indexing_total = nt - self.event_dispatcher(EventType.indexing_progress_changed, nl, nt) + self.event_dispatcher(EventType.indexing_progress_changed, *self._fts_indexing_progress()) + + @read_api + def fts_indexing_progress(self): + rate = None + if self.fts_measuring_rate is not None and self.fts_num_done_since_start > 4: + rate = self.fts_num_done_since_start / (monotonic() - self.fts_measuring_rate) + return self.fts_indexing_left, self.fts_indexing_total, rate @write_api def enable_fts(self, enabled=True, start_pool=True): @@ -498,6 +512,7 @@ class Cache: self = dbref() if self is None: return False + start_time = monotonic() with self.read_lock: if not self.backend.fts_enabled: return False @@ -522,9 +537,9 @@ class Cache: h.update(chunk) pt.write(chunk) with self.write_lock: - queued = self.backend.queue_fts_job(book_id, fmt, pt.name, sz, h.hexdigest()) + queued = self.backend.queue_fts_job(book_id, fmt, pt.name, sz, h.hexdigest(), start_time) if not queued: # means a dirtied book was removed from the dirty list because the text has not changed - self._update_fts_indexing_numbers() + self._update_fts_indexing_numbers(monotonic() - start_time) return self.backend.fts_has_idle_workers def loop_while_more_available(): @@ -555,9 +570,9 @@ class Cache: self._update_fts_indexing_numbers() @write_api - def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text, err_msg): + def commit_fts_result(self, book_id, fmt, fmt_size, fmt_hash, text, err_msg, start_time): ans = self.backend.commit_fts_result(book_id, fmt, fmt_size, fmt_hash, text, err_msg) - self._update_fts_indexing_numbers() + self._update_fts_indexing_numbers(monotonic() - start_time) return ans @write_api @@ -583,23 +598,29 @@ class Cache: self._queue_next_fts_job() return fts - @api - def set_fts_num_of_workers(self, num=None): + @write_api + def set_fts_num_of_workers(self, num): existing = self.backend.fts_num_of_workers - if num is not None: + if num != existing: self.backend.fts_num_of_workers = num if num > existing: - self.queue_next_fts_job() - return existing + self._queue_next_fts_job() + return True + return False - @api + @write_api def set_fts_speed(self, slow=True): + orig = self.fts_indexing_sleep_time if slow: self.fts_indexing_sleep_time = Cache.fts_indexing_sleep_time - self.set_fts_num_of_workers(1) + changed = self._set_fts_num_of_workers(1) else: self.fts_indexing_sleep_time = 0.1 - self.set_fts_num_of_workers(max(1, detect_ncpus())) + changed = self._set_fts_num_of_workers(max(1, detect_ncpus())) + changed = changed or orig != self.fts_indexing_sleep_time + if changed and self.fts_measuring_rate is not None: + self._fts_start_measuring_rate() + return changed @read_api def fts_search( @@ -967,7 +988,7 @@ class Cache: return ret = pt.name elif as_pixmap or as_image: - from qt.core import QPixmap, QImage + from qt.core import QImage, QPixmap ret = QImage() if as_image else QPixmap() with self.safe_read_lock: path = self._format_abspath(book_id, '__COVER_INTERNAL__') diff --git a/src/calibre/db/fts/connect.py b/src/calibre/db/fts/connect.py index 6e31fb30a3..90faebcc86 100644 --- a/src/calibre/db/fts/connect.py +++ b/src/calibre/db/fts/connect.py @@ -133,14 +133,14 @@ class FTS: break self.add_text(book_id, fmt, text, text_hash, fmt_size, fmt_hash, err_msg) - def queue_job(self, book_id, fmt, path, fmt_size, fmt_hash): + def queue_job(self, book_id, fmt, path, fmt_size, fmt_hash, start_time): conn = self.get_connection() fmt = fmt.upper() for x in conn.get('SELECT id FROM fts_db.books_text WHERE book=? AND format=? AND format_size=? AND format_hash=?', ( book_id, fmt, fmt_size, fmt_hash)): break else: - self.pool.add_job(book_id, fmt, path, fmt_size, fmt_hash) + self.pool.add_job(book_id, fmt, path, fmt_size, fmt_hash, start_time) conn.execute('UPDATE fts_db.dirtied_formats SET in_progress=TRUE WHERE book=? AND format=?', (book_id, fmt)) return True self.remove_dirty(book_id, fmt) diff --git a/src/calibre/db/fts/pool.py b/src/calibre/db/fts/pool.py index 995a55c265..34c3477c19 100644 --- a/src/calibre/db/fts/pool.py +++ b/src/calibre/db/fts/pool.py @@ -21,12 +21,13 @@ quit = object() class Job: - def __init__(self, book_id, fmt, path, fmt_size, fmt_hash): + def __init__(self, book_id, fmt, path, fmt_size, fmt_hash, start_time): self.book_id = book_id self.fmt = fmt self.fmt_size = fmt_size self.fmt_hash = fmt_hash self.path = path + self.start_time = start_time class Result: @@ -37,6 +38,7 @@ class Result: self.fmt_size = job.fmt_size self.fmt_hash = job.fmt_hash self.ok = not bool(err_msg) + self.start_time = job.start_time if self.ok: with open(job.path + '.txt', 'rb') as src: try: @@ -177,9 +179,9 @@ class Pool: self.initialize() self.supervise_queue.put(check_for_work) - def add_job(self, book_id, fmt, path, fmt_size, fmt_hash): + def add_job(self, book_id, fmt, path, fmt_size, fmt_hash, start_time): self.initialize() - job = Job(book_id, fmt, path, fmt_size, fmt_hash) + job = Job(book_id, fmt, path, fmt_size, fmt_hash, start_time) self.jobs_queue.put(job) def commit_result(self, result): @@ -192,7 +194,7 @@ class Pool: text = '' db = self.dbref() if db is not None: - db.commit_fts_result(result.book_id, result.fmt, result.fmt_size, result.fmt_hash, text, err_msg) + db.commit_fts_result(result.book_id, result.fmt, result.fmt_size, result.fmt_hash, text, err_msg, result.start_time) def shutdown(self): if self.initialized.is_set(): diff --git a/src/calibre/db/utils.py b/src/calibre/db/utils.py index a96856530f..5463c17047 100644 --- a/src/calibre/db/utils.py +++ b/src/calibre/db/utils.py @@ -4,7 +4,7 @@ __license__ = 'GPL v3' __copyright__ = '2013, Kovid Goyal ' -import os, errno, sys, re, time +import os, errno, sys, re from locale import localeconv from collections import OrderedDict, namedtuple from polyglot.builtins import iteritems, itervalues, string_or_bytes @@ -458,24 +458,15 @@ class IndexingProgress: self.reset() def __repr__(self): - return f'IndexingProgress(left={self.left}, total={self.total})' - - def clear_rate_information(self): - from collections import deque - self.done_events = deque() + return f'IndexingProgress(left={self.left}, total={self.total}, rate={self.indexing_rate})' def reset(self): self.left = self.total = -1 - self.clear_rate_information() + self.indexing_rate = None - def update(self, left, total): - changed = (left, total) != (self.left, self.total) - if changed: - done_num = self.left - left - if done_num > 0 and self.left > -1: # initial event will have self.left == -1 - self.done_events.append((done_num, time.monotonic())) - if len(self.done_events) > 50: - self.done_events.popleft() + def update(self, left, total, indexing_rate): + changed = (left, total, indexing_rate) != (self.left, self.total, self.indexing_rate) + self.indexing_rate = indexing_rate self.left, self.total = left, total return changed @@ -493,14 +484,10 @@ class IndexingProgress: return _('calculating time left') if self.left < 2: return _('almost done') - if len(self.done_events) < 5: + if self.indexing_rate is None: return _('calculating time left') try: - start_time = self.done_events[0][1] - end_time = self.done_events[-1][1] - num_done = sum(x[0] for x in self.done_events) - self.done_events[0][0] - rate = num_done / max(0.1, end_time - start_time) - seconds_left = self.left / rate + seconds_left = self.left / self.indexing_rate return _('~{} left').format(human_readable_interval(seconds_left)) except Exception: return _('calculating time left') diff --git a/src/calibre/gui2/fts/scan.py b/src/calibre/gui2/fts/scan.py index c53a504376..0ccfeb3d5f 100644 --- a/src/calibre/gui2/fts/scan.py +++ b/src/calibre/gui2/fts/scan.py @@ -70,7 +70,6 @@ class ScanProgress(QWidget): def change_speed(self): db = get_db() db.set_fts_speed(slow=not self.fast_button.isChecked()) - self.indexing_progress.clear_rate_information() def update(self, complete, left, total): if complete: @@ -130,14 +129,15 @@ class ScanStatus(QWidget): def gui_update_event(self, db, event_type, event_data): if event_type is EventType.indexing_progress_changed: - self.update_stats() + self.update_stats(event_data) def __call__(self, event_type, library_id, event_data): if event_type is EventType.indexing_progress_changed: - self.update_stats() + self.update_stats(event_data) - def update_stats(self): - changed = self.indexing_progress.update(*self.db.fts_indexing_progress()) + def update_stats(self, event_data=None): + event_data = event_data or self.db.fts_indexing_progress() + changed = self.indexing_progress.update(*event_data) if changed: self.indexing_progress_changed.emit(self.indexing_progress.complete, self.indexing_progress.left, self.indexing_progress.total) @@ -194,12 +194,12 @@ class ScanStatus(QWidget): self.apply_fts_state() def startup(self): - self.indexing_progress.clear_rate_information() + self.db.fts_start_measuring_rate(measure=True) def shutdown(self): self.scan_progress.slow_button.setChecked(True) self.reset_indexing_state_for_current_db() - self.indexing_progress.clear_rate_information() + self.db.fts_start_measuring_rate(measure=False) if __name__ == '__main__':