From 0822f1b6fa8624f7e1aca2dca1ca9ae26132cae8 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sun, 20 Oct 2019 18:38:48 +0530 Subject: [PATCH] Allow the simple worker process to be launched first and run later --- src/calibre/utils/ipc/simple_worker.py | 65 ++++++++++++++++---------- 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/src/calibre/utils/ipc/simple_worker.py b/src/calibre/utils/ipc/simple_worker.py index de54b104d2..1789485bcc 100644 --- a/src/calibre/utils/ipc/simple_worker.py +++ b/src/calibre/utils/ipc/simple_worker.py @@ -169,7 +169,42 @@ def start_pipe_worker(command, env=None, priority='normal', **process_args): return p -def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds +def two_part_fork_job(env=None, priority='normal', cwd=None): + env = env or {} + listener, w = create_worker(env, priority, cwd) + + def run_job( + mod_name, func_name, args=(), kwargs=None, timeout=300, # seconds + no_output=False, heartbeat=None, abort=None, module_is_source_code=False + ): + ans = {'result':None, 'stdout_stderr':None} + kwargs = kwargs or {} + try: + communicate(ans, w, listener, (mod_name, func_name, args, kwargs, + module_is_source_code), timeout=timeout, heartbeat=heartbeat, + abort=abort) + except WorkerError as e: + if not no_output: + e.log_path = w.log_path + raise + finally: + t = Thread(target=w.kill) + t.daemon=True + t.start() + if no_output: + try: + os.remove(w.log_path) + except: + pass + if not no_output: + ans['stdout_stderr'] = w.log_path + return ans + run_job.worker = w + + return run_job + + +def fork_job(mod_name, func_name, args=(), kwargs=None, timeout=300, # seconds cwd=None, priority='normal', env={}, no_output=False, heartbeat=None, abort=None, module_is_source_code=False): ''' @@ -220,29 +255,11 @@ def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds path to a file that contains the stdout and stderr of the worker process. If you set no_output=True, then this will not be present. ''' - - ans = {'result':None, 'stdout_stderr':None} - listener, w = create_worker(env, priority, cwd) - try: - communicate(ans, w, listener, (mod_name, func_name, args, kwargs, - module_is_source_code), timeout=timeout, heartbeat=heartbeat, - abort=abort) - except WorkerError as e: - if not no_output: - e.log_path = w.log_path - raise - finally: - t = Thread(target=w.kill) - t.daemon=True - t.start() - if no_output: - try: - os.remove(w.log_path) - except: - pass - if not no_output: - ans['stdout_stderr'] = w.log_path - return ans + return two_part_fork_job(env, priority, cwd)( + mod_name, func_name, args=args, kwargs=kwargs, timeout=timeout, + no_output=no_output, heartbeat=heartbeat, abort=abort, + module_is_source_code=module_is_source_code + ) def offload_worker(env={}, priority='normal', cwd=None):