From c6efde3495b48f0dee3efb66e4397e861ef7e7e1 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sun, 7 Aug 2022 12:59:39 +0530 Subject: [PATCH] calibredb: Implement wait for indexing against remote libraries as well --- src/calibre/db/cli/cmd_fts_index.py | 125 +++++++++++++--------------- src/calibre/db/utils.py | 2 + 2 files changed, 61 insertions(+), 66 deletions(-) diff --git a/src/calibre/db/cli/cmd_fts_index.py b/src/calibre/db/cli/cmd_fts_index.py index 0f39d03005..97b7d085fe 100644 --- a/src/calibre/db/cli/cmd_fts_index.py +++ b/src/calibre/db/cli/cmd_fts_index.py @@ -2,54 +2,27 @@ # License: GPLv3 Copyright: 2017, Kovid Goyal import sys -from functools import lru_cache + +from calibre.db.listeners import EventType version = 0 # change this if you change signature of implementation() -@lru_cache -def indexing_progress(): - from threading import Lock - from calibre.db.utils import IndexingProgress - ans = IndexingProgress() - ans.lock = Lock() - return ans - - -def update_indexing_progress(left, total): - ip = indexing_progress() - with ip.lock: - ip.update(left, total) - - -def reset_indexing_progress(): - ip = indexing_progress() - with ip.lock: - ip.reset() - - -def indexing_progress_time_left(): - ip = indexing_progress() - with ip.lock: - return ip.time_left - - def implementation(db, notify_changes, action, adata=None): if action == 'status': if db.is_fts_enabled(): - l, t = db.fts_indexing_progress() - return {'enabled': True, 'left': l, 'total': t} + l, t, r = db.fts_indexing_progress() + return {'enabled': True, 'left': l, 'total': t, 'rate': r} return {'enabled': False, 'left': -1, 'total': -1} if action == 'enable': if not db.is_fts_enabled(): db.enable_fts() - l, t = db.fts_indexing_progress() - return {'enabled': True, 'left': l, 'total': t} + l, t, r = db.fts_indexing_progress() + return {'enabled': True, 'left': l, 'total': t, 'rate': r} if action == 'disable': if db.is_fts_enabled(): - reset_indexing_progress() db.enable_fts(enabled=False) return @@ -64,8 +37,20 @@ def implementation(db, notify_changes, action, adata=None): db.reindex_fts_book(*item) else: db.reindex_fts() - l, t = db.fts_indexing_progress() - return {'enabled': True, 'left': l, 'total': t} + l, t, r = db.fts_indexing_progress() + return {'enabled': True, 'left': l, 'total': t, 'rate': r} + + if action == 'wait': + if not db.is_fts_enabled(): + a = Exception(_('Full text indexing is not enabled on this library')) + a.suppress_traceback = True + raise a + if 'measure_state' in adata: + db.fts_start_measuring_rate(measure=adata['measure_state']) + if adata.get('speed'): + db.set_fts_speed(slow=adata['speed'] == 'slow') + l, t, r = db.fts_indexing_progress() + return {'left': l, 'total': t, 'rate': r} def option_parser(get_parser, args): @@ -97,39 +82,58 @@ Control the Full text search indexing process. default='', choices=('fast', 'slow', ''), help=_('The speed of indexing. Use fast for fast indexing using all your computers resources' - ' and slow for less resource intensive indexing. Note that the speed is reset to slow on every invocation.') + ' and slow for less resource intensive indexing. Note that the speed is reset to slow after every invocation.') ) return parser +def run_job(dbctx, which, **data): + try: + return dbctx.run('fts_index', which, data) + except Exception as e: + if getattr(e, 'suppress_traceback', False): + raise SystemExit(str(e)) + raise + + +def show_progress(left, total, rate): + from calibre.db.utils import IndexingProgress + ip = IndexingProgress() + ip.update(left, total, rate) + print('\r\x1b[K' + _('{} of {} book files indexed, {}').format(total-left, total, ip.time_left), flush=True, end=' ...') + + +def remote_wait_for_completion(dbctx, indexing_speed): + import time + s = run_job(dbctx, 'wait', speed=indexing_speed, measure_state=True) + try: + while s['left'] > 0: + show_progress(s['left'], s['total'], s['rate']) + time.sleep(1) + s = run_job(dbctx, 'wait') + finally: + print() + run_job(dbctx, 'wait', speed='slow', measure_state=False) + + def local_wait_for_completion(db, indexing_speed): - from calibre.db.listeners import EventType from queue import Queue q = Queue() - def listen(event_type, library_id, event_data): + def notifier(event_type, library_id, event_data): if event_type is EventType.indexing_progress_changed: - update_indexing_progress(*event_data) q.put(event_data) - def show_progress(left, total): - print('\r\x1b[K' + _('{} of {} book files indexed, {}').format(total-left, total, indexing_progress_time_left()), flush=True, end=' ...') - - db.add_listener(listen) + db.add_listener(notifier) if indexing_speed: db.set_fts_speed(slow=indexing_speed == 'slow') - l, t = db.fts_indexing_progress() - if l < 1: - return - show_progress(l, t) - - while True: - l, t = q.get() - if l < 1: - print() - return - show_progress(l, t) + db.fts_start_measuring_rate() + l, t, r = db.fts_indexing_progress() + while l > 0: + show_progress(l, t, r) + l, t, r = q.get() + print() def main(opts, args, dbctx): @@ -137,17 +141,6 @@ def main(opts, args, dbctx): dbctx.option_parser.print_help() raise SystemExit(_('Error: You must specify the indexing action')) action = args[0] - adata = {} - - def run_job(dbctx, which, **kw): - data = adata.copy() - data.update(kw) - try: - return dbctx.run('fts_index', which, data) - except Exception as e: - if getattr(e, 'suppress_traceback', False): - raise SystemExit(str(e)) - raise if action == 'status': s = run_job(dbctx, 'status') @@ -213,7 +206,7 @@ def main(opts, args, dbctx): print(_('Waiting for FTS indexing to complete, press Ctrl-C to abort...')) try: if dbctx.is_remote: - raise NotImplementedError('TODO: Implement waiting for completion via polling') + remote_wait_for_completion(dbctx, opts.indexing_speed) else: local_wait_for_completion(dbctx.db.new_api, opts.indexing_speed) except KeyboardInterrupt: diff --git a/src/calibre/db/utils.py b/src/calibre/db/utils.py index 5463c17047..2ac8a0e0a8 100644 --- a/src/calibre/db/utils.py +++ b/src/calibre/db/utils.py @@ -488,6 +488,8 @@ class IndexingProgress: return _('calculating time left') try: seconds_left = self.left / self.indexing_rate + if seconds_left < 2: + return _('almost done') return _('~{} left').format(human_readable_interval(seconds_left)) except Exception: return _('calculating time left')