From a05b241b306fcb064de66324b155d7c005cf7479 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Tue, 5 Nov 2013 16:09:28 +0530 Subject: [PATCH] A simple worker to run function calls in a separate process --- src/calibre/utils/ipc/simple_worker.py | 55 ++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/src/calibre/utils/ipc/simple_worker.py b/src/calibre/utils/ipc/simple_worker.py index 3f20799a86..6a269754b4 100644 --- a/src/calibre/utils/ipc/simple_worker.py +++ b/src/calibre/utils/ipc/simple_worker.py @@ -52,6 +52,31 @@ class ConnectedWorker(Thread): except BaseException: self.tb = traceback.format_exc() +class OffloadWorker(object): + + def __init__(self, listener, worker): + self.listener = listener + self.worker = worker + self.conn = None + + def __call__(self, module, func, *args, **kwargs): + if self.conn is None: + self.conn = eintr_retry_call(self.listener.accept) + eintr_retry_call(self.conn.send, (module, func, args, kwargs)) + return eintr_retry_call(self.conn.recv) + + def shutdown(self): + try: + eintr_retry_call(self.conn.send, None) + except: + import traceback + traceback.print_exc() + finally: + self.conn = None + t = Thread(target=self.worker.kill) + t.daemon=True + t.start() + def communicate(ans, worker, listener, args, timeout=300, heartbeat=None, abort=None): cw = ConnectedWorker(listener, args) @@ -175,6 +200,10 @@ def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds ans['stdout_stderr'] = w.log_path return ans +def offload_worker(env={}, priority='normal', cwd=None): + listener, w = create_worker(env=env, priority=priority, cwd=cwd, func='offload') + return OffloadWorker(listener) + def compile_code(src): import re, io if not isinstance(src, unicode): @@ -221,5 +250,31 @@ def main(): # Maybe EINTR conn.send(res) +def offload(): + # The entry point for the offload worker process + address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS'])) + key = unhexlify(os.environ['CALIBRE_WORKER_KEY']) + func_cache = {} + with closing(Client(address, authkey=key)) as conn: + while True: + args = eintr_retry_call(conn.recv) + if args is None: + break + res = {'result':None, 'tb':None} + try: + mod, func, args, kwargs = args + f = func_cache.get((mod, func), None) + if f is None: + try: + m = importlib.import_module(mod) + except ImportError: + importlib.import_module('calibre.customize.ui') # Load plugins + m = importlib.import_module(mod) + func_cache[(mod, func)] = f = getattr(m, func) + res['result'] = f(*args, **kwargs) + except: + import traceback + res['tb'] = traceback.format_exc() + eintr_retry_call(conn.send, res)