diff --git a/src/calibre/srv/async.py b/src/calibre/srv/async.py new file mode 100644 index 0000000000..841775272d --- /dev/null +++ b/src/calibre/srv/async.py @@ -0,0 +1,334 @@ +#!/usr/bin/env python2 +# vim:fileencoding=utf-8 +from __future__ import (unicode_literals, division, absolute_import, + print_function) + +__license__ = 'GPL v3' +__copyright__ = '2015, Kovid Goyal ' + +import ssl, socket, select, os +from io import BytesIO + +from calibre import as_unicode +from calibre.srv.opts import Options +from calibre.srv.utils import ( + socket_errors_socket_closed, socket_errors_nonblocking) +from calibre.utils.socket_inheritance import set_socket_inherit +from calibre.utils.logging import ThreadSafeLog +from calibre.utils.monotonic import monotonic + +READ, WRITE, RDWR = 'READ', 'WRITE', 'RDWR' + +class Connection(object): + + def __init__(self, socket, opts, ssl_context): + self.opts = opts + self.ssl_context = ssl_context + self.wait_for = READ + if self.ssl_context is not None: + self.ready = False + self.socket = self.ssl_context.wrap_socket(socket, server_side=True, do_handshake_on_connect=False) + self.set_state(RDWR, self.do_ssl_handshake) + else: + self.ready = True + self.socket = socket + self.connection_ready() + self.last_activity = monotonic() + + def set_state(self, wait_for, func): + self.wait_for = wait_for + self.handle_event = func + + def do_ssl_handshake(self, event): + try: + self._sslobj.do_handshake() + except ssl.SSLWantReadError: + self.set_state(READ, self.do_ssl_handshake) + except ssl.SSLWantWriteError: + self.set_state(WRITE, self.do_ssl_handshake) + self.ready = True + self.connection_ready() + + def send(self, data): + try: + ret = self.socket.send(data) + self.last_activity = monotonic() + return ret + except socket.error as e: + if e.errno in socket_errors_nonblocking: + return 0 + elif e.errno in socket_errors_socket_closed: + self.ready = False + return 0 + raise + + def recv(self, buffer_size): + try: + data = self.socket.recv(buffer_size) + self.last_activity = monotonic() + if not data: + # a closed connection is indicated by signaling + # a read condition, and having recv() return 0. + self.ready = False + return b'' + return data + except socket.error as e: + if e.errno in socket_errors_socket_closed: + self.ready = False + return b'' + + def close(self): + self.ready = False + try: + self.socket.shutdown(socket.SHUT_WR) + self.socket.close() + except socket.error: + pass + + def connection_ready(self): + raise NotImplementedError() + +class ServerLoop(object): + + def __init__( + self, + handler, + bind_address=('localhost', 8080), + opts=None, + # A calibre logging object. If None, a default log that logs to + # stdout is used + log=None + ): + self.ready = False + self.handler = handler + self.opts = opts or Options() + self.log = log or ThreadSafeLog(level=ThreadSafeLog.DEBUG) + + ba = tuple(bind_address) + if not ba[0]: + # AI_PASSIVE does not work with host of '' or None + ba = ('0.0.0.0', ba[1]) + self.bind_address = ba + self.bound_address = None + + self.ssl_context = None + if self.opts.ssl_certfile is not None and self.opts.ssl_keyfile is not None: + self.ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + self.ssl_context.load_cert_chain(certfile=self.opts.ssl_certfile, keyfile=self.opts.ssl_keyfile) + + self.pre_activated_socket = None + if self.opts.allow_socket_preallocation: + from calibre.srv.pre_activated import pre_activated_socket + self.pre_activated_socket = pre_activated_socket() + if self.pre_activated_socket is not None: + set_socket_inherit(self.pre_activated_socket, False) + self.bind_address = self.pre_activated_socket.getsockname() + + def __str__(self): + return "%s(%r)" % (self.__class__.__name__, self.bind_address) + __repr__ = __str__ + + def serve_forever(self): + """ Listen for incoming connections. """ + + if self.pre_activated_socket is None: + # AF_INET or AF_INET6 socket + # Get the correct address family for our host (allows IPv6 + # addresses) + host, port = self.bind_address + try: + info = socket.getaddrinfo( + host, port, socket.AF_UNSPEC, + socket.SOCK_STREAM, 0, socket.AI_PASSIVE) + except socket.gaierror: + if ':' in host: + info = [(socket.AF_INET6, socket.SOCK_STREAM, + 0, "", self.bind_address + (0, 0))] + else: + info = [(socket.AF_INET, socket.SOCK_STREAM, + 0, "", self.bind_address)] + + self.socket = None + msg = "No socket could be created" + for res in info: + af, socktype, proto, canonname, sa = res + try: + self.bind(af, socktype, proto) + except socket.error, serr: + msg = "%s -- (%s: %s)" % (msg, sa, serr) + if self.socket: + self.socket.close() + self.socket = None + continue + break + if not self.socket: + raise socket.error(msg) + else: + self.socket = self.pre_activated_socket + self.pre_activated_socket = None + self.setup_socket() + + self.ready = True + self.connection_map = {} + self.socket.listen(5) + self.bound_address = ba = self.socket.getsockname() + if isinstance(ba, tuple): + ba = ':'.join(map(type(''), ba)) + self.log('calibre server listening on', ba) + + while True: + try: + self.tick() + except (KeyboardInterrupt, SystemExit): + self.shutdown() + break + except: + self.log.exception('Error in ServerLoop.tick') + + def setup_socket(self): + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if self.opts.no_delay and not isinstance(self.bind_address, basestring): + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + + # If listening on the IPV6 any address ('::' = IN6ADDR_ANY), + # activate dual-stack. + if (hasattr(socket, 'AF_INET6') and self.socket.family == socket.AF_INET6 and + self.bind_address[0] in ('::', '::0', '::0.0.0.0')): + try: + self.socket.setsockopt( + socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) + except (AttributeError, socket.error): + # Apparently, the socket option is not available in + # this machine's TCP stack + pass + self.socket.setblocking(0) + + def bind(self, family, atype, proto=0): + '''Create (or recreate) the actual socket object.''' + self.socket = socket.socket(family, atype, proto) + set_socket_inherit(self.socket, False) + self.setup_socket() + self.socket.bind(self.bind_address) + + def tick(self): + now = monotonic() + for s, conn in tuple(self.connection_map.iteritems()): + if now - conn.last_activity > self.opts.timeout: + self.log.debug('Closing connection because of extended inactivity') + self.close(s, conn) + + read_needed = [c.socket for c in self.connection_map.itervalues() if c.wait_for is READ or c.wait_for is RDWR] + write_needed = [c.socket for c in self.connection_map.itervalues() if c.wait_for is WRITE or c.wait_for is RDWR] + readable, writable = select.select([self.socket] + read_needed, write_needed, [], self.opts.timeout)[:2] + if not self.ready: + return + + for s, conn, event in self.get_actions(readable, writable): + try: + conn.handle_event(event) + if not conn.ready: + self.close(s, conn) + except Exception as e: + if conn.ready: + self.log.exception('Unhandled exception, terminating connection') + else: + self.log.error('Error in SSL handshake, terminating connection: %s' % as_unicode(e)) + self.close(s, conn) + + def wakeup(self): + # Touch our own socket to make select() return immediately. + sock = getattr(self, "socket", None) + if sock is not None: + try: + host, port = sock.getsockname()[:2] + except socket.error as e: + if e.errno not in socket_errors_socket_closed: + raise + else: + for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, + socket.SOCK_STREAM): + af, socktype, proto, canonname, sa = res + s = None + try: + s = socket.socket(af, socktype, proto) + s.settimeout(1.0) + s.connect((host, port)) + s.close() + except socket.error: + if s is not None: + s.close() + return sock + + def close(self, s, conn): + self.connection_map.pop(s, None) + conn.close() + + def get_actions(self, readable, writable): + for s in readable: + if s is self.socket: + s, addr = self.accept() + if s is not None: + self.connection_map[s] = conn = self.handler(s, self.opts, self.ssl_context) + if self.ssl_context is not None: + yield s, conn, RDWR + else: + yield s, self.connection_map[s], READ + for s in writable: + yield s, self.connection_map[s], WRITE + + def accept(self): + try: + return self.socket.accept() + except socket.error: + return None, None + + def stop(self): + self.ready = False + self.wakeup() + + def shutdown(self): + try: + if getattr(self, 'socket', None): + self.socket.close() + self.socket = None + except socket.error: + pass + for s, conn in tuple(self.connection_map.iteritems()): + self.close(s, conn) + +class EchoLine(Connection): + + bye_after_echo = False + + def connection_ready(self): + self.rbuf = BytesIO() + self.set_state(READ, self.read_line) + + def read_line(self, event): + data = self.recv(1) + if data: + self.rbuf.write(data) + if b'\n' == data: + if self.rbuf.tell() < 3: + # Empty line + self.rbuf = BytesIO(b'bye' + self.rbuf.getvalue()) + self.bye_after_echo = True + self.set_state(WRITE, self.echo) + self.rbuf.seek(0) + + def echo(self, event): + pos = self.rbuf.tell() + self.rbuf.seek(0, os.SEEK_END) + left = self.rbuf.tell() - pos + self.rbuf.seek(pos) + sent = self.send(self.rbuf.read(512)) + if sent == left: + self.rbuf = BytesIO() + self.set_state(READ, self.read_line) + if self.bye_after_echo: + self.ready = False + else: + self.rbuf.seek(pos + sent) + +if __name__ == '__main__': + ServerLoop(EchoLine).serve_forever() diff --git a/src/calibre/srv/opts.py b/src/calibre/srv/opts.py index 8734e04764..482564f2cd 100644 --- a/src/calibre/srv/opts.py +++ b/src/calibre/srv/opts.py @@ -28,10 +28,6 @@ raw_options = ( 'ssl_keyfile', None, None, - 'Max. queued connections while waiting to accept', - 'request_queue_size', 5, - None, - 'Time (in seconds) after which an idle connection is closed', 'timeout', 60.0, None,