This commit is contained in:
Kovid Goyal 2011-06-24 16:17:14 -06:00
parent c578fc05a5
commit f92aa96a4e

View File

@ -7,15 +7,82 @@ __license__ = 'GPL v3'
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>' __copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en' __docformat__ = 'restructuredtext en'
import os import os, cPickle, struct
from threading import Thread from threading import Thread
from Queue import Queue, Empty
from multiprocessing.connection import arbitrary_address, Listener from multiprocessing.connection import arbitrary_address, Listener
from functools import partial
from calibre.constants import iswindows from calibre import as_unicode, prints
from calibre.constants import iswindows, DEBUG
def _encode(msg):
raw = cPickle.dumps(msg, -1)
size = len(raw)
header = struct.pack('!Q', size)
return header + raw
def _decode(raw):
sz = struct.calcsize('!Q')
if len(raw) < sz:
return 'invalid', None
header, = struct.unpack('!Q', raw[:sz])
if len(raw) != sz + header or header == 0:
return 'invalid', None
return cPickle.loads(raw[sz:])
class Writer(Thread):
TIMEOUT = 60 #seconds
def __init__(self, conn):
Thread.__init__(self)
self.daemon = True
self.dataq, self.resultq = Queue(), Queue()
self.conn = conn
self.start()
self.data_written = False
def close(self):
self.dataq.put(None)
def flush(self):
pass
def write(self, raw_data):
self.dataq.put(raw_data)
try:
ex = self.resultq.get(True, self.TIMEOUT)
except Empty:
raise IOError('Writing to socket timed out')
else:
if ex is not None:
raise IOError('Writing to socket failed with error: %s' % ex)
def run(self):
while True:
x = self.dataq.get()
if x is None:
break
try:
self.data_written = True
self.conn.send_bytes(x)
except Exception as e:
self.resultq.put(as_unicode(e))
else:
self.resultq.put(None)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class Server(Thread): class Server(Thread):
def __init__(self): def __init__(self, dispatcher):
Thread.__init__(self) Thread.__init__(self)
self.daemon = True self.daemon = True
@ -27,6 +94,13 @@ class Server(Thread):
authkey=self.auth_key, backlog=4) authkey=self.auth_key, backlog=4)
self.keep_going = True self.keep_going = True
self.dispatcher = dispatcher
@property
def connection_information(self):
if not self.is_alive():
self.start()
return (self.address, self.auth_key)
def stop(self): def stop(self):
self.keep_going = False self.keep_going = False
@ -43,4 +117,49 @@ class Server(Thread):
except: except:
pass pass
def handle_client(self, conn):
t = Thread(target=partial(self._handle_client, conn))
t.daemon = True
t.start()
def _handle_client(self, conn):
while True:
try:
func_name, args, kwargs = conn.recv()
except EOFError:
try:
conn.close()
except:
pass
return
else:
try:
self.call_func(func_name, args, kwargs, conn)
except:
try:
conn.close()
except:
pass
prints('Proxy function: %s with args: %r and'
' kwargs: %r failed')
if DEBUG:
import traceback
traceback.print_exc()
break
def call_func(self, func_name, args, kwargs, conn):
with Writer(conn) as f:
try:
self.dispatcher(f, func_name, args, kwargs)
except Exception as e:
if not f.data_written:
import traceback
# Try to tell the client process what error happened
try:
conn.send_bytes(_encode(('failed', (unicode(e),
as_unicode(traceback.format_exc())))))
except:
pass
raise