From 71967fd4ba173c5ad9c82f60a166b881fe29809d Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Mon, 11 Apr 2011 15:53:34 -0600 Subject: [PATCH] New framework for running I/O bound jobs in threads inthe calibre main process. Migrate email sending to the new framework. --- src/calibre/gui2/dialogs/job_view.ui | 40 ++-- src/calibre/gui2/email.py | 226 ++++++------------- src/calibre/gui2/jobs.py | 85 +++++-- src/calibre/gui2/metadata/bulk_download2.py | 11 + src/calibre/gui2/threaded_jobs.py | 238 ++++++++++++++++++++ src/calibre/gui2/ui.py | 3 +- src/calibre/utils/ipc/job.py | 10 +- 7 files changed, 410 insertions(+), 203 deletions(-) create mode 100644 src/calibre/gui2/metadata/bulk_download2.py create mode 100644 src/calibre/gui2/threaded_jobs.py diff --git a/src/calibre/gui2/dialogs/job_view.ui b/src/calibre/gui2/dialogs/job_view.ui index 8b54c23573..1e854c0f29 100644 --- a/src/calibre/gui2/dialogs/job_view.ui +++ b/src/calibre/gui2/dialogs/job_view.ui @@ -1,7 +1,8 @@ - + + Dialog - - + + 0 0 @@ -9,38 +10,41 @@ 462 - + Details of job - - + + :/images/view.png:/images/view.png - - - - + + + + false - + QPlainTextEdit::NoWrap - + true - - - + + + QDialogButtonBox::Ok + + + - + @@ -49,11 +53,11 @@ Dialog accept() - + 617 442 - + 206 -5 diff --git a/src/calibre/gui2/email.py b/src/calibre/gui2/email.py index c6d58fa340..c8adeb7d31 100644 --- a/src/calibre/gui2/email.py +++ b/src/calibre/gui2/email.py @@ -6,9 +6,7 @@ __license__ = 'GPL v3' __copyright__ = '2010, Kovid Goyal ' __docformat__ = 'restructuredtext en' -import os, socket, time, cStringIO -from threading import Thread -from Queue import Queue +import os, socket, time from binascii import unhexlify from functools import partial from itertools import repeat @@ -16,67 +14,20 @@ from itertools import repeat from calibre.utils.smtp import compose_mail, sendmail, extract_email_address, \ config as email_config from calibre.utils.filenames import ascii_filename -from calibre.utils.ipc.job import BaseJob -from calibre.ptempfile import PersistentTemporaryFile from calibre.customize.ui import available_input_formats, available_output_formats from calibre.ebooks.metadata import authors_to_string from calibre.constants import preferred_encoding from calibre.gui2 import config, Dispatcher, warning_dialog from calibre.library.save_to_disk import get_components from calibre.utils.config import tweaks +from calibre.gui2.threaded_jobs import ThreadedJob -class EmailJob(BaseJob): # {{{ - - def __init__(self, callback, description, attachment, aname, to, subject, text, job_manager): - BaseJob.__init__(self, description) - self.exception = None - self.job_manager = job_manager - self.email_args = (attachment, aname, to, subject, text) - self.email_sent_callback = callback - self.log_path = None - self._log_file = cStringIO.StringIO() - self._log_file.write(self.description.encode('utf-8') + '\n') - - @property - def log_file(self): - if self.log_path is not None: - return open(self.log_path, 'rb') - return cStringIO.StringIO(self._log_file.getvalue()) - - def start_work(self): - self.start_time = time.time() - self.job_manager.changed_queue.put(self) - - def job_done(self): - self.duration = time.time() - self.start_time - self.percent = 1 - # Dump log onto disk - lf = PersistentTemporaryFile('email_log') - lf.write(self._log_file.getvalue()) - lf.close() - self.log_path = lf.name - self._log_file.close() - self._log_file = None - - self.job_manager.changed_queue.put(self) - - def log_write(self, what): - self._log_file.write(what) - -# }}} - -class Emailer(Thread): # {{{ +class Sendmail(object): MAX_RETRIES = 1 - def __init__(self, job_manager): - Thread.__init__(self) - self.daemon = True - self.jobs = Queue() - self.job_manager = job_manager - self._run = True + def __init__(self): self.calculate_rate_limit() - self.last_send_time = time.time() - self.rate_limit def calculate_rate_limit(self): @@ -87,70 +38,28 @@ class Emailer(Thread): # {{{ 'gmail.com' in rh or 'live.com' in rh): self.rate_limit = tweaks['public_smtp_relay_delay'] - def stop(self): - self._run = False - self.jobs.put(None) + def __call__(self, attachment, aname, to, subject, text, log=None, + abort=None, notifications=None): - def run(self): - while self._run: + try_count = 0 + while try_count <= self.MAX_RETRIES: + if try_count > 0: + log('\nRetrying in %d seconds...\n' % + self.rate_limit) try: - job = self.jobs.get() + self.sendmail(attachment, aname, to, subject, text, log) + try_count = self.MAX_RETRIES + log('Email successfully sent') except: - break - if job is None or not self._run: - break - try_count = 0 - failed, exc = False, None - job.start_work() - if job.kill_on_start: - job.log_write('Aborted\n') - job.failed = failed - job.killed = True - job.job_done() - continue + if abort.is_set(): + return + if try_count == self.MAX_RETRIES: + raise + log.exception('\nSending failed...\n') - while try_count <= self.MAX_RETRIES: - failed = False - if try_count > 0: - job.log_write('\nRetrying in %d seconds...\n' % - self.rate_limit) - try: - self.sendmail(job) - break - except Exception as e: - if not self._run: - return - import traceback - failed = True - exc = e - job.log_write('\nSending failed...\n') - job.log_write(traceback.format_exc()) + try_count += 1 - try_count += 1 - - if not self._run: - break - - job.failed = failed - job.exception = exc - job.job_done() - try: - job.email_sent_callback(job) - except: - import traceback - traceback.print_exc() - - def send_mails(self, jobnames, callback, attachments, to_s, subjects, - texts, attachment_names): - for name, attachment, to, subject, text, aname in zip(jobnames, - attachments, to_s, subjects, texts, attachment_names): - description = _('Email %s to %s') % (name, to) - job = EmailJob(callback, description, attachment, aname, to, - subject, text, self.job_manager) - self.job_manager.add_job(job) - self.jobs.put(job) - - def sendmail(self, job): + def sendmail(self, attachment, aname, to, subject, text, log): while time.time() - self.last_send_time <= self.rate_limit: time.sleep(1) try: @@ -158,7 +67,6 @@ class Emailer(Thread): # {{{ from_ = opts.from_ if not from_: from_ = 'calibre ' - attachment, aname, to, subject, text = job.email_args msg = compose_mail(from_, to, text, subject, open(attachment, 'rb'), aname) efrom, eto = map(extract_email_address, (from_, to)) @@ -169,48 +77,56 @@ class Emailer(Thread): # {{{ username=opts.relay_username, password=unhexlify(opts.relay_password), port=opts.relay_port, encryption=opts.encryption, - debug_output=partial(print, file=job._log_file)) + debug_output=log.debug) finally: self.last_send_time = time.time() - def email_news(self, mi, remove, get_fmts, done): - opts = email_config().parse() - accounts = [(account, [x.strip().lower() for x in x[0].split(',')]) - for account, x in opts.accounts.items() if x[1]] - sent_mails = [] - for i, x in enumerate(accounts): - account, fmts = x - files = get_fmts(fmts) - files = [f for f in files if f is not None] - if not files: - continue - attachment = files[0] - to_s = [account] - subjects = [_('News:')+' '+mi.title] - texts = [ - _('Attached is the %s periodical downloaded by calibre.') - % (mi.title,) - ] - attachment_names = [ascii_filename(mi.title)+os.path.splitext(attachment)[1]] - attachments = [attachment] - jobnames = [mi.title] - do_remove = [] - if i == len(accounts) - 1: - do_remove = remove - self.send_mails(jobnames, - Dispatcher(partial(done, remove=do_remove)), - attachments, to_s, subjects, texts, attachment_names) - sent_mails.append(to_s[0]) - return sent_mails +gui_sendmail = Sendmail() -# }}} +def send_mails(jobnames, callback, attachments, to_s, subjects, + texts, attachment_names, job_manager): + for name, attachment, to, subject, text, aname in zip(jobnames, + attachments, to_s, subjects, texts, attachment_names): + description = _('Email %s to %s') % (name, to) + job = ThreadedJob('email', description, gui_sendmail, (attachment, aname, to, + subject, text), {}, callback, killable=False) + job_manager.run_threaded_job(job) + + +def email_news(mi, remove, get_fmts, done, job_manager): + opts = email_config().parse() + accounts = [(account, [x.strip().lower() for x in x[0].split(',')]) + for account, x in opts.accounts.items() if x[1]] + sent_mails = [] + for i, x in enumerate(accounts): + account, fmts = x + files = get_fmts(fmts) + files = [f for f in files if f is not None] + if not files: + continue + attachment = files[0] + to_s = [account] + subjects = [_('News:')+' '+mi.title] + texts = [ + _('Attached is the %s periodical downloaded by calibre.') + % (mi.title,) + ] + attachment_names = [ascii_filename(mi.title)+os.path.splitext(attachment)[1]] + attachments = [attachment] + jobnames = [mi.title] + do_remove = [] + if i == len(accounts) - 1: + do_remove = remove + send_mails(jobnames, + Dispatcher(partial(done, remove=do_remove)), + attachments, to_s, subjects, texts, attachment_names, + job_manager) + sent_mails.append(to_s[0]) + return sent_mails class EmailMixin(object): # {{{ - def __init__(self): - self.emailer = Emailer(self.job_manager) - def send_by_mail(self, to, fmts, delete_from_library, subject='', send_ids=None, do_auto_convert=True, specific_format=None): ids = [self.library_view.model().id(r) for r in self.library_view.selectionModel().selectedRows()] if send_ids is None else send_ids @@ -246,8 +162,7 @@ class EmailMixin(object): # {{{ components = get_components(subject, mi, id) if not components: components = [mi.title] - subject = os.path.join(*components) - subjects.append(subject) + subjects.append(os.path.join(*components)) a = authors_to_string(mi.authors if mi.authors else \ [_('Unknown')]) texts.append(_('Attached, you will find the e-book') + \ @@ -262,11 +177,10 @@ class EmailMixin(object): # {{{ to_s = list(repeat(to, len(attachments))) if attachments: - if not self.emailer.is_alive(): - self.emailer.start() - self.emailer.send_mails(jobnames, + send_mails(jobnames, Dispatcher(partial(self.email_sent, remove=remove)), - attachments, to_s, subjects, texts, attachment_names) + attachments, to_s, subjects, texts, attachment_names, + self.job_manager) self.status_bar.show_message(_('Sending email to')+' '+to, 3000) auto = [] @@ -334,10 +248,8 @@ class EmailMixin(object): # {{{ files, auto = self.library_view.model().\ get_preferred_formats_from_ids([id_], fmts) return files - if not self.emailer.is_alive(): - self.emailer.start() - sent_mails = self.emailer.email_news(mi, remove, - get_fmts, self.email_sent) + sent_mails = email_news(mi, remove, + get_fmts, self.email_sent, self.job_manager) if sent_mails: self.status_bar.show_message(_('Sent news to')+' '+\ ', '.join(sent_mails), 3000) diff --git a/src/calibre/gui2/jobs.py b/src/calibre/gui2/jobs.py index dbde030e81..34eef4406a 100644 --- a/src/calibre/gui2/jobs.py +++ b/src/calibre/gui2/jobs.py @@ -8,14 +8,13 @@ Job management. ''' import re - from Queue import Empty, Queue -from PyQt4.Qt import QAbstractTableModel, QVariant, QModelIndex, Qt, \ - QTimer, pyqtSignal, QIcon, QDialog, QAbstractItemDelegate, QApplication, \ - QSize, QStyleOptionProgressBarV2, QString, QStyle, QToolTip, QFrame, \ - QHBoxLayout, QVBoxLayout, QSizePolicy, QLabel, QCoreApplication, QAction, \ - QByteArray +from PyQt4.Qt import (QAbstractTableModel, QVariant, QModelIndex, Qt, + QTimer, pyqtSignal, QIcon, QDialog, QAbstractItemDelegate, QApplication, + QSize, QStyleOptionProgressBarV2, QString, QStyle, QToolTip, QFrame, + QHBoxLayout, QVBoxLayout, QSizePolicy, QLabel, QCoreApplication, QAction, + QByteArray) from calibre.utils.ipc.server import Server from calibre.utils.ipc.job import ParallelJob @@ -25,8 +24,9 @@ from calibre.gui2.dialogs.jobs_ui import Ui_JobsDialog from calibre import __appname__ from calibre.gui2.dialogs.job_view_ui import Ui_Dialog from calibre.gui2.progress_indicator import ProgressIndicator +from calibre.gui2.threaded_jobs import ThreadedJobServer, ThreadedJob -class JobManager(QAbstractTableModel): +class JobManager(QAbstractTableModel): # {{{ job_added = pyqtSignal(int) job_done = pyqtSignal(int) @@ -42,6 +42,7 @@ class JobManager(QAbstractTableModel): self.add_job = Dispatcher(self._add_job) self.server = Server(limit=int(config['worker_limit']/2.0), enforce_cpu_limit=config['enforce_cpu_limit']) + self.threaded_server = ThreadedJobServer() self.changed_queue = Queue() self.timer = QTimer(self) @@ -146,12 +147,21 @@ class JobManager(QAbstractTableModel): jobs.add(self.server.changed_jobs_queue.get_nowait()) except Empty: break + + # Update device jobs while True: try: jobs.add(self.changed_queue.get_nowait()) except Empty: break + # Update threaded jobs + while True: + try: + jobs.add(self.threaded_server.changed_jobs.get_nowait()) + except Empty: + break + if jobs: needs_reset = False for job in jobs: @@ -207,11 +217,22 @@ class JobManager(QAbstractTableModel): self.server.add_job(job) return job + def run_threaded_job(self, job): + self.add_job(job) + self.threaded_server.add_job(job) + def launch_gui_app(self, name, args=[], kwargs={}, description=''): job = ParallelJob(name, description, lambda x: x, args=args, kwargs=kwargs) self.server.run_job(job, gui=True, redirect_output=False) + def _kill_job(self, job): + if isinstance(job, ParallelJob): + self.server.kill_job(job) + elif isinstance(job, ThreadedJob): + self.threaded_server.kill_job(job) + else: + job.kill_on_start = True def kill_job(self, row, view): job = self.jobs[row] @@ -221,29 +242,29 @@ class JobManager(QAbstractTableModel): if job.duration is not None: return error_dialog(view, _('Cannot kill job'), _('Job has already run')).exec_() - if isinstance(job, ParallelJob): - self.server.kill_job(job) - else: - job.kill_on_start = True + if not getattr(job, 'killable', True): + return error_dialog(view, _('Cannot kill job'), + _('This job cannot be stopped'), show=True) + self._kill_job(job) def kill_all_jobs(self): for job in self.jobs: - if isinstance(job, DeviceJob) or job.duration is not None: + if (isinstance(job, DeviceJob) or job.duration is not None or + not getattr(job, 'killable', True)): continue - if isinstance(job, ParallelJob): - self.server.kill_job(job) - else: - job.kill_on_start = True + self._kill_job(job) def terminate_all_jobs(self): self.server.killall() for job in self.jobs: - if isinstance(job, DeviceJob) or job.duration is not None: + if (isinstance(job, DeviceJob) or job.duration is not None or + not getattr(job, 'killable', True)): continue if not isinstance(job, ParallelJob): - job.kill_on_start = True - + self._kill_job(job) +# }}} +# Jobs UI {{{ class ProgressBarDelegate(QAbstractItemDelegate): def sizeHint(self, option, index): @@ -269,6 +290,11 @@ class DetailView(QDialog, Ui_Dialog): self.setupUi(self) self.setWindowTitle(job.description) self.job = job + self.html_view = hasattr(job, 'html_details') + if self.html_view: + self.log.setVisible(False) + else: + self.tb.setVisible(False) self.next_pos = 0 self.update() self.timer = QTimer(self) @@ -277,12 +303,19 @@ class DetailView(QDialog, Ui_Dialog): def update(self): - f = self.job.log_file - f.seek(self.next_pos) - more = f.read() - self.next_pos = f.tell() - if more: - self.log.appendPlainText(more.decode('utf-8', 'replace')) + if self.html_view: + html = self.job.html_details + if len(html) > self.next_pos: + self.next_pos = len(html) + self.tb.setHtml( + '
%s
'%html) + else: + f = self.job.log_file + f.seek(self.next_pos) + more = f.read() + self.next_pos = f.tell() + if more: + self.log.appendPlainText(more.decode('utf-8', 'replace')) class JobsButton(QFrame): @@ -441,3 +474,5 @@ class JobsDialog(QDialog, Ui_JobsDialog): def hide(self, *args): self.save_state() return QDialog.hide(self, *args) +# }}} + diff --git a/src/calibre/gui2/metadata/bulk_download2.py b/src/calibre/gui2/metadata/bulk_download2.py new file mode 100644 index 0000000000..cc6da1e995 --- /dev/null +++ b/src/calibre/gui2/metadata/bulk_download2.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai +from __future__ import (unicode_literals, division, absolute_import, + print_function) + +__license__ = 'GPL v3' +__copyright__ = '2011, Kovid Goyal ' +__docformat__ = 'restructuredtext en' + + + diff --git a/src/calibre/gui2/threaded_jobs.py b/src/calibre/gui2/threaded_jobs.py new file mode 100644 index 0000000000..f29baf4134 --- /dev/null +++ b/src/calibre/gui2/threaded_jobs.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python +# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai +from __future__ import (unicode_literals, division, absolute_import, + print_function) + +__license__ = 'GPL v3' +__copyright__ = '2011, Kovid Goyal ' +__docformat__ = 'restructuredtext en' + +import os, time, tempfile, json +from threading import Thread, RLock, Event +from Queue import Queue + +from calibre.utils.ipc.job import BaseJob +from calibre.utils.logging import GUILog +from calibre.ptempfile import base_dir + +class ThreadedJob(BaseJob): + + def __init__(self, + type_, description, + + func, args, kwargs, + + callback, + + max_concurrent_count=1, + killable=True, + log=None): + ''' + A job that is run in its own thread in the calibre main process + + :param type_: The type of this job (a string). The type is used in + conjunction with max_concurrent_count to prevent too many jobs of the + same type from running + + :param description: A user viewable job description + + :func: The function that actually does the work. This function *must* + accept at least three keyword arguments: abort, log and notifications. abort is + An Event object. func should periodically check abort.is_set(0 and if + it is True, it should stop processing as soon as possible. notifications + is a Queue. func should put progress notifications into it in the form + of a tuple (frac, msg). frac is a number between 0 and 1 indicating + progress and msg is a string describing the progress. log is a Log + object which func should use for all debugging output. func should + raise an Exception to indicate failure. This exception is stored in + job.exception and can thus be used to pass arbitrary information to + callback. + + :param args,kwargs: These are passed to func when it is called + + :param callback: A callable that is called on completion of this job. + Note that it is not called if the user kills the job. Check job.failed + to see if the job succeeded or not. And use job.log to get the job log. + + :param killable: If False the GUI wont let the user kill this job + + :param log: Must be a subclass of GUILog or None. If None a default + GUILog is created. + ''' + BaseJob.__init__(self, description) + + self.type = type_ + self.max_concurrent_count = max_concurrent_count + self.killable = killable + self.callback = callback + self.abort = Event() + self.exception = None + + kwargs['notifications'] = self.notifications + kwargs['abort'] = self.abort + self.log = GUILog() if log is None else log + kwargs['log'] = self.log + + self.func, self.args, self.kwargs = func, args, kwargs + self.consolidated_log = None + + def start_work(self): + self.start_time = time.time() + self.log('Starting job:', self.description) + try: + self.result = self.func(*self.args, **self.kwargs) + except Exception as e: + self.exception = e + self.failed = True + self.log.exception('Job: "%s" failed with error:'%self.description) + self.log.debug('Called with args:', self.args, self.kwargs) + + self.duration = time.time() - self.start_time + try: + self.callback(self) + except: + pass + self._cleanup() + + def _cleanup(self): + + try: + self.consolidate_log() + except: + self.log.exception('Log consolidation failed') + + # No need to keep references to these around anymore + self.func = self.args = self.kwargs = self.notifications = None + + def kill(self): + if self.start_time is None: + self.start_time = time.time() + self.duration = 0.0001 + else: + self.duration = time.time() - self.start_time() + self.abort.set() + + self.log('Aborted job:', self.description) + self.killed = True + self.failed = True + self._cleanup() + + def consolidate_log(self): + logs = [self.log.html, self.log.plain_text] + bdir = base_dir() + log_dir = os.path.join(bdir, 'threaded_job_logs') + if not os.path.exists(log_dir): + os.makedirs(log_dir) + fd, path = tempfile.mkstemp(suffix='.json', prefix='log-', dir=log_dir) + with os.fdopen(fd, 'wb') as f: + f.write(json.dumps(logs, ensure_ascii=False, + indent=2).encode('utf-8')) + self.consolidated_log = path + self.log = None + + def read_consolidated_log(self): + with open(self.consolidated_log, 'rb') as f: + return json.loads(f.read().decode('utf-8')) + + @property + def details(self): + if self.consolidated_log is None: + return self.log.plain_text + return self.read_consolidated_log()[1] + + @property + def html_details(self): + if self.consolidated_log is None: + return self.log.html + return self.read_consolidated_log()[0] + +class ThreadedJobWorker(Thread): + + def __init__(self, job): + Thread.__init__(self) + self.daemon = True + self.job = job + + def run(self): + try: + self.job.start_work() + except: + import traceback + from calibre import prints + prints('Job had unhandled exception:', self.job.description) + traceback.print_exc() + +class ThreadedJobServer(Thread): + + def __init__(self): + Thread.__init__(self) + self.daemon = True + self.lock = RLock() + + self.queued_jobs = [] + self.running_jobs = set() + self.changed_jobs = Queue() + self.keep_going = True + + def close(self): + self.keep_going = False + + def add_job(self, job): + with self.lock: + self.queued_jobs.append(job) + + if not self.is_alive(): + self.start() + + def run(self): + while self.keep_going: + self.run_once() + time.sleep(0.1) + + def run_once(self): + with self.lock: + remove = set() + for worker in self.running_jobs: + if worker.is_alive(): + # Get progress notifications + if worker.job.consume_notifications(): + self.changed_jobs.put(worker.job) + else: + remove.add(worker) + self.changed_jobs.put(worker.job) + + for worker in remove: + self.running_jobs.remove(worker) + + jobs = self.get_startable_jobs() + for job in jobs: + w = ThreadedJobWorker(job) + w.start() + self.running_jobs.add(w) + self.changed_jobs.put(job) + self.queued_jobs.remove(job) + + def kill_job(self, job): + with self.lock: + if job in self.queued_jobs: + self.queued_jobs.remove(job) + elif job in self.running_jobs: + self.running_jobs.remove(job) + job.kill() + self.changed_jobs.put(job) + + def running_jobs_of_type(self, type_): + return len([w for w in self.running_jobs if w.job.type == type_]) + + def get_startable_jobs(self): + queued_types = [] + ans = [] + for job in self.queued_jobs: + num = self.running_jobs_of_type(job.type) + num += queued_types.count(job.type) + if num < job.max_concurrent_count: + queued_types.append(job.type) + ans.append(job) + return ans + + diff --git a/src/calibre/gui2/ui.py b/src/calibre/gui2/ui.py index 4d363c283a..e7853b9491 100644 --- a/src/calibre/gui2/ui.py +++ b/src/calibre/gui2/ui.py @@ -608,6 +608,7 @@ class Main(MainWindow, MainWindowMixin, DeviceMixin, EmailMixin, # {{{ self.update_checker.terminate() self.listener.close() self.job_manager.server.close() + self.job_manager.threaded_server.close() while self.spare_servers: self.spare_servers.pop().close() self.device_manager.keep_going = False @@ -616,8 +617,6 @@ class Main(MainWindow, MainWindowMixin, DeviceMixin, EmailMixin, # {{{ mb.stop() self.hide_windows() - if self.emailer.is_alive(): - self.emailer.stop() try: try: if self.content_server is not None: diff --git a/src/calibre/utils/ipc/job.py b/src/calibre/utils/ipc/job.py index 91db333791..f4b54aee95 100644 --- a/src/calibre/utils/ipc/job.py +++ b/src/calibre/utils/ipc/job.py @@ -75,12 +75,20 @@ class BaseJob(object): self._run_state = self.RUNNING self._status_text = _('Working...') - while consume_notifications: + if consume_notifications: + return self.consume_notifications() + return False + + def consume_notifications(self): + got_notification = False + while self.notifications is not None: try: self.percent, self._message = self.notifications.get_nowait() self.percent *= 100. + got_notification = True except Empty: break + return got_notification @property def status_text(self):