Start implementation of worker for completion

This commit is contained in:
Kovid Goyal 2014-10-19 08:40:49 +05:30
parent a145377220
commit bb4fe765b6
4 changed files with 107 additions and 4 deletions

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,102 @@
#!/usr/bin/env python
# vim:fileencoding=utf-8
from __future__ import (unicode_literals, division, absolute_import,
print_function)
__license__ = 'GPL v3'
__copyright__ = '2014, Kovid Goyal <kovid at kovidgoyal.net>'
import cPickle, os, sys
from threading import Thread, Event
from Queue import Queue
from contextlib import closing
from calibre.constants import iswindows
from calibre.utils.ipc import eintr_retry_call
class CompletionWorker(Thread):
daemon = True
def __init__(self, worker_entry_point='main'):
Thread.__init__(self)
self.worker_entry_point = worker_entry_point
self.start()
self.main_queue = Queue()
self.reap_thread = None
self.shutting_down = False
self.connected = Event()
def launch_worker_process(self):
from calibre.utils.ipc.server import create_listener
from calibre.utils.ipc.simple_worker import start_pipe_worker
self.worker_process = p = start_pipe_worker(
'from {0} import run_main, {1}; run_main({1})'.format(self.__class__.__module__, self.worker_entry_point), stdout=None)
auth_key = os.urandom(32)
address, self.listener = create_listener(auth_key)
eintr_retry_call(p.stdin.write, cPickle.dumps((address, auth_key), -1))
p.stdin.flush(), p.stdin.close()
self.control_conn = eintr_retry_call(self.listener.accept)
self.data_conn = eintr_retry_call(self.listener.accept)
self.connected.set()
def send(self, data, conn=None):
conn = conn or self.control_conn
try:
eintr_retry_call(conn.send, data)
except:
if not self.shutting_down:
raise
def recv(self, conn=None):
conn = conn or self.control_conn
try:
return eintr_retry_call(conn.recv)
except:
if not self.shutting_down:
raise
def wait_for_connection(self, timeout=None):
self.connected.wait(timeout)
def run(self):
self.launch_worker_process()
while True:
obj = self.main_queue.get()
if obj is None:
break
def shutdown(self):
self.shutting_down = True
self.main_queue.put(None)
p = self.worker_process
if p.returncode is None:
self.worker_process.terminate()
t = self.reap_thread = Thread(target=p.wait)
t.daemon = True
t.start()
def join(self, timeout=0.2):
if self.reap_thread is not None:
self.reap_thread.join(timeout)
if not iswindows and self.worker_process.returncode is None:
self.worker_process.kill()
return self.worker_process.returncode
def run_main(func):
from multiprocessing.connection import Client
address, key = cPickle.loads(eintr_retry_call(sys.stdin.read))
with closing(Client(address, authkey=key)) as control_conn, closing(Client(address, authkey=key)) as data_conn:
func(control_conn, data_conn)
def test_main(control_conn, data_conn):
obj = control_conn.recv()
control_conn.send(obj)
def test():
w = CompletionWorker(worker_entry_point='test_main')
w.wait_for_connection()
w.send('Hello World!')
print (w.recv())
w.shutdown(), w.join()

View File

@ -134,11 +134,12 @@ def create_worker(env, priority='normal', cwd=None, func='main'):
w(cwd=cwd, priority=priority)
return listener, w
def start_pipe_worker(command, env=None, priority='normal'):
import subprocess, atexit
def start_pipe_worker(command, env=None, priority='normal', **process_args):
import subprocess
from functools import partial
w = Worker(env or {})
args = {'stdout':subprocess.PIPE, 'stdin':subprocess.PIPE, 'env':w.env}
args.update(process_args)
if iswindows:
import win32process
priority = {
@ -157,7 +158,6 @@ def start_pipe_worker(command, env=None, priority='normal'):
args['close_fds'] = True
p = subprocess.Popen([w.executable, '--pipe-worker', command], **args)
atexit.register(w.kill)
return p
def fork_job(mod_name, func_name, args=(), kwargs={}, timeout=300, # seconds

View File

@ -162,7 +162,7 @@ def main():
# Close open file descriptors inherited from parent
# On Unix this is done by the subprocess module
os.closerange(3, 256)
if isosx and 'CALIBRE_WORKER_ADDRESS' not in os.environ:
if isosx and 'CALIBRE_WORKER_ADDRESS' not in os.environ and '--pipe-worker' not in sys.argv:
# On some OS X computers launchd apparently tries to
# launch the last run process from the bundle
# so launch the gui as usual