diff --git a/src/calibre/srv/loop.py b/src/calibre/srv/loop.py index 56006fb5ca..2b831fb968 100644 --- a/src/calibre/srv/loop.py +++ b/src/calibre/srv/loop.py @@ -15,12 +15,14 @@ from calibre.ptempfile import TemporaryDirectory from calibre.srv.opts import Options from calibre.srv.utils import ( socket_errors_socket_closed, socket_errors_nonblocking, HandleInterrupt, - socket_errors_eintr, start_cork, stop_cork, DESIRED_SEND_BUFFER_SIZE) + socket_errors_eintr, start_cork, stop_cork, DESIRED_SEND_BUFFER_SIZE, + create_sock_pair) from calibre.utils.socket_inheritance import set_socket_inherit from calibre.utils.logging import ThreadSafeLog from calibre.utils.monotonic import monotonic READ, WRITE, RDWR = 'READ', 'WRITE', 'RDWR' +WAKEUP, JOB_DONE = bytes(bytearray(xrange(2))) class ReadBuffer(object): # {{{ @@ -109,7 +111,7 @@ class ReadBuffer(object): # {{{ return ans # }}} -class Connection(object): +class Connection(object): # {{{ def __init__(self, socket, opts, ssl_context, tdir, addr): self.opts = opts @@ -244,6 +246,7 @@ class Connection(object): def handle_timeout(self): return False +# }}} class ServerLoop(object): @@ -282,6 +285,11 @@ class ServerLoop(object): set_socket_inherit(self.pre_activated_socket, False) self.bind_address = self.pre_activated_socket.getsockname() + self.create_control_connection() + + def create_control_connection(self): + self.control_in, self.control_out = create_sock_pair() + def __str__(self): return "%s(%r)" % (self.__class__.__name__, self.bind_address) __repr__ = __str__ @@ -399,7 +407,7 @@ class ServerLoop(object): writable = [] else: try: - readable, writable, _ = select.select([self.socket.fileno()] + read_needed, write_needed, [], self.opts.timeout) + readable, writable, _ = select.select([self.socket.fileno(), self.control_out.fileno()] + read_needed, write_needed, [], self.opts.timeout) except ValueError: # self.socket.fileno() == -1 self.ready = False self.log.error('Listening socket was unexpectedly terminated') @@ -442,28 +450,7 @@ class ServerLoop(object): self.close(s, conn) def wakeup(self): - # Touch our own socket to make select() return immediately. - sock = getattr(self, "socket", None) - if sock is not None: - try: - host, port = sock.getsockname()[:2] - except socket.error as e: - if e.errno not in socket_errors_socket_closed: - raise - else: - for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, - socket.SOCK_STREAM): - af, socktype, proto, canonname, sa = res - s = None - try: - s = socket.socket(af, socktype, proto) - s.settimeout(1.0) - s.connect((host, port)) - s.close() - except socket.error: - if s is not None: - s.close() - return sock + self.control_in.sendall(WAKEUP) def close(self, s, conn): self.connection_map.pop(s, None) @@ -471,6 +458,7 @@ class ServerLoop(object): def get_actions(self, readable, writable): listener = self.socket.fileno() + control = self.control_out.fileno() for s in readable: if s == listener: sock, addr = self.accept() @@ -480,6 +468,24 @@ class ServerLoop(object): self.connection_map[s] = conn = self.handler(sock, self.opts, self.ssl_context, self.tdir, addr) if self.ssl_context is not None: yield s, conn, RDWR + elif s == control: + try: + c = self.control_out.recv(1) + except socket.error: + if not self.ready: + return + self.log.error('Control socket raised an error, resetting') + self.create_control_connection() + continue + if c == JOB_DONE: + pass + elif c == WAKEUP: + pass + elif not c: + if not self.ready: + return + self.log.error('Control socket failed to recv(), resetting') + self.create_control_connection() else: yield s, self.connection_map[s], READ for s in writable: diff --git a/src/calibre/srv/utils.py b/src/calibre/srv/utils.py index 5685cc303b..f93cc3e3e9 100644 --- a/src/calibre/srv/utils.py +++ b/src/calibre/srv/utils.py @@ -140,7 +140,7 @@ def create_sock_pair(port=0): temp_srv_sock = socket.socket() set_socket_inherit(temp_srv_sock, False) temp_srv_sock.setblocking(False) - temp_srv_sock.bind(('localhost', port)) + temp_srv_sock.bind(('127.0.0.1', port)) port = temp_srv_sock.getsockname()[1] temp_srv_sock.listen(1) with closing(temp_srv_sock): @@ -148,13 +148,12 @@ def create_sock_pair(port=0): client_sock = socket.socket() client_sock.setblocking(False) set_socket_inherit(client_sock, False) - while True: - try: - client_sock.connect(('localhost', port)) - except socket.error as err: - # EWOULDBLOCK is not an error, as the socket is non-blocking - if err.errno not in socket_errors_nonblocking: - raise + try: + client_sock.connect(('127.0.0.1', port)) + except socket.error as err: + # EWOULDBLOCK is not an error, as the socket is non-blocking + if err.errno not in socket_errors_nonblocking: + raise # Use select to wait for connect() to succeed. timeout = 1