diff --git a/src/calibre/utils/ipc/proxy.py b/src/calibre/utils/ipc/proxy.py index b60f9eb755..3b037a42a6 100644 --- a/src/calibre/utils/ipc/proxy.py +++ b/src/calibre/utils/ipc/proxy.py @@ -7,15 +7,82 @@ __license__ = 'GPL v3' __copyright__ = '2011, Kovid Goyal ' __docformat__ = 'restructuredtext en' -import os +import os, cPickle, struct from threading import Thread +from Queue import Queue, Empty from multiprocessing.connection import arbitrary_address, Listener +from functools import partial -from calibre.constants import iswindows +from calibre import as_unicode, prints +from calibre.constants import iswindows, DEBUG + +def _encode(msg): + raw = cPickle.dumps(msg, -1) + size = len(raw) + header = struct.pack('!Q', size) + return header + raw + +def _decode(raw): + sz = struct.calcsize('!Q') + if len(raw) < sz: + return 'invalid', None + header, = struct.unpack('!Q', raw[:sz]) + if len(raw) != sz + header or header == 0: + return 'invalid', None + return cPickle.loads(raw[sz:]) + + +class Writer(Thread): + + TIMEOUT = 60 #seconds + + def __init__(self, conn): + Thread.__init__(self) + self.daemon = True + self.dataq, self.resultq = Queue(), Queue() + self.conn = conn + self.start() + self.data_written = False + + def close(self): + self.dataq.put(None) + + def flush(self): + pass + + def write(self, raw_data): + self.dataq.put(raw_data) + + try: + ex = self.resultq.get(True, self.TIMEOUT) + except Empty: + raise IOError('Writing to socket timed out') + else: + if ex is not None: + raise IOError('Writing to socket failed with error: %s' % ex) + + def run(self): + while True: + x = self.dataq.get() + if x is None: + break + try: + self.data_written = True + self.conn.send_bytes(x) + except Exception as e: + self.resultq.put(as_unicode(e)) + else: + self.resultq.put(None) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() class Server(Thread): - def __init__(self): + def __init__(self, dispatcher): Thread.__init__(self) self.daemon = True @@ -27,6 +94,13 @@ class Server(Thread): authkey=self.auth_key, backlog=4) self.keep_going = True + self.dispatcher = dispatcher + + @property + def connection_information(self): + if not self.is_alive(): + self.start() + return (self.address, self.auth_key) def stop(self): self.keep_going = False @@ -43,4 +117,49 @@ class Server(Thread): except: pass + def handle_client(self, conn): + t = Thread(target=partial(self._handle_client, conn)) + t.daemon = True + t.start() + + def _handle_client(self, conn): + while True: + try: + func_name, args, kwargs = conn.recv() + except EOFError: + try: + conn.close() + except: + pass + return + else: + try: + self.call_func(func_name, args, kwargs, conn) + except: + try: + conn.close() + except: + pass + prints('Proxy function: %s with args: %r and' + ' kwargs: %r failed') + if DEBUG: + import traceback + traceback.print_exc() + break + + def call_func(self, func_name, args, kwargs, conn): + with Writer(conn) as f: + try: + self.dispatcher(f, func_name, args, kwargs) + except Exception as e: + if not f.data_written: + import traceback + # Try to tell the client process what error happened + try: + conn.send_bytes(_encode(('failed', (unicode(e), + as_unicode(traceback.format_exc()))))) + except: + pass + raise +