From a09214d507387d71c1f114118745f568d7793ac7 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Tue, 5 Nov 2013 15:40:21 +0530 Subject: [PATCH] Refactor simple worker creation --- src/calibre/utils/ipc/simple_worker.py | 46 ++++++++++++++------------ 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/src/calibre/utils/ipc/simple_worker.py b/src/calibre/utils/ipc/simple_worker.py index a8253a40e3..3f20799a86 100644 --- a/src/calibre/utils/ipc/simple_worker.py +++ b/src/calibre/utils/ipc/simple_worker.py @@ -18,6 +18,7 @@ from calibre.utils.ipc import eintr_retry_call from calibre.utils.ipc.launch import Worker class WorkerError(Exception): + def __init__(self, msg, orig_tb=''): Exception.__init__(self, msg) self.orig_tb = orig_tb @@ -83,7 +84,27 @@ def communicate(ans, worker, listener, args, timeout=300, heartbeat=None, raise WorkerError('Worker failed', cw.res['tb']) ans['result'] = cw.res['result'] -def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds +def create_worker(env, priority='normal', cwd=None, func='main'): + address = arbitrary_address('AF_PIPE' if iswindows else 'AF_UNIX') + if iswindows and address[1] == ':': + address = address[2:] + auth_key = os.urandom(32) + listener = Listener(address=address, authkey=auth_key) + + env = dict(env) + env.update({ + 'CALIBRE_WORKER_ADDRESS' : + hexlify(cPickle.dumps(listener.address, -1)), + 'CALIBRE_WORKER_KEY' : hexlify(auth_key), + 'CALIBRE_SIMPLE_WORKER': + 'calibre.utils.ipc.simple_worker:%s' % func, + }) + + w = Worker(env) + w(cwd=cwd, priority=priority) + return listener, w + +def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds cwd=None, priority='normal', env={}, no_output=False, heartbeat=None, abort=None, module_is_source_code=False): ''' @@ -136,24 +157,7 @@ def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds ''' ans = {'result':None, 'stdout_stderr':None} - - address = arbitrary_address('AF_PIPE' if iswindows else 'AF_UNIX') - if iswindows and address[1] == ':': - address = address[2:] - auth_key = os.urandom(32) - listener = Listener(address=address, authkey=auth_key) - - env = dict(env) - env.update({ - 'CALIBRE_WORKER_ADDRESS' : - hexlify(cPickle.dumps(listener.address, -1)), - 'CALIBRE_WORKER_KEY' : hexlify(auth_key), - 'CALIBRE_SIMPLE_WORKER': - 'calibre.utils.ipc.simple_worker:main', - }) - - w = Worker(env) - w(cwd=cwd, priority=priority) + listener, w = create_worker(env, priority, cwd) try: communicate(ans, w, listener, (mod_name, func_name, args, kwargs, module_is_source_code), timeout=timeout, heartbeat=heartbeat, @@ -197,14 +201,14 @@ def main(): try: mod, func, args, kwargs, module_is_source_code = args if module_is_source_code: - importlib.import_module('calibre.customize.ui') # Load plugins + importlib.import_module('calibre.customize.ui') # Load plugins mod = compile_code(mod) func = mod[func] else: try: mod = importlib.import_module(mod) except ImportError: - importlib.import_module('calibre.customize.ui') # Load plugins + importlib.import_module('calibre.customize.ui') # Load plugins mod = importlib.import_module(mod) func = getattr(mod, func) res = {'result':func(*args, **kwargs)}