New framework for running I/O bound jobs in threads inthe calibre main process. Migrate email sending to the new framework.

This commit is contained in:
Kovid Goyal 2011-04-11 15:53:34 -06:00
parent 64dd32eaf5
commit 71967fd4ba
7 changed files with 410 additions and 203 deletions

View File

@ -1,7 +1,8 @@
<ui version="4.0" > <?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Dialog</class> <class>Dialog</class>
<widget class="QDialog" name="Dialog" > <widget class="QDialog" name="Dialog">
<property name="geometry" > <property name="geometry">
<rect> <rect>
<x>0</x> <x>0</x>
<y>0</y> <y>0</y>
@ -9,38 +10,41 @@
<height>462</height> <height>462</height>
</rect> </rect>
</property> </property>
<property name="windowTitle" > <property name="windowTitle">
<string>Details of job</string> <string>Details of job</string>
</property> </property>
<property name="windowIcon" > <property name="windowIcon">
<iconset resource="../../../../resources/images.qrc" > <iconset resource="../../../../resources/images.qrc">
<normaloff>:/images/view.png</normaloff>:/images/view.png</iconset> <normaloff>:/images/view.png</normaloff>:/images/view.png</iconset>
</property> </property>
<layout class="QGridLayout" name="gridLayout" > <layout class="QGridLayout" name="gridLayout">
<item row="0" column="0" > <item row="0" column="0">
<widget class="QPlainTextEdit" name="log" > <widget class="QPlainTextEdit" name="log">
<property name="undoRedoEnabled" > <property name="undoRedoEnabled">
<bool>false</bool> <bool>false</bool>
</property> </property>
<property name="lineWrapMode" > <property name="lineWrapMode">
<enum>QPlainTextEdit::NoWrap</enum> <enum>QPlainTextEdit::NoWrap</enum>
</property> </property>
<property name="readOnly" > <property name="readOnly">
<bool>true</bool> <bool>true</bool>
</property> </property>
</widget> </widget>
</item> </item>
<item row="1" column="0" > <item row="2" column="0">
<widget class="QDialogButtonBox" name="buttonBox" > <widget class="QDialogButtonBox" name="buttonBox">
<property name="standardButtons" > <property name="standardButtons">
<set>QDialogButtonBox::Ok</set> <set>QDialogButtonBox::Ok</set>
</property> </property>
</widget> </widget>
</item> </item>
<item row="1" column="0">
<widget class="QTextBrowser" name="tb"/>
</item>
</layout> </layout>
</widget> </widget>
<resources> <resources>
<include location="../../../../resources/images.qrc" /> <include location="../../../../resources/images.qrc"/>
</resources> </resources>
<connections> <connections>
<connection> <connection>
@ -49,11 +53,11 @@
<receiver>Dialog</receiver> <receiver>Dialog</receiver>
<slot>accept()</slot> <slot>accept()</slot>
<hints> <hints>
<hint type="sourcelabel" > <hint type="sourcelabel">
<x>617</x> <x>617</x>
<y>442</y> <y>442</y>
</hint> </hint>
<hint type="destinationlabel" > <hint type="destinationlabel">
<x>206</x> <x>206</x>
<y>-5</y> <y>-5</y>
</hint> </hint>

View File

