diff --git a/src/calibre/srv/handler.py b/src/calibre/srv/handler.py index 595310c93f..b7630a148e 100644 --- a/src/calibre/srv/handler.py +++ b/src/calibre/srv/handler.py @@ -80,6 +80,7 @@ class Context(object): log = None url_for = None + jobs_manager = None CATEGORY_CACHE_SIZE = 25 SEARCH_CACHE_SIZE = 100 @@ -92,6 +93,12 @@ class Context(object): self.ignored_fields = frozenset(filter(None, (x.strip() for x in (opts.ignored_fields or '').split(',')))) self.displayed_fields = frozenset(filter(None, (x.strip() for x in (opts.displayed_fields or '').split(',')))) + def start_job(self, name, module, func, args=(), kwargs=None): + return self.jobs_manager.start_job(name, module, func, args, kwargs) + + def job_status(self, job_id): + return self.jobs_manager.job_status(job_id) + def is_field_displayable(self, field): if self.displayed_fields and field not in self.displayed_fields: return False @@ -193,6 +200,9 @@ class Handler(object): if self.auth_controller is not None: self.auth_controller.log = log + def set_jobs_manager(self, jobs_manager): + self.router.ctx.jobs_manager = jobs_manager + def close(self): self.router.ctx.library_broker.close() diff --git a/src/calibre/srv/jobs.py b/src/calibre/srv/jobs.py new file mode 100644 index 0000000000..996b1905ea --- /dev/null +++ b/src/calibre/srv/jobs.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python2 +# vim:fileencoding=utf-8 +# License: GPLv3 Copyright: 2016, Kovid Goyal + +from __future__ import (unicode_literals, division, absolute_import, + print_function) +import os, time +from itertools import count +from collections import namedtuple, deque +from functools import partial +from threading import RLock, Thread, Event +from Queue import Queue, Empty + +from calibre import detect_ncpus, force_unicode +from calibre.utils.monotonic import monotonic +from calibre.utils.ipc.simple_worker import fork_job, WorkerError + +StartEvent = namedtuple('StartEvent', 'job_id name module function args kwargs') +DoneEvent = namedtuple('DoneEvent', 'job_id') + +class Job(Thread): + + daemon = True + + def __init__(self, start_event, events_queue): + Thread.__init__(self, name='JobsMonitor%s' % start_event.job_id) + self.abort_event = Event() + self.events_queue = events_queue + self.job_name = start_event.name + self.job_id = start_event.job_id + self.func = partial(fork_job, start_event.module, start_event.function, start_event.args, start_event.kwargs, abort=self.abort_event) + self.result = self.traceback = None + self.done = False + self.start_time = monotonic() + self.end_time = self.log_path = None + self.wait_for_end = Event() + self.start() + + def run(self): + func, self.func = self.func, None + try: + result = func() + except WorkerError as err: + import traceback + self.traceback = err.orig_tb or traceback.format_exc() + else: + self.result, self.log_path = result['result'], result['stdout_stderr'] + self.done = True + self.end_time = monotonic() + self.wait_for_end.set() + self.events_queue.put(DoneEvent(self.job_id)) + + @property + def was_aborted(self): + return self.done and self.result is None and self.abort_event.is_set() + + def remove_log(self): + lp, self.log_path = self.log_path, None + if lp: + try: + os.remove(lp) + except EnvironmentError: + pass + + def read_log(self): + ans = '' + if self.log_path is not None: + try: + with lopen(self.log_path, 'rb') as f: + ans = f.read() + except EnvironmentError: + pass + if isinstance(ans, bytes): + ans = force_unicode(ans, 'utf-8') + return ans + + +class JobsManager(object): + + def __init__(self, opts, log): + mj = opts.max_jobs + if mj < 1: + mj = detect_ncpus() + self.log = log + self.max_jobs = max(1, mj) + self.max_job_time = max(0, opts.max_job_time * 60) + self.lock = RLock() + self.jobs = {} + self.finished_jobs = {} + self.events = Queue() + self.job_id = count() + self.waiting_job_ids = set() + self.waiting_jobs = deque() + self.max_block = None + self.shutting_down = False + self.event_loop = None + + def start_job(self, name, module, func, args=(), kwargs=None): + with self.lock: + if self.shutting_down: + return None + if self.event_loop is None: + self.event_loop = t = Thread(name='JobsEventLoop', target=self.run) + t.daemon = True + t.start() + job_id = next(self.job_id) + self.events.put(StartEvent(job_id, name, module, func, args, kwargs or {})) + self.waiting_job_ids.add(job_id) + return job_id + + def job_status(self, job_id): + with self.lock: + if not self.shutting_down: + if job_id in self.finished_jobs: + job = self.finished_jobs[job_id] + return 'finished', job.result, job.traceback, job.was_aborted + if job_id in self.jobs: + return 'running', None, None, None + if job_id in self.waiting_job_ids: + return 'waiting', None, None, None + return None, None, None, None + + def abort_job(self, job_id): + job = self.jobs.get(job_id) + if job is not None: + job.abort_event.set() + + def wait_for_running_job(self, job_id, timeout=None): + job = self.jobs.get(job_id) + if job is not None: + job.wait_for_end.wait(timeout) + if not job.done: + return False + while job_id not in self.finished_jobs: + time.sleep(0.001) + return True + + def shutdown(self, timeout=5.0): + with self.lock: + self.shutting_down = True + for job in self.jobs.itervalues(): + job.abort_event.set() + self.events.put(False) + + def wait_for_shutdown(self, wait_till): + for job in self.jobs.itervalues(): + delta = wait_till - monotonic() + if delta > 0: + job.join(delta) + if self.event_loop is not None: + delta = wait_till - monotonic() + if delta > 0: + self.event_loop.join(delta) + + # Internal API {{{ + + def run(self): + while not self.shutting_down: + if self.max_block is None: + ev = self.events.get() + else: + try: + ev = self.events.get(block=True, timeout=self.max_block) + except Empty: + ev = None + if self.shutting_down: + break + if ev is None: + self.abort_hanging_jobs() + elif isinstance(ev, StartEvent): + self.waiting_jobs.append(ev) + self.start_waiting_jobs() + elif isinstance(ev, DoneEvent): + self.job_finished(ev.job_id) + elif ev is False: + break + + def start_waiting_jobs(self): + with self.lock: + while self.waiting_jobs and len(self.jobs) < self.max_jobs: + ev = self.waiting_jobs.popleft() + self.jobs[ev.job_id] = Job(ev, self.events) + self.waiting_job_ids.discard(ev.job_id) + self.update_max_block() + + def update_max_block(self): + with self.lock: + mb = None + now = monotonic() + for job in self.jobs.itervalues(): + if not job.done and not job.abort_event.is_set(): + delta = self.max_job_time - (now - job.start_time) + if delta <= 0: + self.max_block = 0 + return + if mb is None: + mb = delta + else: + mb = min(mb, delta) + self.max_block = mb + + def abort_hanging_jobs(self): + now = monotonic() + found = False + for job in self.jobs.itervalues(): + if not job.done and not job.abort_event.is_set(): + delta = self.max_job_time - (now - job.start_time) + if delta <= 0: + job.abort_event.set() + found = True + if found: + self.update_max_block() + + def job_finished(self, job_id): + with self.lock: + self.finished_jobs[job_id] = job = self.jobs.pop(job_id) + self.update_max_block() + self.prune_finished_jobs() + if job.traceback and not job.was_aborted: + logdata = job.read_log() + self.log.error('The job: %s failed:\n%s\n%s' % (job.name, logdata, job.traceback)) + job.remove_log() + self.start_waiting_jobs() + + def prune_finished_jobs(self): + with self.lock: + remove = [] + now = monotonic() + for job_id, job in self.finished_jobs.iteritems(): + if now - job.end_time > 3600: + remove.append(job_id) + for job_id in remove: + del self.finished_jobs[job_id] + # }}} + +def sleep_test(x): + time.sleep(x) + return x + +def error_test(): + raise Exception('a testing error') diff --git a/src/calibre/srv/loop.py b/src/calibre/srv/loop.py index 536ed1b1c8..ac30c38938 100644 --- a/src/calibre/srv/loop.py +++ b/src/calibre/srv/loop.py @@ -16,6 +16,7 @@ from calibre.ptempfile import TemporaryDirectory from calibre.srv.errors import JobQueueFull from calibre.srv.pool import ThreadPool, PluginPool from calibre.srv.opts import Options +from calibre.srv.jobs import JobsManager from calibre.srv.utils import ( socket_errors_socket_closed, socket_errors_nonblocking, HandleInterrupt, socket_errors_eintr, start_cork, stop_cork, DESIRED_SEND_BUFFER_SIZE, @@ -313,6 +314,7 @@ class ServerLoop(object): self.handler = handler self.opts = opts or Options() self.log = log or ThreadSafeLog(level=ThreadSafeLog.DEBUG) + self.jobs_manager = JobsManager(self.opts, self.log) self.access_log = access_log ba = (self.opts.listen_on, int(self.opts.port)) @@ -607,6 +609,7 @@ class ServerLoop(object): self.wakeup() def shutdown(self): + self.jobs_manager.shutdown() try: if getattr(self, 'socket', None): self.socket.close() @@ -620,6 +623,7 @@ class ServerLoop(object): pool.stop(wait_till) if pool.workers: self.log.warn('Failed to shutdown %d workers in %s cleanly' % (len(pool.workers), pool.__class__.__name__)) + self.jobs_manager.wait_for_shutdown(wait_till) class EchoLine(Connection): # {{{ diff --git a/src/calibre/srv/opts.py b/src/calibre/srv/opts.py index b2c5802523..0439c13c69 100644 --- a/src/calibre/srv/opts.py +++ b/src/calibre/srv/opts.py @@ -57,6 +57,18 @@ raw_options = ( 'worker_count', 10, None, + _('Maximum number of worker processes'), + 'max_jobs', 0, + _('Worker processes are launched as needed and used for large jobs such as preparing' + ' a book for viewing, adding books, converting, etc. By default, the max.' + ' number of such processes is based on the number of CPU cores. You can' + ' control it by this setting.'), + + _('Maximum time for worker processes'), + 'max_job_time', 60, + _('Maximum amount of time worker processes are allowed to run (in minutes). Set' + ' to zero for no limit.'), + _('The port on which to listen for connections'), 'port', 8080, None, diff --git a/src/calibre/srv/standalone.py b/src/calibre/srv/standalone.py index 408bd91852..4757be6066 100644 --- a/src/calibre/srv/standalone.py +++ b/src/calibre/srv/standalone.py @@ -69,6 +69,7 @@ class Server(object): plugins.append(BonJour()) self.loop = ServerLoop(create_http_handler(self.handler.dispatch), opts=opts, log=log, access_log=access_log, plugins=plugins) self.handler.set_log(self.loop.log) + self.handler.set_jobs_manager(self.loop.jobs_manager) self.serve_forever = self.loop.serve_forever self.stop = self.loop.stop _df = os.environ.get('CALIBRE_DEVELOP_FROM', None) diff --git a/src/calibre/srv/tests/loop.py b/src/calibre/srv/tests/loop.py index 820d0a4964..7585f27b51 100644 --- a/src/calibre/srv/tests/loop.py +++ b/src/calibre/srv/tests/loop.py @@ -7,6 +7,7 @@ __license__ = 'GPL v3' __copyright__ = '2015, Kovid Goyal ' import httplib, ssl, os, socket, time +from collections import namedtuple from unittest import skipIf from glob import glob from threading import Event @@ -222,3 +223,39 @@ class LoopTest(BaseTest): self.assertGreaterEqual(b, a) self.assertGreaterEqual(b - a, 0.09) self.assertLessEqual(b - a, 0.2) + + def test_jobs_manager(self): + 'Test the jobs manager' + from calibre.srv.jobs import JobsManager + O = namedtuple('O', 'max_jobs max_job_time') + class FakeLog(list): + def error(self, *args): + self.append(' '.join(args)) + jm = JobsManager(O(1, 5), FakeLog()) + job_id = jm.start_job('simple test', 'calibre.srv.jobs', 'sleep_test', args=(1.0,)) + job_id2 = jm.start_job('t2', 'calibre.srv.jobs', 'sleep_test', args=(3,)) + jid = jm.start_job('err test', 'calibre.srv.jobs', 'error_test') + status = jm.job_status(job_id)[0] + s = ('waiting', 'running') + self.assertIn(status, s) + status2 = jm.job_status(job_id2)[0] + self.assertEqual(status2, 'waiting') + while jm.job_status(job_id)[0] in s: + time.sleep(0.01) + status, result, tb, was_aborted = jm.job_status(job_id) + self.assertEqual(status, 'finished') + self.assertFalse(was_aborted) + self.assertFalse(tb) + self.assertEqual(result, 1.0) + status2 = jm.job_status(job_id2)[0] + time.sleep(0.01) + self.assertEqual(status2, 'running') + jm.abort_job(job_id2) + self.assertTrue(jm.wait_for_running_job(job_id2)) + status, result, tb, was_aborted = jm.job_status(job_id2) + self.assertTrue(was_aborted) + self.assertTrue(jm.wait_for_running_job(jid)) + status, result, tb, was_aborted = jm.job_status(jid) + self.assertTrue(tb), self.assertIn('a testing error', tb) + jm.start_job('simple test', 'calibre.srv.jobs', 'sleep_test', args=(1.0,)) + jm.shutdown(), jm.wait_for_shutdown(monotonic() + 1)