From b95bcfbbe332a6826cce095be599fc2f2b7aee77 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Wed, 3 Jun 2015 18:27:45 +0530 Subject: [PATCH] Server loop: Add support for simple plugins whose lifecycle is tied to the loop's life cycle --- src/calibre/srv/loop.py | 11 ++++++++--- src/calibre/srv/pool.py | 34 ++++++++++++++++++++++++++++++++++ src/calibre/srv/tests/base.py | 3 ++- src/calibre/srv/tests/loop.py | 23 +++++++++++++++++++++++ 4 files changed, 67 insertions(+), 4 deletions(-) diff --git a/src/calibre/srv/loop.py b/src/calibre/srv/loop.py index 3f1f3d148e..c03ba90c89 100644 --- a/src/calibre/srv/loop.py +++ b/src/calibre/srv/loop.py @@ -6,7 +6,7 @@ from __future__ import (unicode_literals, division, absolute_import, __license__ = 'GPL v3' __copyright__ = '2015, Kovid Goyal ' -import ssl, socket, select, os, traceback +import ssl, socket, select, os, traceback, time from io import BytesIO from Queue import Empty, Full from functools import partial @@ -14,7 +14,7 @@ from functools import partial from calibre import as_unicode from calibre.ptempfile import TemporaryDirectory from calibre.srv.errors import JobQueueFull -from calibre.srv.pool import ThreadPool +from calibre.srv.pool import ThreadPool, PluginPool from calibre.srv.opts import Options from calibre.srv.utils import ( socket_errors_socket_closed, socket_errors_nonblocking, HandleInterrupt, @@ -275,6 +275,7 @@ class ServerLoop(object): self, handler, opts=None, + plugins=(), # A calibre logging object. If None, a default log that logs to # stdout is used log=None @@ -307,6 +308,7 @@ class ServerLoop(object): self.create_control_connection() self.pool = ThreadPool(self.log, self.job_completed, count=self.opts.worker_count) + self.plugin_pool = PluginPool(self, plugins) def create_control_connection(self): self.control_in, self.control_out = create_sock_pair() @@ -369,6 +371,7 @@ class ServerLoop(object): self.tdir = tdir self.ready = True self.log('calibre server listening on', ba) + self.plugin_pool.start() while self.ready: try: @@ -560,7 +563,9 @@ class ServerLoop(object): pass for s, conn in tuple(self.connection_map.iteritems()): self.close(s, conn) - self.pool.stop(self.opts.shutdown_timeout) + end = time.time() + self.opts.shutdown_timeout + for pool in (self.plugin_pool, self.pool): + pool.stop(max(0, end - time.time())) class EchoLine(Connection): # {{{ diff --git a/src/calibre/srv/pool.py b/src/calibre/srv/pool.py index b68a1b6deb..955993c9dd 100644 --- a/src/calibre/srv/pool.py +++ b/src/calibre/srv/pool.py @@ -81,3 +81,37 @@ class ThreadPool(object): @property def idle(self): return sum(int(not w.working) for w in self.workers) + +class PluginPool(object): + + def __init__(self, loop, plugins): + self.workers = [] + self.loop = loop + for plugin in plugins: + w = Thread(target=self.run_plugin, args=(plugin,), name=plugin.__class__.__name__) + w.daemon = True + w.plugin = plugin + self.workers.append(w) + + def run_plugin(self, plugin): + try: + plugin.start(self.loop) + except Exception: + self.loop.log.exception('Failed to start plugin: %s', plugin.__class__.__name__) + + def start(self): + for w in self.workers: + w.start() + + def stop(self, shutdown_timeout): + end = time.time() + shutdown_timeout + for w in self.workers: + if w.is_alive(): + w.plugin.stop() + for w in self.workers: + left = end - time.time() + if left > 0: + w.join(left) + else: + break + self.workers = [w for w in self.workers if w.is_alive()] diff --git a/src/calibre/srv/tests/base.py b/src/calibre/srv/tests/base.py index 2ea55cf60b..59528afce2 100644 --- a/src/calibre/srv/tests/base.py +++ b/src/calibre/srv/tests/base.py @@ -23,7 +23,7 @@ class TestServer(Thread): daemon = True - def __init__(self, handler, **kwargs): + def __init__(self, handler, plugins=(), **kwargs): Thread.__init__(self, name='ServerMain') from calibre.srv.opts import Options from calibre.srv.loop import ServerLoop @@ -34,6 +34,7 @@ class TestServer(Thread): self.loop = ServerLoop( create_http_handler(handler), opts=Options(**kwargs), + plugins=plugins, log=ServerLog(level=ServerLog.WARN), ) self.log = self.loop.log diff --git a/src/calibre/srv/tests/loop.py b/src/calibre/srv/tests/loop.py index b81d663fa1..5e884431f9 100644 --- a/src/calibre/srv/tests/loop.py +++ b/src/calibre/srv/tests/loop.py @@ -9,6 +9,7 @@ __copyright__ = '2015, Kovid Goyal ' import httplib, ssl, os, socket, time from unittest import skipIf from glob import glob +from threading import Event try: from calibre.utils.certgen import create_server_cert @@ -50,6 +51,28 @@ class LoopTest(BaseTest): self.assertLessEqual(log_size(), 100) self.ae(history(), {1,2,3,4,5}) + def test_plugins(self): + 'Test plugin semantics' + class Plugin(object): + def __init__(self): + self.running = Event() + self.event = Event() + self.port = None + def start(self, loop): + self.running.set() + self.port = loop.bound_address[1] + self.event.wait() + self.running.clear() + def stop(self): + self.event.set() + + plugin = Plugin() + with TestServer(lambda data:'xxx', plugins=(plugin,)) as server: + self.assertTrue(plugin.running.wait(0.2)) + self.ae(plugin.port, server.address[1]) + self.assertTrue(plugin.event.wait(0.2)) + self.assertFalse(plugin.running.is_set()) + def test_workers(self): ' Test worker semantics ' with TestServer(lambda data:(data.path[0] + data.read()), worker_count=3) as server: