Use a dedicated socket pair to control the server

Allows for a faster wakeup() implementation and the server
can receive arbitrary control messages
This commit is contained in:
Kovid Goyal 2015-05-28 14:11:51 +05:30
parent 6764873483
commit 97d32ecdf6
2 changed files with 38 additions and 33 deletions

View File

@ -15,12 +15,14 @@ from calibre.ptempfile import TemporaryDirectory
from calibre.srv.opts import Options
from calibre.srv.utils import (
socket_errors_socket_closed, socket_errors_nonblocking, HandleInterrupt,
socket_errors_eintr, start_cork, stop_cork, DESIRED_SEND_BUFFER_SIZE)
socket_errors_eintr, start_cork, stop_cork, DESIRED_SEND_BUFFER_SIZE,
create_sock_pair)
from calibre.utils.socket_inheritance import set_socket_inherit
from calibre.utils.logging import ThreadSafeLog
from calibre.utils.monotonic import monotonic
READ, WRITE, RDWR = 'READ', 'WRITE', 'RDWR'
WAKEUP, JOB_DONE = bytes(bytearray(xrange(2)))
class ReadBuffer(object): # {{{
@ -109,7 +111,7 @@ class ReadBuffer(object): # {{{
return ans
# }}}
class Connection(object):
class Connection(object): # {{{
def __init__(self, socket, opts, ssl_context, tdir, addr):
self.opts = opts
@ -244,6 +246,7 @@ class Connection(object):
def handle_timeout(self):
return False
# }}}
class ServerLoop(object):
@ -282,6 +285,11 @@ class ServerLoop(object):
set_socket_inherit(self.pre_activated_socket, False)
self.bind_address = self.pre_activated_socket.getsockname()
self.create_control_connection()
def create_control_connection(self):
self.control_in, self.control_out = create_sock_pair()
def __str__(self):
return "%s(%r)" % (self.__class__.__name__, self.bind_address)
__repr__ = __str__
@ -399,7 +407,7 @@ class ServerLoop(object):
writable = []
else:
try:
readable, writable, _ = select.select([self.socket.fileno()] + read_needed, write_needed, [], self.opts.timeout)
readable, writable, _ = select.select([self.socket.fileno(), self.control_out.fileno()] + read_needed, write_needed, [], self.opts.timeout)
except ValueError: # self.socket.fileno() == -1
self.ready = False
self.log.error('Listening socket was unexpectedly terminated')
@ -442,28 +450,7 @@ class ServerLoop(object):
self.close(s, conn)
def wakeup(self):
# Touch our own socket to make select() return immediately.
sock = getattr(self, "socket", None)
if sock is not None:
try:
host, port = sock.getsockname()[:2]
except socket.error as e:
if e.errno not in socket_errors_socket_closed:
raise
else:
for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC,
socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
s = None
try:
s = socket.socket(af, socktype, proto)
s.settimeout(1.0)
s.connect((host, port))
s.close()
except socket.error:
if s is not None:
s.close()
return sock
self.control_in.sendall(WAKEUP)
def close(self, s, conn):
self.connection_map.pop(s, None)
@ -471,6 +458,7 @@ class ServerLoop(object):
def get_actions(self, readable, writable):
listener = self.socket.fileno()
control = self.control_out.fileno()
for s in readable:
if s == listener:
sock, addr = self.accept()
@ -480,6 +468,24 @@ class ServerLoop(object):
self.connection_map[s] = conn = self.handler(sock, self.opts, self.ssl_context, self.tdir, addr)
if self.ssl_context is not None:
yield s, conn, RDWR
elif s == control:
try:
c = self.control_out.recv(1)
except socket.error:
if not self.ready:
return
self.log.error('Control socket raised an error, resetting')
self.create_control_connection()
continue
if c == JOB_DONE:
pass
elif c == WAKEUP:
pass
elif not c:
if not self.ready:
return
self.log.error('Control socket failed to recv(), resetting')
self.create_control_connection()
else:
yield s, self.connection_map[s], READ
for s in writable:

View File

@ -140,7 +140,7 @@ def create_sock_pair(port=0):
temp_srv_sock = socket.socket()
set_socket_inherit(temp_srv_sock, False)
temp_srv_sock.setblocking(False)
temp_srv_sock.bind(('localhost', port))
temp_srv_sock.bind(('127.0.0.1', port))
port = temp_srv_sock.getsockname()[1]
temp_srv_sock.listen(1)
with closing(temp_srv_sock):
@ -148,9 +148,8 @@ def create_sock_pair(port=0):
client_sock = socket.socket()
client_sock.setblocking(False)
set_socket_inherit(client_sock, False)
while True:
try:
client_sock.connect(('localhost', port))
client_sock.connect(('127.0.0.1', port))
except socket.error as err:
# EWOULDBLOCK is not an error, as the socket is non-blocking
if err.errno not in socket_errors_nonblocking: