Framework for jobs that run in transient worker processes

This commit is contained in:
Kovid Goyal 2016-02-26 17:43:29 +05:30
parent 83c93c8e09
commit 2aa8b189c3
6 changed files with 305 additions and 0 deletions

View File

@ -80,6 +80,7 @@ class Context(object):
log = None log = None
url_for = None url_for = None
jobs_manager = None
CATEGORY_CACHE_SIZE = 25 CATEGORY_CACHE_SIZE = 25
SEARCH_CACHE_SIZE = 100 SEARCH_CACHE_SIZE = 100
@ -92,6 +93,12 @@ class Context(object):
self.ignored_fields = frozenset(filter(None, (x.strip() for x in (opts.ignored_fields or '').split(',')))) self.ignored_fields = frozenset(filter(None, (x.strip() for x in (opts.ignored_fields or '').split(','))))
self.displayed_fields = frozenset(filter(None, (x.strip() for x in (opts.displayed_fields or '').split(',')))) self.displayed_fields = frozenset(filter(None, (x.strip() for x in (opts.displayed_fields or '').split(','))))
def start_job(self, name, module, func, args=(), kwargs=None):
return self.jobs_manager.start_job(name, module, func, args, kwargs)
def job_status(self, job_id):
return self.jobs_manager.job_status(job_id)
def is_field_displayable(self, field): def is_field_displayable(self, field):
if self.displayed_fields and field not in self.displayed_fields: if self.displayed_fields and field not in self.displayed_fields:
return False return False
@ -193,6 +200,9 @@ class Handler(object):
if self.auth_controller is not None: if self.auth_controller is not None:
self.auth_controller.log = log self.auth_controller.log = log
def set_jobs_manager(self, jobs_manager):
self.router.ctx.jobs_manager = jobs_manager
def close(self): def close(self):
self.router.ctx.library_broker.close() self.router.ctx.library_broker.close()

241
src/calibre/srv/jobs.py Normal file
View File

