Server: Run the actual request handling code in threads so as not to block the servers event loop

This commit is contained in:
Kovid Goyal 2015-05-29 10:35:31 +05:30
parent 5b2049c3c8
commit ab81fe992b
6 changed files with 198 additions and 35 deletions

View File

@ -9,3 +9,6 @@ __copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
class HTTP404(Exception):
pass
class JobQueueFull(Exception):
pass

View File

@ -316,14 +316,45 @@ class HTTPConnection(HTTPRequest):
outheaders, self.response_protocol, self.static_cache, self.opts,
self.remote_addr, self.remote_port
)
try:
output = self.request_handler(data)
except HTTP404 as e:
return self.simple_response(httplib.NOT_FOUND, msg=e.message or '', close_after_response=False)
self.queue_job(self.run_request_handler, data)
def run_request_handler(self, data):
result = self.request_handler(data)
return data, result
def send_range_not_satisfiable(self, content_length):
buf = [
'%s %d %s' % (self.response_protocol, httplib.REQUESTED_RANGE_NOT_SATISFIABLE, httplib.responses[httplib.REQUESTED_RANGE_NOT_SATISFIABLE]),
"Date: " + http_date(),
"Content-Range: bytes */%d" % content_length,
]
self.response_ready(header_list_to_file(buf))
def send_not_modified(self, etag=None):
buf = [
'%s %d %s' % (self.response_protocol, httplib.NOT_MODIFIED, httplib.responses[httplib.NOT_MODIFIED]),
"Content-Length: 0",
"Date: " + http_date(),
]
if etag is not None:
buf.append('ETag: ' + etag)
self.response_ready(header_list_to_file(buf))
def report_busy(self):
self.simple_response(httplib.SERVICE_UNAVAILABLE)
def job_done(self, ok, result):
if not ok:
etype, e, tb = result
if isinstance(e, HTTP404):
return self.simple_response(httplib.NOT_FOUND, msg=e.message or '', close_after_response=False)
raise e, None, tb
data, output = result
output = self.finalize_output(output, data, self.method is HTTP1)
if output is None:
return
outheaders = data.outheaders
outheaders.set('Date', http_date(), replace_all=True)
outheaders.set('Server', 'calibre %s' % __version__, replace_all=True)
@ -348,24 +379,6 @@ class HTTPConnection(HTTPRequest):
buf.append('')
self.response_ready(BytesIO(b''.join((x + '\r\n').encode('ascii') for x in buf)), output=output)
def send_range_not_satisfiable(self, content_length):
buf = [
'%s %d %s' % (self.response_protocol, httplib.REQUESTED_RANGE_NOT_SATISFIABLE, httplib.responses[httplib.REQUESTED_RANGE_NOT_SATISFIABLE]),
"Date: " + http_date(),
"Content-Range: bytes */%d" % content_length,
]
self.response_ready(header_list_to_file(buf))
def send_not_modified(self, etag=None):
buf = [
'%s %d %s' % (self.response_protocol, httplib.NOT_MODIFIED, httplib.responses[httplib.NOT_MODIFIED]),
"Content-Length: 0",
"Date: " + http_date(),
]
if etag is not None:
buf.append('ETag: ' + etag)
self.response_ready(header_list_to_file(buf))
def response_ready(self, header_file, output=None):
self.response_started = True
self.optimize_for_sending_packet()

View File

