Re-organization

This commit is contained in:
Kovid Goyal 2014-11-10 20:22:24 +05:30
parent 7a6ce7b3d8
commit 66267446ac

View File

@ -80,6 +80,50 @@ class Pool(Thread):
self.start()
def set_common_data(self, data=None):
''' Set some data that will be passed to all subsequent jobs without
needing to be transmitted every time. You must call this method before
queueing any jobs, otherwise the behavior is undefined. You can call it
after all jobs are done, then it will be used for the new round of
jobs. Can raise the :class:`Failure` exception is data could not be
sent to workers.'''
with self.lock:
if self.failed:
raise Failure(self.terminal_failure)
self.events.put(data)
def __call__(self, job_id, module, func, *args, **kwargs):
'''
Schedule a job. The job will be run in a worker process, with the
result placed in self.results. If a terminal failure has occurred
previously, this method will raise the :class:`Failure` exception.
:param job_id: A unique id for the job. The result will have this id.
:param module: Either a fully qualified python module name or python
source code which will be executed as a module.
Source code is detected by the presence of newlines in module.
:param func: Name of the function from ``module`` that will be
executed. ``args`` and ``kwargs`` will be passed to the function.
'''
job = Job(job_id, module, func, args, kwargs)
with self.lock:
if self.failed:
raise Failure(self.terminal_failure)
self.tracker.put(None)
self.events.put(job)
def wait_for_tasks(self, timeout=None):
''' Wait for all queued jobs to be completed, if timeout is not None,
will raise a RuntimeError if jobs are not completed in the specified
time. Will raise a :class:`Failure` exception if a terminal failure has
occurred previously. '''
if self.failed:
raise Failure(self.terminal_failure)
if timeout is None:
self.tracker.join()
else:
join_with_timeout(self.tracker, timeout)
def create_worker(self):
from calibre.utils.ipc.simple_worker import start_pipe_worker
p = start_pipe_worker(
@ -93,18 +137,6 @@ class Pool(Thread):
w.set_common_data(self.common_data)
return w
def set_common_data(self, data=None):
''' Set some data that will be passed to all subsequent jobs without
needing to be transmitted every time. You must call this method before
queueing any jobs, otherwise the behavior is undefined. You can call it
after all jobs are done, then it will be used for the new round of
jobs. Can raise the :class:`Failure` exception is data could not be
sent to workers.'''
with self.lock:
if self.failed:
raise Failure(self.terminal_failure)
self.events.put(data)
def start_worker(self):
try:
self.available_workers.append(self.create_worker())
@ -188,38 +220,6 @@ class Pool(Thread):
def failed(self):
return self.terminal_failure is not None
def __call__(self, job_id, module, func, *args, **kwargs):
'''
Schedule a job. The job will be run in a worker process, with the
result placed in self.results. If a terminal failure has occurred
previously, this method will raise the :class:`Failure` exception.
:param job_id: A unique id for the job. The result will have this id.
:param module: Either a fully qualified python module name or python
source code which will be executed as a module.
Source code is detected by the presence of newlines in module.
:param func: Name of the function from ``module`` that will be
executed. ``args`` and ``kwargs`` will be passed to the function.
'''
job = Job(job_id, module, func, args, kwargs)
with self.lock:
if self.failed:
raise Failure(self.terminal_failure)
self.tracker.put(None)
self.events.put(job)
def wait_for_tasks(self, timeout=None):
''' Wait for all queued jobs to be completed, if timeout is not None,
will raise a RuntimeError if jobs are not completed in the specified
time. Will raise a :class:`Failure` exception if a terminal failure has
occurred previously. '''
if self.failed:
raise Failure(self.terminal_failure)
if timeout is None:
self.tracker.join()
else:
join_with_timeout(self.tracker, timeout)
def terminal_error(self):
for worker, job in self.busy_workers.iteritems():
self.results.put(WorkerResult(job.id, Result(None, None, None), True, worker))