From d3ff58352dd19eb40dfad4ac385545d0decd2b6e Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sat, 16 May 2015 18:16:45 +0530 Subject: [PATCH] systemd socket activation support for the new server --- src/calibre/srv/loop.py | 129 ++++++++++++++++++------------- src/calibre/srv/pre_activated.py | 59 ++++++++++++++ 2 files changed, 135 insertions(+), 53 deletions(-) create mode 100644 src/calibre/srv/pre_activated.py diff --git a/src/calibre/srv/loop.py b/src/calibre/srv/loop.py index b794dd0ab7..aeafff8fb3 100644 --- a/src/calibre/srv/loop.py +++ b/src/calibre/srv/loop.py @@ -532,7 +532,7 @@ class ThreadPool(object): # Forcibly shut down the socket. c = worker.conn if c and not c.socket_file.closed: - c.socket.shutdown(socket.SHUT_RD) + c.socket.shutdown(socket.SHUT_RDWR) worker.join() except (AssertionError, # Ignore repeated Ctrl-C. @@ -571,6 +571,10 @@ class ServerLoop(object): # number of connections will be dropped) max_threads=500, + # Allow socket pre-allocation, for example, with systemd + # socket activation + allow_socket_preallocation=True, + # no_delay turns on TCP_NODELAY which decreases latency at the cost of # worse overall performance when sending multiple small packets. It # prevents the TCP stack from aggregating multiple small TCP packets. @@ -589,6 +593,7 @@ class ServerLoop(object): if http_handler is None and nonhttp_handler is None: raise ValueError('You must specify at least one protocol handler') self.log = log or ThreadSafeLog(level=ThreadSafeLog.DEBUG) + self.allow_socket_preallocation = allow_socket_preallocation self.no_delay = no_delay self.request_queue_size = request_queue_size self.timeout = timeout @@ -605,7 +610,15 @@ class ServerLoop(object): self.ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) self.ssl_context.load_cert_chain(certfile=ssl_certfile, keyfile=ssl_keyfile) - self.ready = True + self.pre_activated_socket = None + if self.allow_socket_preallocation: + from calibre.srv.pre_activated import pre_activated_socket + self.pre_activated_socket = pre_activated_socket() + if self.pre_activated_socket is not None: + set_socket_inherit(self.pre_activated_socket, False) + self.bind_address = self.pre_activated_socket.getsockname() + + self.ready = False self.requests = ThreadPool(self, min_threads=min_threads, max_threads=max_threads) def __str__(self): @@ -615,61 +628,69 @@ class ServerLoop(object): def serve_forever(self): """ Listen for incoming connections. """ - # Select the appropriate socket - if isinstance(self.bind_address, basestring): - # AF_UNIX socket + if self.pre_activated_socket is None: + # Select the appropriate socket + if isinstance(self.bind_address, basestring): + # AF_UNIX socket - # So we can reuse the socket... - try: - os.unlink(self.bind_address) - except EnvironmentError: - pass + # So we can reuse the socket... + try: + os.unlink(self.bind_address) + except EnvironmentError: + pass - # So everyone can access the socket... - try: - os.chmod(self.bind_address, 0777) - except EnvironmentError: - pass + # So everyone can access the socket... + try: + os.chmod(self.bind_address, 0777) + except EnvironmentError: + pass - info = [ - (socket.AF_UNIX, socket.SOCK_STREAM, 0, "", self.bind_address)] + info = [ + (socket.AF_UNIX, socket.SOCK_STREAM, 0, "", self.bind_address)] + else: + # AF_INET or AF_INET6 socket + # Get the correct address family for our host (allows IPv6 + # addresses) + host, port = self.bind_address + try: + info = socket.getaddrinfo( + host, port, socket.AF_UNSPEC, + socket.SOCK_STREAM, 0, socket.AI_PASSIVE) + except socket.gaierror: + if ':' in host: + info = [(socket.AF_INET6, socket.SOCK_STREAM, + 0, "", self.bind_address + (0, 0))] + else: + info = [(socket.AF_INET, socket.SOCK_STREAM, + 0, "", self.bind_address)] + + self.socket = None + msg = "No socket could be created" + for res in info: + af, socktype, proto, canonname, sa = res + try: + self.bind(af, socktype, proto) + except socket.error, serr: + msg = "%s -- (%s: %s)" % (msg, sa, serr) + if self.socket: + self.socket.close() + self.socket = None + continue + break + if not self.socket: + raise socket.error(msg) else: - # AF_INET or AF_INET6 socket - # Get the correct address family for our host (allows IPv6 - # addresses) - host, port = self.bind_address - try: - info = socket.getaddrinfo( - host, port, socket.AF_UNSPEC, - socket.SOCK_STREAM, 0, socket.AI_PASSIVE) - except socket.gaierror: - if ':' in host: - info = [(socket.AF_INET6, socket.SOCK_STREAM, - 0, "", self.bind_address + (0, 0))] - else: - info = [(socket.AF_INET, socket.SOCK_STREAM, - 0, "", self.bind_address)] - - self.socket = None - msg = "No socket could be created" - for res in info: - af, socktype, proto, canonname, sa = res - try: - self.bind(af, socktype, proto) - except socket.error, serr: - msg = "%s -- (%s: %s)" % (msg, sa, serr) - if self.socket: - self.socket.close() - self.socket = None - continue - break - if not self.socket: - raise socket.error(msg) + self.socket = self.pre_activated_socket + self.setup_socket() self.ready = True # Timeout so KeyboardInterrupt can be caught on Win32 self.socket.settimeout(1) self.socket.listen(self.request_queue_size) + ba = self.bind_address + if isinstance(ba, tuple): + ba = ':'.join(map(type(''), ba)) + self.log('calibre server listening on', ba) # Create worker threads self.requests.start() @@ -682,17 +703,14 @@ class ServerLoop(object): except: self.log.exception('Error in ServerLoop.tick') - def bind(self, family, atype, proto=0): - """Create (or recreate) the actual socket object.""" - self.socket = socket.socket(family, atype, proto) - set_socket_inherit(self.socket, False) + def setup_socket(self): self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if self.no_delay and not isinstance(self.bind_address, basestring): self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # If listening on the IPV6 any address ('::' = IN6ADDR_ANY), # activate dual-stack. - if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6 and + if (hasattr(socket, 'AF_INET6') and self.socket.family == socket.AF_INET6 and self.bind_address[0] in ('::', '::0', '::0.0.0.0')): try: self.socket.setsockopt( @@ -702,6 +720,11 @@ class ServerLoop(object): # this machine's TCP stack pass + def bind(self, family, atype, proto=0): + """Create (or recreate) the actual socket object.""" + self.socket = socket.socket(family, atype, proto) + set_socket_inherit(self.socket, False) + self.setup_socket() self.socket.bind(self.bind_address) def tick(self): diff --git a/src/calibre/srv/pre_activated.py b/src/calibre/srv/pre_activated.py new file mode 100644 index 0000000000..bea4b40f32 --- /dev/null +++ b/src/calibre/srv/pre_activated.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python2 +# vim:fileencoding=utf-8 +from __future__ import (unicode_literals, division, absolute_import, + print_function) + +__license__ = 'GPL v3' +__copyright__ = '2015, Kovid Goyal ' + +# Support server pre-activation, such as with systemd's socket activation + +import socket, errno +from calibre.constants import islinux + +def pre_activated_socket(): + return None + +if islinux: + import ctypes + + class SOCKADDR_NL(ctypes.Structure): + _fields_ = [("nl_family", ctypes.c_ushort), + ("nl_pad", ctypes.c_ushort), + ("nl_pid", ctypes.c_int), + ("nl_groups", ctypes.c_int)] + + def getsockfamily(fd): + addr = SOCKADDR_NL(0, 0, 0, 0) + sz = ctypes.c_int(ctypes.sizeof(addr)) + if ctypes.CDLL(None, use_errno=True).getsockname(fd, ctypes.pointer(addr), ctypes.pointer(sz)) != 0: + raise EnvironmentError(errno.errcode[ctypes.get_errno()]) + return addr.nl_family + + try: + systemd = ctypes.CDLL(ctypes.util.find_library('systemd')) + except Exception: + pass + else: + del pre_activated_socket + def pre_activated_socket(): + num = systemd.sd_listen_fds(1) # Remove systemd env vars so that child processes do not inherit them + if num > 1: + raise EnvironmentError('Too many file descriptors received from systemd') + if num != 1: + return None + fd = 3 # systemd starts activated sockets at 3 + ret = systemd.sd_is_socket(fd, socket.AF_UNSPEC, socket.SOCK_STREAM, -1) + if ret == 0: + raise EnvironmentError('The systemd socket file descriptor is not valid') + if ret < 0: + raise EnvironmentError('Failed to check the systemd socket file descriptor for validity') + family = getsockfamily(fd) + return socket.fromfd(fd, family, socket.SOCK_STREAM) + +if __name__ == '__main__': + # Run as: + # /usr/lib/systemd/systemd-activate -l 8081 calibre-debug pre_activated.py + # telnet localhost 8081 + s = pre_activated_socket() + print (s, s.getsockname())