Get pool size controls working

This commit is contained in:
Kovid Goyal 2022-04-29 07:44:49 +05:30
parent afcaac89c6
commit abe42f909b
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
5 changed files with 107 additions and 18 deletions

View File

@ -957,7 +957,7 @@ class DB:
@fts_num_of_workers.setter @fts_num_of_workers.setter
def fts_num_of_workers(self, num): def fts_num_of_workers(self, num):
if self.fts_enabled: if self.fts_enabled:
self.fts.num_of_workers = num self.fts.pool.num_of_workers = num
def get_next_fts_job(self): def get_next_fts_job(self):
return self.fts.get_next_fts_job() return self.fts.get_next_fts_job()

View File

@ -456,11 +456,9 @@ class Cache:
return num_to_scan, (self.backend.get('SELECT COUNT(*) FROM main.data')[0][0] or 0) return num_to_scan, (self.backend.get('SELECT COUNT(*) FROM main.data')[0][0] or 0)
@write_api @write_api
def enable_fts(self, enabled=True, start_pool=True, mark_all_dirty=False): def enable_fts(self, enabled=True, start_pool=True):
fts = self.backend.enable_fts(weakref.ref(self) if enabled else None) fts = self.backend.enable_fts(weakref.ref(self) if enabled else None)
if fts and start_pool: # used in the tests if fts and start_pool: # used in the tests
if mark_all_dirty:
fts.dirty_existing()
self.start_fts_pool() self.start_fts_pool()
if not fts and self.fts_queue_thread: if not fts and self.fts_queue_thread:
self.fts_job_queue.put(None) self.fts_job_queue.put(None)
@ -534,7 +532,7 @@ class Cache:
@api @api
def set_fts_num_of_workers(self, num=None): def set_fts_num_of_workers(self, num=None):
existing = self.backend.fts_num_of_workers existing = self.backend.fts_num_of_workers
if num is not None and num != existing: if num is not None:
self.backend.fts_num_of_workers = num self.backend.fts_num_of_workers = num
if num > existing: if num > existing:
self.queue_next_fts_job() self.queue_next_fts_job()

View File

@ -9,10 +9,11 @@ import sys
import traceback import traceback
from contextlib import suppress from contextlib import suppress
from queue import Queue from queue import Queue
from threading import Thread, Event from threading import Event, Thread
from time import monotonic from time import monotonic
from calibre import human_readable from calibre import detect_ncpus, human_readable
from calibre.utils.config import dynamic
from calibre.utils.ipc.simple_worker import start_pipe_worker from calibre.utils.ipc.simple_worker import start_pipe_worker
check_for_work = object() check_for_work = object()
@ -116,8 +117,13 @@ class Worker(Thread):
class Pool: class Pool:
MAX_WORKERS_PREF_NAME = 'fts_pool_max_workers'
def __init__(self, dbref): def __init__(self, dbref):
self.max_workers = 1 try:
self.max_workers = min(max(1, int(dynamic.get(self.MAX_WORKERS_PREF_NAME, 1))), detect_ncpus())
except Exception:
self.max_workers = 1
self.jobs_queue = Queue() self.jobs_queue = Queue()
self.supervise_queue = Queue() self.supervise_queue = Queue()
self.workers = [] self.workers = []
@ -142,8 +148,6 @@ class Pool:
def create_worker(self): def create_worker(self):
w = Worker(self.jobs_queue, self.supervise_queue) w = Worker(self.jobs_queue, self.supervise_queue)
w.start() w.start()
while not w.is_alive():
w.join(0.01)
return w return w
def shrink_workers(self): def shrink_workers(self):
@ -162,12 +166,14 @@ class Pool:
def num_of_workers(self, num): def num_of_workers(self, num):
self.initialize() self.initialize()
self.prune_dead_workers() self.prune_dead_workers()
num = max(1, num) num = min(max(1, num), detect_ncpus())
self.max_workers = num if num != self.max_workers:
if num > len(self.workers): self.max_workers = num
self.expand_workers() dynamic.set(self.MAX_WORKERS_PREF_NAME, num)
elif num < self.workers: if num > len(self.workers):
self.shrink_workers() self.expand_workers()
elif num < len(self.workers):
self.shrink_workers()
@property @property
def num_of_idle_workers(self): def num_of_idle_workers(self):

View File

@ -26,6 +26,9 @@ class FTSAPITest(BaseTest):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
from calibre_extensions.sqlite_extension import set_ui_language from calibre_extensions.sqlite_extension import set_ui_language
from calibre.db.fts.pool import Pool
self.orig_pw_pref_name = Pool.MAX_WORKERS_PREF_NAME
Pool.MAX_WORKERS_PREF_NAME = 'test_fts_max_workers'
set_ui_language('en') set_ui_language('en')
self.libraries_to_close = [] self.libraries_to_close = []
@ -33,6 +36,8 @@ class FTSAPITest(BaseTest):
[c.close() for c in self.libraries_to_close] [c.close() for c in self.libraries_to_close]
super().tearDown() super().tearDown()
from calibre_extensions.sqlite_extension import set_ui_language from calibre_extensions.sqlite_extension import set_ui_language
from calibre.db.fts.pool import Pool
Pool.MAX_WORKERS_PREF_NAME = self.orig_pw_pref_name
set_ui_language('en') set_ui_language('en')
def new_library(self): def new_library(self):

View File

