Allow the simple worker process to be launched first and run later

This commit is contained in:
Kovid Goyal 2019-10-20 18:38:48 +05:30
parent c0fa79d58b
commit 0822f1b6fa
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -169,7 +169,42 @@ def start_pipe_worker(command, env=None, priority='normal', **process_args):
return p return p
def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds def two_part_fork_job(env=None, priority='normal', cwd=None):
env = env or {}
listener, w = create_worker(env, priority, cwd)
def run_job(
mod_name, func_name, args=(), kwargs=None, timeout=300, # seconds
no_output=False, heartbeat=None, abort=None, module_is_source_code=False
):
ans = {'result':None, 'stdout_stderr':None}
kwargs = kwargs or {}
try:
communicate(ans, w, listener, (mod_name, func_name, args, kwargs,
module_is_source_code), timeout=timeout, heartbeat=heartbeat,
abort=abort)
except WorkerError as e:
if not no_output:
e.log_path = w.log_path
raise
finally:
t = Thread(target=w.kill)
t.daemon=True
t.start()
if no_output:
try:
os.remove(w.log_path)
except:
pass
if not no_output:
ans['stdout_stderr'] = w.log_path
return ans
run_job.worker = w
return run_job
def fork_job(mod_name, func_name, args=(), kwargs=None, timeout=300, # seconds
cwd=None, priority='normal', env={}, no_output=False, heartbeat=None, cwd=None, priority='normal', env={}, no_output=False, heartbeat=None,
abort=None, module_is_source_code=False): abort=None, module_is_source_code=False):
''' '''
@ -220,29 +255,11 @@ def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds
path to a file that contains the stdout and stderr of the worker process. path to a file that contains the stdout and stderr of the worker process.
If you set no_output=True, then this will not be present. If you set no_output=True, then this will not be present.
''' '''
return two_part_fork_job(env, priority, cwd)(
ans = {'result':None, 'stdout_stderr':None} mod_name, func_name, args=args, kwargs=kwargs, timeout=timeout,
listener, w = create_worker(env, priority, cwd) no_output=no_output, heartbeat=heartbeat, abort=abort,
try: module_is_source_code=module_is_source_code
communicate(ans, w, listener, (mod_name, func_name, args, kwargs, )
module_is_source_code), timeout=timeout, heartbeat=heartbeat,
abort=abort)
except WorkerError as e:
if not no_output:
e.log_path = w.log_path
raise
finally:
t = Thread(target=w.kill)
t.daemon=True
t.start()
if no_output:
try:
os.remove(w.log_path)
except:
pass
if not no_output:
ans['stdout_stderr'] = w.log_path
return ans
def offload_worker(env={}, priority='normal', cwd=None): def offload_worker(env={}, priority='normal', cwd=None):