diff --git a/src/calibre/utils/ipc/simple_worker.py b/src/calibre/utils/ipc/simple_worker.py index 5a89b91461..68d879f8a8 100644 --- a/src/calibre/utils/ipc/simple_worker.py +++ b/src/calibre/utils/ipc/simple_worker.py @@ -94,7 +94,7 @@ def communicate(ans, worker, listener, args, timeout=300, heartbeat=None, def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds cwd=None, priority='normal', env={}, no_output=False, heartbeat=None, - abort=None): + abort=None, module_is_source_code=False): ''' Run a job in a worker process. A job is simply a function that will be called with the supplied arguments, in the worker process. @@ -133,6 +133,11 @@ def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds :param abort: If not None, it must be an Event. As soon as abort.is_set() returns True, the worker process is killed. No error is raised. + :param module_is_source_code: If True, the ``mod`` is treated as python + source rather than a module name to import. The source is executed as a + module. Useful if you want to use fork_job from within a script to run some + dynamically generated python. + :return: A dictionary with the keys result and stdout_stderr. result is the return value of the function (it must be picklable). stdout_stderr is the path to a file that contains the stdout and stderr of the worker process. @@ -159,8 +164,9 @@ def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds w = Worker(env) w(cwd=cwd, priority=priority) try: - communicate(ans, w, listener, (mod_name, func_name, args, kwargs), - timeout=timeout, heartbeat=heartbeat, abort=abort) + communicate(ans, w, listener, (mod_name, func_name, args, kwargs, + module_is_source_code), timeout=timeout, heartbeat=heartbeat, + abort=abort) finally: t = Thread(target=w.kill) t.daemon=True @@ -174,6 +180,23 @@ def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds ans['stdout_stderr'] = w.log_path return ans +def compile_code(src): + import re, io + if not isinstance(src, unicode): + match = re.search(r'coding[:=]\s*([-\w.]+)', src[:200]) + enc = match.group(1) if match else 'utf-8' + src = src.decode(enc) + # Python complains if there is a coding declaration in a unicode string + src = re.sub(r'^#.*coding\s*[:=]\s*([-\w.]+)', '#', src, flags=re.MULTILINE) + # Translate newlines to \n + src = io.StringIO(src, newline=None).getvalue() + + namespace = { + 'time':time, 're':re, 'os':os, 'io':io, + } + exec src in namespace + return namespace + def main(): # The entry point for the simple worker process address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS'])) @@ -185,15 +208,18 @@ def main(): # Maybe EINTR args = conn.recv() try: - mod, func, args, kwargs = args - try: - mod = importlib.import_module(mod) - except ImportError: - # Load plugins incase fork_job() is being used in a plugin - import calibre.customize.ui as u - u - mod = importlib.import_module(mod) - func = getattr(mod, func) + mod, func, args, kwargs, module_is_source_code = args + if module_is_source_code: + importlib.import_module('calibre.customize.ui') # Load plugins + mod = compile_code(mod) + func = mod[func] + else: + try: + mod = importlib.import_module(mod) + except ImportError: + importlib.import_module('calibre.customize.ui') # Load plugins + mod = importlib.import_module(mod) + func = getattr(mod, func) res = {'result':func(*args, **kwargs)} except: res = {'tb': traceback.format_exc()}