From 8dae598304375e71ac4b09f7f2d36692b35065ef Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Thu, 1 Mar 2012 11:35:00 +0530 Subject: [PATCH] allow aborting the the simple worker and document fork_job() --- src/calibre/utils/ipc/simple_worker.py | 55 ++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 3 deletions(-) diff --git a/src/calibre/utils/ipc/simple_worker.py b/src/calibre/utils/ipc/simple_worker.py index 4eec56994a..c18d556aa3 100644 --- a/src/calibre/utils/ipc/simple_worker.py +++ b/src/calibre/utils/ipc/simple_worker.py @@ -62,11 +62,13 @@ class ConnectedWorker(Thread): except: self.tb = traceback.format_exc() -def communicate(ans, worker, listener, args, timeout=300, heartbeat=None): +def communicate(ans, worker, listener, args, timeout=300, heartbeat=None, + abort=None): cw = ConnectedWorker(listener, args) cw.start() st = time.time() check_heartbeat = callable(heartbeat) + while worker.is_alive and cw.is_alive(): cw.join(0.01) delta = time.time() - st @@ -75,6 +77,10 @@ def communicate(ans, worker, listener, args, timeout=300, heartbeat=None): hung = not heartbeat() if check_heartbeat else delta > timeout if hung: raise WorkerError('Worker appears to have hung') + if abort is not None and abort.is_set(): + # The worker process will be killed by fork_job, after we return + return + if not cw.accepted: if not cw.tb: raise WorkerError('Failed to connect to worker process') @@ -87,7 +93,50 @@ def communicate(ans, worker, listener, args, timeout=300, heartbeat=None): ans['result'] = cw.res['result'] def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds - cwd=None, priority='normal', env={}, no_output=False, heartbeat=None): + cwd=None, priority='normal', env={}, no_output=False, heartbeat=None, + abort=None): + ''' + Run a job in a worker process. A job is simply a function that will be + called with the supplied arguments, in the worker process. + The result of the function will be returned. + If an error occurs a WorkerError is raised. + + :param mod_name: Module to import in the worker process + + :param func_name: Function to call in the worker process from the imported + module + + :param args: Positional arguments to pass to the function + + :param kwargs: Keyword arguments to pass to the function + + :param timeout: The time in seconds to wait for the worker process to + complete. If it takes longer a WorkerError is raised and the process is + killed. + + :param cwd: The working directory for the worker process. + + :param priority: The process priority for the worker process + + :param env: Extra environment variables to set for the worker process + + :param no_output: If True, the stdout and stderr of the worker process are + discarded + + :param heartbeat: If not None, it is used to check if the worker has hung, + instead of a simple timeout. It must be a callable that takes no + arguments and returns True or False. The worker will be assumed to have + hung if this function returns False. At that point, the process will be + killed and a WorkerError will be raised. + + :param abort: If not None, it must be an Event. As soon as abort.is_set() + returns True, the worker process is killed. No error is raised. + + :return: A dictionary with the keys result and stdout_stderr. result is the + return value of the function (it must be picklable). stdout_stderr is the + path to a file that contains the stdout and stderr of the worker process. + If you set no_output=True, then this will not be present. + ''' ans = {'result':None, 'stdout_stderr':None} @@ -110,7 +159,7 @@ def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds w(cwd=cwd, priority=priority) try: communicate(ans, w, listener, (mod_name, func_name, args, kwargs), - timeout=timeout, heartbeat=heartbeat) + timeout=timeout, heartbeat=heartbeat, abort=abort) finally: t = Thread(target=w.kill) t.daemon=True