@ -6,9 +6,7 @@ __license__ = 'GPL v3'
__copyright__ = '2010, Kovid Goyal <kovid@kovidgoyal.net>' __copyright__ = '2010, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en' __docformat__ = 'restructuredtext en'
import os, socket, time, cStringIO import os, socket, time
from threading import Thread
from Queue import Queue
from binascii import unhexlify from binascii import unhexlify
from functools import partial from functools import partial
from itertools import repeat from itertools import repeat
@ -16,67 +14,20 @@ from itertools import repeat
from calibre.utils.smtp import compose_mail, sendmail, extract_email_address, \ from calibre.utils.smtp import compose_mail, sendmail, extract_email_address, \
config as email_config config as email_config
from calibre.utils.filenames import ascii_filename from calibre.utils.filenames import ascii_filename
from calibre.utils.ipc.job import BaseJob
from calibre.ptempfile import PersistentTemporaryFile
from calibre.customize.ui import available_input_formats, available_output_formats from calibre.customize.ui import available_input_formats, available_output_formats
from calibre.ebooks.metadata import authors_to_string from calibre.ebooks.metadata import authors_to_string
from calibre.constants import preferred_encoding from calibre.constants import preferred_encoding
from calibre.gui2 import config, Dispatcher, warning_dialog from calibre.gui2 import config, Dispatcher, warning_dialog
from calibre.library.save_to_disk import get_components from calibre.library.save_to_disk import get_components
from calibre.utils.config import tweaks from calibre.utils.config import tweaks
from calibre.gui2.threaded_jobs import ThreadedJob
class EmailJob(BaseJob): # {{{ class Sendmail(object):
def __init__(self, callback, description, attachment, aname, to, subject, text, job_manager):
BaseJob.__init__(self, description)
self.exception = None
self.job_manager = job_manager
self.email_args = (attachment, aname, to, subject, text)
self.email_sent_callback = callback
self.log_path = None
self._log_file = cStringIO.StringIO()
self._log_file.write(self.description.encode('utf-8') + '\n')
@property
def log_file(self):
if self.log_path is not None:
return open(self.log_path, 'rb')
return cStringIO.StringIO(self._log_file.getvalue())
def start_work(self):
self.start_time = time.time()
self.job_manager.changed_queue.put(self)
def job_done(self):
self.duration = time.time() - self.start_time
self.percent = 1
# Dump log onto disk
lf = PersistentTemporaryFile('email_log')
lf.write(self._log_file.getvalue())
lf.close()
self.log_path = lf.name
self._log_file.close()
self._log_file = None
self.job_manager.changed_queue.put(self)
def log_write(self, what):
self._log_file.write(what)
# }}}
class Emailer(Thread): # {{{
MAX_RETRIES = 1 MAX_RETRIES = 1
def __init__(self, job_manager): def __init__(self):
Thread.__init__(self)
self.daemon = True
self.jobs = Queue()
self.job_manager = job_manager
self._run = True
self.calculate_rate_limit() self.calculate_rate_limit()
self.last_send_time = time.time() - self.rate_limit self.last_send_time = time.time() - self.rate_limit
def calculate_rate_limit(self): def calculate_rate_limit(self):
@ -87,70 +38,28 @@ class Emailer(Thread): # {{{
'gmail.com' in rh or 'live.com' in rh): 'gmail.com' in rh or 'live.com' in rh):
self.rate_limit = tweaks['public_smtp_relay_delay'] self.rate_limit = tweaks['public_smtp_relay_delay']
def stop(self): def __call__(self, attachment, aname, to, subject, text, log=None,
self._run = False abort=None, notifications=None):
self.jobs.put(None)
def run(self): try_count = 0
while self._run: while try_count <= self.MAX_RETRIES:
if try_count > 0:
log('\nRetrying in %d seconds...\n' %
self.rate_limit)
try: try:
job = self.jobs.get() self.sendmail(attachment, aname, to, subject, text, log)
try_count = self.MAX_RETRIES
log('Email successfully sent')
except: except:
break if abort.is_set():
if job is None or not self._run: return
break if try_count == self.MAX_RETRIES:
try_count = 0 raise
failed, exc = False, None log.exception('\nSending failed...\n')
job.start_work()
if job.kill_on_start:
job.log_write('Aborted\n')
job.failed = failed
job.killed = True
job.job_done()
continue
while try_count <= self.MAX_RETRIES: try_count += 1
failed = False
if try_count > 0:
job.log_write('\nRetrying in %d seconds...\n' %
self.rate_limit)
try:
self.sendmail(job)
break
except Exception as e:
if not self._run:
return
import traceback
failed = True
exc = e
job.log_write('\nSending failed...\n')
job.log_write(traceback.format_exc())
try_count += 1 def sendmail(self, attachment, aname, to, subject, text, log):
if not self._run:
break
job.failed = failed
job.exception = exc
job.job_done()
try:
job.email_sent_callback(job)
except:
import traceback
traceback.print_exc()
def send_mails(self, jobnames, callback, attachments, to_s, subjects,
texts, attachment_names):
for name, attachment, to, subject, text, aname in zip(jobnames,
attachments, to_s, subjects, texts, attachment_names):
description = _('Email %s to %s') % (name, to)
job = EmailJob(callback, description, attachment, aname, to,
subject, text, self.job_manager)
self.job_manager.add_job(job)
self.jobs.put(job)
def sendmail(self, job):
while time.time() - self.last_send_time <= self.rate_limit: while time.time() - self.last_send_time <= self.rate_limit:
time.sleep(1) time.sleep(1)
try: try:
@ -158,7 +67,6 @@ class Emailer(Thread): # {{{
from_ = opts.from_ from_ = opts.from_
if not from_: if not from_:
from_ = 'calibre <calibre@'+socket.getfqdn()+'>' from_ = 'calibre <calibre@'+socket.getfqdn()+'>'
attachment, aname, to, subject, text = job.email_args
msg = compose_mail(from_, to, text, subject, open(attachment, 'rb'), msg = compose_mail(from_, to, text, subject, open(attachment, 'rb'),
aname) aname)
efrom, eto = map(extract_email_address, (from_, to)) efrom, eto = map(extract_email_address, (from_, to))
@ -169,48 +77,56 @@ class Emailer(Thread): # {{{
username=opts.relay_username, username=opts.relay_username,
password=unhexlify(opts.relay_password), port=opts.relay_port, password=unhexlify(opts.relay_password), port=opts.relay_port,
encryption=opts.encryption, encryption=opts.encryption,
debug_output=partial(print, file=job._log_file)) debug_output=log.debug)
finally: finally:
self.last_send_time = time.time() self.last_send_time = time.time()
def email_news(self, mi, remove, get_fmts, done): gui_sendmail = Sendmail()
opts = email_config().parse()
accounts = [(account, [x.strip().lower() for x in x[0].split(',')])
for account, x in opts.accounts.items() if x[1]]
sent_mails = []
for i, x in enumerate(accounts):
account, fmts = x
files = get_fmts(fmts)
files = [f for f in files if f is not None]
if not files:
continue
attachment = files[0]
to_s = [account]
subjects = [_('News:')+' '+mi.title]
texts = [
_('Attached is the %s periodical downloaded by calibre.')
% (mi.title,)
]
attachment_names = [ascii_filename(mi.title)+os.path.splitext(attachment)[1]]
attachments = [attachment]
jobnames = [mi.title]
do_remove = []
if i == len(accounts) - 1:
do_remove = remove
self.send_mails(jobnames,
Dispatcher(partial(done, remove=do_remove)),
attachments, to_s, subjects, texts, attachment_names)
sent_mails.append(to_s[0])
return sent_mails
# }}} def send_mails(jobnames, callback, attachments, to_s, subjects,
texts, attachment_names, job_manager):
for name, attachment, to, subject, text, aname in zip(jobnames,
attachments, to_s, subjects, texts, attachment_names):
description = _('Email %s to %s') % (name, to)
job = ThreadedJob('email', description, gui_sendmail, (attachment, aname, to,
subject, text), {}, callback, killable=False)
job_manager.run_threaded_job(job)
def email_news(mi, remove, get_fmts, done, job_manager):
opts = email_config().parse()
accounts = [(account, [x.strip().lower() for x in x[0].split(',')])
for account, x in opts.accounts.items() if x[1]]
sent_mails = []
for i, x in enumerate(accounts):
account, fmts = x
files = get_fmts(fmts)
files = [f for f in files if f is not None]
if not files:
continue
attachment = files[0]
to_s = [account]
subjects = [_('News:')+' '+mi.title]
texts = [
_('Attached is the %s periodical downloaded by calibre.')
% (mi.title,)
]
attachment_names = [ascii_filename(mi.title)+os.path.splitext(attachment)[1]]
attachments = [attachment]
jobnames = [mi.title]
do_remove = []
if i == len(accounts) - 1:
do_remove = remove
send_mails(jobnames,
Dispatcher(partial(done, remove=do_remove)),
attachments, to_s, subjects, texts, attachment_names,
job_manager)
sent_mails.append(to_s[0])
return sent_mails
class EmailMixin(object): # {{{ class EmailMixin(object): # {{{
def __init__(self):
self.emailer = Emailer(self.job_manager)
def send_by_mail(self, to, fmts, delete_from_library, subject='', send_ids=None, def send_by_mail(self, to, fmts, delete_from_library, subject='', send_ids=None,
do_auto_convert=True, specific_format=None): do_auto_convert=True, specific_format=None):
ids = [self.library_view.model().id(r) for r in self.library_view.selectionModel().selectedRows()] if send_ids is None else send_ids ids = [self.library_view.model().id(r) for r in self.library_view.selectionModel().selectedRows()] if send_ids is None else send_ids
@ -246,8 +162,7 @@ class EmailMixin(object): # {{{
components = get_components(subject, mi, id) components = get_components(subject, mi, id)
if not components: if not components:
components = [mi.title] components = [mi.title]
subject = os.path.join(*components) subjects.append(os.path.join(*components))
subjects.append(subject)
a = authors_to_string(mi.authors if mi.authors else \ a = authors_to_string(mi.authors if mi.authors else \
[_('Unknown')]) [_('Unknown')])
texts.append(_('Attached, you will find the e-book') + \ texts.append(_('Attached, you will find the e-book') + \
@ -262,11 +177,10 @@ class EmailMixin(object): # {{{
to_s = list(repeat(to, len(attachments))) to_s = list(repeat(to, len(attachments)))
if attachments: if attachments:
if not self.emailer.is_alive(): send_mails(jobnames,
self.emailer.start()
self.emailer.send_mails(jobnames,
Dispatcher(partial(self.email_sent, remove=remove)), Dispatcher(partial(self.email_sent, remove=remove)),
attachments, to_s, subjects, texts, attachment_names) attachments, to_s, subjects, texts, attachment_names,
self.job_manager)
self.status_bar.show_message(_('Sending email to')+' '+to, 3000) self.status_bar.show_message(_('Sending email to')+' '+to, 3000)
auto = [] auto = []
@ -334,10 +248,8 @@ class EmailMixin(object): # {{{
files, auto = self.library_view.model().\ files, auto = self.library_view.model().\
get_preferred_formats_from_ids([id_], fmts) get_preferred_formats_from_ids([id_], fmts)
return files return files
if not self.emailer.is_alive(): sent_mails = email_news(mi, remove,
self.emailer.start() get_fmts, self.email_sent, self.job_manager)
sent_mails = self.emailer.email_news(mi, remove,
get_fmts, self.email_sent)
if sent_mails: if sent_mails:
self.status_bar.show_message(_('Sent news to')+' '+\ self.status_bar.show_message(_('Sent news to')+' '+\
', '.join(sent_mails), 3000) ', '.join(sent_mails), 3000)

View File

@ -8,14 +8,13 @@ Job management.
''' '''
import re import re
from Queue import Empty, Queue from Queue import Empty, Queue
from PyQt4.Qt import QAbstractTableModel, QVariant, QModelIndex, Qt, \ from PyQt4.Qt import (QAbstractTableModel, QVariant, QModelIndex, Qt,
QTimer, pyqtSignal, QIcon, QDialog, QAbstractItemDelegate, QApplication, \ QTimer, pyqtSignal, QIcon, QDialog, QAbstractItemDelegate, QApplication,
QSize, QStyleOptionProgressBarV2, QString, QStyle, QToolTip, QFrame, \ QSize, QStyleOptionProgressBarV2, QString, QStyle, QToolTip, QFrame,
QHBoxLayout, QVBoxLayout, QSizePolicy, QLabel, QCoreApplication, QAction, \ QHBoxLayout, QVBoxLayout, QSizePolicy, QLabel, QCoreApplication, QAction,
QByteArray QByteArray)
from calibre.utils.ipc.server import Server from calibre.utils.ipc.server import Server
from calibre.utils.ipc.job import ParallelJob from calibre.utils.ipc.job import ParallelJob
@ -25,8 +24,9 @@ from calibre.gui2.dialogs.jobs_ui import Ui_JobsDialog
from calibre import __appname__ from calibre import __appname__
from calibre.gui2.dialogs.job_view_ui import Ui_Dialog from calibre.gui2.dialogs.job_view_ui import Ui_Dialog
from calibre.gui2.progress_indicator import ProgressIndicator from calibre.gui2.progress_indicator import ProgressIndicator
from calibre.gui2.threaded_jobs import ThreadedJobServer, ThreadedJob
class JobManager(QAbstractTableModel): class JobManager(QAbstractTableModel): # {{{
job_added = pyqtSignal(int) job_added = pyqtSignal(int)
job_done = pyqtSignal(int) job_done = pyqtSignal(int)
@ -42,6 +42,7 @@ class JobManager(QAbstractTableModel):
self.add_job = Dispatcher(self._add_job) self.add_job = Dispatcher(self._add_job)
self.server = Server(limit=int(config['worker_limit']/2.0), self.server = Server(limit=int(config['worker_limit']/2.0),
enforce_cpu_limit=config['enforce_cpu_limit']) enforce_cpu_limit=config['enforce_cpu_limit'])
self.threaded_server = ThreadedJobServer()
self.changed_queue = Queue() self.changed_queue = Queue()
self.timer = QTimer(self) self.timer = QTimer(self)
@ -146,12 +147,21 @@ class JobManager(QAbstractTableModel):
jobs.add(self.server.changed_jobs_queue.get_nowait()) jobs.add(self.server.changed_jobs_queue.get_nowait())
except Empty: except Empty:
break break
# Update device jobs
while True: while True:
try: try:
jobs.add(self.changed_queue.get_nowait()) jobs.add(self.changed_queue.get_nowait())
except Empty: except Empty:
break break
# Update threaded jobs
while True:
try:
jobs.add(self.threaded_server.changed_jobs.get_nowait())
except Empty:
break
if jobs: if jobs:
needs_reset = False needs_reset = False
for job in jobs: for job in jobs:
@ -207,11 +217,22 @@ class JobManager(QAbstractTableModel):
self.server.add_job(job) self.server.add_job(job)
return job return job
def run_threaded_job(self, job):
self.add_job(job)
self.threaded_server.add_job(job)
def launch_gui_app(self, name, args=[], kwargs={}, description=''): def launch_gui_app(self, name, args=[], kwargs={}, description=''):
job = ParallelJob(name, description, lambda x: x, job = ParallelJob(name, description, lambda x: x,
args=args, kwargs=kwargs) args=args, kwargs=kwargs)
self.server.run_job(job, gui=True, redirect_output=False) self.server.run_job(job, gui=True, redirect_output=False)
def _kill_job(self, job):
if isinstance(job, ParallelJob):
self.server.kill_job(job)
elif isinstance(job, ThreadedJob):
self.threaded_server.kill_job(job)
else:
job.kill_on_start = True
def kill_job(self, row, view): def kill_job(self, row, view):
job = self.jobs[row] job = self.jobs[row]
@ -221,29 +242,29 @@ class JobManager(QAbstractTableModel):
if job.duration is not None: if job.duration is not None:
return error_dialog(view, _('Cannot kill job'), return error_dialog(view, _('Cannot kill job'),
_('Job has already run')).exec_() _('Job has already run')).exec_()
if isinstance(job, ParallelJob): if not getattr(job, 'killable', True):
self.server.kill_job(job) return error_dialog(view, _('Cannot kill job'),
else: _('This job cannot be stopped'), show=True)
job.kill_on_start = True self._kill_job(job)
def kill_all_jobs(self): def kill_all_jobs(self):
for job in self.jobs: for job in self.jobs:
if isinstance(job, DeviceJob) or job.duration is not None: if (isinstance(job, DeviceJob) or job.duration is not None or
not getattr(job, 'killable', True)):
continue continue
if isinstance(job, ParallelJob): self._kill_job(job)
self.server.kill_job(job)
else:
job.kill_on_start = True
def terminate_all_jobs(self): def terminate_all_jobs(self):
self.server.killall() self.server.killall()
for job in self.jobs: for job in self.jobs:
if isinstance(job, DeviceJob) or job.duration is not None: if (isinstance(job, DeviceJob) or job.duration is not None or
not getattr(job, 'killable', True)):
continue continue
if not isinstance(job, ParallelJob): if not isinstance(job, ParallelJob):
job.kill_on_start = True self._kill_job(job)
# }}}
# Jobs UI {{{
class ProgressBarDelegate(QAbstractItemDelegate): class ProgressBarDelegate(QAbstractItemDelegate):
def sizeHint(self, option, index): def sizeHint(self, option, index):
@ -269,6 +290,11 @@ class DetailView(QDialog, Ui_Dialog):
self.setupUi(self) self.setupUi(self)
self.setWindowTitle(job.description) self.setWindowTitle(job.description)
self.job = job self.job = job
self.html_view = hasattr(job, 'html_details')
if self.html_view:
self.log.setVisible(False)
else:
self.tb.setVisible(False)
self.next_pos = 0 self.next_pos = 0
self.update() self.update()
self.timer = QTimer(self) self.timer = QTimer(self)
@ -277,12 +303,19 @@ class DetailView(QDialog, Ui_Dialog):
def update(self): def update(self):
f = self.job.log_file if self.html_view:
f.seek(self.next_pos) html = self.job.html_details
more = f.read() if len(html) > self.next_pos:
self.next_pos = f.tell() self.next_pos = len(html)
if more: self.tb.setHtml(
self.log.appendPlainText(more.decode('utf-8', 'replace')) '<pre style="font-family:monospace">%s</pre>'%html)
else:
f = self.job.log_file
f.seek(self.next_pos)
more = f.read()
self.next_pos = f.tell()
if more:
self.log.appendPlainText(more.decode('utf-8', 'replace'))
class JobsButton(QFrame): class JobsButton(QFrame):
@ -441,3 +474,5 @@ class JobsDialog(QDialog, Ui_JobsDialog):
def hide(self, *args): def hide(self, *args):
self.save_state() self.save_state()
return QDialog.hide(self, *args) return QDialog.hide(self, *args)
# }}}

View File

@ -0,0 +1,11 @@
#!/usr/bin/env python
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import (unicode_literals, division, absolute_import,
print_function)
__license__ = 'GPL v3'
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'

View File

@ -0,0 +1,238 @@
#!/usr/bin/env python
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import (unicode_literals, division, absolute_import,
print_function)
__license__ = 'GPL v3'
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import os, time, tempfile, json
from threading import Thread, RLock, Event
from Queue import Queue
from calibre.utils.ipc.job import BaseJob
from calibre.utils.logging import GUILog
from calibre.ptempfile import base_dir
class ThreadedJob(BaseJob):
def __init__(self,
type_, description,
func, args, kwargs,
callback,
max_concurrent_count=1,
killable=True,
log=None):
'''
A job that is run in its own thread in the calibre main process
:param type_: The type of this job (a string). The type is used in
conjunction with max_concurrent_count to prevent too many jobs of the
same type from running
:param description: A user viewable job description
:func: The function that actually does the work. This function *must*
accept at least three keyword arguments: abort, log and notifications. abort is
An Event object. func should periodically check abort.is_set(0 and if
it is True, it should stop processing as soon as possible. notifications
is a Queue. func should put progress notifications into it in the form
of a tuple (frac, msg). frac is a number between 0 and 1 indicating
progress and msg is a string describing the progress. log is a Log
object which func should use for all debugging output. func should
raise an Exception to indicate failure. This exception is stored in
job.exception and can thus be used to pass arbitrary information to
callback.
:param args,kwargs: These are passed to func when it is called
:param callback: A callable that is called on completion of this job.
Note that it is not called if the user kills the job. Check job.failed
to see if the job succeeded or not. And use job.log to get the job log.
:param killable: If False the GUI wont let the user kill this job
:param log: Must be a subclass of GUILog or None. If None a default
GUILog is created.
'''
BaseJob.__init__(self, description)
self.type = type_
self.max_concurrent_count = max_concurrent_count
self.killable = killable
self.callback = callback
self.abort = Event()
self.exception = None
kwargs['notifications'] = self.notifications
kwargs['abort'] = self.abort
self.log = GUILog() if log is None else log
kwargs['log'] = self.log
self.func, self.args, self.kwargs = func, args, kwargs
self.consolidated_log = None
def start_work(self):
self.start_time = time.time()
self.log('Starting job:', self.description)
try:
self.result = self.func(*self.args, **self.kwargs)
except Exception as e:
self.exception = e
self.failed = True
self.log.exception('Job: "%s" failed with error:'%self.description)
self.log.debug('Called with args:', self.args, self.kwargs)
self.duration = time.time() - self.start_time
try:
self.callback(self)
except:
pass
self._cleanup()
def _cleanup(self):
try:
self.consolidate_log()
except:
self.log.exception('Log consolidation failed')
# No need to keep references to these around anymore
self.func = self.args = self.kwargs = self.notifications = None
def kill(self):
if self.start_time is None:
self.start_time = time.time()
self.duration = 0.0001
else:
self.duration = time.time() - self.start_time()
self.abort.set()
self.log('Aborted job:', self.description)
self.killed = True
self.failed = True
self._cleanup()
def consolidate_log(self):
logs = [self.log.html, self.log.plain_text]
bdir = base_dir()
log_dir = os.path.join(bdir, 'threaded_job_logs')
if not os.path.exists(log_dir):
os.makedirs(log_dir)
fd, path = tempfile.mkstemp(suffix='.json', prefix='log-', dir=log_dir)
with os.fdopen(fd, 'wb') as f:
f.write(json.dumps(logs, ensure_ascii=False,
indent=2).encode('utf-8'))
self.consolidated_log = path
self.log = None
def read_consolidated_log(self):
with open(self.consolidated_log, 'rb') as f:
return json.loads(f.read().decode('utf-8'))
@property
def details(self):
if self.consolidated_log is None:
return self.log.plain_text
return self.read_consolidated_log()[1]
@property
def html_details(self):
if self.consolidated_log is None:
return self.log.html
return self.read_consolidated_log()[0]
class ThreadedJobWorker(Thread):
def __init__(self, job):
Thread.__init__(self)
self.daemon = True
self.job = job
def run(self):
try:
self.job.start_work()
except:
import traceback
from calibre import prints
prints('Job had unhandled exception:', self.job.description)
traceback.print_exc()
class ThreadedJobServer(Thread):
def __init__(self):
Thread.__init__(self)
self.daemon = True
self.lock = RLock()
self.queued_jobs = []
self.running_jobs = set()
self.changed_jobs = Queue()
self.keep_going = True
def close(self):
self.keep_going = False
def add_job(self, job):
with self.lock:
self.queued_jobs.append(job)
if not self.is_alive():
self.start()
def run(self):
while self.keep_going:
self.run_once()
time.sleep(0.1)
def run_once(self):
with self.lock:
remove = set()
for worker in self.running_jobs:
if worker.is_alive():
# Get progress notifications
if worker.job.consume_notifications():
self.changed_jobs.put(worker.job)
else:
remove.add(worker)
self.changed_jobs.put(worker.job)
for worker in remove:
self.running_jobs.remove(worker)
jobs = self.get_startable_jobs()
for job in jobs:
w = ThreadedJobWorker(job)
w.start()
self.running_jobs.add(w)
self.changed_jobs.put(job)
self.queued_jobs.remove(job)
def kill_job(self, job):
with self.lock:
if job in self.queued_jobs:
self.queued_jobs.remove(job)
elif job in self.running_jobs:
self.running_jobs.remove(job)
job.kill()
self.changed_jobs.put(job)
def running_jobs_of_type(self, type_):
return len([w for w in self.running_jobs if w.job.type == type_])
def get_startable_jobs(self):
queued_types = []
ans = []
for job in self.queued_jobs:
num = self.running_jobs_of_type(job.type)
num += queued_types.count(job.type)
if num < job.max_concurrent_count:
queued_types.append(job.type)
ans.append(job)
return ans

View File

@ -608,6 +608,7 @@ class Main(MainWindow, MainWindowMixin, DeviceMixin, EmailMixin, # {{{
self.update_checker.terminate() self.update_checker.terminate()
self.listener.close() self.listener.close()
self.job_manager.server.close() self.job_manager.server.close()
self.job_manager.threaded_server.close()
while self.spare_servers: while self.spare_servers:
self.spare_servers.pop().close() self.spare_servers.pop().close()
self.device_manager.keep_going = False self.device_manager.keep_going = False
@ -616,8 +617,6 @@ class Main(MainWindow, MainWindowMixin, DeviceMixin, EmailMixin, # {{{
mb.stop() mb.stop()
self.hide_windows() self.hide_windows()
if self.emailer.is_alive():
self.emailer.stop()
try: try:
try: try:
if self.content_server is not None: if self.content_server is not None:

View File

@ -75,12 +75,20 @@ class BaseJob(object):
self._run_state = self.RUNNING self._run_state = self.RUNNING
self._status_text = _('Working...') self._status_text = _('Working...')
while consume_notifications: if consume_notifications:
return self.consume_notifications()
return False
def consume_notifications(self):
got_notification = False
while self.notifications is not None:
try: try:
self.percent, self._message = self.notifications.get_nowait() self.percent, self._message = self.notifications.get_nowait()
self.percent *= 100. self.percent *= 100.
got_notification = True
except Empty: except Empty:
break break
return got_notification
@property @property
def status_text(self): def status_text(self):