diff --git a/src/calibre/gui2/tweak_book/completion/__init__.py b/src/calibre/gui2/tweak_book/completion/__init__.py new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/src/calibre/gui2/tweak_book/completion/__init__.py @@ -0,0 +1 @@ + diff --git a/src/calibre/gui2/tweak_book/completion/worker.py b/src/calibre/gui2/tweak_book/completion/worker.py new file mode 100644 index 0000000000..e05e8ebd8b --- /dev/null +++ b/src/calibre/gui2/tweak_book/completion/worker.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +# vim:fileencoding=utf-8 +from __future__ import (unicode_literals, division, absolute_import, + print_function) + +__license__ = 'GPL v3' +__copyright__ = '2014, Kovid Goyal ' + +import cPickle, os, sys +from threading import Thread, Event +from Queue import Queue +from contextlib import closing + +from calibre.constants import iswindows +from calibre.utils.ipc import eintr_retry_call + +class CompletionWorker(Thread): + + daemon = True + + def __init__(self, worker_entry_point='main'): + Thread.__init__(self) + self.worker_entry_point = worker_entry_point + self.start() + self.main_queue = Queue() + self.reap_thread = None + self.shutting_down = False + self.connected = Event() + + def launch_worker_process(self): + from calibre.utils.ipc.server import create_listener + from calibre.utils.ipc.simple_worker import start_pipe_worker + self.worker_process = p = start_pipe_worker( + 'from {0} import run_main, {1}; run_main({1})'.format(self.__class__.__module__, self.worker_entry_point), stdout=None) + auth_key = os.urandom(32) + address, self.listener = create_listener(auth_key) + eintr_retry_call(p.stdin.write, cPickle.dumps((address, auth_key), -1)) + p.stdin.flush(), p.stdin.close() + self.control_conn = eintr_retry_call(self.listener.accept) + self.data_conn = eintr_retry_call(self.listener.accept) + self.connected.set() + + def send(self, data, conn=None): + conn = conn or self.control_conn + try: + eintr_retry_call(conn.send, data) + except: + if not self.shutting_down: + raise + + def recv(self, conn=None): + conn = conn or self.control_conn + try: + return eintr_retry_call(conn.recv) + except: + if not self.shutting_down: + raise + + def wait_for_connection(self, timeout=None): + self.connected.wait(timeout) + + def run(self): + self.launch_worker_process() + while True: + obj = self.main_queue.get() + if obj is None: + break + + def shutdown(self): + self.shutting_down = True + self.main_queue.put(None) + p = self.worker_process + if p.returncode is None: + self.worker_process.terminate() + t = self.reap_thread = Thread(target=p.wait) + t.daemon = True + t.start() + + def join(self, timeout=0.2): + if self.reap_thread is not None: + self.reap_thread.join(timeout) + if not iswindows and self.worker_process.returncode is None: + self.worker_process.kill() + return self.worker_process.returncode + + +def run_main(func): + from multiprocessing.connection import Client + address, key = cPickle.loads(eintr_retry_call(sys.stdin.read)) + with closing(Client(address, authkey=key)) as control_conn, closing(Client(address, authkey=key)) as data_conn: + func(control_conn, data_conn) + +def test_main(control_conn, data_conn): + obj = control_conn.recv() + control_conn.send(obj) + +def test(): + w = CompletionWorker(worker_entry_point='test_main') + w.wait_for_connection() + w.send('Hello World!') + print (w.recv()) + w.shutdown(), w.join() diff --git a/src/calibre/utils/ipc/simple_worker.py b/src/calibre/utils/ipc/simple_worker.py index d06550cdce..0a0e0cbcaf 100644 --- a/src/calibre/utils/ipc/simple_worker.py +++ b/src/calibre/utils/ipc/simple_worker.py @@ -134,11 +134,12 @@ def create_worker(env, priority='normal', cwd=None, func='main'): w(cwd=cwd, priority=priority) return listener, w -def start_pipe_worker(command, env=None, priority='normal'): - import subprocess, atexit +def start_pipe_worker(command, env=None, priority='normal', **process_args): + import subprocess from functools import partial w = Worker(env or {}) args = {'stdout':subprocess.PIPE, 'stdin':subprocess.PIPE, 'env':w.env} + args.update(process_args) if iswindows: import win32process priority = { @@ -157,7 +158,6 @@ def start_pipe_worker(command, env=None, priority='normal'): args['close_fds'] = True p = subprocess.Popen([w.executable, '--pipe-worker', command], **args) - atexit.register(w.kill) return p def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds diff --git a/src/calibre/utils/ipc/worker.py b/src/calibre/utils/ipc/worker.py index cc4a2a73fa..28371d8000 100644 --- a/src/calibre/utils/ipc/worker.py +++ b/src/calibre/utils/ipc/worker.py @@ -162,7 +162,7 @@ def main(): # Close open file descriptors inherited from parent # On Unix this is done by the subprocess module os.closerange(3, 256) - if isosx and 'CALIBRE_WORKER_ADDRESS' not in os.environ: + if isosx and 'CALIBRE_WORKER_ADDRESS' not in os.environ and '--pipe-worker' not in sys.argv: # On some OS X computers launchd apparently tries to # launch the last run process from the bundle # so launch the gui as usual