@ -8,10 +8,13 @@ __copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
import ssl, socket, select, os, traceback
from io import BytesIO
from Queue import Empty, Full
from functools import partial
from calibre import as_unicode
from calibre.ptempfile import TemporaryDirectory
from calibre.srv.errors import JobQueueFull
from calibre.srv.pool import ThreadPool
from calibre.srv.opts import Options
from calibre.srv.utils import (
socket_errors_socket_closed, socket_errors_nonblocking, HandleInterrupt,
@ -21,7 +24,7 @@ from calibre.utils.socket_inheritance import set_socket_inherit
from calibre.utils.logging import ThreadSafeLog
from calibre.utils.monotonic import monotonic
READ, WRITE, RDWR = 'READ', 'WRITE', 'RDWR'
READ, WRITE, RDWR, WAIT = 'READ', 'WRITE', 'RDWR', 'WAIT'
WAKEUP, JOB_DONE = bytes(bytearray(xrange(2)))
class ReadBuffer(object): # {{{
@ -113,8 +116,8 @@ class ReadBuffer(object): # {{{
class Connection(object): # {{{
def __init__(self, socket, opts, ssl_context, tdir, addr):
self.opts = opts
def __init__(self, socket, opts, ssl_context, tdir, addr, pool):
self.opts, self.pool = opts, pool
try:
self.remote_addr = addr[0]
self.remote_port = addr[1]
@ -234,6 +237,21 @@ class Connection(object): # {{{
except socket.error:
pass
def queue_job(self, func, *args):
if args:
func = partial(func, *args)
try:
self.pool.put_nowait(self.socket.fileno(), func)
except Full:
raise JobQueueFull()
self.set_state(WAIT, self._job_done)
def _job_done(self, event):
self.job_done(*event)
def job_done(self, ok, result):
raise NotImplementedError()
@property
def state_description(self):
return ''
@ -241,6 +259,9 @@ class Connection(object): # {{{
def report_unhandled_exception(self, e, formatted_traceback):
pass
def report_busy(self):
pass
def connection_ready(self):
raise NotImplementedError()
@ -286,6 +307,7 @@ class ServerLoop(object):
self.bind_address = self.pre_activated_socket.getsockname()
self.create_control_connection()
self.pool = ThreadPool(self.log, self.job_completed, count=self.opts.worker_count)
def create_control_connection(self):
self.control_in, self.control_out = create_sock_pair()
@ -343,6 +365,7 @@ class ServerLoop(object):
self.bound_address = ba = self.socket.getsockname()
if isinstance(ba, tuple):
ba = ':'.join(map(type(''), ba))
self.pool.start()
with TemporaryDirectory(prefix='srv-') as tdir:
self.tdir = tdir
self.ready = True
@ -351,13 +374,14 @@ class ServerLoop(object):
while self.ready:
try:
self.tick()
except (KeyboardInterrupt, SystemExit) as e:
except SystemExit:
self.shutdown()
if isinstance(e, SystemExit):
raise
raise
except KeyboardInterrupt:
break
except:
self.log.exception('Error in ServerLoop.tick')
self.shutdown()
def setup_socket(self):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@ -388,14 +412,17 @@ class ServerLoop(object):
read_needed, write_needed, readable, remove = [], [], [], []
for s, conn in self.connection_map.iteritems():
if now - conn.last_activity > self.opts.timeout:
if not conn.handle_timeout():
if conn.handle_timeout():
conn.last_activity = now
else:
remove.append((s, conn))
continue
if conn.wait_for is READ:
wf = conn.wait_for
if wf is READ:
(readable if conn.read_buffer.has_data else read_needed).append(s)
elif conn.wait_for is WRITE:
elif wf is WRITE:
write_needed.append(s)
else:
elif wf is RDWR:
write_needed.append(s)
(readable if conn.read_buffer.has_data else read_needed).append(s)
@ -434,10 +461,20 @@ class ServerLoop(object):
conn.handle_event(event)
if not conn.ready:
self.close(s, conn)
except JobQueueFull:
self.log.exception('Server busy handling request: ' % conn.state_description)
if conn.ready:
if conn.response_started:
self.close(s, conn)
else:
try:
conn.report_busy()
except Exception:
self.close(s, conn)
except Exception as e:
ignore.add(s)
self.log.exception('Unhandled exception in state: %s' % conn.state_description)
if conn.ready:
self.log.exception('Unhandled exception in state: %s' % conn.state_description)
if conn.response_started:
self.close(s, conn)
else:
@ -452,6 +489,19 @@ class ServerLoop(object):
def wakeup(self):
self.control_in.sendall(WAKEUP)
def job_completed(self):
self.control_in.sendall(JOB_DONE)
def dispatch_job_results(self):
while True:
try:
s, ok, result = self.pool.get_nowait()
except Empty:
break
conn = self.connection_map.get(s)
if conn is not None:
yield s, conn, (ok, result)
def close(self, s, conn):
self.connection_map.pop(s, None)
conn.close()
@ -465,7 +515,7 @@ class ServerLoop(object):
if sock is not None:
s = sock.fileno()
if s > -1:
self.connection_map[s] = conn = self.handler(sock, self.opts, self.ssl_context, self.tdir, addr)
self.connection_map[s] = conn = self.handler(sock, self.opts, self.ssl_context, self.tdir, addr, self.pool)
if self.ssl_context is not None:
yield s, conn, RDWR
elif s == control:
@ -478,7 +528,8 @@ class ServerLoop(object):
self.create_control_connection()
continue
if c == JOB_DONE:
pass
for s, conn, event in self.dispatch_job_results():
yield s, conn, event
elif c == WAKEUP:
pass
elif not c:
@ -510,6 +561,7 @@ class ServerLoop(object):
pass
for s, conn in tuple(self.connection_map.iteritems()):
self.close(s, conn)
self.pool.stop(self.opts.shutdown_timeout)
class EchoLine(Connection): # {{{

View File

@ -52,6 +52,10 @@ raw_options = (
'compress_min_size', 1024,
None,
'Number of worker threads to use to process requests',
'worker_count', 10,
None,
'Use zero copy file transfers for increased performance',
'use_sendfile', True,
'This will use zero-copy in-kernel transfers when sending files over the network,'

83
src/calibre/srv/pool.py Normal file
View File

@ -0,0 +1,83 @@
#!/usr/bin/env python2
# vim:fileencoding=utf-8
from __future__ import (unicode_literals, division, absolute_import,
print_function)
__license__ = 'GPL v3'
__copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
import sys, time
from Queue import Queue, Full
from threading import Thread
class Worker(Thread):
daemon = True
def __init__(self, log, notify_server, num, request_queue, result_queue):
self.request_queue, self.result_queue = request_queue, result_queue
self.notify_server = notify_server
self.log = log
self.working = False
Thread.__init__(self, name='ServerWorker%d' % num)
def run(self):
while True:
x = self.request_queue.get()
if x is None:
break
job_id, func = x
self.working = True
try:
result = func()
except Exception:
self.handle_error(job_id) # must be a separate function to avoid reference cycles with sys.exc_info()
else:
self.result_queue.put((job_id, True, result))
finally:
self.working = False
try:
self.notify_server()
except Exception:
self.log.exception('ServerWorker failed to notify server on job completion')
def handle_error(self, job_id):
self.result_queue.put((job_id, False, sys.exc_info()))
class ThreadPool(object):
def __init__(self, log, notify_server, count=10, queue_size=1000):
self.request_queue, self.result_queue = Queue(queue_size), Queue(queue_size)
self.workers = [Worker(log, notify_server, i, self.request_queue, self.result_queue) for i in xrange(count)]
def start(self):
for w in self.workers:
w.start()
def put_nowait(self, job_id, func):
self.request_queue.put_nowait((job_id, func))
def get_nowait(self):
return self.result_queue.get_nowait()
def stop(self, shutdown_timeout):
end = time.time() + shutdown_timeout
for w in self.workers:
try:
self.request_queue.put_nowait(None)
except Full:
break
for w in self.workers:
now = time.time()
if now >= end:
break
w.join(end - now)
self.workers = [w for w in self.workers if w.is_alive()]
@property
def busy(self):
return sum(int(w.working) for w in self.workers)
@property
def idle(self):
return sum(int(not w.working) for w in self.workers)

View File

@ -21,6 +21,14 @@ from calibre.ptempfile import TemporaryDirectory
class LoopTest(BaseTest):
def test_workers(self):
' Test worker semantics '
with TestServer(lambda data:(data.path[0] + data.read()), worker_count=3) as server:
self.ae(3, sum(int(w.is_alive()) for w in server.loop.pool.workers))
server.loop.stop()
server.join()
self.ae(0, sum(int(w.is_alive()) for w in server.loop.pool.workers))
@skipIf(create_server_cert is None, 'certgen module not available')
def test_ssl(self):
'Test serving over SSL'