systemd socket activation support for the new server

This commit is contained in:
Kovid Goyal 2015-05-16 18:16:45 +05:30
parent d503568706
commit d3ff58352d
2 changed files with 135 additions and 53 deletions

View File

@ -532,7 +532,7 @@ class ThreadPool(object):
# Forcibly shut down the socket.
c = worker.conn
if c and not c.socket_file.closed:
c.socket.shutdown(socket.SHUT_RD)
c.socket.shutdown(socket.SHUT_RDWR)
worker.join()
except (AssertionError,
# Ignore repeated Ctrl-C.
@ -571,6 +571,10 @@ class ServerLoop(object):
# number of connections will be dropped)
max_threads=500,
# Allow socket pre-allocation, for example, with systemd
# socket activation
allow_socket_preallocation=True,
# no_delay turns on TCP_NODELAY which decreases latency at the cost of
# worse overall performance when sending multiple small packets. It
# prevents the TCP stack from aggregating multiple small TCP packets.
@ -589,6 +593,7 @@ class ServerLoop(object):
if http_handler is None and nonhttp_handler is None:
raise ValueError('You must specify at least one protocol handler')
self.log = log or ThreadSafeLog(level=ThreadSafeLog.DEBUG)
self.allow_socket_preallocation = allow_socket_preallocation
self.no_delay = no_delay
self.request_queue_size = request_queue_size
self.timeout = timeout
@ -605,7 +610,15 @@ class ServerLoop(object):
self.ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
self.ssl_context.load_cert_chain(certfile=ssl_certfile, keyfile=ssl_keyfile)
self.ready = True
self.pre_activated_socket = None
if self.allow_socket_preallocation:
from calibre.srv.pre_activated import pre_activated_socket
self.pre_activated_socket = pre_activated_socket()
if self.pre_activated_socket is not None:
set_socket_inherit(self.pre_activated_socket, False)
self.bind_address = self.pre_activated_socket.getsockname()
self.ready = False
self.requests = ThreadPool(self, min_threads=min_threads, max_threads=max_threads)
def __str__(self):
@ -615,61 +628,69 @@ class ServerLoop(object):
def serve_forever(self):
""" Listen for incoming connections. """
# Select the appropriate socket
if isinstance(self.bind_address, basestring):
# AF_UNIX socket
if self.pre_activated_socket is None:
# Select the appropriate socket
if isinstance(self.bind_address, basestring):
# AF_UNIX socket
# So we can reuse the socket...
try:
os.unlink(self.bind_address)
except EnvironmentError:
pass
# So we can reuse the socket...
try:
os.unlink(self.bind_address)
except EnvironmentError:
pass
# So everyone can access the socket...
try:
os.chmod(self.bind_address, 0777)
except EnvironmentError:
pass
# So everyone can access the socket...
try:
os.chmod(self.bind_address, 0777)
except EnvironmentError:
pass
info = [
(socket.AF_UNIX, socket.SOCK_STREAM, 0, "", self.bind_address)]
info = [
(socket.AF_UNIX, socket.SOCK_STREAM, 0, "", self.bind_address)]
else:
# AF_INET or AF_INET6 socket
# Get the correct address family for our host (allows IPv6
# addresses)
host, port = self.bind_address
try:
info = socket.getaddrinfo(
host, port, socket.AF_UNSPEC,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
except socket.gaierror:
if ':' in host:
info = [(socket.AF_INET6, socket.SOCK_STREAM,
0, "", self.bind_address + (0, 0))]
else:
info = [(socket.AF_INET, socket.SOCK_STREAM,
0, "", self.bind_address)]
self.socket = None
msg = "No socket could be created"
for res in info:
af, socktype, proto, canonname, sa = res
try:
self.bind(af, socktype, proto)
except socket.error, serr:
msg = "%s -- (%s: %s)" % (msg, sa, serr)
if self.socket:
self.socket.close()
self.socket = None
continue
break
if not self.socket:
raise socket.error(msg)
else:
# AF_INET or AF_INET6 socket
# Get the correct address family for our host (allows IPv6
# addresses)
host, port = self.bind_address
try:
info = socket.getaddrinfo(
host, port, socket.AF_UNSPEC,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
except socket.gaierror:
if ':' in host:
info = [(socket.AF_INET6, socket.SOCK_STREAM,
0, "", self.bind_address + (0, 0))]
else:
info = [(socket.AF_INET, socket.SOCK_STREAM,
0, "", self.bind_address)]
self.socket = None
msg = "No socket could be created"
for res in info:
af, socktype, proto, canonname, sa = res
try:
self.bind(af, socktype, proto)
except socket.error, serr:
msg = "%s -- (%s: %s)" % (msg, sa, serr)
if self.socket:
self.socket.close()
self.socket = None
continue
break
if not self.socket:
raise socket.error(msg)
self.socket = self.pre_activated_socket
self.setup_socket()
self.ready = True
# Timeout so KeyboardInterrupt can be caught on Win32
self.socket.settimeout(1)
self.socket.listen(self.request_queue_size)
ba = self.bind_address
if isinstance(ba, tuple):
ba = ':'.join(map(type(''), ba))
self.log('calibre server listening on', ba)
# Create worker threads
self.requests.start()
@ -682,17 +703,14 @@ class ServerLoop(object):
except:
self.log.exception('Error in ServerLoop.tick')
def bind(self, family, atype, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, atype, proto)
set_socket_inherit(self.socket, False)
def setup_socket(self):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.no_delay and not isinstance(self.bind_address, basestring):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
# activate dual-stack.
if (hasattr(socket, 'AF_INET6') and family == socket.AF_INET6 and
if (hasattr(socket, 'AF_INET6') and self.socket.family == socket.AF_INET6 and
self.bind_address[0] in ('::', '::0', '::0.0.0.0')):
try:
self.socket.setsockopt(
@ -702,6 +720,11 @@ class ServerLoop(object):
# this machine's TCP stack
pass
def bind(self, family, atype, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, atype, proto)
set_socket_inherit(self.socket, False)
self.setup_socket()
self.socket.bind(self.bind_address)
def tick(self):

View File

@ -0,0 +1,59 @@
#!/usr/bin/env python2
# vim:fileencoding=utf-8
from __future__ import (unicode_literals, division, absolute_import,
print_function)
__license__ = 'GPL v3'
__copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
# Support server pre-activation, such as with systemd's socket activation
import socket, errno
from calibre.constants import islinux
def pre_activated_socket():
return None
if islinux:
import ctypes
class SOCKADDR_NL(ctypes.Structure):
_fields_ = [("nl_family", ctypes.c_ushort),
("nl_pad", ctypes.c_ushort),
("nl_pid", ctypes.c_int),
("nl_groups", ctypes.c_int)]
def getsockfamily(fd):
addr = SOCKADDR_NL(0, 0, 0, 0)
sz = ctypes.c_int(ctypes.sizeof(addr))
if ctypes.CDLL(None, use_errno=True).getsockname(fd, ctypes.pointer(addr), ctypes.pointer(sz)) != 0:
raise EnvironmentError(errno.errcode[ctypes.get_errno()])
return addr.nl_family
try:
systemd = ctypes.CDLL(ctypes.util.find_library('systemd'))
except Exception:
pass
else:
del pre_activated_socket
def pre_activated_socket():
num = systemd.sd_listen_fds(1) # Remove systemd env vars so that child processes do not inherit them
if num > 1:
raise EnvironmentError('Too many file descriptors received from systemd')
if num != 1:
return None
fd = 3 # systemd starts activated sockets at 3
ret = systemd.sd_is_socket(fd, socket.AF_UNSPEC, socket.SOCK_STREAM, -1)
if ret == 0:
raise EnvironmentError('The systemd socket file descriptor is not valid')
if ret < 0:
raise EnvironmentError('Failed to check the systemd socket file descriptor for validity')
family = getsockfamily(fd)
return socket.fromfd(fd, family, socket.SOCK_STREAM)
if __name__ == '__main__':
# Run as:
# /usr/lib/systemd/systemd-activate -l 8081 calibre-debug pre_activated.py
# telnet localhost 8081
s = pre_activated_socket()
print (s, s.getsockname())