@ -3,38 +3,117 @@
# License: GPL v3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net> # License: GPL v3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net>
import os import os
from qt.core import QCheckBox, QLabel, QVBoxLayout, QWidget from qt.core import (
QCheckBox, QHBoxLayout, QLabel, QSpinBox, QTimer, QVBoxLayout, QWidget
)
from calibre import detect_ncpus
from calibre.db.fts.pool import Pool
from calibre.gui2.dialogs.confirm_delete import confirm from calibre.gui2.dialogs.confirm_delete import confirm
from calibre.gui2.fts.utils import get_db from calibre.gui2.fts.utils import get_db
from calibre.utils.config import dynamic
class IndexingProgress:
def __init__(self):
self.left = self.total = 0
@property
def complete(self):
return not self.left or not self.total
class ScanProgress(QWidget):
def __init__(self, parent):
super().__init__(parent)
self.l = l = QVBoxLayout(self)
l.setContentsMargins(0, 0, 0, 0)
self.status_label = la = QLabel('\xa0')
la.setWordWrap(True)
l.addWidget(la)
self.h = h = QHBoxLayout()
l.addLayout(h)
self.niwl = la = QLabel(_('Number of workers used for indexing:'))
h.addWidget(la)
self.num_of_workers = n = QSpinBox(self)
n.setMinimum(1)
n.setMaximum(detect_ncpus())
self.debounce_timer = t = QTimer(self)
t.setInterval(750)
t.timeout.connect(self.change_num_of_workers)
t.setSingleShot(True)
n.valueChanged.connect(self.schedule_change_num_of_workers)
try:
c = min(max(1, int(dynamic.get(Pool.MAX_WORKERS_PREF_NAME, 1))), n.maximum())
except Exception:
c = 1
n.setValue(c)
h.addWidget(n), h.addStretch(10)
self.wl = la = QLabel(_(
'Increasing the number of workers used for indexing will'
' speed up indexing at the cost of using more of the computer\'s resources.'
' Changes will take a few seconds to take effect.'
))
la.setWordWrap(True)
l.addWidget(la)
def schedule_change_num_of_workers(self):
self.debounce_timer.stop()
self.debounce_timer.start()
def change_num_of_workers(self):
get_db().set_fts_num_of_workers(self.num_of_workers.value())
def update(self, indexing_progress):
if indexing_progress.complete:
t = _('All book files indexed')
else:
done = indexing_progress.total - indexing_progress.left
t = _('{0} of {1} book files ({2:.0%}) have been indexed').format(
done, indexing_progress.total, done / indexing_progress.total)
self.status_label.setText(t)
class ScanStatus(QWidget): class ScanStatus(QWidget):
def __init__(self, parent=None): def __init__(self, parent=None):
super().__init__(parent) super().__init__(parent)
self.indexing_progress = IndexingProgress()
self.l = l = QVBoxLayout(self) self.l = l = QVBoxLayout(self)
l.setContentsMargins(0, 0, 0, 0) l.setContentsMargins(0, 0, 0, 0)
self.enable_fts = b = QCheckBox(self) self.enable_fts = b = QCheckBox(self)
b.setText(_('&Index books in this library to allow searching their full text')) b.setText(_('&Index books in this library to allow searching their full text'))
b.setChecked(self.db.is_fts_enabled())
l.addWidget(b) l.addWidget(b)
self.enable_msg = la = QLabel('<p>' + _( self.enable_msg = la = QLabel('<p>' + _(
'In order to search the full text of books, the text must first be <i>indexed</i>. Once enabled, indexing is done' 'In order to search the full text of books, the text must first be <i>indexed</i>. Once enabled, indexing is done'
' automatically, in the background, whenever new books are added to this calibre library.')) ' automatically, in the background, whenever new books are added to this calibre library.'))
la.setWordWrap(True) la.setWordWrap(True)
l.addWidget(la) l.addWidget(la)
self.scan_progress = sc = ScanProgress(self)
l.addWidget(sc)
l.addStretch(10) l.addStretch(10)
self.apply_fts_state() self.apply_fts_state()
self.enable_fts.toggled.connect(self.change_fts_state) self.enable_fts.toggled.connect(self.change_fts_state)
self.indexing_status_timer = t = QTimer(self)
t.timeout.connect(self.update_stats)
t.start(1000)
self.update_stats()
def update_stats(self):
self.indexing_progress.left, self.indexing_progress.total = self.db.fts_indexing_progress()
self.scan_progress.update(self.indexing_progress)
def change_fts_state(self): def change_fts_state(self):
if not self.enable_fts.isChecked() and not confirm(_( if not self.enable_fts.isChecked() and not confirm(_(
'Disabling indexing will mean that all books will have to be re-checked when re-enabling indexing. Are you sure?' 'Disabling indexing will mean that all books will have to be re-checked when re-enabling indexing. Are you sure?'
), 'disable-fts-indexing', self): ), 'disable-fts-indexing', self):
return return
self.db.enable_fts(enabled=self.enable_fts.isChecked(), mark_all_dirty=True) self.db.enable_fts(enabled=self.enable_fts.isChecked())
self.apply_fts_state() self.apply_fts_state()
def apply_fts_state(self): def apply_fts_state(self):
@ -44,6 +123,7 @@ class ScanStatus(QWidget):
f.setBold(not indexing_enabled) f.setBold(not indexing_enabled)
b.setFont(f) b.setFont(f)
self.enable_msg.setVisible(not indexing_enabled) self.enable_msg.setVisible(not indexing_enabled)
self.scan_progress.setVisible(indexing_enabled)
@property @property
def db(self): def db(self):