From ab81fe992bde09981ea600133b61ce3f46b0013c Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Fri, 29 May 2015 10:35:31 +0530 Subject: [PATCH] Server: Run the actual request handling code in threads so as not to block the servers event loop --- src/calibre/srv/errors.py | 3 ++ src/calibre/srv/http_response.py | 57 +++++++++++++--------- src/calibre/srv/loop.py | 78 +++++++++++++++++++++++++----- src/calibre/srv/opts.py | 4 ++ src/calibre/srv/pool.py | 83 ++++++++++++++++++++++++++++++++ src/calibre/srv/tests/loop.py | 8 +++ 6 files changed, 198 insertions(+), 35 deletions(-) create mode 100644 src/calibre/srv/pool.py diff --git a/src/calibre/srv/errors.py b/src/calibre/srv/errors.py index b0073c3d8d..2a120a316a 100644 --- a/src/calibre/srv/errors.py +++ b/src/calibre/srv/errors.py @@ -9,3 +9,6 @@ __copyright__ = '2015, Kovid Goyal ' class HTTP404(Exception): pass + +class JobQueueFull(Exception): + pass diff --git a/src/calibre/srv/http_response.py b/src/calibre/srv/http_response.py index 87bce58150..256ccc8d5e 100644 --- a/src/calibre/srv/http_response.py +++ b/src/calibre/srv/http_response.py @@ -316,14 +316,45 @@ class HTTPConnection(HTTPRequest): outheaders, self.response_protocol, self.static_cache, self.opts, self.remote_addr, self.remote_port ) - try: - output = self.request_handler(data) - except HTTP404 as e: - return self.simple_response(httplib.NOT_FOUND, msg=e.message or '', close_after_response=False) + self.queue_job(self.run_request_handler, data) + def run_request_handler(self, data): + result = self.request_handler(data) + return data, result + + def send_range_not_satisfiable(self, content_length): + buf = [ + '%s %d %s' % (self.response_protocol, httplib.REQUESTED_RANGE_NOT_SATISFIABLE, httplib.responses[httplib.REQUESTED_RANGE_NOT_SATISFIABLE]), + "Date: " + http_date(), + "Content-Range: bytes */%d" % content_length, + ] + self.response_ready(header_list_to_file(buf)) + + def send_not_modified(self, etag=None): + buf = [ + '%s %d %s' % (self.response_protocol, httplib.NOT_MODIFIED, httplib.responses[httplib.NOT_MODIFIED]), + "Content-Length: 0", + "Date: " + http_date(), + ] + if etag is not None: + buf.append('ETag: ' + etag) + self.response_ready(header_list_to_file(buf)) + + def report_busy(self): + self.simple_response(httplib.SERVICE_UNAVAILABLE) + + def job_done(self, ok, result): + if not ok: + etype, e, tb = result + if isinstance(e, HTTP404): + return self.simple_response(httplib.NOT_FOUND, msg=e.message or '', close_after_response=False) + raise e, None, tb + + data, output = result output = self.finalize_output(output, data, self.method is HTTP1) if output is None: return + outheaders = data.outheaders outheaders.set('Date', http_date(), replace_all=True) outheaders.set('Server', 'calibre %s' % __version__, replace_all=True) @@ -348,24 +379,6 @@ class HTTPConnection(HTTPRequest): buf.append('') self.response_ready(BytesIO(b''.join((x + '\r\n').encode('ascii') for x in buf)), output=output) - def send_range_not_satisfiable(self, content_length): - buf = [ - '%s %d %s' % (self.response_protocol, httplib.REQUESTED_RANGE_NOT_SATISFIABLE, httplib.responses[httplib.REQUESTED_RANGE_NOT_SATISFIABLE]), - "Date: " + http_date(), - "Content-Range: bytes */%d" % content_length, - ] - self.response_ready(header_list_to_file(buf)) - - def send_not_modified(self, etag=None): - buf = [ - '%s %d %s' % (self.response_protocol, httplib.NOT_MODIFIED, httplib.responses[httplib.NOT_MODIFIED]), - "Content-Length: 0", - "Date: " + http_date(), - ] - if etag is not None: - buf.append('ETag: ' + etag) - self.response_ready(header_list_to_file(buf)) - def response_ready(self, header_file, output=None): self.response_started = True self.optimize_for_sending_packet() diff --git a/src/calibre/srv/loop.py b/src/calibre/srv/loop.py index 2b831fb968..6cf48acb75 100644 --- a/src/calibre/srv/loop.py +++ b/src/calibre/srv/loop.py @@ -8,10 +8,13 @@ __copyright__ = '2015, Kovid Goyal ' import ssl, socket, select, os, traceback from io import BytesIO +from Queue import Empty, Full from functools import partial from calibre import as_unicode from calibre.ptempfile import TemporaryDirectory +from calibre.srv.errors import JobQueueFull +from calibre.srv.pool import ThreadPool from calibre.srv.opts import Options from calibre.srv.utils import ( socket_errors_socket_closed, socket_errors_nonblocking, HandleInterrupt, @@ -21,7 +24,7 @@ from calibre.utils.socket_inheritance import set_socket_inherit from calibre.utils.logging import ThreadSafeLog from calibre.utils.monotonic import monotonic -READ, WRITE, RDWR = 'READ', 'WRITE', 'RDWR' +READ, WRITE, RDWR, WAIT = 'READ', 'WRITE', 'RDWR', 'WAIT' WAKEUP, JOB_DONE = bytes(bytearray(xrange(2))) class ReadBuffer(object): # {{{ @@ -113,8 +116,8 @@ class ReadBuffer(object): # {{{ class Connection(object): # {{{ - def __init__(self, socket, opts, ssl_context, tdir, addr): - self.opts = opts + def __init__(self, socket, opts, ssl_context, tdir, addr, pool): + self.opts, self.pool = opts, pool try: self.remote_addr = addr[0] self.remote_port = addr[1] @@ -234,6 +237,21 @@ class Connection(object): # {{{ except socket.error: pass + def queue_job(self, func, *args): + if args: + func = partial(func, *args) + try: + self.pool.put_nowait(self.socket.fileno(), func) + except Full: + raise JobQueueFull() + self.set_state(WAIT, self._job_done) + + def _job_done(self, event): + self.job_done(*event) + + def job_done(self, ok, result): + raise NotImplementedError() + @property def state_description(self): return '' @@ -241,6 +259,9 @@ class Connection(object): # {{{ def report_unhandled_exception(self, e, formatted_traceback): pass + def report_busy(self): + pass + def connection_ready(self): raise NotImplementedError() @@ -286,6 +307,7 @@ class ServerLoop(object): self.bind_address = self.pre_activated_socket.getsockname() self.create_control_connection() + self.pool = ThreadPool(self.log, self.job_completed, count=self.opts.worker_count) def create_control_connection(self): self.control_in, self.control_out = create_sock_pair() @@ -343,6 +365,7 @@ class ServerLoop(object): self.bound_address = ba = self.socket.getsockname() if isinstance(ba, tuple): ba = ':'.join(map(type(''), ba)) + self.pool.start() with TemporaryDirectory(prefix='srv-') as tdir: self.tdir = tdir self.ready = True @@ -351,13 +374,14 @@ class ServerLoop(object): while self.ready: try: self.tick() - except (KeyboardInterrupt, SystemExit) as e: + except SystemExit: self.shutdown() - if isinstance(e, SystemExit): - raise + raise + except KeyboardInterrupt: break except: self.log.exception('Error in ServerLoop.tick') + self.shutdown() def setup_socket(self): self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -388,14 +412,17 @@ class ServerLoop(object): read_needed, write_needed, readable, remove = [], [], [], [] for s, conn in self.connection_map.iteritems(): if now - conn.last_activity > self.opts.timeout: - if not conn.handle_timeout(): + if conn.handle_timeout(): + conn.last_activity = now + else: remove.append((s, conn)) continue - if conn.wait_for is READ: + wf = conn.wait_for + if wf is READ: (readable if conn.read_buffer.has_data else read_needed).append(s) - elif conn.wait_for is WRITE: + elif wf is WRITE: write_needed.append(s) - else: + elif wf is RDWR: write_needed.append(s) (readable if conn.read_buffer.has_data else read_needed).append(s) @@ -434,10 +461,20 @@ class ServerLoop(object): conn.handle_event(event) if not conn.ready: self.close(s, conn) + except JobQueueFull: + self.log.exception('Server busy handling request: ' % conn.state_description) + if conn.ready: + if conn.response_started: + self.close(s, conn) + else: + try: + conn.report_busy() + except Exception: + self.close(s, conn) except Exception as e: ignore.add(s) + self.log.exception('Unhandled exception in state: %s' % conn.state_description) if conn.ready: - self.log.exception('Unhandled exception in state: %s' % conn.state_description) if conn.response_started: self.close(s, conn) else: @@ -452,6 +489,19 @@ class ServerLoop(object): def wakeup(self): self.control_in.sendall(WAKEUP) + def job_completed(self): + self.control_in.sendall(JOB_DONE) + + def dispatch_job_results(self): + while True: + try: + s, ok, result = self.pool.get_nowait() + except Empty: + break + conn = self.connection_map.get(s) + if conn is not None: + yield s, conn, (ok, result) + def close(self, s, conn): self.connection_map.pop(s, None) conn.close() @@ -465,7 +515,7 @@ class ServerLoop(object): if sock is not None: s = sock.fileno() if s > -1: - self.connection_map[s] = conn = self.handler(sock, self.opts, self.ssl_context, self.tdir, addr) + self.connection_map[s] = conn = self.handler(sock, self.opts, self.ssl_context, self.tdir, addr, self.pool) if self.ssl_context is not None: yield s, conn, RDWR elif s == control: @@ -478,7 +528,8 @@ class ServerLoop(object): self.create_control_connection() continue if c == JOB_DONE: - pass + for s, conn, event in self.dispatch_job_results(): + yield s, conn, event elif c == WAKEUP: pass elif not c: @@ -510,6 +561,7 @@ class ServerLoop(object): pass for s, conn in tuple(self.connection_map.iteritems()): self.close(s, conn) + self.pool.stop(self.opts.shutdown_timeout) class EchoLine(Connection): # {{{ diff --git a/src/calibre/srv/opts.py b/src/calibre/srv/opts.py index 7b545caf26..7e17f7a9a4 100644 --- a/src/calibre/srv/opts.py +++ b/src/calibre/srv/opts.py @@ -52,6 +52,10 @@ raw_options = ( 'compress_min_size', 1024, None, + 'Number of worker threads to use to process requests', + 'worker_count', 10, + None, + 'Use zero copy file transfers for increased performance', 'use_sendfile', True, 'This will use zero-copy in-kernel transfers when sending files over the network,' diff --git a/src/calibre/srv/pool.py b/src/calibre/srv/pool.py new file mode 100644 index 0000000000..b68a1b6deb --- /dev/null +++ b/src/calibre/srv/pool.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python2 +# vim:fileencoding=utf-8 +from __future__ import (unicode_literals, division, absolute_import, + print_function) + +__license__ = 'GPL v3' +__copyright__ = '2015, Kovid Goyal ' + +import sys, time +from Queue import Queue, Full +from threading import Thread + +class Worker(Thread): + + daemon = True + + def __init__(self, log, notify_server, num, request_queue, result_queue): + self.request_queue, self.result_queue = request_queue, result_queue + self.notify_server = notify_server + self.log = log + self.working = False + Thread.__init__(self, name='ServerWorker%d' % num) + + def run(self): + while True: + x = self.request_queue.get() + if x is None: + break + job_id, func = x + self.working = True + try: + result = func() + except Exception: + self.handle_error(job_id) # must be a separate function to avoid reference cycles with sys.exc_info() + else: + self.result_queue.put((job_id, True, result)) + finally: + self.working = False + try: + self.notify_server() + except Exception: + self.log.exception('ServerWorker failed to notify server on job completion') + + def handle_error(self, job_id): + self.result_queue.put((job_id, False, sys.exc_info())) + +class ThreadPool(object): + + def __init__(self, log, notify_server, count=10, queue_size=1000): + self.request_queue, self.result_queue = Queue(queue_size), Queue(queue_size) + self.workers = [Worker(log, notify_server, i, self.request_queue, self.result_queue) for i in xrange(count)] + + def start(self): + for w in self.workers: + w.start() + + def put_nowait(self, job_id, func): + self.request_queue.put_nowait((job_id, func)) + + def get_nowait(self): + return self.result_queue.get_nowait() + + def stop(self, shutdown_timeout): + end = time.time() + shutdown_timeout + for w in self.workers: + try: + self.request_queue.put_nowait(None) + except Full: + break + for w in self.workers: + now = time.time() + if now >= end: + break + w.join(end - now) + self.workers = [w for w in self.workers if w.is_alive()] + + @property + def busy(self): + return sum(int(w.working) for w in self.workers) + + @property + def idle(self): + return sum(int(not w.working) for w in self.workers) diff --git a/src/calibre/srv/tests/loop.py b/src/calibre/srv/tests/loop.py index 28cec0d9c1..b11ac2611e 100644 --- a/src/calibre/srv/tests/loop.py +++ b/src/calibre/srv/tests/loop.py @@ -21,6 +21,14 @@ from calibre.ptempfile import TemporaryDirectory class LoopTest(BaseTest): + def test_workers(self): + ' Test worker semantics ' + with TestServer(lambda data:(data.path[0] + data.read()), worker_count=3) as server: + self.ae(3, sum(int(w.is_alive()) for w in server.loop.pool.workers)) + server.loop.stop() + server.join() + self.ae(0, sum(int(w.is_alive()) for w in server.loop.pool.workers)) + @skipIf(create_server_cert is None, 'certgen module not available') def test_ssl(self): 'Test serving over SSL'