mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-09 03:04:10 -04:00
Launch worker processes on demand
This commit is contained in:
parent
ccaa7143b5
commit
6b52f4ad89
@ -156,7 +156,9 @@ def _config():
|
|||||||
c.add_opt('plugin_search_history', default=[],
|
c.add_opt('plugin_search_history', default=[],
|
||||||
help='Search history for the recipe scheduler')
|
help='Search history for the recipe scheduler')
|
||||||
c.add_opt('worker_limit', default=6,
|
c.add_opt('worker_limit', default=6,
|
||||||
help=_('Maximum number of waiting worker processes'))
|
help=_(
|
||||||
|
'Maximum number of simultaneous conversion/news download jobs. '
|
||||||
|
'This number is twice the actual value for historical reasons.'))
|
||||||
c.add_opt('get_social_metadata', default=True,
|
c.add_opt('get_social_metadata', default=True,
|
||||||
help=_('Download social metadata (tags/rating/etc.)'))
|
help=_('Download social metadata (tags/rating/etc.)'))
|
||||||
c.add_opt('overwrite_author_title_metadata', default=True,
|
c.add_opt('overwrite_author_title_metadata', default=True,
|
||||||
|
@ -6,19 +6,27 @@ __copyright__ = '2010, Kovid Goyal <kovid@kovidgoyal.net>'
|
|||||||
__docformat__ = 'restructuredtext en'
|
__docformat__ = 'restructuredtext en'
|
||||||
|
|
||||||
|
|
||||||
from calibre.gui2.preferences import ConfigWidgetBase, test_widget
|
from calibre.gui2.preferences import ConfigWidgetBase, test_widget, Setting
|
||||||
from calibre.gui2.preferences.misc_ui import Ui_Form
|
from calibre.gui2.preferences.misc_ui import Ui_Form
|
||||||
from calibre.gui2 import error_dialog, config, open_local_file, info_dialog
|
from calibre.gui2 import error_dialog, config, open_local_file, info_dialog
|
||||||
from calibre.constants import isosx
|
from calibre.constants import isosx
|
||||||
|
|
||||||
# Check Integrity {{{
|
class WorkersSetting(Setting):
|
||||||
|
|
||||||
|
def set_gui_val(self, val):
|
||||||
|
val = val//2
|
||||||
|
Setting.set_gui_val(self, val)
|
||||||
|
|
||||||
|
def get_gui_val(self):
|
||||||
|
val = Setting.get_gui_val(self)
|
||||||
|
return val * 2
|
||||||
|
|
||||||
class ConfigWidget(ConfigWidgetBase, Ui_Form):
|
class ConfigWidget(ConfigWidgetBase, Ui_Form):
|
||||||
|
|
||||||
def genesis(self, gui):
|
def genesis(self, gui):
|
||||||
self.gui = gui
|
self.gui = gui
|
||||||
r = self.register
|
r = self.register
|
||||||
r('worker_limit', config, restart_required=True)
|
r('worker_limit', config, restart_required=True, setting=WorkersSetting)
|
||||||
r('enforce_cpu_limit', config, restart_required=True)
|
r('enforce_cpu_limit', config, restart_required=True)
|
||||||
self.device_detection_button.clicked.connect(self.debug_device_detection)
|
self.device_detection_button.clicked.connect(self.debug_device_detection)
|
||||||
self.button_open_config_dir.clicked.connect(self.open_config_dir)
|
self.button_open_config_dir.clicked.connect(self.open_config_dir)
|
||||||
|
@ -17,7 +17,7 @@
|
|||||||
<item row="0" column="0">
|
<item row="0" column="0">
|
||||||
<widget class="QLabel" name="label_5">
|
<widget class="QLabel" name="label_5">
|
||||||
<property name="text">
|
<property name="text">
|
||||||
<string>&Maximum number of waiting worker processes (needs restart):</string>
|
<string>Max. simultaneous conversion/news download jobs:</string>
|
||||||
</property>
|
</property>
|
||||||
<property name="buddy">
|
<property name="buddy">
|
||||||
<cstring>opt_worker_limit</cstring>
|
<cstring>opt_worker_limit</cstring>
|
||||||
@ -27,13 +27,7 @@
|
|||||||
<item row="0" column="1">
|
<item row="0" column="1">
|
||||||
<widget class="QSpinBox" name="opt_worker_limit">
|
<widget class="QSpinBox" name="opt_worker_limit">
|
||||||
<property name="minimum">
|
<property name="minimum">
|
||||||
<number>2</number>
|
<number>1</number>
|
||||||
</property>
|
|
||||||
<property name="maximum">
|
|
||||||
<number>10000</number>
|
|
||||||
</property>
|
|
||||||
<property name="singleStep">
|
|
||||||
<number>2</number>
|
|
||||||
</property>
|
</property>
|
||||||
</widget>
|
</widget>
|
||||||
</item>
|
</item>
|
||||||
|
@ -549,17 +549,6 @@ How do I run calibre from my USB stick?
|
|||||||
|
|
||||||
A portable version of calibre is available at: `portableapps.com <http://portableapps.com/node/20518>`_. However, this is usually out of date. You can also setup your own portable calibre install by following :ref:`these instructions <portablecalibre>`.
|
A portable version of calibre is available at: `portableapps.com <http://portableapps.com/node/20518>`_. However, this is usually out of date. You can also setup your own portable calibre install by following :ref:`these instructions <portablecalibre>`.
|
||||||
|
|
||||||
Why are there so many calibre-parallel processes on my system?
|
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
||||||
|
|
||||||
|app| maintains two separate worker process pools. One is used for adding books/saving to disk and the other for conversions. You can control the number of worker processes via :guilabel:`Preferences->Advanced->Miscellaneous`. So if you set it to 6 that means a maximum of 3 conversions will run simultaneously. And that is why you will see the number of worker processes changes by two when you use the up and down arrows. On windows, you can set the priority that these processes run with. This can be useful on older, single CPU machines, if you find them slowing down to a crawl when conversions are running.
|
|
||||||
|
|
||||||
In addition to this some conversion plugins run tasks in their own pool of processes, so for example if you bulk convert comics, each comic conversion will use three separate processes to render the images. The job manager knows this so it will run only a single comic conversion simultaneously.
|
|
||||||
|
|
||||||
And since I'm sure someone will ask: The reason adding/saving books are in separate processes is because of PDF. PDF processing libraries can crash on reading PDFs and I dont want the crash to take down all of calibre. Also when adding EPUB books, in order to extract the cover you have to sometimes render the HTML of the first page, which means that it either has to run in the GUI thread of the main process or in a separate process.
|
|
||||||
|
|
||||||
Finally, the reason calibre keep workers alive and idle instead of launching on demand is to workaround the slow startup time of python processes.
|
|
||||||
|
|
||||||
How do I run parts of |app| like news download and the content server on my own linux server?
|
How do I run parts of |app| like news download and the content server on my own linux server?
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
@ -17,7 +17,7 @@ from binascii import hexlify
|
|||||||
from calibre.utils.ipc.launch import Worker
|
from calibre.utils.ipc.launch import Worker
|
||||||
from calibre.utils.ipc.worker import PARALLEL_FUNCS
|
from calibre.utils.ipc.worker import PARALLEL_FUNCS
|
||||||
from calibre import detect_ncpus as cpu_count
|
from calibre import detect_ncpus as cpu_count
|
||||||
from calibre.constants import iswindows
|
from calibre.constants import iswindows, DEBUG
|
||||||
from calibre.ptempfile import base_dir
|
from calibre.ptempfile import base_dir
|
||||||
|
|
||||||
_counter = 0
|
_counter = 0
|
||||||
@ -106,14 +106,14 @@ class Server(Thread):
|
|||||||
self.add_jobs_queue, self.changed_jobs_queue = Queue(), Queue()
|
self.add_jobs_queue, self.changed_jobs_queue = Queue(), Queue()
|
||||||
self.kill_queue = Queue()
|
self.kill_queue = Queue()
|
||||||
self.waiting_jobs = []
|
self.waiting_jobs = []
|
||||||
self.pool, self.workers = deque(), deque()
|
self.workers = deque()
|
||||||
self.launched_worker_count = 0
|
self.launched_worker_count = 0
|
||||||
self._worker_launch_lock = RLock()
|
self._worker_launch_lock = RLock()
|
||||||
|
|
||||||
self.start()
|
self.start()
|
||||||
|
|
||||||
def launch_worker(self, gui=False, redirect_output=None):
|
def launch_worker(self, gui=False, redirect_output=None):
|
||||||
#start = time.time()
|
start = time.time()
|
||||||
with self._worker_launch_lock:
|
with self._worker_launch_lock:
|
||||||
self.launched_worker_count += 1
|
self.launched_worker_count += 1
|
||||||
id = self.launched_worker_count
|
id = self.launched_worker_count
|
||||||
@ -137,7 +137,8 @@ class Server(Thread):
|
|||||||
break
|
break
|
||||||
if isinstance(cw, basestring):
|
if isinstance(cw, basestring):
|
||||||
raise CriticalError('Failed to launch worker process:\n'+cw)
|
raise CriticalError('Failed to launch worker process:\n'+cw)
|
||||||
#print 'Launch took:', time.time() - start
|
if DEBUG:
|
||||||
|
print 'Worker Launch took:', time.time() - start
|
||||||
return cw
|
return cw
|
||||||
|
|
||||||
def do_launch(self, env, gui, redirect_output, rfile):
|
def do_launch(self, env, gui, redirect_output, rfile):
|
||||||
@ -206,13 +207,6 @@ class Server(Thread):
|
|||||||
job.duration = time.time() - job.start_time
|
job.duration = time.time() - job.start_time
|
||||||
self.changed_jobs_queue.put(job)
|
self.changed_jobs_queue.put(job)
|
||||||
|
|
||||||
# Start new workers
|
|
||||||
if len(self.pool) + len(self.workers) < self.pool_size:
|
|
||||||
try:
|
|
||||||
self.pool.append(self.launch_worker())
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Start waiting jobs
|
# Start waiting jobs
|
||||||
sj = self.suitable_waiting_job()
|
sj = self.suitable_waiting_job()
|
||||||
if sj is not None:
|
if sj is not None:
|
||||||
@ -224,7 +218,7 @@ class Server(Thread):
|
|||||||
job.killed = job.failed = True
|
job.killed = job.failed = True
|
||||||
job.result = None
|
job.result = None
|
||||||
else:
|
else:
|
||||||
worker = self.pool.pop()
|
worker = self.launch_worker()
|
||||||
worker.start_job(job)
|
worker.start_job(job)
|
||||||
self.workers.append(worker)
|
self.workers.append(worker)
|
||||||
job.log_path = worker.log_path
|
job.log_path = worker.log_path
|
||||||
@ -238,7 +232,7 @@ class Server(Thread):
|
|||||||
break
|
break
|
||||||
|
|
||||||
def suitable_waiting_job(self):
|
def suitable_waiting_job(self):
|
||||||
available_workers = len(self.pool)
|
available_workers = self.pool_size - len(self.workers)
|
||||||
for worker in self.workers:
|
for worker in self.workers:
|
||||||
job = worker.job
|
job = worker.job
|
||||||
if job.core_usage == -1:
|
if job.core_usage == -1:
|
||||||
@ -304,11 +298,6 @@ class Server(Thread):
|
|||||||
worker.kill()
|
worker.kill()
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
for worker in list(self.pool):
|
|
||||||
try:
|
|
||||||
worker.kill()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
return self
|
return self
|
||||||
|
Loading…
x
Reference in New Issue
Block a user