@ -0,0 +1,241 @@
#!/usr/bin/env python2
# vim:fileencoding=utf-8
# License: GPLv3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
from __future__ import (unicode_literals, division, absolute_import,
print_function)
import os, time
from itertools import count
from collections import namedtuple, deque
from functools import partial
from threading import RLock, Thread, Event
from Queue import Queue, Empty
from calibre import detect_ncpus, force_unicode
from calibre.utils.monotonic import monotonic
from calibre.utils.ipc.simple_worker import fork_job, WorkerError
StartEvent = namedtuple('StartEvent', 'job_id name module function args kwargs')
DoneEvent = namedtuple('DoneEvent', 'job_id')
class Job(Thread):
daemon = True
def __init__(self, start_event, events_queue):
Thread.__init__(self, name='JobsMonitor%s' % start_event.job_id)
self.abort_event = Event()
self.events_queue = events_queue
self.job_name = start_event.name
self.job_id = start_event.job_id
self.func = partial(fork_job, start_event.module, start_event.function, start_event.args, start_event.kwargs, abort=self.abort_event)
self.result = self.traceback = None
self.done = False
self.start_time = monotonic()
self.end_time = self.log_path = None
self.wait_for_end = Event()
self.start()
def run(self):
func, self.func = self.func, None
try:
result = func()
except WorkerError as err:
import traceback
self.traceback = err.orig_tb or traceback.format_exc()
else:
self.result, self.log_path = result['result'], result['stdout_stderr']
self.done = True
self.end_time = monotonic()
self.wait_for_end.set()
self.events_queue.put(DoneEvent(self.job_id))
@property
def was_aborted(self):
return self.done and self.result is None and self.abort_event.is_set()
def remove_log(self):
lp, self.log_path = self.log_path, None
if lp:
try:
os.remove(lp)
except EnvironmentError:
pass
def read_log(self):
ans = ''
if self.log_path is not None:
try:
with lopen(self.log_path, 'rb') as f:
ans = f.read()
except EnvironmentError:
pass
if isinstance(ans, bytes):
ans = force_unicode(ans, 'utf-8')
return ans
class JobsManager(object):
def __init__(self, opts, log):
mj = opts.max_jobs
if mj < 1:
mj = detect_ncpus()
self.log = log
self.max_jobs = max(1, mj)
self.max_job_time = max(0, opts.max_job_time * 60)
self.lock = RLock()
self.jobs = {}
self.finished_jobs = {}
self.events = Queue()
self.job_id = count()
self.waiting_job_ids = set()
self.waiting_jobs = deque()
self.max_block = None
self.shutting_down = False
self.event_loop = None
def start_job(self, name, module, func, args=(), kwargs=None):
with self.lock:
if self.shutting_down:
return None
if self.event_loop is None:
self.event_loop = t = Thread(name='JobsEventLoop', target=self.run)
t.daemon = True
t.start()
job_id = next(self.job_id)
self.events.put(StartEvent(job_id, name, module, func, args, kwargs or {}))
self.waiting_job_ids.add(job_id)
return job_id
def job_status(self, job_id):
with self.lock:
if not self.shutting_down:
if job_id in self.finished_jobs:
job = self.finished_jobs[job_id]
return 'finished', job.result, job.traceback, job.was_aborted
if job_id in self.jobs:
return 'running', None, None, None
if job_id in self.waiting_job_ids:
return 'waiting', None, None, None
return None, None, None, None
def abort_job(self, job_id):
job = self.jobs.get(job_id)
if job is not None:
job.abort_event.set()
def wait_for_running_job(self, job_id, timeout=None):
job = self.jobs.get(job_id)
if job is not None:
job.wait_for_end.wait(timeout)
if not job.done:
return False
while job_id not in self.finished_jobs:
time.sleep(0.001)
return True
def shutdown(self, timeout=5.0):
with self.lock:
self.shutting_down = True
for job in self.jobs.itervalues():
job.abort_event.set()
self.events.put(False)
def wait_for_shutdown(self, wait_till):
for job in self.jobs.itervalues():
delta = wait_till - monotonic()
if delta > 0:
job.join(delta)
if self.event_loop is not None:
delta = wait_till - monotonic()
if delta > 0:
self.event_loop.join(delta)
# Internal API {{{
def run(self):
while not self.shutting_down:
if self.max_block is None:
ev = self.events.get()
else:
try:
ev = self.events.get(block=True, timeout=self.max_block)
except Empty:
ev = None
if self.shutting_down:
break
if ev is None:
self.abort_hanging_jobs()
elif isinstance(ev, StartEvent):
self.waiting_jobs.append(ev)
self.start_waiting_jobs()
elif isinstance(ev, DoneEvent):
self.job_finished(ev.job_id)
elif ev is False:
break
def start_waiting_jobs(self):
with self.lock:
while self.waiting_jobs and len(self.jobs) < self.max_jobs:
ev = self.waiting_jobs.popleft()
self.jobs[ev.job_id] = Job(ev, self.events)
self.waiting_job_ids.discard(ev.job_id)
self.update_max_block()
def update_max_block(self):
with self.lock:
mb = None
now = monotonic()
for job in self.jobs.itervalues():
if not job.done and not job.abort_event.is_set():
delta = self.max_job_time - (now - job.start_time)
if delta <= 0:
self.max_block = 0
return
if mb is None:
mb = delta
else:
mb = min(mb, delta)
self.max_block = mb
def abort_hanging_jobs(self):
now = monotonic()
found = False
for job in self.jobs.itervalues():
if not job.done and not job.abort_event.is_set():
delta = self.max_job_time - (now - job.start_time)
if delta <= 0:
job.abort_event.set()
found = True
if found:
self.update_max_block()
def job_finished(self, job_id):
with self.lock:
self.finished_jobs[job_id] = job = self.jobs.pop(job_id)
self.update_max_block()
self.prune_finished_jobs()
if job.traceback and not job.was_aborted:
logdata = job.read_log()
self.log.error('The job: %s failed:\n%s\n%s' % (job.name, logdata, job.traceback))
job.remove_log()
self.start_waiting_jobs()
def prune_finished_jobs(self):
with self.lock:
remove = []
now = monotonic()
for job_id, job in self.finished_jobs.iteritems():
if now - job.end_time > 3600:
remove.append(job_id)
for job_id in remove:
del self.finished_jobs[job_id]
# }}}
def sleep_test(x):
time.sleep(x)
return x
def error_test():
raise Exception('a testing error')

View File

