Start work on async version of HTTP server

This commit is contained in:
Kovid Goyal 2015-05-23 19:11:23 +05:30
parent eb0aa151fb
commit d48b968cb9
2 changed files with 334 additions and 4 deletions

334
src/calibre/srv/async.py Normal file
View File

@ -0,0 +1,334 @@
#!/usr/bin/env python2
# vim:fileencoding=utf-8
from __future__ import (unicode_literals, division, absolute_import,
print_function)
__license__ = 'GPL v3'
__copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
import ssl, socket, select, os
from io import BytesIO
from calibre import as_unicode
from calibre.srv.opts import Options
from calibre.srv.utils import (
socket_errors_socket_closed, socket_errors_nonblocking)
from calibre.utils.socket_inheritance import set_socket_inherit
from calibre.utils.logging import ThreadSafeLog
from calibre.utils.monotonic import monotonic
READ, WRITE, RDWR = 'READ', 'WRITE', 'RDWR'
class Connection(object):
def __init__(self, socket, opts, ssl_context):
self.opts = opts
self.ssl_context = ssl_context
self.wait_for = READ
if self.ssl_context is not None:
self.ready = False
self.socket = self.ssl_context.wrap_socket(socket, server_side=True, do_handshake_on_connect=False)
self.set_state(RDWR, self.do_ssl_handshake)
else:
self.ready = True
self.socket = socket
self.connection_ready()
self.last_activity = monotonic()
def set_state(self, wait_for, func):
self.wait_for = wait_for
self.handle_event = func
def do_ssl_handshake(self, event):
try:
self._sslobj.do_handshake()
except ssl.SSLWantReadError:
self.set_state(READ, self.do_ssl_handshake)
except ssl.SSLWantWriteError:
self.set_state(WRITE, self.do_ssl_handshake)
self.ready = True
self.connection_ready()
def send(self, data):
try:
ret = self.socket.send(data)
self.last_activity = monotonic()
return ret
except socket.error as e:
if e.errno in socket_errors_nonblocking:
return 0
elif e.errno in socket_errors_socket_closed:
self.ready = False
return 0
raise
def recv(self, buffer_size):
try:
data = self.socket.recv(buffer_size)
self.last_activity = monotonic()
if not data:
# a closed connection is indicated by signaling
# a read condition, and having recv() return 0.
self.ready = False
return b''
return data
except socket.error as e:
if e.errno in socket_errors_socket_closed:
self.ready = False
return b''
def close(self):
self.ready = False
try:
self.socket.shutdown(socket.SHUT_WR)
self.socket.close()
except socket.error:
pass
def connection_ready(self):
raise NotImplementedError()
class ServerLoop(object):
def __init__(
self,
handler,
bind_address=('localhost', 8080),
opts=None,
# A calibre logging object. If None, a default log that logs to
# stdout is used
log=None
):
self.ready = False
self.handler = handler
self.opts = opts or Options()
self.log = log or ThreadSafeLog(level=ThreadSafeLog.DEBUG)
ba = tuple(bind_address)
if not ba[0]:
# AI_PASSIVE does not work with host of '' or None
ba = ('0.0.0.0', ba[1])
self.bind_address = ba
self.bound_address = None
self.ssl_context = None
if self.opts.ssl_certfile is not None and self.opts.ssl_keyfile is not None:
self.ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
self.ssl_context.load_cert_chain(certfile=self.opts.ssl_certfile, keyfile=self.opts.ssl_keyfile)
self.pre_activated_socket = None
if self.opts.allow_socket_preallocation:
from calibre.srv.pre_activated import pre_activated_socket
self.pre_activated_socket = pre_activated_socket()
if self.pre_activated_socket is not None:
set_socket_inherit(self.pre_activated_socket, False)
self.bind_address = self.pre_activated_socket.getsockname()
def __str__(self):
return "%s(%r)" % (self.__class__.__name__, self.bind_address)
__repr__ = __str__
def serve_forever(self):
""" Listen for incoming connections. """
if self.pre_activated_socket is None:
# AF_INET or AF_INET6 socket
# Get the correct address family for our host (allows IPv6
# addresses)
host, port = self.bind_address
try:
info = socket.getaddrinfo(
host, port, socket.AF_UNSPEC,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
except socket.gaierror:
if ':' in host:
info = [(socket.AF_INET6, socket.SOCK_STREAM,
0, "", self.bind_address + (0, 0))]
else:
info = [(socket.AF_INET, socket.SOCK_STREAM,
0, "", self.bind_address)]
self.socket = None
msg = "No socket could be created"
for res in info:
af, socktype, proto, canonname, sa = res
try:
self.bind(af, socktype, proto)
except socket.error, serr:
msg = "%s -- (%s: %s)" % (msg, sa, serr)
if self.socket:
self.socket.close()
self.socket = None
continue
break
if not self.socket:
raise socket.error(msg)
else:
self.socket = self.pre_activated_socket
self.pre_activated_socket = None
self.setup_socket()
self.ready = True
self.connection_map = {}
self.socket.listen(5)
self.bound_address = ba = self.socket.getsockname()
if isinstance(ba, tuple):
ba = ':'.join(map(type(''), ba))
self.log('calibre server listening on', ba)
while True:
try:
self.tick()
except (KeyboardInterrupt, SystemExit):
self.shutdown()
break
except:
self.log.exception('Error in ServerLoop.tick')
def setup_socket(self):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.opts.no_delay and not isinstance(self.bind_address, basestring):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
# activate dual-stack.
if (hasattr(socket, 'AF_INET6') and self.socket.family == socket.AF_INET6 and
self.bind_address[0] in ('::', '::0', '::0.0.0.0')):
try:
self.socket.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except (AttributeError, socket.error):
# Apparently, the socket option is not available in
# this machine's TCP stack
pass
self.socket.setblocking(0)
def bind(self, family, atype, proto=0):
'''Create (or recreate) the actual socket object.'''
self.socket = socket.socket(family, atype, proto)
set_socket_inherit(self.socket, False)
self.setup_socket()
self.socket.bind(self.bind_address)
def tick(self):
now = monotonic()
for s, conn in tuple(self.connection_map.iteritems()):
if now - conn.last_activity > self.opts.timeout:
self.log.debug('Closing connection because of extended inactivity')
self.close(s, conn)
read_needed = [c.socket for c in self.connection_map.itervalues() if c.wait_for is READ or c.wait_for is RDWR]
write_needed = [c.socket for c in self.connection_map.itervalues() if c.wait_for is WRITE or c.wait_for is RDWR]
readable, writable = select.select([self.socket] + read_needed, write_needed, [], self.opts.timeout)[:2]
if not self.ready:
return
for s, conn, event in self.get_actions(readable, writable):
try:
conn.handle_event(event)
if not conn.ready:
self.close(s, conn)
except Exception as e:
if conn.ready:
self.log.exception('Unhandled exception, terminating connection')
else:
self.log.error('Error in SSL handshake, terminating connection: %s' % as_unicode(e))
self.close(s, conn)
def wakeup(self):
# Touch our own socket to make select() return immediately.
sock = getattr(self, "socket", None)
if sock is not None:
try:
host, port = sock.getsockname()[:2]
except socket.error as e:
if e.errno not in socket_errors_socket_closed:
raise
else:
for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC,
socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
s = None
try:
s = socket.socket(af, socktype, proto)
s.settimeout(1.0)
s.connect((host, port))
s.close()
except socket.error:
if s is not None:
s.close()
return sock
def close(self, s, conn):
self.connection_map.pop(s, None)
conn.close()
def get_actions(self, readable, writable):
for s in readable:
if s is self.socket:
s, addr = self.accept()
if s is not None:
self.connection_map[s] = conn = self.handler(s, self.opts, self.ssl_context)
if self.ssl_context is not None:
yield s, conn, RDWR
else:
yield s, self.connection_map[s], READ
for s in writable:
yield s, self.connection_map[s], WRITE
def accept(self):
try:
return self.socket.accept()
except socket.error:
return None, None
def stop(self):
self.ready = False
self.wakeup()
def shutdown(self):
try:
if getattr(self, 'socket', None):
self.socket.close()
self.socket = None
except socket.error:
pass
for s, conn in tuple(self.connection_map.iteritems()):
self.close(s, conn)
class EchoLine(Connection):
bye_after_echo = False
def connection_ready(self):
self.rbuf = BytesIO()
self.set_state(READ, self.read_line)
def read_line(self, event):
data = self.recv(1)
if data:
self.rbuf.write(data)
if b'\n' == data:
if self.rbuf.tell() < 3:
# Empty line
self.rbuf = BytesIO(b'bye' + self.rbuf.getvalue())
self.bye_after_echo = True
self.set_state(WRITE, self.echo)
self.rbuf.seek(0)
def echo(self, event):
pos = self.rbuf.tell()
self.rbuf.seek(0, os.SEEK_END)
left = self.rbuf.tell() - pos
self.rbuf.seek(pos)
sent = self.send(self.rbuf.read(512))
if sent == left:
self.rbuf = BytesIO()
self.set_state(READ, self.read_line)
if self.bye_after_echo:
self.ready = False
else:
self.rbuf.seek(pos + sent)
if __name__ == '__main__':
ServerLoop(EchoLine).serve_forever()

View File

@ -28,10 +28,6 @@ raw_options = (
'ssl_keyfile', None, 'ssl_keyfile', None,
None, None,
'Max. queued connections while waiting to accept',
'request_queue_size', 5,
None,
'Time (in seconds) after which an idle connection is closed', 'Time (in seconds) after which an idle connection is closed',
'timeout', 60.0, 'timeout', 60.0,
None, None,