Server loop: Add support for simple plugins whose lifecycle is tied to the loop's life cycle

This commit is contained in:
Kovid Goyal 2015-06-03 18:27:45 +05:30
parent f2ad28b964
commit b95bcfbbe3
4 changed files with 67 additions and 4 deletions

View File

@ -6,7 +6,7 @@ from __future__ import (unicode_literals, division, absolute_import,
__license__ = 'GPL v3'
__copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
import ssl, socket, select, os, traceback
import ssl, socket, select, os, traceback, time
from io import BytesIO
from Queue import Empty, Full
from functools import partial
@ -14,7 +14,7 @@ from functools import partial
from calibre import as_unicode
from calibre.ptempfile import TemporaryDirectory
from calibre.srv.errors import JobQueueFull
from calibre.srv.pool import ThreadPool
from calibre.srv.pool import ThreadPool, PluginPool
from calibre.srv.opts import Options
from calibre.srv.utils import (
socket_errors_socket_closed, socket_errors_nonblocking, HandleInterrupt,
@ -275,6 +275,7 @@ class ServerLoop(object):
self,
handler,
opts=None,
plugins=(),
# A calibre logging object. If None, a default log that logs to
# stdout is used
log=None
@ -307,6 +308,7 @@ class ServerLoop(object):
self.create_control_connection()
self.pool = ThreadPool(self.log, self.job_completed, count=self.opts.worker_count)
self.plugin_pool = PluginPool(self, plugins)
def create_control_connection(self):
self.control_in, self.control_out = create_sock_pair()
@ -369,6 +371,7 @@ class ServerLoop(object):
self.tdir = tdir
self.ready = True
self.log('calibre server listening on', ba)
self.plugin_pool.start()
while self.ready:
try:
@ -560,7 +563,9 @@ class ServerLoop(object):
pass
for s, conn in tuple(self.connection_map.iteritems()):
self.close(s, conn)
self.pool.stop(self.opts.shutdown_timeout)
end = time.time() + self.opts.shutdown_timeout
for pool in (self.plugin_pool, self.pool):
pool.stop(max(0, end - time.time()))
class EchoLine(Connection): # {{{

View File

@ -81,3 +81,37 @@ class ThreadPool(object):
@property
def idle(self):
return sum(int(not w.working) for w in self.workers)
class PluginPool(object):
def __init__(self, loop, plugins):
self.workers = []
self.loop = loop
for plugin in plugins:
w = Thread(target=self.run_plugin, args=(plugin,), name=plugin.__class__.__name__)
w.daemon = True
w.plugin = plugin
self.workers.append(w)
def run_plugin(self, plugin):
try:
plugin.start(self.loop)
except Exception:
self.loop.log.exception('Failed to start plugin: %s', plugin.__class__.__name__)
def start(self):
for w in self.workers:
w.start()
def stop(self, shutdown_timeout):
end = time.time() + shutdown_timeout
for w in self.workers:
if w.is_alive():
w.plugin.stop()
for w in self.workers:
left = end - time.time()
if left > 0:
w.join(left)
else:
break
self.workers = [w for w in self.workers if w.is_alive()]

View File

@ -23,7 +23,7 @@ class TestServer(Thread):
daemon = True
def __init__(self, handler, **kwargs):
def __init__(self, handler, plugins=(), **kwargs):
Thread.__init__(self, name='ServerMain')
from calibre.srv.opts import Options
from calibre.srv.loop import ServerLoop
@ -34,6 +34,7 @@ class TestServer(Thread):
self.loop = ServerLoop(
create_http_handler(handler),
opts=Options(**kwargs),
plugins=plugins,
log=ServerLog(level=ServerLog.WARN),
)
self.log = self.loop.log

View File

@ -9,6 +9,7 @@ __copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
import httplib, ssl, os, socket, time
from unittest import skipIf
from glob import glob
from threading import Event
try:
from calibre.utils.certgen import create_server_cert
@ -50,6 +51,28 @@ class LoopTest(BaseTest):
self.assertLessEqual(log_size(), 100)
self.ae(history(), {1,2,3,4,5})
def test_plugins(self):
'Test plugin semantics'
class Plugin(object):
def __init__(self):
self.running = Event()
self.event = Event()
self.port = None
def start(self, loop):
self.running.set()
self.port = loop.bound_address[1]
self.event.wait()
self.running.clear()
def stop(self):
self.event.set()
plugin = Plugin()
with TestServer(lambda data:'xxx', plugins=(plugin,)) as server:
self.assertTrue(plugin.running.wait(0.2))
self.ae(plugin.port, server.address[1])
self.assertTrue(plugin.event.wait(0.2))
self.assertFalse(plugin.running.is_set())
def test_workers(self):
' Test worker semantics '
with TestServer(lambda data:(data.path[0] + data.read()), worker_count=3) as server: