diff --git a/src/calibre/utils/ipc/simple_worker.py b/src/calibre/utils/ipc/simple_worker.py index 8454986360..4eec56994a 100644 --- a/src/calibre/utils/ipc/simple_worker.py +++ b/src/calibre/utils/ipc/simple_worker.py @@ -62,16 +62,18 @@ class ConnectedWorker(Thread): except: self.tb = traceback.format_exc() -def communicate(ans, worker, listener, args, timeout=300): +def communicate(ans, worker, listener, args, timeout=300, heartbeat=None): cw = ConnectedWorker(listener, args) cw.start() st = time.time() + check_heartbeat = callable(heartbeat) while worker.is_alive and cw.is_alive(): cw.join(0.01) delta = time.time() - st if not cw.accepted and delta > min(10, timeout): break - if delta > timeout: + hung = not heartbeat() if check_heartbeat else delta > timeout + if hung: raise WorkerError('Worker appears to have hung') if not cw.accepted: if not cw.tb: @@ -85,7 +87,7 @@ def communicate(ans, worker, listener, args, timeout=300): ans['result'] = cw.res['result'] def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds - cwd=None, priority='normal', env={}, no_output=False): + cwd=None, priority='normal', env={}, no_output=False, heartbeat=None): ans = {'result':None, 'stdout_stderr':None} @@ -108,7 +110,7 @@ def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds w(cwd=cwd, priority=priority) try: communicate(ans, w, listener, (mod_name, func_name, args, kwargs), - timeout=timeout) + timeout=timeout, heartbeat=heartbeat) finally: t = Thread(target=w.kill) t.daemon=True