A simple worker to run function calls in a separate process

This commit is contained in:
Kovid Goyal 2013-11-05 16:09:28 +05:30
parent a09214d507
commit a05b241b30

View File

@ -52,6 +52,31 @@ class ConnectedWorker(Thread):
except BaseException:
self.tb = traceback.format_exc()
class OffloadWorker(object):
def __init__(self, listener, worker):
self.listener = listener
self.worker = worker
self.conn = None
def __call__(self, module, func, *args, **kwargs):
if self.conn is None:
self.conn = eintr_retry_call(self.listener.accept)
eintr_retry_call(self.conn.send, (module, func, args, kwargs))
return eintr_retry_call(self.conn.recv)
def shutdown(self):
try:
eintr_retry_call(self.conn.send, None)
except:
import traceback
traceback.print_exc()
finally:
self.conn = None
t = Thread(target=self.worker.kill)
t.daemon=True
t.start()
def communicate(ans, worker, listener, args, timeout=300, heartbeat=None,
abort=None):
cw = ConnectedWorker(listener, args)
@ -175,6 +200,10 @@ def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds
ans['stdout_stderr'] = w.log_path
return ans
def offload_worker(env={}, priority='normal', cwd=None):
listener, w = create_worker(env=env, priority=priority, cwd=cwd, func='offload')
return OffloadWorker(listener)
def compile_code(src):
import re, io
if not isinstance(src, unicode):
@ -221,5 +250,31 @@ def main():
# Maybe EINTR
conn.send(res)
def offload():
# The entry point for the offload worker process
address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS']))
key = unhexlify(os.environ['CALIBRE_WORKER_KEY'])
func_cache = {}
with closing(Client(address, authkey=key)) as conn:
while True:
args = eintr_retry_call(conn.recv)
if args is None:
break
res = {'result':None, 'tb':None}
try:
mod, func, args, kwargs = args
f = func_cache.get((mod, func), None)
if f is None:
try:
m = importlib.import_module(mod)
except ImportError:
importlib.import_module('calibre.customize.ui') # Load plugins
m = importlib.import_module(mod)
func_cache[(mod, func)] = f = getattr(m, func)
res['result'] = f(*args, **kwargs)
except:
import traceback
res['tb'] = traceback.format_exc()
eintr_retry_call(conn.send, res)