Refactor simple worker creation

This commit is contained in:
Kovid Goyal 2013-11-05 15:40:21 +05:30
parent 7a7cacc920
commit a09214d507

View File

@ -18,6 +18,7 @@ from calibre.utils.ipc import eintr_retry_call
from calibre.utils.ipc.launch import Worker
class WorkerError(Exception):
def __init__(self, msg, orig_tb=''):
Exception.__init__(self, msg)
self.orig_tb = orig_tb
@ -83,7 +84,27 @@ def communicate(ans, worker, listener, args, timeout=300, heartbeat=None,
raise WorkerError('Worker failed', cw.res['tb'])
ans['result'] = cw.res['result']
def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds
def create_worker(env, priority='normal', cwd=None, func='main'):
address = arbitrary_address('AF_PIPE' if iswindows else 'AF_UNIX')
if iswindows and address[1] == ':':
address = address[2:]
auth_key = os.urandom(32)
listener = Listener(address=address, authkey=auth_key)
env = dict(env)
env.update({
'CALIBRE_WORKER_ADDRESS' :
hexlify(cPickle.dumps(listener.address, -1)),
'CALIBRE_WORKER_KEY' : hexlify(auth_key),
'CALIBRE_SIMPLE_WORKER':
'calibre.utils.ipc.simple_worker:%s' % func,
})
w = Worker(env)
w(cwd=cwd, priority=priority)
return listener, w
def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds
cwd=None, priority='normal', env={}, no_output=False, heartbeat=None,
abort=None, module_is_source_code=False):
'''
@ -136,24 +157,7 @@ def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds
'''
ans = {'result':None, 'stdout_stderr':None}
address = arbitrary_address('AF_PIPE' if iswindows else 'AF_UNIX')
if iswindows and address[1] == ':':
address = address[2:]
auth_key = os.urandom(32)
listener = Listener(address=address, authkey=auth_key)
env = dict(env)
env.update({
'CALIBRE_WORKER_ADDRESS' :
hexlify(cPickle.dumps(listener.address, -1)),
'CALIBRE_WORKER_KEY' : hexlify(auth_key),
'CALIBRE_SIMPLE_WORKER':
'calibre.utils.ipc.simple_worker:main',
})
w = Worker(env)
w(cwd=cwd, priority=priority)
listener, w = create_worker(env, priority, cwd)
try:
communicate(ans, w, listener, (mod_name, func_name, args, kwargs,
module_is_source_code), timeout=timeout, heartbeat=heartbeat,
@ -197,14 +201,14 @@ def main():
try:
mod, func, args, kwargs, module_is_source_code = args
if module_is_source_code:
importlib.import_module('calibre.customize.ui') # Load plugins
importlib.import_module('calibre.customize.ui') # Load plugins
mod = compile_code(mod)
func = mod[func]
else:
try:
mod = importlib.import_module(mod)
except ImportError:
importlib.import_module('calibre.customize.ui') # Load plugins
importlib.import_module('calibre.customize.ui') # Load plugins
mod = importlib.import_module(mod)
func = getattr(mod, func)
res = {'result':func(*args, **kwargs)}