allow aborting the the simple worker and document fork_job()

This commit is contained in:
Kovid Goyal 2012-03-01 11:35:00 +05:30
parent 390327f4da
commit 8dae598304

View File

@ -62,11 +62,13 @@ class ConnectedWorker(Thread):
except: except:
self.tb = traceback.format_exc() self.tb = traceback.format_exc()
def communicate(ans, worker, listener, args, timeout=300, heartbeat=None): def communicate(ans, worker, listener, args, timeout=300, heartbeat=None,
abort=None):
cw = ConnectedWorker(listener, args) cw = ConnectedWorker(listener, args)
cw.start() cw.start()
st = time.time() st = time.time()
check_heartbeat = callable(heartbeat) check_heartbeat = callable(heartbeat)
while worker.is_alive and cw.is_alive(): while worker.is_alive and cw.is_alive():
cw.join(0.01) cw.join(0.01)
delta = time.time() - st delta = time.time() - st
@ -75,6 +77,10 @@ def communicate(ans, worker, listener, args, timeout=300, heartbeat=None):
hung = not heartbeat() if check_heartbeat else delta > timeout hung = not heartbeat() if check_heartbeat else delta > timeout
if hung: if hung:
raise WorkerError('Worker appears to have hung') raise WorkerError('Worker appears to have hung')
if abort is not None and abort.is_set():
# The worker process will be killed by fork_job, after we return
return
if not cw.accepted: if not cw.accepted:
if not cw.tb: if not cw.tb:
raise WorkerError('Failed to connect to worker process') raise WorkerError('Failed to connect to worker process')
@ -87,7 +93,50 @@ def communicate(ans, worker, listener, args, timeout=300, heartbeat=None):
ans['result'] = cw.res['result'] ans['result'] = cw.res['result']
def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds
cwd=None, priority='normal', env={}, no_output=False, heartbeat=None): cwd=None, priority='normal', env={}, no_output=False, heartbeat=None,
abort=None):
'''
Run a job in a worker process. A job is simply a function that will be
called with the supplied arguments, in the worker process.
The result of the function will be returned.
If an error occurs a WorkerError is raised.
:param mod_name: Module to import in the worker process
:param func_name: Function to call in the worker process from the imported
module
:param args: Positional arguments to pass to the function
:param kwargs: Keyword arguments to pass to the function
:param timeout: The time in seconds to wait for the worker process to
complete. If it takes longer a WorkerError is raised and the process is
killed.
:param cwd: The working directory for the worker process.
:param priority: The process priority for the worker process
:param env: Extra environment variables to set for the worker process
:param no_output: If True, the stdout and stderr of the worker process are
discarded
:param heartbeat: If not None, it is used to check if the worker has hung,
instead of a simple timeout. It must be a callable that takes no
arguments and returns True or False. The worker will be assumed to have
hung if this function returns False. At that point, the process will be
killed and a WorkerError will be raised.
:param abort: If not None, it must be an Event. As soon as abort.is_set()
returns True, the worker process is killed. No error is raised.
:return: A dictionary with the keys result and stdout_stderr. result is the
return value of the function (it must be picklable). stdout_stderr is the
path to a file that contains the stdout and stderr of the worker process.
If you set no_output=True, then this will not be present.
'''
ans = {'result':None, 'stdout_stderr':None} ans = {'result':None, 'stdout_stderr':None}
@ -110,7 +159,7 @@ def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds
w(cwd=cwd, priority=priority) w(cwd=cwd, priority=priority)
try: try:
communicate(ans, w, listener, (mod_name, func_name, args, kwargs), communicate(ans, w, listener, (mod_name, func_name, args, kwargs),
timeout=timeout, heartbeat=heartbeat) timeout=timeout, heartbeat=heartbeat, abort=abort)
finally: finally:
t = Thread(target=w.kill) t = Thread(target=w.kill)
t.daemon=True t.daemon=True