@ -16,6 +16,7 @@ from calibre.ptempfile import TemporaryDirectory
from calibre.srv.errors import JobQueueFull from calibre.srv.errors import JobQueueFull
from calibre.srv.pool import ThreadPool, PluginPool from calibre.srv.pool import ThreadPool, PluginPool
from calibre.srv.opts import Options from calibre.srv.opts import Options
from calibre.srv.jobs import JobsManager
from calibre.srv.utils import ( from calibre.srv.utils import (
socket_errors_socket_closed, socket_errors_nonblocking, HandleInterrupt, socket_errors_socket_closed, socket_errors_nonblocking, HandleInterrupt,
socket_errors_eintr, start_cork, stop_cork, DESIRED_SEND_BUFFER_SIZE, socket_errors_eintr, start_cork, stop_cork, DESIRED_SEND_BUFFER_SIZE,
@ -313,6 +314,7 @@ class ServerLoop(object):
self.handler = handler self.handler = handler
self.opts = opts or Options() self.opts = opts or Options()
self.log = log or ThreadSafeLog(level=ThreadSafeLog.DEBUG) self.log = log or ThreadSafeLog(level=ThreadSafeLog.DEBUG)
self.jobs_manager = JobsManager(self.opts, self.log)
self.access_log = access_log self.access_log = access_log
ba = (self.opts.listen_on, int(self.opts.port)) ba = (self.opts.listen_on, int(self.opts.port))
@ -607,6 +609,7 @@ class ServerLoop(object):
self.wakeup() self.wakeup()
def shutdown(self): def shutdown(self):
self.jobs_manager.shutdown()
try: try:
if getattr(self, 'socket', None): if getattr(self, 'socket', None):
self.socket.close() self.socket.close()
@ -620,6 +623,7 @@ class ServerLoop(object):
pool.stop(wait_till) pool.stop(wait_till)
if pool.workers: if pool.workers:
self.log.warn('Failed to shutdown %d workers in %s cleanly' % (len(pool.workers), pool.__class__.__name__)) self.log.warn('Failed to shutdown %d workers in %s cleanly' % (len(pool.workers), pool.__class__.__name__))
self.jobs_manager.wait_for_shutdown(wait_till)
class EchoLine(Connection): # {{{ class EchoLine(Connection): # {{{

View File

@ -57,6 +57,18 @@ raw_options = (
'worker_count', 10, 'worker_count', 10,
None, None,
_('Maximum number of worker processes'),
'max_jobs', 0,
_('Worker processes are launched as needed and used for large jobs such as preparing'
' a book for viewing, adding books, converting, etc. By default, the max.'
' number of such processes is based on the number of CPU cores. You can'
' control it by this setting.'),
_('Maximum time for worker processes'),
'max_job_time', 60,
_('Maximum amount of time worker processes are allowed to run (in minutes). Set'
' to zero for no limit.'),
_('The port on which to listen for connections'), _('The port on which to listen for connections'),
'port', 8080, 'port', 8080,
None, None,

View File

@ -69,6 +69,7 @@ class Server(object):
plugins.append(BonJour()) plugins.append(BonJour())
self.loop = ServerLoop(create_http_handler(self.handler.dispatch), opts=opts, log=log, access_log=access_log, plugins=plugins) self.loop = ServerLoop(create_http_handler(self.handler.dispatch), opts=opts, log=log, access_log=access_log, plugins=plugins)
self.handler.set_log(self.loop.log) self.handler.set_log(self.loop.log)
self.handler.set_jobs_manager(self.loop.jobs_manager)
self.serve_forever = self.loop.serve_forever self.serve_forever = self.loop.serve_forever
self.stop = self.loop.stop self.stop = self.loop.stop
_df = os.environ.get('CALIBRE_DEVELOP_FROM', None) _df = os.environ.get('CALIBRE_DEVELOP_FROM', None)

View File

@ -7,6 +7,7 @@ __license__ = 'GPL v3'
__copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>' __copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
import httplib, ssl, os, socket, time import httplib, ssl, os, socket, time
from collections import namedtuple
from unittest import skipIf from unittest import skipIf
from glob import glob from glob import glob
from threading import Event from threading import Event
@ -222,3 +223,39 @@ class LoopTest(BaseTest):
self.assertGreaterEqual(b, a) self.assertGreaterEqual(b, a)
self.assertGreaterEqual(b - a, 0.09) self.assertGreaterEqual(b - a, 0.09)
self.assertLessEqual(b - a, 0.2) self.assertLessEqual(b - a, 0.2)
def test_jobs_manager(self):
'Test the jobs manager'
from calibre.srv.jobs import JobsManager
O = namedtuple('O', 'max_jobs max_job_time')
class FakeLog(list):
def error(self, *args):
self.append(' '.join(args))
jm = JobsManager(O(1, 5), FakeLog())
job_id = jm.start_job('simple test', 'calibre.srv.jobs', 'sleep_test', args=(1.0,))
job_id2 = jm.start_job('t2', 'calibre.srv.jobs', 'sleep_test', args=(3,))
jid = jm.start_job('err test', 'calibre.srv.jobs', 'error_test')
status = jm.job_status(job_id)[0]
s = ('waiting', 'running')
self.assertIn(status, s)
status2 = jm.job_status(job_id2)[0]
self.assertEqual(status2, 'waiting')
while jm.job_status(job_id)[0] in s:
time.sleep(0.01)
status, result, tb, was_aborted = jm.job_status(job_id)
self.assertEqual(status, 'finished')
self.assertFalse(was_aborted)
self.assertFalse(tb)
self.assertEqual(result, 1.0)
status2 = jm.job_status(job_id2)[0]
time.sleep(0.01)
self.assertEqual(status2, 'running')
jm.abort_job(job_id2)
self.assertTrue(jm.wait_for_running_job(job_id2))
status, result, tb, was_aborted = jm.job_status(job_id2)
self.assertTrue(was_aborted)
self.assertTrue(jm.wait_for_running_job(jid))
status, result, tb, was_aborted = jm.job_status(jid)
self.assertTrue(tb), self.assertIn('a testing error', tb)
jm.start_job('simple test', 'calibre.srv.jobs', 'sleep_test', args=(1.0,))
jm.shutdown(), jm.wait_for_shutdown(monotonic() + 1)