From d61e8e057d68b10c1d7d453ecdc0258ef0fbe9fc Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Wed, 30 Jan 2008 01:59:37 +0000 Subject: [PATCH] Improved the job dispatch system. Now after a job has completed, it remains in the list and you can view any output from it, by double clicking the list entry. --- src/libprs500/__init__.py | 20 + src/libprs500/gui2/dialogs/job_view.ui | 32 + src/libprs500/gui2/images.qrc | 2 + src/libprs500/gui2/images/exec.svg | 840 +++++++++++++++++++++++++ src/libprs500/gui2/images/ok.svg | 487 ++++++++++++++ src/libprs500/gui2/jobs.py | 362 ++++++----- src/libprs500/gui2/main.py | 6 +- src/libprs500/gui2/widgets.py | 14 +- 8 files changed, 1604 insertions(+), 159 deletions(-) create mode 100644 src/libprs500/gui2/dialogs/job_view.ui create mode 100644 src/libprs500/gui2/images/exec.svg create mode 100644 src/libprs500/gui2/images/ok.svg diff --git a/src/libprs500/__init__.py b/src/libprs500/__init__.py index 5117e537f1..06a3c5ae16 100644 --- a/src/libprs500/__init__.py +++ b/src/libprs500/__init__.py @@ -192,3 +192,23 @@ def sanitize_file_name(name): underscores. ''' return re.sub(r'\s', ' ', re.sub(r'["\'\|\~\:\?\\\/]|^-', '_', name.strip())) + +def detect_ncpus(): + """Detects the number of effective CPUs in the system""" + #for Linux, Unix and MacOS + if hasattr(os, "sysconf"): + if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"): + #Linux and Unix + ncpus = os.sysconf("SC_NPROCESSORS_ONLN") + if isinstance(ncpus, int) and ncpus > 0: + return ncpus + else: + #MacOS X + return int(os.popen2("sysctl -n hw.ncpu")[1].read()) + #for Windows + if os.environ.has_key("NUMBER_OF_PROCESSORS"): + ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]); + if ncpus > 0: + return ncpus + #return the default value + return 1 diff --git a/src/libprs500/gui2/dialogs/job_view.ui b/src/libprs500/gui2/dialogs/job_view.ui new file mode 100644 index 0000000000..d20f67f892 --- /dev/null +++ b/src/libprs500/gui2/dialogs/job_view.ui @@ -0,0 +1,32 @@ + + Dialog + + + + 0 + 0 + 664 + 462 + + + + Details of job + + + :/images/view.svg + + + + + + QTextEdit::NoWrap + + + + + + + + + + diff --git a/src/libprs500/gui2/images.qrc b/src/libprs500/gui2/images.qrc index e81d7ed9ea..263ffde392 100644 --- a/src/libprs500/gui2/images.qrc +++ b/src/libprs500/gui2/images.qrc @@ -3,6 +3,8 @@ images/back.svg images/book.svg images/search.svg + images/exec.svg + images/ok.svg images/user_profile.svg images/chapters.svg images/clear_left.svg diff --git a/src/libprs500/gui2/images/exec.svg b/src/libprs500/gui2/images/exec.svg new file mode 100644 index 0000000000..285d96ed4d --- /dev/null +++ b/src/libprs500/gui2/images/exec.svg @@ -0,0 +1,840 @@ + + + + + + + image/svg+xml + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/libprs500/gui2/images/ok.svg b/src/libprs500/gui2/images/ok.svg new file mode 100644 index 0000000000..fbda89dc4d --- /dev/null +++ b/src/libprs500/gui2/images/ok.svg @@ -0,0 +1,487 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + image/svg+xml + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/libprs500/gui2/jobs.py b/src/libprs500/gui2/jobs.py index 4a3315a465..7b1b033aea 100644 --- a/src/libprs500/gui2/jobs.py +++ b/src/libprs500/gui2/jobs.py @@ -12,96 +12,117 @@ ## You should have received a copy of the GNU General Public License along ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import traceback, logging +import traceback, logging, collections from PyQt4.QtCore import QAbstractTableModel, QMutex, QObject, SIGNAL, Qt, \ - QVariant, QThread, QModelIndex, QSettings -from PyQt4.QtGui import QIcon + QVariant, QThread, QSettings +from PyQt4.QtGui import QIcon, QDialog +from libprs500 import detect_ncpus from libprs500.gui2 import NONE from libprs500.parallel import Server +from libprs500.gui2.dialogs.job_view_ui import Ui_Dialog class JobException(Exception): pass class Job(QThread): ''' Class to run a function in a separate thread with optional mutex based locking.''' - def __init__(self, id, description, mutex, func, *args, **kwargs): + def __init__(self, id, description, slot, priority, func, *args, **kwargs): ''' @param id: Number. Id of this thread. @param description: String. Description of this job. - @param mutex: A QMutex or None. Is locked before function is run. + @param slot: The callable that should be called when the job is done. + @param priority: The priority with which this thread should be run @param func: A callable that should be executed in this thread. ''' QThread.__init__(self) self.id = id self.func = func - self.description = description if description else 'Device Job #' + str(self.id) + self.description = description if description else 'Job #' + str(self.id) self.args = args self.kwargs = kwargs - self.mutex = mutex + self.slot, self._priority = slot, priority self.result = None self.percent_done = 0 self.logger = logging.getLogger('Job #'+str(id)) self.logger.setLevel(logging.DEBUG) self.is_locked = False - self.log = None + self.log = self.exception = self.last_traceback = None + self.connect_done_signal() - def lock(self): - if self.mutex is not None: - self.is_locked = True - self.mutex.lock() - self.is_locked = False - - - def unlock(self): - if self.mutex is not None: - self.mutex.unlock() - self.is_locked = False + def start(self): + QThread.start(self, self._priority) def progress_update(self, val): self.percent_done = val self.emit(SIGNAL('status_update(int, int)'), self.id, int(val)) - + + def formatted_log(self): + if self.log is None: + return '' + return '

Log:

%s
'%self.log + + class DeviceJob(Job): - ''' - Jobs that involve communication with the device. Synchronous. - ''' + ''' Jobs that involve communication with the device. ''' def run(self): - self.lock() last_traceback, exception = None, None - try: - try: - self.result = self.func(self.progress_update, *self.args, **self.kwargs) - except Exception, err: - exception = err - last_traceback = traceback.format_exc() - finally: - self.unlock() - self.emit(SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), - self.id, self.description, self.result, exception, last_traceback) -MPServer = None + try: + self.result = self.func(self.progress_update, *self.args, **self.kwargs) + except Exception, err: + exception = err + last_traceback = traceback.format_exc() + + self.exception, self.last_traceback = exception, last_traceback + + def formatted_error(self): + if self.exception is None: + return '' + ans = u'

%s: %s

'%(self.exception.__class__.__name__, self.exception) + ans += '

Traceback:

%s
'%self.last_traceback + return ans + + def notify(self): + self.emit(SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), + self.id, self.description, self.result, self.exception, self.last_traceback) + + def connect_done_signal(self): + if self.slot is not None: + self.connect(self, SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), + self.slot, Qt.QueuedConnection) + class ConversionJob(Job): - ''' Jobs that involve conversion of content. Synchronous. ''' + ''' Jobs that involve conversion of content.''' def run(self): - self.lock() last_traceback, exception = None, None try: - try: - self.result, exception, last_traceback, self.log = \ - MPServer.run(self.id, self.func, self.args, self.kwargs) - except Exception, err: - last_traceback = traceback.format_exc() - exception = (exception.__class__.__name__, unicode(str(err), 'utf8', 'replace')) - finally: - self.unlock() - self.emit(SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), - self.id, self.description, self.result, exception, last_traceback, self.log) + self.result, exception, last_traceback, self.log = \ + self.server.run(self.id, self.func, self.args, self.kwargs) + except Exception, err: + last_traceback = traceback.format_exc() + exception = (exception.__class__.__name__, unicode(str(err), 'utf8', 'replace')) + self.last_traceback, self.exception = last_traceback, exception + + def notify(self): + self.emit(SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), + self.id, self.description, self.result, self.exception, self.last_traceback, self.log) + + def connect_done_signal(self): + if self.slot is not None: + self.connect(self, SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), + self.slot, Qt.QueuedConnection) + + def formatted_error(self): + if self.exception is None: + return '' + ans = u'

%s: %s

'%self.exception + ans += '

Traceback:

%s
'%self.last_traceback + return ans - class JobManager(QAbstractTableModel): PRIORITY = {'Idle' : QThread.IdlePriority, @@ -112,18 +133,21 @@ class JobManager(QAbstractTableModel): def __init__(self): QAbstractTableModel.__init__(self) - self.jobs = {} - self.next_id = 0 - self.job_create_lock = QMutex() - self.job_remove_lock = QMutex() - self.device_lock = QMutex() - self.conversion_lock = QMutex() - self.cleanup_lock = QMutex() - self.cleanup = {} - self.device_job_icon = QVariant(QIcon(':/images/reader.svg')) - self.job_icon = QVariant(QIcon(':/images/jobs.svg')) - global MPServer - MPServer = Server() + self.waiting_jobs = collections.deque() + self.running_jobs = collections.deque() + self.finished_jobs = collections.deque() + self.add_queue = collections.deque() + self.update_lock = QMutex() # Protects write access to the above dequeues + self.next_id = 0 + self.wait_icon = QVariant(QIcon(':/images/jobs.svg')) + self.running_icon = QVariant(QIcon(':/images/exec.svg')) + self.error_icon = QVariant(QIcon(':/images/dialog_error.svg')) + self.done_icon = QVariant(QIcon(':/images/ok.svg')) + + self.process_server = Server() + + self.ncpus = detect_ncpus() + self.timer_id = self.startTimer(500) def terminate_device_jobs(self): changed = False @@ -132,34 +156,86 @@ class JobManager(QAbstractTableModel): changed = True job = self.jobs.pop(key) job.terminate() - job.mutex.unlock() - self.emit(SIGNAL('job_done(int)'), job.id) if changed: self.reset() - def create_job(self, job_class, description, lock, *args, **kwargs): - self.job_create_lock.lock() - try: - self.next_id += 1 - job = job_class(self.next_id, description, lock, *args, **kwargs) - QObject.connect(job, SIGNAL('finished()'), self.cleanup_jobs, Qt.QueuedConnection) - QObject.connect(job, SIGNAL('status_update(int, int)'), self.status_update, Qt.QueuedConnection) - self.beginInsertRows(QModelIndex(), len(self.jobs), len(self.jobs)) - self.jobs[self.next_id] = job - self.endInsertRows() - self.emit(SIGNAL('job_added(int)'), self.next_id) - return job - finally: - self.job_create_lock.unlock() + def timerEvent(self, event): + if event.timerId() == self.timer_id: + self.update_lock.lock() + try: + refresh = False + + while self.add_queue: + job = self.add_queue.pop() + self.waiting_jobs.append(job) + self.emit(SIGNAL('job_added(int)'), job.id, Qt.QueuedConnection) + + + for job in [job for job in self.running_jobs if job.isFinished()]: + self.running_jobs.remove(job) + self.finished_jobs.appendleft(job) + job.notify() + self.emit(SIGNAL('job_done(int)'), job.id) + refresh = True + + cjs = list(self.running_conversion_jobs()) + if len(cjs) < self.ncpus: + cj = None + for job in self.waiting_jobs: + if isinstance(job, ConversionJob): + cj = job + break + if cj is not None: + self.waiting_jobs.remove(cj) + cj.start() + self.running_jobs.append(cj) + refresh = True + + djs = list(self.running_device_jobs()) + if len(djs) == 0: + dj = None + for job in self.waiting_jobs: + if isinstance(job, DeviceJob): + dj = job + break + if dj is not None: + self.waiting_jobs.remove(dj) + dj.start() + self.running_jobs.append(dj) + refresh = True + if refresh: + self.reset() + if len(self.running_jobs) == 0: + self.emit(SIGNAL('no_more_jobs()')) + finally: + self.update_lock.unlock() + + def has_jobs(self): + return len(self.waiting_jobs) + len(self.running_jobs) > 0 def has_device_jobs(self): - for job in self.jobs.values(): - if isinstance(job, DeviceJob): - return True - return False + return len(tuple(self.running_device_jobs())) > 0 - def has_jobs(self): - return len(self.jobs.values()) > 0 + def running_device_jobs(self): + for job in self.running_jobs: + if isinstance(job, DeviceJob): + yield job + + def running_conversion_jobs(self): + for job in self.running_jobs: + if isinstance(job, ConversionJob): + yield job + + def create_job(self, job_class, description, slot, priority, *args, **kwargs): + self.next_id += 1 + id = self.next_id + job = job_class(id, description, slot, priority, *args, **kwargs) + job.server = self.process_server + QObject.connect(job, SIGNAL('status_update(int, int)'), self.status_update, Qt.QueuedConnection) + self.update_lock.lock() + self.add_queue.append(job) + self.update_lock.unlock() + return job def run_conversion_job(self, slot, callable, args=[], **kwargs): ''' @@ -170,16 +246,10 @@ class JobManager(QAbstractTableModel): @param kwargs: The keyword arguments to pass to callable ''' desc = kwargs.pop('job_description', '') - job = self.create_job(ConversionJob, desc, self.conversion_lock, - callable, *args, **kwargs) - QObject.connect(job, SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), - self.job_done, Qt.QueuedConnection) - if slot: - QObject.connect(job, SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), - slot, Qt.QueuedConnection) priority = self.PRIORITY[str(QSettings().value('conversion job priority', QVariant('Normal')).toString())] - job.start(priority) + job = self.create_job(ConversionJob, desc, slot, priority, + callable, *args, **kwargs) return job.id def run_device_job(self, slot, callable, *args, **kwargs): @@ -192,52 +262,12 @@ class JobManager(QAbstractTableModel): ''' desc = callable.__doc__ if callable.__doc__ else '' desc += kwargs.pop('job_extra_description', '') - job = self.create_job(DeviceJob, desc, self.device_lock, callable, *args, **kwargs) - QObject.connect(job, SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), - self.job_done, Qt.QueuedConnection) - if slot: - QObject.connect(job, SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), - slot, Qt.QueuedConnection) - job.start() + job = self.create_job(DeviceJob, desc, slot, QThread.NormalPriority, + callable, *args, **kwargs) return job.id - - def job_done(self, id, *args, **kwargs): - ''' - Slot that is called when a job is completed. - ''' - keys = self.jobs.keys() - if not id in keys: # Terminated job - return - self.job_remove_lock.lock() - try: - keys.sort() - idx = keys.index(id) - self.beginRemoveRows(QModelIndex(), idx, idx) - job = self.jobs.pop(id) - self.endRemoveRows() - self.cleanup_lock.lock() - self.cleanup[id] = job - self.cleanup_lock.unlock() - self.emit(SIGNAL('job_done(int)'), id) - if len(self.jobs.keys()) == 0: - self.emit(SIGNAL('no_more_jobs()')) - - finally: - self.job_remove_lock.unlock() - def cleanup_jobs(self): - self.cleanup_lock.lock() - toast = [] - for id in self.cleanup.keys(): - if not self.cleanup[id].isRunning(): - toast.append(id) - for id in toast: - self.cleanup.pop(id) - self.cleanup_lock.unlock() - - def rowCount(self, parent): - return len(self.jobs) + return len(self.running_jobs) + len(self.waiting_jobs) + len(self.finished_jobs) def columnCount(self, parent): return 3 @@ -253,43 +283,65 @@ class JobManager(QAbstractTableModel): else: return QVariant(section+1) + def row_to_job(self, row): + if row < len(self.running_jobs): + return self.running_jobs[row], 0 + row -= len(self.running_jobs) + if row < len(self.waiting_jobs): + return self.waiting_jobs[row], 1 + row -= len(self.running_jobs) + return self.finished_jobs[row], 2 + def data(self, index, role): if role not in (Qt.DisplayRole, Qt.DecorationRole): return NONE row, col = index.row(), index.column() - keys = self.jobs.keys() - keys.sort() - job = self.jobs[keys[row]] + try: + job, status = self.row_to_job(row) + except IndexError: + return NONE + if role == Qt.DisplayRole: if col == 0: return QVariant(job.description) if col == 1: - status = _('Waiting') - if job.isRunning() and not job.is_locked: - status = _('Working') - if job.isFinished(): - status = _('Done') - return QVariant(status) + if status == 2: + st = _('Finished') if job.exception is None else _('Error') + else: + st = [_('Working'), _('Waiting')][status] + return QVariant(st) if col == 2: p = str(job.percent_done) + r'%' if job.percent_done > 0 else _('Unavailable') return QVariant(p) if role == Qt.DecorationRole and col == 0: - return self.device_job_icon if isinstance(job, DeviceJob) else self.job_icon + if status == 1: + return self.wait_icon + if status == 0: + return self.running_icon + if status == 2: + return self.done_icon if job.exception is None else self.error_icon return NONE def status_update(self, id, progress): - keys = self.jobs.keys() - keys.sort() - try: - row = keys.index(id) - index = self.index(row, 2) - self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'), index, index) - except ValueError: - pass + for i in range(len(self.running_jobs)): + job = self.running_jobs[i] + if job.id == id: + index = self.index(i, 2) + self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'), index, index) + break + +class DetailView(QDialog, Ui_Dialog): + + def __init__(self, parent, job): + QDialog.__init__(self, parent) + self.setupUi(self) + self.setWindowTitle(job.description) + self.job = job + txt = self.job.formatted_error() + self.job.formatted_log() - def closeEvent(self, e): - self.jobs_view.write_settings() - e.accept() + if not txt: + txt = 'No details available' + + self.log.setHtml(txt) - \ No newline at end of file diff --git a/src/libprs500/gui2/main.py b/src/libprs500/gui2/main.py index 98f0c08c2b..b2f6899d0a 100644 --- a/src/libprs500/gui2/main.py +++ b/src/libprs500/gui2/main.py @@ -103,8 +103,10 @@ class Main(MainWindow, Ui_MainWindow): ####################### Status Bar ##################### self.status_bar = StatusBar(self.jobs_dialog) self.setStatusBar(self.status_bar) - QObject.connect(self.job_manager, SIGNAL('job_added(int)'), self.status_bar.job_added) - QObject.connect(self.job_manager, SIGNAL('job_done(int)'), self.status_bar.job_done) + QObject.connect(self.job_manager, SIGNAL('job_added(int)'), self.status_bar.job_added, + Qt.QueuedConnection) + QObject.connect(self.job_manager, SIGNAL('job_done(int)'), self.status_bar.job_done, + Qt.QueuedConnection) ####################### Setup Toolbar ##################### sm = QMenu() diff --git a/src/libprs500/gui2/widgets.py b/src/libprs500/gui2/widgets.py index 000efe1c6c..a75b5279d8 100644 --- a/src/libprs500/gui2/widgets.py +++ b/src/libprs500/gui2/widgets.py @@ -12,13 +12,14 @@ ## You should have received a copy of the GNU General Public License along ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -from libprs500.gui2 import qstring_to_unicode + ''' Miscellanous widgets used in the GUI ''' from PyQt4.QtGui import QListView, QIcon, QFont, QLabel, QListWidget, QListWidgetItem from PyQt4.QtCore import QAbstractListModel, QVariant, Qt, QSize, SIGNAL, QObject +from libprs500.gui2.jobs import ConversionJob, DetailView from libprs500.gui2 import human_readable, NONE, TableView from libprs500 import fit_image, get_font_families @@ -98,7 +99,16 @@ class LocationView(QListView): self.model().location_changed(row) class JobsView(TableView): - pass + + def __init__(self, parent): + TableView.__init__(self, parent) + self.connect(self, SIGNAL('activated(QModelIndex)'), self.show_details) + + def show_details(self, index): + row = index.row() + job = self.model().row_to_job(row)[0] + DetailView(self, job).exec_() + class FontFamilyModel(QAbstractListModel):