diff --git a/src/calibre/utils/ipc/pool.py b/src/calibre/utils/ipc/pool.py index 624081d58a..35fa569f55 100644 --- a/src/calibre/utils/ipc/pool.py +++ b/src/calibre/utils/ipc/pool.py @@ -80,6 +80,50 @@ class Pool(Thread): self.start() + def set_common_data(self, data=None): + ''' Set some data that will be passed to all subsequent jobs without + needing to be transmitted every time. You must call this method before + queueing any jobs, otherwise the behavior is undefined. You can call it + after all jobs are done, then it will be used for the new round of + jobs. Can raise the :class:`Failure` exception is data could not be + sent to workers.''' + with self.lock: + if self.failed: + raise Failure(self.terminal_failure) + self.events.put(data) + + def __call__(self, job_id, module, func, *args, **kwargs): + ''' + Schedule a job. The job will be run in a worker process, with the + result placed in self.results. If a terminal failure has occurred + previously, this method will raise the :class:`Failure` exception. + + :param job_id: A unique id for the job. The result will have this id. + :param module: Either a fully qualified python module name or python + source code which will be executed as a module. + Source code is detected by the presence of newlines in module. + :param func: Name of the function from ``module`` that will be + executed. ``args`` and ``kwargs`` will be passed to the function. + ''' + job = Job(job_id, module, func, args, kwargs) + with self.lock: + if self.failed: + raise Failure(self.terminal_failure) + self.tracker.put(None) + self.events.put(job) + + def wait_for_tasks(self, timeout=None): + ''' Wait for all queued jobs to be completed, if timeout is not None, + will raise a RuntimeError if jobs are not completed in the specified + time. Will raise a :class:`Failure` exception if a terminal failure has + occurred previously. ''' + if self.failed: + raise Failure(self.terminal_failure) + if timeout is None: + self.tracker.join() + else: + join_with_timeout(self.tracker, timeout) + def create_worker(self): from calibre.utils.ipc.simple_worker import start_pipe_worker p = start_pipe_worker( @@ -93,18 +137,6 @@ class Pool(Thread): w.set_common_data(self.common_data) return w - def set_common_data(self, data=None): - ''' Set some data that will be passed to all subsequent jobs without - needing to be transmitted every time. You must call this method before - queueing any jobs, otherwise the behavior is undefined. You can call it - after all jobs are done, then it will be used for the new round of - jobs. Can raise the :class:`Failure` exception is data could not be - sent to workers.''' - with self.lock: - if self.failed: - raise Failure(self.terminal_failure) - self.events.put(data) - def start_worker(self): try: self.available_workers.append(self.create_worker()) @@ -188,38 +220,6 @@ class Pool(Thread): def failed(self): return self.terminal_failure is not None - def __call__(self, job_id, module, func, *args, **kwargs): - ''' - Schedule a job. The job will be run in a worker process, with the - result placed in self.results. If a terminal failure has occurred - previously, this method will raise the :class:`Failure` exception. - - :param job_id: A unique id for the job. The result will have this id. - :param module: Either a fully qualified python module name or python - source code which will be executed as a module. - Source code is detected by the presence of newlines in module. - :param func: Name of the function from ``module`` that will be - executed. ``args`` and ``kwargs`` will be passed to the function. - ''' - job = Job(job_id, module, func, args, kwargs) - with self.lock: - if self.failed: - raise Failure(self.terminal_failure) - self.tracker.put(None) - self.events.put(job) - - def wait_for_tasks(self, timeout=None): - ''' Wait for all queued jobs to be completed, if timeout is not None, - will raise a RuntimeError if jobs are not completed in the specified - time. Will raise a :class:`Failure` exception if a terminal failure has - occurred previously. ''' - if self.failed: - raise Failure(self.terminal_failure) - if timeout is None: - self.tracker.join() - else: - join_with_timeout(self.tracker, timeout) - def terminal_error(self): for worker, job in self.busy_workers.iteritems(): self.results.put(WorkerResult(job.id, Result(None, None, None), True, worker))