diff --git a/src/calibre/utils/ipc/pool.py b/src/calibre/utils/ipc/pool.py index dfc3b9bfe0..624081d58a 100644 --- a/src/calibre/utils/ipc/pool.py +++ b/src/calibre/utils/ipc/pool.py @@ -101,20 +101,9 @@ class Pool(Thread): jobs. Can raise the :class:`Failure` exception is data could not be sent to workers.''' with self.lock: - self.common_data = cPickle.dumps(data, -1) - if len(self.common_data) > MAX_SIZE: - self.cd_file = PersistentTemporaryFile('pool_common_data') - with self.cd_file as f: - f.write(self.common_data) - self.common_data = cPickle.dumps(File(f.name), -1) - for worker in self.available_workers: - try: - worker.set_common_data(self.common_data) - except Exception: - import traceback - self.terminal_failure = TerminalFailure('Worker process crashed while sending common data', traceback.format_exc()) - self.terminal_error() - raise Failure(self.terminal_failure) + if self.failed: + raise Failure(self.terminal_failure) + self.events.put(data) def start_worker(self): try: @@ -154,7 +143,7 @@ class Pool(Thread): if self.start_worker() is False: return False return self.run_job(job) - else: + elif isinstance(event, WorkerResult): worker_result = event self.busy_workers.pop(worker_result.worker, None) self.available_workers.append(worker_result.worker) @@ -164,6 +153,21 @@ class Pool(Thread): self.terminal_error() return False self.results.put(worker_result) + else: + self.common_data = cPickle.dumps(event, -1) + if len(self.common_data) > MAX_SIZE: + self.cd_file = PersistentTemporaryFile('pool_common_data') + with self.cd_file as f: + f.write(self.common_data) + self.common_data = cPickle.dumps(File(f.name), -1) + for worker in self.available_workers: + try: + worker.set_common_data(self.common_data) + except Exception: + import traceback + self.terminal_failure = TerminalFailure('Worker process crashed while sending common data', traceback.format_exc()) + self.terminal_error() + return False while self.pending_jobs and self.available_workers: if self.run_job(self.pending_jobs.pop()) is False: