diff --git a/src/calibre/ebooks/lrf/html/table_as_image.py b/src/calibre/ebooks/lrf/html/table_as_image.py index f4bdfa973d..6db42865be 100644 --- a/src/calibre/ebooks/lrf/html/table_as_image.py +++ b/src/calibre/ebooks/lrf/html/table_as_image.py @@ -10,6 +10,8 @@ import os, tempfile, atexit, shutil, time from PyQt4.Qt import QWebPage, QUrl, QApplication, QSize, \ SIGNAL, QPainter, QImage, QObject, Qt +from calibre.parallel import ParallelJob + __app = None class HTMLTableRenderer(QObject): @@ -80,18 +82,17 @@ def render_table(server, soup, table, css, base_dir, width, height, dpi, factor= '''%(head, width-10, style, unicode(table)) - server.run_job(1, 'render_table', + job = ParallelJob('render_table', lambda j : j, None, args=[html, base_dir, width, height, dpi, factor]) - res = None - while res is None: + server.add_job(job) + while not job.has_run: time.sleep(2) - res = server.result(1) - result, exception, traceback = res - if exception: + + if job.exception is not None: print 'Failed to render table' - print exception - print traceback - images, tdir = result + print job.exception + print job.traceback + images, tdir = job.result atexit.register(shutil.rmtree, tdir) return images diff --git a/src/calibre/gui2/__init__.py b/src/calibre/gui2/__init__.py index cfa50cdd15..8c2b8e51b4 100644 --- a/src/calibre/gui2/__init__.py +++ b/src/calibre/gui2/__init__.py @@ -2,7 +2,7 @@ __license__ = 'GPL v3' __copyright__ = '2008, Kovid Goyal ' """ The GUI """ import sys, os, re, StringIO, traceback -from PyQt4.QtCore import QVariant, QFileInfo, QObject, SIGNAL, QBuffer, \ +from PyQt4.QtCore import QVariant, QFileInfo, QObject, SIGNAL, QBuffer, Qt, \ QByteArray, QLocale, QUrl, QTranslator, QCoreApplication from PyQt4.QtGui import QFileDialog, QMessageBox, QPixmap, QFileIconProvider, \ QIcon, QTableView, QDialogButtonBox, QApplication @@ -84,6 +84,21 @@ def human_readable(size): size = size[:-2] return size + " " + suffix +class Dispatcher(QObject): + '''Convenience class to ensure that a function call always happens in the GUI thread''' + + def __init__(self, func): + QObject.__init__(self) + self.func = func + self.connect(self, SIGNAL('edispatch(PyQt_PyObject, PyQt_PyObject)'), + self.dispatch, Qt.QueuedConnection) + + def __call__(self, *args, **kwargs): + self.emit(SIGNAL('edispatch(PyQt_PyObject, PyQt_PyObject)'), args, kwargs) + + def dispatch(self, args, kwargs): + self.func(*args, **kwargs) + class TableView(QTableView): def __init__(self, parent): diff --git a/src/calibre/gui2/device.py b/src/calibre/gui2/device.py index 2dab6aceb5..33f5585411 100644 --- a/src/calibre/gui2/device.py +++ b/src/calibre/gui2/device.py @@ -1,130 +1,185 @@ __license__ = 'GPL v3' __copyright__ = '2008, Kovid Goyal ' -import os, traceback - -from PyQt4.QtCore import QThread, SIGNAL, QObject +import os, traceback, Queue, time +from threading import Thread from calibre.devices import devices +from calibre.parallel import Job from calibre.devices.scanner import DeviceScanner -class DeviceDetector(QThread): + +class DeviceJob(Job): + + def __init__(self, func, *args, **kwargs): + Job.__init__(self, *args, **kwargs) + self.func = func + + def run(self): + self.start_work() + try: + self.result = self.func(*self.args, **self.kwargs) + except (Exception, SystemExit), err: + self.exception = err + self.traceback = traceback.format_exc() + finally: + self.job_done() + + +class DeviceManager(Thread): ''' Worker thread that polls the USB ports for devices. Emits the signal connected(PyQt_PyObject, PyQt_PyObject) on connection and disconnection events. ''' - def __init__(self, sleep_time=2000): + def __init__(self, connected_slot, job_manager, sleep_time=2): ''' @param sleep_time: Time to sleep between device probes in millisecs @type sleep_time: integer ''' - self.devices = [[d, False] for d in devices()] - self.sleep_time = sleep_time - QThread.__init__(self) - self.keep_going = True + Thread.__init__(self) + self.setDaemon(True) + self.devices = [[d, False] for d in devices()] + self.device = None + self.device_class = None + self.sleep_time = sleep_time + self.connected_slot = connected_slot + self.jobs = Queue.Queue(0) + self.keep_going = True + self.job_manager = job_manager + self.current_job = None + self.scanner = DeviceScanner() - def run(self): - scanner = DeviceScanner() - while self.keep_going: - scanner.scan() - for device in self.devices: - connected = scanner.is_device_connected(device[0]) - if connected and not device[1]: + def detect_device(self): + self.scanner.scan() + for device in self.devices: + connected = self.scanner.is_device_connected(device[0]) + if connected and not device[1]: + try: + dev = device[0]() + dev.open() + self.device = dev + self.device_class = dev.__class__ + self.connected_slot(True) + except: + print 'Unable to open device' + traceback.print_exc() + finally: + device[1] = True + elif not connected and device[1]: + while True: try: - dev = device[0]() - dev.open() - self.emit(SIGNAL('connected(PyQt_PyObject, PyQt_PyObject)'), dev, True) - except: - print 'Unable to open device' - traceback.print_exc() - finally: - device[1] = True - elif not connected and device[1]: - self.emit(SIGNAL('connected(PyQt_PyObject, PyQt_PyObject)'), device[0], False) - device[1] ^= True - self.msleep(self.sleep_time) + job = self.jobs.get_nowait() + job.abort(Exception(_('Device no longer connected.'))) + except Queue.Empty: + break + self.device = None + self.connected_slot(False) + device[1] ^= True + + def next(self): + if not self.jobs.empty(): + try: + return self.jobs.get_nowait() + except Queue.Empty: + pass + + def run(self): + while self.keep_going: + self.detect_device() + while True: + job = self.next() + if job is not None: + self.current_job = job + self.device.set_progress_reporter(job.update_status) + self.current_job.run() + self.current_job = None + else: + break + time.sleep(self.sleep_time) - - -class DeviceManager(QObject): - def __init__(self, device): - QObject.__init__(self) - self.device_class = device.__class__ - self.device = device - - def device_removed(self): - self.device = None - - def info_func(self): - ''' Return callable that returns device information and free space on device''' - def get_device_information(updater): - '''Get device information''' - self.device.set_progress_reporter(updater) - info = self.device.get_device_information(end_session=False) - info = [i.replace('\x00', '').replace('\x01', '') for i in info] - cp = self.device.card_prefix(end_session=False) - fs = self.device.free_space() - return info, cp, fs - return get_device_information + def create_job(self, func, done, description, args=[], kwargs={}): + job = DeviceJob(func, done, self.job_manager, + args=args, kwargs=kwargs, description=description) + self.job_manager.add_job(job) + self.jobs.put(job) + return job - def books_func(self): + def _get_device_information(self): + info = self.device.get_device_information(end_session=False) + info = [i.replace('\x00', '').replace('\x01', '') for i in info] + cp = self.device.card_prefix(end_session=False) + fs = self.device.free_space() + return info, cp, fs + + def get_device_information(self, done): + '''Get device information and free space on device''' + return self.create_job(self._get_device_information, done, + description=_('Get device information')) + + + def _books(self): + '''Get metadata from device''' + mainlist = self.device.books(oncard=False, end_session=False) + cardlist = self.device.books(oncard=True) + return (mainlist, cardlist) + + def books(self, done): '''Return callable that returns the list of books on device as two booklists''' - def books(updater): - '''Get metadata from device''' - self.device.set_progress_reporter(updater) - mainlist = self.device.books(oncard=False, end_session=False) - cardlist = self.device.books(oncard=True) - return (mainlist, cardlist) - return books + return self.create_job(self._books, done, description=_('Get list of books on device')) - def sync_booklists_func(self): - '''Upload booklists to device''' - def sync_booklists(updater, booklists): - '''Sync metadata to device''' - self.device.set_progress_reporter(updater) - self.device.sync_booklists(booklists, end_session=False) - return self.device.card_prefix(end_session=False), self.device.free_space() - return sync_booklists + def _sync_booklists(self, booklists): + '''Sync metadata to device''' + self.device.sync_booklists(booklists, end_session=False) + return self.device.card_prefix(end_session=False), self.device.free_space() - def upload_books_func(self): - '''Upload books to device''' - def upload_books(updater, files, names, on_card=False): - '''Upload books to device: ''' - self.device.set_progress_reporter(updater) - return self.device.upload_books(files, names, on_card, end_session=False) - return upload_books + def sync_booklists(self, done, booklists): + return self.create_job(self._sync_booklists, done, args=[booklists], + description=_('Send metadata to device')) + def _upload_books(self, files, names, on_card=False): + '''Upload books to device: ''' + return self.device.upload_books(files, names, on_card, end_session=False) + + def upload_books(self, done, files, names, on_card=False, titles=None): + desc = _('Upload %d books to device')%len(names) + if titles: + desc += u':' + u', '.join(titles) + return self.create_job(self._upload_books, done, args=[files, names], + kwargs={'on_card':on_card}, description=desc) + def add_books_to_metadata(self, locations, metadata, booklists): - self.device_class.add_books_to_metadata(locations, metadata, booklists) + self.device.add_books_to_metadata(locations, metadata, booklists) - def delete_books_func(self): + def _delete_books(self, paths): '''Remove books from device''' - def delete_books(updater, paths): - '''Remove books from device''' - self.device.delete_books(paths, end_session=True) - return delete_books + self.device.delete_books(paths, end_session=True) + + def delete_books(self, done, paths): + return self.create_job(self._delete_books, done, args=[paths], + description=_('Delete books from device')) def remove_books_from_metadata(self, paths, booklists): - self.device_class.remove_books_from_metadata(paths, booklists) + self.device.remove_books_from_metadata(paths, booklists) - def save_books_func(self): - '''Copy books from device to disk''' - def save_books(updater, paths, target): - '''Copy books from device to disk''' - self.device.set_progress_reporter(updater) - for path in paths: - name = path.rpartition('/')[2] - f = open(os.path.join(target, name), 'wb') - self.device.get_file(path, f) - f.close() - return save_books - - def view_book_func(self): - '''Copy book from device to local hdd for viewing''' - def view_book(updater, path, target): - self.device.set_progress_reporter(updater) - f = open(target, 'wb') + def _save_books(self, paths, target): + '''Copy books from device to disk''' + for path in paths: + name = path.rpartition('/')[2] + f = open(os.path.join(target, name), 'wb') self.device.get_file(path, f) f.close() - return target - return view_book + + def save_books(self, done, paths, target): + return self.create_job(self._save_books, done, args=[paths, target], + description=_('Download books from device')) + + def _view_book(self, path, target): + f = open(target, 'wb') + self.device.get_file(path, f) + f.close() + return target + + def view_book(self, done, path, target): + return self.create_job(self._view_book, done, args=[path, target], + description=_('View book on device')) + \ No newline at end of file diff --git a/src/calibre/gui2/dialogs/jobs.py b/src/calibre/gui2/dialogs/jobs.py index e0682b6bd8..ced7f2ee10 100644 --- a/src/calibre/gui2/dialogs/jobs.py +++ b/src/calibre/gui2/dialogs/jobs.py @@ -2,7 +2,7 @@ __license__ = 'GPL v3' __copyright__ = '2008, Kovid Goyal ' '''Display active jobs''' -from PyQt4.QtCore import Qt, QObject, SIGNAL, QSize, QString +from PyQt4.QtCore import Qt, QObject, SIGNAL, QSize, QString, QTimer from PyQt4.QtGui import QDialog, QAbstractItemDelegate, QStyleOptionProgressBarV2, \ QApplication, QStyle @@ -44,11 +44,21 @@ class JobsDialog(QDialog, Ui_JobsDialog): self.jobs_view.model().kill_job) self.pb_delegate = ProgressBarDelegate(self) self.jobs_view.setItemDelegateForColumn(2, self.pb_delegate) + + self.running_time_timer = QTimer(self) + self.connect(self.running_time_timer, SIGNAL('timeout()'), self.update_running_time) + self.running_time_timer.start(1000) + + def update_running_time(self): + model = self.model + for row, job in enumerate(model.jobs): + if job.is_running: + self.jobs_view.dataChanged(model.index(row, 3), model.index(row, 3)) def kill_job(self): for index in self.jobs_view.selectedIndexes(): row = index.row() - self.emit(SIGNAL('kill_job(int, PyQt_PyObject)'), row, self) + self.model.kill_job(row, self) return def closeEvent(self, e): diff --git a/src/calibre/gui2/jobs.py b/src/calibre/gui2/jobs.py deleted file mode 100644 index 8e6246d8d9..0000000000 --- a/src/calibre/gui2/jobs.py +++ /dev/null @@ -1,415 +0,0 @@ -__license__ = 'GPL v3' -__copyright__ = '2008, Kovid Goyal ' -import traceback, logging, collections, time - -from PyQt4.QtCore import QAbstractTableModel, QMutex, QObject, SIGNAL, Qt, \ - QVariant, QThread -from PyQt4.QtGui import QIcon, QDialog - -from calibre import detect_ncpus, Settings -from calibre.gui2 import NONE, error_dialog -from calibre.parallel import Server -from calibre.gui2.dialogs.job_view_ui import Ui_Dialog - -class JobException(Exception): - pass - -class Job(QThread): - ''' Class to run a function in a separate thread with optional mutex based locking.''' - def __init__(self, id, description, slot, priority, func, *args, **kwargs): - ''' - @param id: Number. Id of this thread. - @param description: String. Description of this job. - @param slot: The callable that should be called when the job is done. - @param priority: The priority with which this thread should be run - @param func: A callable that should be executed in this thread. - ''' - QThread.__init__(self) - self.id = id - self.func = func - self.description = description if description else 'Job #' + str(self.id) - self.args = args - self.kwargs = kwargs - self.slot, self._priority = slot, priority - self.result = None - self.percent_done = 0 - self.logger = logging.getLogger('Job #'+str(id)) - self.logger.setLevel(logging.DEBUG) - self.is_locked = False - self.log = self.exception = self.last_traceback = None - self.connect_done_signal() - self.start_time = None - - - def start(self): - self.start_time = time.time() - QThread.start(self, self._priority) - - def progress_update(self, val): - self.percent_done = val - self.emit(SIGNAL('status_update(int, int)'), self.id, int(val)) - - def formatted_log(self): - if self.log is None: - return '' - return '

Log:

%s
'%self.log - - -class DeviceJob(Job): - ''' Jobs that involve communication with the device. ''' - def run(self): - last_traceback, exception = None, None - - try: - self.result = self.func(self.progress_update, *self.args, **self.kwargs) - except Exception, err: - exception = err - last_traceback = traceback.format_exc() - - self.exception, self.last_traceback = exception, last_traceback - - def formatted_error(self): - if self.exception is None: - return '' - ans = u'

%s: %s

'%(self.exception.__class__.__name__, self.exception) - ans += '

Traceback:

%s
'%self.last_traceback - return ans - - def notify(self): - self.emit(SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), - self.id, self.description, self.result, self.exception, self.last_traceback) - - def connect_done_signal(self): - if self.slot is not None: - self.connect(self, SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), - self.slot, Qt.QueuedConnection) - -class ConversionJob(Job): - ''' Jobs that involve conversion of content.''' - def __init__(self, *args, **kwdargs): - Job.__init__(self, *args, **kwdargs) - self.log = '' - - def run(self): - result = None - self.server.run_job(self.id, self.func, progress=self.progress, - args=self.args, kwdargs=self.kwargs, - output=self.output) - res = None - while res is None: - time.sleep(2) - res = self.server.result(self.id) - if res is None: - exception, tb = 'UnknownError: This should not have happened', '' - else: - result, exception, tb = res - self.result, self.last_traceback, self.exception = result, tb, exception - - def output(self, msg): - if self.log is None: - self.log = '' - self.log += msg - self.emit(SIGNAL('output_received()')) - - def formatted_log(self): - return '

Log:

%s
'%self.log - - def notify(self): - self.emit(SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), - self.id, self.description, self.result, self.exception, self.last_traceback, self.log) - - def connect_done_signal(self): - if self.slot is not None: - self.connect(self, SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), - self.slot, Qt.QueuedConnection) - - def formatted_error(self): - if self.exception is None: - return '' - ans = u'

%s:'%repr(self.exception) - ans += '

Traceback:

%s
'%self.last_traceback - return ans - - def progress(self, percent, msg): - self.emit(SIGNAL('update_progress(int, PyQt_PyObject)'), self.id, percent) - -class JobManager(QAbstractTableModel): - - PRIORITY = {'Idle' : QThread.IdlePriority, - 'Lowest': QThread.LowestPriority, - 'Low' : QThread.LowPriority, - 'Normal': QThread.NormalPriority - } - - def __init__(self): - QAbstractTableModel.__init__(self) - self.waiting_jobs = collections.deque() - self.running_jobs = collections.deque() - self.finished_jobs = collections.deque() - self.add_queue = collections.deque() - self.update_lock = QMutex() # Protects write access to the above dequeues - self.next_id = 0 - self.wait_icon = QVariant(QIcon(':/images/jobs.svg')) - self.running_icon = QVariant(QIcon(':/images/exec.svg')) - self.error_icon = QVariant(QIcon(':/images/dialog_error.svg')) - self.done_icon = QVariant(QIcon(':/images/ok.svg')) - - self.process_server = Server() - - self.ncpus = detect_ncpus() - self.timer_id = self.startTimer(500) - - def terminate_device_jobs(self): - for job in self.running_jobs: - if isinstance(job, DeviceJob): - job.terminate() - - def terminate_all_jobs(self): - for job in self.running_jobs: - try: - if isinstance(job, DeviceJob): - job.terminate() - except: - continue - self.process_server.killall() - - def timerEvent(self, event): - if event.timerId() == self.timer_id: - self.update_lock.lock() - try: - refresh = False - - while self.add_queue: - job = self.add_queue.pop() - self.waiting_jobs.append(job) - self.emit(SIGNAL('job_added(int)'), job.id, Qt.QueuedConnection) - refresh = True - - for job in [job for job in self.running_jobs if job.isFinished()]: - self.running_jobs.remove(job) - self.finished_jobs.appendleft(job) - if job.result != self.process_server.KILL_RESULT: - job.notify() - job.running_time = time.time() - job.start_time - self.emit(SIGNAL('job_done(int)'), job.id) - refresh = True - - cjs = list(self.running_conversion_jobs()) - if len(cjs) < self.ncpus: - cj = None - for job in self.waiting_jobs: - if isinstance(job, ConversionJob): - cj = job - break - if cj is not None: - self.waiting_jobs.remove(cj) - cj.start() - self.running_jobs.append(cj) - refresh = True - - djs = list(self.running_device_jobs()) - if len(djs) == 0: - dj = None - for job in self.waiting_jobs: - if isinstance(job, DeviceJob): - dj = job - break - if dj is not None: - self.waiting_jobs.remove(dj) - dj.start() - self.running_jobs.append(dj) - refresh = True - if refresh: - self.reset() - if len(self.running_jobs) == 0: - self.emit(SIGNAL('no_more_jobs()')) - for i in range(len(self.running_jobs)): - self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'), self.index(i, 3), self.index(i, 3)) - finally: - self.update_lock.unlock() - - def has_jobs(self): - return len(self.waiting_jobs) + len(self.running_jobs) > 0 - - def has_device_jobs(self): - return len(tuple(self.running_device_jobs())) > 0 - - def running_device_jobs(self): - for job in self.running_jobs: - if isinstance(job, DeviceJob): - yield job - - def running_conversion_jobs(self): - for job in self.running_jobs: - if isinstance(job, ConversionJob): - yield job - - def update_progress(self, id, percent): - row = -1 - for collection in (self.running_jobs, self.waiting_jobs, self.finished_jobs): - for job in collection: - row += 1 - if job.id == id: - job.percent_done = percent - index = self.index(row, 2) - self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'), index, index) - return - - - def create_job(self, job_class, description, slot, priority, *args, **kwargs): - self.next_id += 1 - id = self.next_id - job = job_class(id, description, slot, priority, *args, **kwargs) - job.server = self.process_server - QObject.connect(job, SIGNAL('status_update(int, int)'), self.status_update, - Qt.QueuedConnection) - self.connect(job, SIGNAL('update_progress(int, PyQt_PyObject)'), - self.update_progress, Qt.QueuedConnection) - self.update_lock.lock() - self.add_queue.append(job) - self.update_lock.unlock() - return job - - def run_conversion_job(self, slot, callable, args=[], **kwargs): - ''' - Run a conversion job. - @param slot: The function to call with the job result. - @param callable: The function to call to communicate with the device. - @param args: The arguments to pass to callable - @param kwargs: The keyword arguments to pass to callable - ''' - desc = kwargs.pop('job_description', '') - if args and hasattr(args[0], 'append') and '--verbose' not in args[0]: - args[0].append('--verbose') - priority = self.PRIORITY[Settings().get('conversion job priority', 'Normal')] - job = self.create_job(ConversionJob, desc, slot, priority, - callable, *args, **kwargs) - return job.id - - def run_device_job(self, slot, callable, *args, **kwargs): - ''' - Run a job to communicate with the device. - @param slot: The function to call with the job result. - @param callable: The function to call to communicate with the device. - @param args: The arguments to pass to callable - @param kwargs: The keyword arguments to pass to callable - ''' - desc = callable.__doc__ if callable.__doc__ else '' - desc += kwargs.pop('job_extra_description', '') - job = self.create_job(DeviceJob, desc, slot, QThread.NormalPriority, - callable, *args, **kwargs) - return job.id - - def rowCount(self, parent): - return len(self.running_jobs) + len(self.waiting_jobs) + len(self.finished_jobs) - - def columnCount(self, parent): - return 4 - - def headerData(self, section, orientation, role): - if role != Qt.DisplayRole: - return NONE - if orientation == Qt.Horizontal: - if section == 0: text = _("Job") - elif section == 1: text = _("Status") - elif section == 2: text = _("Progress") - elif section == 3: text = _('Running time') - return QVariant(text) - else: - return QVariant(section+1) - - def row_to_job(self, row): - if row < len(self.running_jobs): - return self.running_jobs[row], 0 - row -= len(self.running_jobs) - if row < len(self.waiting_jobs): - return self.waiting_jobs[row], 1 - row -= len(self.running_jobs) - return self.finished_jobs[row], 2 - - def data(self, index, role): - if role not in (Qt.DisplayRole, Qt.DecorationRole): - return NONE - row, col = index.row(), index.column() - try: - job, status = self.row_to_job(row) - except IndexError: - return NONE - - if role == Qt.DisplayRole: - if col == 0: - return QVariant(job.description) - if col == 1: - if status == 2: - st = _('Finished') if job.exception is None else _('Error') - else: - st = [_('Working'), _('Waiting')][status] - return QVariant(st) - if col == 2: - return QVariant(int(100*job.percent_done)) - if col == 3: - if job.start_time is None: - return NONE - rtime = job.running_time if hasattr(job, 'running_time') else time.time() - job.start_time - return QVariant('%dm %ds'%(int(rtime)//60, int(rtime)%60)) - if role == Qt.DecorationRole and col == 0: - if status == 1: - return self.wait_icon - if status == 0: - return self.running_icon - if status == 2: - if job.exception or job.result == self.process_server.KILL_RESULT: - return self.error_icon - return self.done_icon - return NONE - - def status_update(self, id, progress): - for i in range(len(self.running_jobs)): - job = self.running_jobs[i] - if job.id == id: - self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'), self.index(i, 2), self.index(i, 3)) - break - - def kill_job(self, row, gui_parent): - job, status = self.row_to_job(row) - if isinstance(job, DeviceJob): - error_dialog(gui_parent, _('Cannot kill job'), - _('Cannot kill jobs that are communicating with the device as this may cause data corruption.')).exec_() - return - if status == 2: - error_dialog(gui_parent, _('Cannot kill job'), - _('Cannot kill already completed jobs.')).exec_() - return - if status == 1: - self.update_lock.lock() - try: - self.waiting_jobs.remove(job) - self.finished_jobs.append(job) - self.emit(SIGNAL('job_done(int)'), job.id) - job.result = self.process_server.KILL_RESULT - finally: - self.update_lock.unlock() - else: - self.process_server.kill(job.id) - self.reset() - if len(self.running_jobs) + len(self.waiting_jobs) == 0: - self.emit(SIGNAL('no_more_jobs()')) - -class DetailView(QDialog, Ui_Dialog): - - def __init__(self, parent, job): - QDialog.__init__(self, parent) - self.setupUi(self) - self.setWindowTitle(job.description) - self.job = job - self.connect(self.job, SIGNAL('output_received()'), self.update) - self.update() - - - def update(self): - txt = self.job.formatted_error() + self.job.formatted_log() - if not txt: - txt = 'No details available' - self.log.setHtml(txt) - vbar = self.log.verticalScrollBar() - vbar.setValue(vbar.maximum()) diff --git a/src/calibre/gui2/jobs2.py b/src/calibre/gui2/jobs2.py new file mode 100644 index 0000000000..9b30a3190e --- /dev/null +++ b/src/calibre/gui2/jobs2.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python +__license__ = 'GPL v3' +__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net' +__docformat__ = 'restructuredtext en' + +''' +Job management. +''' +import time +from PyQt4.QtCore import QAbstractTableModel, QVariant, QModelIndex, Qt, SIGNAL +from PyQt4.QtGui import QIcon, QDialog + +from calibre.parallel import ParallelJob, Server +from calibre.gui2 import Dispatcher, error_dialog +from calibre.gui2.device import DeviceJob +from calibre.gui2.dialogs.job_view_ui import Ui_Dialog + +NONE = QVariant() + +class JobManager(QAbstractTableModel): + + wait_icon = QVariant(QIcon(':/images/jobs.svg')) + running_icon = QVariant(QIcon(':/images/exec.svg')) + error_icon = QVariant(QIcon(':/images/dialog_error.svg')) + done_icon = QVariant(QIcon(':/images/ok.svg')) + + def __init__(self): + QAbstractTableModel.__init__(self) + self.jobs = [] + self.server = Server() + self.add_job = Dispatcher(self._add_job) + self.status_update = Dispatcher(self._status_update) + self.start_work = Dispatcher(self._start_work) + self.job_done = Dispatcher(self._job_done) + + def columnCount(self, parent=QModelIndex()): + return 4 + + def rowCount(self, parent=QModelIndex()): + return len(self.jobs) + + def headerData(self, section, orientation, role): + if role != Qt.DisplayRole: + return NONE + if orientation == Qt.Horizontal: + if section == 0: text = _("Job") + elif section == 1: text = _("Status") + elif section == 2: text = _("Progress") + elif section == 3: text = _('Running time') + return QVariant(text) + else: + return QVariant(section+1) + + def data(self, index, role): + try: + if role not in (Qt.DisplayRole, Qt.DecorationRole): + return NONE + row, col = index.row(), index.column() + job = self.jobs[row] + + if role == Qt.DisplayRole: + if col == 0: + desc = job.description + if not desc: + desc = _('Unknown job') + return QVariant(desc) + if col == 1: + status = job.status() + if status == 'DONE': + st = _('Finished') + elif status == 'ERROR': + st = _('Error') + elif status == 'WAITING': + st = _('Waiting') + else: + st = _('Working') + return QVariant(st) + if col == 2: + pc = job.percent + if pc <=0: + percent = 0 + else: + percent = int(100*pc) + return QVariant(percent) + if col == 3: + if job.start_time is None: + return NONE + rtime = job.running_time if job.running_time is not None else \ + time.time() - job.start_time + return QVariant('%dm %ds'%(int(rtime)//60, int(rtime)%60)) + if role == Qt.DecorationRole and col == 0: + status = job.status() + if status == 'WAITING': + return self.wait_icon + if status == 'WORKING': + return self.running_icon + if status == 'ERROR': + return self.error_icon + if status == 'DONE': + return self.done_icon + except: + import traceback + traceback.print_exc() + return NONE + + def _add_job(self, job): + self.emit(SIGNAL('layoutAboutToBeChanged()')) + self.jobs.append(job) + self.jobs.sort() + self.emit(SIGNAL('job_added(int)'), self.rowCount()) + self.emit(SIGNAL('layoutChanged()')) + + def done_jobs(self): + return [j for j in self.jobs if j.status() in ['DONE', 'ERROR']] + + def row_to_job(self, row): + return self.jobs[row] + + def _start_work(self, job): + self.emit(SIGNAL('layoutAboutToBeChanged()')) + self.jobs.sort() + self.emit(SIGNAL('layoutChanged()')) + + def _job_done(self, job): + self.emit(SIGNAL('layoutAboutToBeChanged()')) + self.jobs.sort() + self.emit(SIGNAL('job_done(int)'), len(self.jobs) - len(self.done_jobs())) + self.emit(SIGNAL('layoutChanged()')) + + def _status_update(self, job): + row = self.jobs.index(job) + self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'), + self.index(row, 0), self.index(row, 3)) + + + def has_device_jobs(self): + for job in self.jobs: + if job.is_running and isinstance(job, DeviceJob): + return True + return False + + def has_jobs(self): + for job in self.jobs: + if job.is_running: + return True + return False + + def run_job(self, done, func, args=[], kwargs={}, + description=None): + job = ParallelJob(func, done, self, args=args, kwargs=kwargs, + description=description) + self.server.add_job(job) + return job + + + def output(self, job): + self.emit(SIGNAL('output_received()')) + + def kill_job(self, row, view): + job = self.jobs[row] + if isinstance(job, DeviceJob): + error_dialog(view, _('Cannot kill job'), + _('Cannot kill jobs that communicate with the device')).exec_() + return + if job.has_run: + error_dialog(view, _('Cannot kill job'), + _('Job has already run')).exec_() + return + if not job.is_running: + error_dialog(view, _('Cannot kill job'), + _('Cannot kill waiting job')).exec_() + return + + + self.server.kill(job) + + def terminate_all_jobs(self): + pass + +class DetailView(QDialog, Ui_Dialog): + + def __init__(self, parent, job): + QDialog.__init__(self, parent) + self.setupUi(self) + self.setWindowTitle(job.description) + self.job = job + self.update() + + + def update(self): + self.log.setHtml(self.job.gui_text()) + vbar = self.log.verticalScrollBar() + vbar.setValue(vbar.maximum()) diff --git a/src/calibre/gui2/main.py b/src/calibre/gui2/main.py index 475b20a9c0..f0d168de0b 100644 --- a/src/calibre/gui2/main.py +++ b/src/calibre/gui2/main.py @@ -4,13 +4,13 @@ import os, sys, textwrap, collections, traceback, shutil, time from xml.parsers.expat import ExpatError from functools import partial from PyQt4.QtCore import Qt, SIGNAL, QObject, QCoreApplication, \ - QVariant, QThread, QUrl, QSize + QVariant, QUrl, QSize from PyQt4.QtGui import QPixmap, QColor, QPainter, QMenu, QIcon, QMessageBox, \ QToolButton, QDialog, QDesktopServices from PyQt4.QtSvg import QSvgRenderer from calibre import __version__, __appname__, islinux, sanitize_file_name, \ - Settings, iswindows, isosx, preferred_encoding + Settings, iswindows, isosx from calibre.ptempfile import PersistentTemporaryFile from calibre.ebooks.metadata.meta import get_metadata, get_filename_pat, set_filename_pat from calibre.devices.errors import FreeSpaceError @@ -18,16 +18,16 @@ from calibre.devices.interface import Device from calibre.gui2 import APP_UID, warning_dialog, choose_files, error_dialog, \ initialize_file_icon_provider, question_dialog,\ pixmap_to_data, choose_dir, ORG_NAME, \ - set_sidebar_directories, \ + set_sidebar_directories, Dispatcher, \ SingleApplication, Application, available_height, max_available_height from calibre.gui2.cover_flow import CoverFlow, DatabaseImages, pictureflowerror from calibre.library.database import LibraryDatabase from calibre.gui2.update import CheckForUpdates from calibre.gui2.main_window import MainWindow, option_parser from calibre.gui2.main_ui import Ui_MainWindow -from calibre.gui2.device import DeviceDetector, DeviceManager +from calibre.gui2.device import DeviceManager from calibre.gui2.status import StatusBar -from calibre.gui2.jobs import JobManager +from calibre.gui2.jobs2 import JobManager from calibre.gui2.news import NewsMenu from calibre.gui2.dialogs.metadata_single import MetadataSingleDialog from calibre.gui2.dialogs.metadata_bulk import MetadataBulkDialog @@ -44,6 +44,7 @@ from calibre.ebooks.metadata.meta import set_metadata from calibre.ebooks.metadata import MetaInformation from calibre.ebooks import BOOK_EXTENSIONS from calibre.ebooks.lrf import preferred_source_formats as LRF_PREFERRED_SOURCE_FORMATS +from calibre.parallel import JobKilled class Main(MainWindow, Ui_MainWindow): @@ -72,7 +73,6 @@ class Main(MainWindow, Ui_MainWindow): self.read_settings() self.job_manager = JobManager() self.jobs_dialog = JobsDialog(self, self.job_manager) - self.device_manager = None self.upload_memory = {} self.delete_memory = {} self.conversion_jobs = {} @@ -238,10 +238,8 @@ class Main(MainWindow, Ui_MainWindow): self.setMaximumHeight(max_available_height()) ####################### Setup device detection ######################## - self.detector = DeviceDetector(sleep_time=2000) - QObject.connect(self.detector, SIGNAL('connected(PyQt_PyObject, PyQt_PyObject)'), - self.device_detected, Qt.QueuedConnection) - self.detector.start(QThread.InheritPriority) + self.device_manager = DeviceManager(Dispatcher(self.device_detected), self.job_manager) + self.device_manager.start() self.news_menu.set_custom_feeds(self.library_view.model().db.get_feeds()) @@ -300,22 +298,19 @@ class Main(MainWindow, Ui_MainWindow): ########################## Connect to device ############################## - def device_detected(self, device, connected): + def device_detected(self, connected): ''' Called when a device is connected to the computer. ''' if connected: - self.device_manager = DeviceManager(device) - self.job_manager.run_device_job(self.info_read, self.device_manager.info_func()) - self.set_default_thumbnail(device.THUMBNAIL_HEIGHT) - self.status_bar.showMessage(_('Device: ')+device.__class__.__name__+_(' detected.'), 3000) + self.device_manager.get_device_information(Dispatcher(self.info_read)) + self.set_default_thumbnail(self.device_manager.device.THUMBNAIL_HEIGHT) + self.status_bar.showMessage(_('Device: ')+\ + self.device_manager.device.__class__.__name__+_(' detected.'), 3000) self.action_sync.setEnabled(True) self.device_connected = True else: self.device_connected = False - self.job_manager.terminate_device_jobs() - if self.device_manager: - self.device_manager.device_removed() self.location_view.model().update_devices() self.action_sync.setEnabled(False) self.vanity.setText(self.vanity_template%dict(version=self.latest_version, device=' ')) @@ -324,27 +319,25 @@ class Main(MainWindow, Ui_MainWindow): self.status_bar.reset_info() self.location_selected('library') - def info_read(self, id, description, result, exception, formatted_traceback): + def info_read(self, job): ''' Called once device information has been read. ''' - if exception: - self.device_job_exception(id, description, exception, formatted_traceback) + if job.exception is not None: + self.device_job_exception(job) return - info, cp, fs = result + info, cp, fs = job.result self.location_view.model().update_devices(cp, fs) self.device_info = _('Connected ')+' '.join(info[:-1]) self.vanity.setText(self.vanity_template%dict(version=self.latest_version, device=self.device_info)) - func = self.device_manager.books_func() - self.job_manager.run_device_job(self.metadata_downloaded, func) + self.device_manager.books(Dispatcher(self.metadata_downloaded)) - def metadata_downloaded(self, id, description, result, exception, formatted_traceback): + def metadata_downloaded(self, job): ''' Called once metadata has been read for all books on the device. ''' - if exception: - print exception, type(exception) - if isinstance(exception, ExpatError): + if job.exception is not None: + if isinstance(job.exception, ExpatError): error_dialog(self, _('Device database corrupted'), _('''

The database of books on the reader is corrupted. Try the following: @@ -354,9 +347,9 @@ class Main(MainWindow, Ui_MainWindow): ''')%dict(app=__appname__)).exec_() else: - self.device_job_exception(id, description, exception, formatted_traceback) + self.device_job_exception(job) return - mainlist, cardlist = result + mainlist, cardlist = job.result self.memory_view.set_database(mainlist) self.card_view.set_database(cardlist) for view in (self.memory_view, self.card_view): @@ -373,18 +366,17 @@ class Main(MainWindow, Ui_MainWindow): ''' Upload metadata to device. ''' - self.job_manager.run_device_job(self.metadata_synced, - self.device_manager.sync_booklists_func(), - self.booklists()) + self.device_manager.sync_booklists(Dispatcher(self.metadata_synced), + self.booklists()) - def metadata_synced(self, id, description, result, exception, formatted_traceback): + def metadata_synced(self, job): ''' Called once metadata has been uploaded. ''' - if exception: - self.device_job_exception(id, description, exception, formatted_traceback) + if job.exception is not None: + self.device_job_exception(job) return - cp, fs = result + cp, fs = job.result self.location_view.model().update_devices(cp, fs) ############################################################################ @@ -486,35 +478,34 @@ class Main(MainWindow, Ui_MainWindow): def upload_books(self, files, names, metadata, on_card=False, memory=None): ''' Upload books to device. - @param files: List of either paths to files or file like objects + :param files: List of either paths to files or file like objects ''' - titles = ', '.join([i['title'] for i in metadata]) - id = self.job_manager.run_device_job(self.books_uploaded, - self.device_manager.upload_books_func(), + titles = [i['title'] for i in metadata] + job = self.device_manager.upload_books(Dispatcher(self.books_uploaded), files, names, on_card=on_card, - job_extra_description=titles + titles=titles ) - self.upload_memory[id] = (metadata, on_card, memory) + self.upload_memory[job] = (metadata, on_card, memory) - def books_uploaded(self, id, description, result, exception, formatted_traceback): + def books_uploaded(self, job): ''' Called once books have been uploaded. ''' - metadata, on_card, memory = self.upload_memory.pop(id) + metadata, on_card, memory = self.upload_memory.pop(job) - if exception: - if isinstance(exception, FreeSpaceError): - where = 'in main memory.' if 'memory' in str(exception) else 'on the storage card.' + if job.exception is not None: + if isinstance(job.exception, FreeSpaceError): + where = 'in main memory.' if 'memory' in str(job.exception) else 'on the storage card.' titles = '\n'.join(['

  • '+mi['title']+'
  • ' for mi in metadata]) d = error_dialog(self, _('No space on device'), _('

    Cannot upload books to device there is no more free space available ')+where+ '

    \n
      %s
    '%(titles,)) d.exec_() else: - self.device_job_exception(id, description, exception, formatted_traceback) + self.device_job_exception(job) return - self.device_manager.add_books_to_metadata(result, metadata, self.booklists()) + self.device_manager.add_books_to_metadata(job.result, metadata, self.booklists()) self.upload_booklists() @@ -554,22 +545,20 @@ class Main(MainWindow, Ui_MainWindow): self.status_bar.showMessage(_('Deleting books from device.'), 1000) def remove_paths(self, paths): - return self.job_manager.run_device_job(self.books_deleted, - self.device_manager.delete_books_func(), paths) - + return self.device_manager.delete_books(Dispatcher(self.books_deleted), paths) - def books_deleted(self, id, description, result, exception, formatted_traceback): + def books_deleted(self, job): ''' Called once deletion is done on the device ''' for view in (self.memory_view, self.card_view): - view.model().deletion_done(id, bool(exception)) - if exception: - self.device_job_exception(id, description, exception, formatted_traceback) + view.model().deletion_done(id, bool(job.exception)) + if job.exception is not None: + self.device_job_exception(job) return - if self.delete_memory.has_key(id): - paths, model = self.delete_memory.pop(id) + if self.delete_memory.has_key(job): + paths, model = self.delete_memory.pop(job) self.device_manager.remove_books_from_metadata(paths, self.booklists()) model.paths_deleted(paths) self.upload_booklists() @@ -718,12 +707,11 @@ class Main(MainWindow, Ui_MainWindow): QDesktopServices.openUrl(QUrl('file:'+dir)) else: paths = self.current_view().model().paths(rows) - self.job_manager.run_device_job(self.books_saved, - self.device_manager.save_books_func(), paths, dir) - - def books_saved(self, id, description, result, exception, formatted_traceback): - if exception: - self.device_job_exception(id, description, exception, formatted_traceback) + self.device_manager.save_books(Dispatcher(self.books_saved), paths, dir) + + def books_saved(self, job): + if job.exception is not None: + self.device_job_exception(job) return ############################################################################ @@ -746,15 +734,15 @@ class Main(MainWindow, Ui_MainWindow): if data['password']: args.extend(['--password', data['password']]) args.append(data['script'] if data['script'] else data['title']) - id = self.job_manager.run_conversion_job(self.news_fetched, 'feeds2lrf', args=[args], - job_description=_('Fetch news from ')+data['title']) - self.conversion_jobs[id] = (pt, 'lrf') + job = self.job_manager.run_job(Dispatcher(self.news_fetched), 'feeds2lrf', args=[args], + description=_('Fetch news from ')+data['title']) + self.conversion_jobs[job] = (pt, 'lrf') self.status_bar.showMessage(_('Fetching news from ')+data['title'], 2000) - def news_fetched(self, id, description, result, exception, formatted_traceback, log): - pt, fmt = self.conversion_jobs.pop(id) - if exception: - self.conversion_job_exception(id, description, exception, formatted_traceback, log) + def news_fetched(self, job): + pt, fmt = self.conversion_jobs.pop(job) + if job.exception is not None: + self.job_exception(job) return to_device = self.device_connected and fmt in self.device_manager.device_class.FORMATS self._add_books([pt.name], to_device) @@ -828,12 +816,12 @@ class Main(MainWindow, Ui_MainWindow): cmdline.extend(['--cover', cf.name]) cmdline.extend(['-o', of.name]) cmdline.append(pt.name) - id = self.job_manager.run_conversion_job(self.book_converted, + job = self.job_manager.run_job(Dispatcher(self.book_converted), 'any2lrf', args=[cmdline], - job_description=_('Convert book %d of %d (%s)')%(i+1, len(rows), repr(mi.title))) + description=_('Convert book %d of %d (%s)')%(i+1, len(rows), repr(mi.title))) - self.conversion_jobs[id] = (d.cover_file, pt, of, d.output_format, + self.conversion_jobs[job] = (d.cover_file, pt, of, d.output_format, self.library_view.model().db.id(row)) res = [] for row in bad_rows: @@ -874,10 +862,10 @@ class Main(MainWindow, Ui_MainWindow): setattr(options, 'output', of.name) options.verbose = 1 args = [pt.name, options] - id = self.job_manager.run_conversion_job(self.book_converted, - 'comic2lrf', args=args, - job_description=_('Convert comic %d of %d (%s)')%(i+1, len(comics), repr(options.title))) - self.conversion_jobs[id] = (None, pt, of, 'lrf', + job = self.job_manager.run_job(Dispatcher(self.book_converted), + 'comic2lrf', args=args, + description=_('Convert comic %d of %d (%s)')%(i+1, len(comics), repr(options.title))) + self.conversion_jobs[job] = (None, pt, of, 'lrf', self.library_view.model().db.id(row)) @@ -904,12 +892,12 @@ class Main(MainWindow, Ui_MainWindow): of.close() cmdline.extend(['-o', of.name]) cmdline.append(pt.name) - id = self.job_manager.run_conversion_job(self.book_converted, - 'any2lrf', args=[cmdline], - job_description=_('Convert book: ')+d.title()) + job = self.job_manager.run_job(Dispatcher(self.book_converted), + 'any2lrf', args=[cmdline], + description=_('Convert book: ')+d.title()) - self.conversion_jobs[id] = (d.cover_file, pt, of, d.output_format, d.id) + self.conversion_jobs[job] = (d.cover_file, pt, of, d.output_format, d.id) changed = True if changed: self.library_view.model().resort(reset=False) @@ -949,24 +937,24 @@ class Main(MainWindow, Ui_MainWindow): opts.verbose = 1 args = [pt.name, opts] changed = True - id = self.job_manager.run_conversion_job(self.book_converted, + job = self.job_manager.run_job(Dispatcher(self.book_converted), 'comic2lrf', args=args, - job_description=_('Convert comic: ')+opts.title) - self.conversion_jobs[id] = (None, pt, of, 'lrf', + description=_('Convert comic: ')+opts.title) + self.conversion_jobs[job] = (None, pt, of, 'lrf', self.library_view.model().db.id(row)) if changed: self.library_view.model().resort(reset=False) self.library_view.model().research() - def book_converted(self, id, description, result, exception, formatted_traceback, log): - of, fmt, book_id = self.conversion_jobs.pop(id)[2:] - if exception: - self.conversion_job_exception(id, description, exception, formatted_traceback, log) + def book_converted(self, job): + of, fmt, book_id = self.conversion_jobs.pop(job)[2:] + if job.exception is not None: + self.job_exception(job) return data = open(of.name, 'rb') self.library_view.model().db.add_format(book_id, fmt, data, index_is_id=True) data.close() - self.status_bar.showMessage(description + (' completed'), 2000) + self.status_bar.showMessage(job.description + (' completed'), 2000) #############################View book###################################### @@ -977,19 +965,18 @@ class Main(MainWindow, Ui_MainWindow): self.persistent_files.append(pt) self._view_file(pt.name) - def book_downloaded_for_viewing(self, id, description, result, exception, formatted_traceback): - if exception: - self.device_job_exception(id, description, exception, formatted_traceback) + def book_downloaded_for_viewing(self, job): + if job.exception: + self.device_job_exception(job) return - print result - self._view_file(result) + self._view_file(job.result) def _view_file(self, name): self.setCursor(Qt.BusyCursor) try: if name.upper().endswith('.LRF'): args = ['lrfviewer', name] - self.job_manager.process_server.run_free_job('lrfviewer', kwdargs=dict(args=args)) + self.job_manager.server.run_free_job('lrfviewer', kwdargs=dict(args=args)) else: QDesktopServices.openUrl(QUrl('file:'+name))#launch(name) time.sleep(5) # User feedback @@ -1050,9 +1037,9 @@ class Main(MainWindow, Ui_MainWindow): pt = PersistentTemporaryFile('_viewer_'+os.path.splitext(paths[0])[1]) self.persistent_files.append(pt) pt.close() - self.job_manager.run_device_job(self.book_downloaded_for_viewing, - self.device_manager.view_book_func(), paths[0], pt.name) - + self.device_manager.view_book(Dispatcher(self.book_downloaded_for_viewing), + paths[0], pt.name) + ############################################################################ @@ -1176,72 +1163,40 @@ class Main(MainWindow, Ui_MainWindow): self.action_convert.setEnabled(False) self.view_menu.actions()[1].setEnabled(False) - def device_job_exception(self, id, description, exception, formatted_traceback): + def device_job_exception(self, job): ''' Handle exceptions in threaded device jobs. ''' - if 'Could not read 32 bytes on the control bus.' in str(exception): + if 'Could not read 32 bytes on the control bus.' in str(job.exception): error_dialog(self, _('Error talking to device'), _('There was a temporary error talking to the device. Please unplug and reconnect the device and or reboot.')).show() return - print >>sys.stderr, 'Error in job:', description.encode('utf8') - print >>sys.stderr, exception - print >>sys.stderr, formatted_traceback.encode('utf8') + try: + print >>sys.stderr, job.console_text() + except: + pass if not self.device_error_dialog.isVisible(): - msg = u'

    %s: '%(exception.__class__.__name__,) + unicode(str(exception), 'utf8', 'replace') + u'

    ' - msg += u'

    Failed to perform job: '+description - msg += u'

    Further device related error messages will not be shown while this message is visible.' - msg += u'

    Detailed traceback:

    '
    -            if isinstance(formatted_traceback, str):
    -                formatted_traceback = unicode(formatted_traceback, 'utf8', 'replace')
    -            msg += formatted_traceback
    -            self.device_error_dialog.set_message(msg)
    +            self.device_error_dialog.set_message(job.gui_text())
                 self.device_error_dialog.show()
                 
    -    def conversion_job_exception(self, id, description, exception, formatted_traceback, log):
    +    def job_exception(self, job):
             
    -        def safe_print(msgs, file=sys.stderr):
    -            for i, msg in enumerate(msgs):
    -                if not msg:
    -                    msg = ''
    -                if isinstance(msg, unicode):
    -                    msgs[i] = msg.encode(preferred_encoding, 'replace')
    -            msg = ' '.join(msgs)
    -            print >>file, msg
    -        
    -        def safe_unicode(arg):
    -            if not arg:
    -                arg = unicode(repr(arg))
    -            if isinstance(arg, str):
    -                arg = arg.decode(preferred_encoding, 'replace')
    -            if not isinstance(arg, unicode):
    -                try:
    -                    arg = unicode(repr(arg))
    -                except:
    -                    arg = u'Could not convert to unicode'
    -            return arg
    -        
    -        only_msg = getattr(exception, 'only_msg', False)
    -        description, exception, formatted_traceback, log = map(safe_unicode,
    -                            (description, exception, formatted_traceback, log))
    +        only_msg = getattr(job.exception, 'only_msg', False)
             try:
    -            safe_print('Error in job:', description)
    -            if log:
    -                safe_print(log)
    -            safe_print(exception)
    -            safe_print(formatted_traceback)
    +            print job.console_text()
             except:
                 pass
             if only_msg:
    -            error_dialog(self, _('Conversion Error'), exception).exec_()
    +            try:
    +                exc = unicode(job.exception)
    +            except:
    +                exc = repr(job.exception)
    +            error_dialog(self, _('Conversion Error'), exc).exec_()
                 return
    -        msg =  u'

    %s:'%exception - msg += u'

    Failed to perform job: '+description - msg += u'

    Detailed traceback:

    '
    -        msg += formatted_traceback + u'
    ' - msg += u'

    Log:

    '
    -        msg += log
    -        ConversionErrorDialog(self, 'Conversion Error', msg, show=True)
    +        if isinstance(job.exception, JobKilled):
    +            return
    +        ConversionErrorDialog(self, _('Conversion Error'), job.gui_text(), 
    +                              show=True)
             
         
         def read_settings(self):
    @@ -1294,10 +1249,9 @@ class Main(MainWindow, Ui_MainWindow):
                 
             self.job_manager.terminate_all_jobs()
             self.write_settings()
    -        self.detector.keep_going = False
    +        self.device_manager.keep_going = False
             self.hide()
             time.sleep(2)
    -        self.detector.terminate()
             e.accept()
             
         def update_found(self, version):
    diff --git a/src/calibre/gui2/status.py b/src/calibre/gui2/status.py
    index 20df329364..1a322f7e31 100644
    --- a/src/calibre/gui2/status.py
    +++ b/src/calibre/gui2/status.py
    @@ -163,24 +163,23 @@ class StatusBar(QStatusBar):
         def show_book_info(self):
             self.emit(SIGNAL('show_book_info()'))
         
    -    def job_added(self, id):
    +    def job_added(self, nnum):
             jobs = self.movie_button.jobs
             src = qstring_to_unicode(jobs.text())
             num = self.jobs()
    -        nnum = num+1
    +        nnum = num + 1
             text = src.replace(str(num), str(nnum))
             jobs.setText(text)
             if self.movie_button.movie.state() == QMovie.Paused:
                 self.movie_button.movie.setPaused(False)
                 
    -    def job_done(self, id):
    +    def job_done(self, running):
             jobs = self.movie_button.jobs
             src = qstring_to_unicode(jobs.text())
             num = self.jobs()
    -        nnum = num-1
    -        text = src.replace(str(num), str(nnum))
    +        text = src.replace(str(num), str(running))
             jobs.setText(text)
    -        if nnum == 0:
    +        if running == 0:
                 self.no_more_jobs()
                 
         def no_more_jobs(self):
    diff --git a/src/calibre/gui2/widgets.py b/src/calibre/gui2/widgets.py
    index 06a85128bd..7fe8a278e8 100644
    --- a/src/calibre/gui2/widgets.py
    +++ b/src/calibre/gui2/widgets.py
    @@ -11,7 +11,7 @@ from PyQt4.QtGui import QListView, QIcon, QFont, QLabel, QListWidget, \
     from PyQt4.QtCore import QAbstractListModel, QVariant, Qt, SIGNAL, \
                              QObject, QRegExp, QString
     
    -from calibre.gui2.jobs import DetailView
    +from calibre.gui2.jobs2 import DetailView
     from calibre.gui2 import human_readable, NONE, TableView, qstring_to_unicode, error_dialog
     from calibre.gui2.filename_pattern_ui import Ui_Form
     from calibre import fit_image, Settings
    @@ -235,8 +235,10 @@ class JobsView(TableView):
             
         def show_details(self, index):
             row = index.row()
    -        job = self.model().row_to_job(row)[0]
    -        DetailView(self, job).exec_()
    +        job = self.model().row_to_job(row)
    +        d = DetailView(self, job)
    +        self.connect(self.model(), SIGNAL('output_received()'), d.update)
    +        d.exec_()
                 
     
     class FontFamilyModel(QAbstractListModel):
    diff --git a/src/calibre/manual/faq.rst b/src/calibre/manual/faq.rst
    index 780de1d85b..f8c4d744db 100644
    --- a/src/calibre/manual/faq.rst
    +++ b/src/calibre/manual/faq.rst
    @@ -17,7 +17,7 @@ E-book Format Conversion
     
     What formats does |app| support conversion to/from?
     ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    -|app| supports the conversion of the following formats to LRF: HTML, LIT, MOBI, PRC, EPUB, RTF, TXT, PDF and LRS. It also supports the conversion of LRF to LRS and HTML. Note that calibre does not support the conversion of DRMed ebooks.
    +|app| supports the conversion of the following formats to LRF: HTML, LIT, MOBI, PRC, EPUB, CBR, CBZ, RTF, TXT, PDF and LRS. It also supports the conversion of LRF to LRS and HTML(forthcoming). Note that calibre does not support the conversion of DRMed ebooks.
     
     What are the best formats to convert to LRF?
     ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    diff --git a/src/calibre/parallel.py b/src/calibre/parallel.py
    index cde249a779..0a028a16bd 100644
    --- a/src/calibre/parallel.py
    +++ b/src/calibre/parallel.py
    @@ -27,7 +27,6 @@ is buffered and asynchronous to prevent the job from being IO bound.
     import sys, os, gc, cPickle, traceback, atexit, cStringIO, time, signal, \
            subprocess, socket, collections, binascii, re, thread, tempfile
     from select import select
    -from functools import partial
     from threading import RLock, Thread, Event
     
     from calibre.ptempfile import PersistentTemporaryFile
    @@ -334,7 +333,7 @@ class Overseer(object):
         def __init__(self, server, port, timeout=5):
             self.worker_status = mother.spawn_worker('127.0.0.1:'+str(port))
             self.socket = server.accept()[0]
    -        # Needed if terminate called hwen interpreter is shutting down
    +        # Needed if terminate called when interpreter is shutting down
             self.os = os
             self.signal = signal
             self.on_probation = False
    @@ -343,7 +342,6 @@ class Overseer(object):
             self.working = False
             self.timeout = timeout
             self.last_job_time = time.time()
    -        self.job_id = None
             self._stop = False
             if not select([self.socket], [], [], 120)[0]:
                 raise RuntimeError(_('Could not launch worker process.'))
    @@ -408,16 +406,14 @@ class Overseer(object):
     
             `job`: An instance of :class:`Job`.
             '''
    -        self.job_id = job.job_id
             self.working = True
    -        self.write('JOB:'+cPickle.dumps((job.func, job.args, job.kwdargs), -1))
    +        self.write('JOB:'+cPickle.dumps((job.func, job.args, job.kwargs), -1))
             msg = self.read()
             if msg != 'OK':
                 raise ControlError('Failed to initialize job on worker %d:%s'%(self.worker_pid, msg))
    -        self.output = job.output if callable(job.output) else sys.stdout.write
    -        self.progress = job.progress if callable(job.progress) else None
             self.job =  job
             self.last_report = time.time()
    +        job.start_work()
     
         def control(self):
             '''
    @@ -435,7 +431,9 @@ class Overseer(object):
                 else:
                     if self.on_probation:
                         self.terminate()
    -                    return Result(None, ControlError('Worker process died unexpectedly'), '')
    +                    self.job.result = None
    +                    self.job.exception = ControlError('Worker process died unexpectedly')
    +                    return
                     else:
                         self.on_probation = True
                         return
    @@ -445,13 +443,14 @@ class Overseer(object):
                     return
                 elif word == 'RESULT':
                     self.write('OK')
    -                return Result(cPickle.loads(msg), None, None)
    +                self.job.result = cPickle.loads(msg)
    +                return True
                 elif word == 'OUTPUT':
                     self.write('OK')
                     try:
    -                    self.output(''.join(cPickle.loads(msg)))
    +                    self.job.output(''.join(cPickle.loads(msg)))
                     except:
    -                    self.output('Bad output message: '+ repr(msg))
    +                    self.job.output('Bad output message: '+ repr(msg))
                 elif word == 'PROGRESS':
                     self.write('OK')
                     percent = None
    @@ -459,45 +458,154 @@ class Overseer(object):
                         percent, msg = cPickle.loads(msg)[-1]
                     except:
                         print 'Bad progress update:', repr(msg)
    -                if self.progress and percent is not None:
    -                    self.progress(percent, msg)
    +                if percent is not None:
    +                    self.job.update_status(percent, msg)
                 elif word == 'ERROR':
                     self.write('OK')
    -                return Result(None, *cPickle.loads(msg))
    +                self.job.excetion, self.job.traceback = cPickle.loads(msg)
    +                return True
                 else:
                     self.terminate()
    -                return Result(None, ControlError('Worker sent invalid msg: %s'%repr(msg)), '')
    +                self.job.exception = ControlError('Worker sent invalid msg: %s'%repr(msg))
    +                return
             if not self.worker_status.is_alive() or time.time() - self.last_report > 180:
                 self.terminate()
    -            return Result(None, ControlError('Worker process died unexpectedly with returncode: %s'%str(self.process.returncode)), '')
    +            self.job.exception = ControlError('Worker process died unexpectedly with returncode: %s'%str(self.process.returncode))
    +            return
     
    +class JobKilled(Exception):
    +    pass
     
     class Job(object):
    +    
    +    def __init__(self, job_done, job_manager=None, 
    +                 args=[], kwargs={}, description=None):
    +        self.args            = args
    +        self.kwargs          = kwargs
    +        self._job_done       = job_done
    +        self.job_manager     = job_manager
    +        self.is_running      = False
    +        self.has_run         = False
    +        self.percent         = -1
    +        self.msg             = None
    +        self.description     = description
    +        self.start_time      = None
    +        self.running_time    = None
    +        
    +        self.result = self.exception = self.traceback = self.log = None
    +    
    +    def __cmp__(self, other):
    +        sstatus, ostatus = self.status(), other.status()
    +        if sstatus == ostatus or (self.has_run and other.has_run):
    +            if self.start_time == other.start_time:
    +                return cmp(id(self), id(other))
    +            return cmp(self.start_time, other.start_time)
    +        if sstatus == 'WORKING':
    +            return -1
    +        if ostatus == 'WORKING':
    +            return 1
    +        if sstatus == 'WAITING':
    +            return -1
    +        if ostatus == 'WAITING':
    +            return 1
    +        
    +    
    +    def job_done(self):
    +        self.is_running, self.has_run = False, True
    +        self.running_time = (time.time() - self.start_time) if \
    +                                    self.start_time is not None else 0
    +        if self.job_manager is not None:
    +            self.job_manager.job_done(self)
    +        self._job_done(self)
    +        
    +    def start_work(self):
    +        self.is_running = True
    +        self.has_run    = False
    +        self.start_time = time.time()
    +        if self.job_manager is not None:
    +            self.job_manager.start_work(self)
    +    
    +    def update_status(self, percent, msg=None):
    +        self.percent = percent
    +        self.msg     = msg
    +        if self.job_manager is not None:
    +            self.job_manager.status_update(self)
    +        
    +    def status(self):
    +        if self.is_running:
    +            return 'WORKING'
    +        if not self.has_run:
    +            return 'WAITING'
    +        if self.has_run:
    +            if self.exception is None:
    +                return 'DONE'
    +            return 'ERROR'
    +            
    +    def console_text(self):
    +        ans = [u'Error in job: ']
    +        if self.description:
    +            ans[0] += self.description
    +        if self.log:
    +            if isinstance(self.log, str):
    +                self.log = unicode(self.log, 'utf-8', 'replace')
    +            ans.append(self.log)
    +        header = unicode(self.exception.__class__.__name__) if \
    +                hasattr(self.exception, '__class__') else u'Error'
    +        header += u': '
    +        try:
    +            header += unicode(self.exception)
    +        except:
    +            header += unicode(repr(self.exception))
    +        ans.append(header)
    +        if self.traceback:
    +            ans.append(self.traceback)
    +        return (u'\n'.join(ans)).encode('utf-8')
    +    
    +    def gui_text(self):
    +        ans = [u'Job: ']
    +        if self.description:
    +            if not isinstance(self.description, unicode):
    +                self.description = self.description.decode('utf-8', 'replace')
    +            ans[0] += u'%s'%self.description
    +        if self.exception is not None:
    +            header = unicode(self.exception.__class__.__name__) if \
    +                    hasattr(self.exception, '__class__') else u'Error'
    +            header = u'%s'%header
    +            header += u': '
    +            try:
    +                header += unicode(self.exception)
    +            except:
    +                header += unicode(repr(self.exception))
    +            ans.append(header)
    +            if self.traceback:
    +                ans.append(u'Traceback:')
    +                ans.extend(self.traceback.split('\n'))
    +        if self.log:
    +            ans.append(u'Log:')
    +            if isinstance(self.log, str):
    +                self.log = unicode(self.log, 'utf-8', 'replace')
    +            ans.extend(self.log.split('\n'))
    +            
    +        return '
    '.join(ans) - def __init__(self, job_id, func, args, kwdargs, output, progress, done): - self.job_id = job_id + +class ParallelJob(Job): + + def __init__(self, func, *args, **kwargs): + Job.__init__(self, *args, **kwargs) self.func = func - self.args = args - self.kwdargs = kwdargs - self.output = output - self.progress = progress - self.done = done - -class Result(object): - - def __init__(self, result, exception, traceback): - self.result = result - self.exception = exception - self.traceback = traceback - - def __len__(self): - return 3 - - def __item__(self, i): - return (self.result, self.exception, self.traceback)[i] - - def __iter__(self): - return iter((self.result, self.exception, self.traceback)) + self.done = self.job_done + + def output(self, msg): + if not self.log: + self.log = u'' + if not isinstance(msg, unicode): + msg = msg.decode('utf-8', 'replace') + if msg: + self.log += msg + if self.job_manager is not None: + self.job_manager.output(self) + def remove_ipc_socket(path): os = __import__('os') @@ -527,7 +635,7 @@ class Server(Thread): atexit.register(remove_ipc_socket, self.port) self.server_socket.listen(5) self.number_of_workers = number_of_workers - self.pool, self.jobs, self.working, self.results = [], collections.deque(), [], {} + self.pool, self.jobs, self.working = [], collections.deque(), [] atexit.register(self.killall) atexit.register(self.close) self.job_lock = RLock() @@ -546,16 +654,9 @@ class Server(Thread): def add_job(self, job): with self.job_lock: self.jobs.append(job) - - def store_result(self, result, id=None): - if id: - with self.job_lock: - self.results[id] = result - - def result(self, id): - with self.result_lock: - return self.results.pop(id, None) - + if job.job_manager is not None: + job.job_manager.add_job(job) + def run(self): while True: job = None @@ -577,8 +678,9 @@ class Server(Thread): o.initialize_job(job) except Exception, err: o.terminate() - res = Result(None, unicode(err), traceback.format_exc()) - job.done(res) + job.exception = err + job.traceback = traceback.format_exc() + job.done() o = None if o and o.is_viable(): with self.working_lock: @@ -588,12 +690,14 @@ class Server(Thread): done = [] for o in self.working: try: - res = o.control() + if o.control() is not None or o.job.exception is not None: + o.job.done() + done.append(o) except Exception, err: - res = Result(None, unicode(err), traceback.format_exc()) + o.job.exception = err + o.job.traceback = traceback.format_exc() o.terminate() - if isinstance(res, Result): - o.job.done(res) + o.job.done() done.append(o) for o in done: self.working.remove(o) @@ -613,32 +717,23 @@ class Server(Thread): self.pool = [] - def kill(self, job_id): + def kill(self, job): with self.working_lock: pop = None for o in self.working: - if o.job_id == job_id: - o.terminate() - o.job.done(Result(self.KILL_RESULT, None, '')) + if o.job == job or o == job: + try: + o.terminate() + except: pass + o.job.exception = JobKilled(_('Job stopped by user')) + try: + o.job.done() + except: pass pop = o break if pop is not None: self.working.remove(pop) - - - def run_job(self, job_id, func, args=[], kwdargs={}, - output=None, progress=None, done=None): - ''' - Run a job in a separate process. Supports job control, output redirection - and progress reporting. - ''' - if done is None: - done = partial(self.store_result, id=job_id) - job = Job(job_id, func, args, kwdargs, output, progress, done) - with self.job_lock: - self.jobs.append(job) - def run_free_job(self, func, args=[], kwdargs={}): pt = PersistentTemporaryFile('.pickle', '_IPC_') pt.write(cPickle.dumps((func, args, kwdargs))) @@ -744,7 +839,7 @@ def worker(host, port): if msg != 'OK': return 1 write(client_socket, 'WAITING') - + sys.stdout = BufferedSender(client_socket) sys.stderr = sys.stdout diff --git a/src/calibre/web/feeds/news.py b/src/calibre/web/feeds/news.py index 33f24b5a05..dabdd3547d 100644 --- a/src/calibre/web/feeds/news.py +++ b/src/calibre/web/feeds/news.py @@ -573,7 +573,7 @@ class BasicNewsRecipe(object, LoggingInterface): open(pt.name, 'wb').write(raw) pt.close() url = ('file:'+pt.name) if iswindows else ('file://'+pt.name) - return self._fetch_article(url, dir, logger, f, a, num_of_feeds) + return self._fetch_article(url, dir, logger, f, a, num_of_feeds) def build_index(self): @@ -586,7 +586,7 @@ class BasicNewsRecipe(object, LoggingInterface): feeds = self.parse_feeds() self.report_progress(0, _('Trying to download cover...')) - self.download_cover() + self.download_cover() if self.test: feeds = feeds[:2] self.has_single_feed = len(feeds) == 1