Make set_common_data asynchronous

This commit is contained in:
Kovid Goyal 2014-11-10 20:18:48 +05:30
parent f8c7d1ae07
commit 7a6ce7b3d8

View File

@ -101,20 +101,9 @@ class Pool(Thread):
jobs. Can raise the :class:`Failure` exception is data could not be
sent to workers.'''
with self.lock:
self.common_data = cPickle.dumps(data, -1)
if len(self.common_data) > MAX_SIZE:
self.cd_file = PersistentTemporaryFile('pool_common_data')
with self.cd_file as f:
f.write(self.common_data)
self.common_data = cPickle.dumps(File(f.name), -1)
for worker in self.available_workers:
try:
worker.set_common_data(self.common_data)
except Exception:
import traceback
self.terminal_failure = TerminalFailure('Worker process crashed while sending common data', traceback.format_exc())
self.terminal_error()
raise Failure(self.terminal_failure)
if self.failed:
raise Failure(self.terminal_failure)
self.events.put(data)
def start_worker(self):
try:
@ -154,7 +143,7 @@ class Pool(Thread):
if self.start_worker() is False:
return False
return self.run_job(job)
else:
elif isinstance(event, WorkerResult):
worker_result = event
self.busy_workers.pop(worker_result.worker, None)
self.available_workers.append(worker_result.worker)
@ -164,6 +153,21 @@ class Pool(Thread):
self.terminal_error()
return False
self.results.put(worker_result)
else:
self.common_data = cPickle.dumps(event, -1)
if len(self.common_data) > MAX_SIZE:
self.cd_file = PersistentTemporaryFile('pool_common_data')
with self.cd_file as f:
f.write(self.common_data)
self.common_data = cPickle.dumps(File(f.name), -1)
for worker in self.available_workers:
try:
worker.set_common_data(self.common_data)
except Exception:
import traceback
self.terminal_failure = TerminalFailure('Worker process crashed while sending common data', traceback.format_exc())
self.terminal_error()
return False
while self.pending_jobs and self.available_workers:
if self.run_job(self.pending_jobs.pop()) is False: