add a generic mechanism to check if a simple worker is alive

This commit is contained in:
Kovid Goyal 2012-03-01 10:53:30 +05:30
parent 436f9ff752
commit 390327f4da

View File

@ -62,16 +62,18 @@ class ConnectedWorker(Thread):
except:
self.tb = traceback.format_exc()
def communicate(ans, worker, listener, args, timeout=300):
def communicate(ans, worker, listener, args, timeout=300, heartbeat=None):
cw = ConnectedWorker(listener, args)
cw.start()
st = time.time()
check_heartbeat = callable(heartbeat)
while worker.is_alive and cw.is_alive():
cw.join(0.01)
delta = time.time() - st
if not cw.accepted and delta > min(10, timeout):
break
if delta > timeout:
hung = not heartbeat() if check_heartbeat else delta > timeout
if hung:
raise WorkerError('Worker appears to have hung')
if not cw.accepted:
if not cw.tb:
@ -85,7 +87,7 @@ def communicate(ans, worker, listener, args, timeout=300):
ans['result'] = cw.res['result']
def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds
cwd=None, priority='normal', env={}, no_output=False):
cwd=None, priority='normal', env={}, no_output=False, heartbeat=None):
ans = {'result':None, 'stdout_stderr':None}
@ -108,7 +110,7 @@ def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds
w(cwd=cwd, priority=priority)
try:
communicate(ans, w, listener, (mod_name, func_name, args, kwargs),
timeout=timeout)
timeout=timeout, heartbeat=heartbeat)
finally:
t = Thread(target=w.kill)
t.daemon=True