Refactor job control system.

This commit is contained in:
Kovid Goyal 2008-08-12 11:10:10 -07:00
parent 7998201e0e
commit 47da65b4c9
12 changed files with 675 additions and 766 deletions

View File

@ -10,6 +10,8 @@ import os, tempfile, atexit, shutil, time
from PyQt4.Qt import QWebPage, QUrl, QApplication, QSize, \
SIGNAL, QPainter, QImage, QObject, Qt
from calibre.parallel import ParallelJob
__app = None
class HTMLTableRenderer(QObject):
@ -80,18 +82,17 @@ def render_table(server, soup, table, css, base_dir, width, height, dpi, factor=
</body>
</html>
'''%(head, width-10, style, unicode(table))
server.run_job(1, 'render_table',
job = ParallelJob('render_table', lambda j : j, None,
args=[html, base_dir, width, height, dpi, factor])
res = None
while res is None:
server.add_job(job)
while not job.has_run:
time.sleep(2)
res = server.result(1)
result, exception, traceback = res
if exception:
if job.exception is not None:
print 'Failed to render table'
print exception
print traceback
images, tdir = result
print job.exception
print job.traceback
images, tdir = job.result
atexit.register(shutil.rmtree, tdir)
return images

View File

@ -2,7 +2,7 @@ __license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
""" The GUI """
import sys, os, re, StringIO, traceback
from PyQt4.QtCore import QVariant, QFileInfo, QObject, SIGNAL, QBuffer, \
from PyQt4.QtCore import QVariant, QFileInfo, QObject, SIGNAL, QBuffer, Qt, \
QByteArray, QLocale, QUrl, QTranslator, QCoreApplication
from PyQt4.QtGui import QFileDialog, QMessageBox, QPixmap, QFileIconProvider, \
QIcon, QTableView, QDialogButtonBox, QApplication
@ -84,6 +84,21 @@ def human_readable(size):
size = size[:-2]
return size + " " + suffix
class Dispatcher(QObject):
'''Convenience class to ensure that a function call always happens in the GUI thread'''
def __init__(self, func):
QObject.__init__(self)
self.func = func
self.connect(self, SIGNAL('edispatch(PyQt_PyObject, PyQt_PyObject)'),
self.dispatch, Qt.QueuedConnection)
def __call__(self, *args, **kwargs):
self.emit(SIGNAL('edispatch(PyQt_PyObject, PyQt_PyObject)'), args, kwargs)
def dispatch(self, args, kwargs):
self.func(*args, **kwargs)
class TableView(QTableView):
def __init__(self, parent):

View File

@ -1,130 +1,185 @@
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
import os, traceback
from PyQt4.QtCore import QThread, SIGNAL, QObject
import os, traceback, Queue, time
from threading import Thread
from calibre.devices import devices
from calibre.parallel import Job
from calibre.devices.scanner import DeviceScanner
class DeviceDetector(QThread):
class DeviceJob(Job):
def __init__(self, func, *args, **kwargs):
Job.__init__(self, *args, **kwargs)
self.func = func
def run(self):
self.start_work()
try:
self.result = self.func(*self.args, **self.kwargs)
except (Exception, SystemExit), err:
self.exception = err
self.traceback = traceback.format_exc()
finally:
self.job_done()
class DeviceManager(Thread):
'''
Worker thread that polls the USB ports for devices. Emits the
signal connected(PyQt_PyObject, PyQt_PyObject) on connection and
disconnection events.
'''
def __init__(self, sleep_time=2000):
def __init__(self, connected_slot, job_manager, sleep_time=2):
'''
@param sleep_time: Time to sleep between device probes in millisecs
@type sleep_time: integer
'''
self.devices = [[d, False] for d in devices()]
self.sleep_time = sleep_time
QThread.__init__(self)
self.keep_going = True
Thread.__init__(self)
self.setDaemon(True)
self.devices = [[d, False] for d in devices()]
self.device = None
self.device_class = None
self.sleep_time = sleep_time
self.connected_slot = connected_slot
self.jobs = Queue.Queue(0)
self.keep_going = True
self.job_manager = job_manager
self.current_job = None
self.scanner = DeviceScanner()
def detect_device(self):
self.scanner.scan()
for device in self.devices:
connected = self.scanner.is_device_connected(device[0])
if connected and not device[1]:
try:
dev = device[0]()
dev.open()
self.device = dev
self.device_class = dev.__class__
self.connected_slot(True)
except:
print 'Unable to open device'
traceback.print_exc()
finally:
device[1] = True
elif not connected and device[1]:
while True:
try:
job = self.jobs.get_nowait()
job.abort(Exception(_('Device no longer connected.')))
except Queue.Empty:
break
self.device = None
self.connected_slot(False)
device[1] ^= True
def next(self):
if not self.jobs.empty():
try:
return self.jobs.get_nowait()
except Queue.Empty:
pass
def run(self):
scanner = DeviceScanner()
while self.keep_going:
scanner.scan()
for device in self.devices:
connected = scanner.is_device_connected(device[0])
if connected and not device[1]:
try:
dev = device[0]()
dev.open()
self.emit(SIGNAL('connected(PyQt_PyObject, PyQt_PyObject)'), dev, True)
except:
print 'Unable to open device'
traceback.print_exc()
finally:
device[1] = True
elif not connected and device[1]:
self.emit(SIGNAL('connected(PyQt_PyObject, PyQt_PyObject)'), device[0], False)
device[1] ^= True
self.msleep(self.sleep_time)
self.detect_device()
while True:
job = self.next()
if job is not None:
self.current_job = job
self.device.set_progress_reporter(job.update_status)
self.current_job.run()
self.current_job = None
else:
break
time.sleep(self.sleep_time)
def create_job(self, func, done, description, args=[], kwargs={}):
job = DeviceJob(func, done, self.job_manager,
args=args, kwargs=kwargs, description=description)
self.job_manager.add_job(job)
self.jobs.put(job)
return job
def _get_device_information(self):
info = self.device.get_device_information(end_session=False)
info = [i.replace('\x00', '').replace('\x01', '') for i in info]
cp = self.device.card_prefix(end_session=False)
fs = self.device.free_space()
return info, cp, fs
def get_device_information(self, done):
'''Get device information and free space on device'''
return self.create_job(self._get_device_information, done,
description=_('Get device information'))
def _books(self):
'''Get metadata from device'''
mainlist = self.device.books(oncard=False, end_session=False)
cardlist = self.device.books(oncard=True)
return (mainlist, cardlist)
class DeviceManager(QObject):
def __init__(self, device):
QObject.__init__(self)
self.device_class = device.__class__
self.device = device
def device_removed(self):
self.device = None
def info_func(self):
''' Return callable that returns device information and free space on device'''
def get_device_information(updater):
'''Get device information'''
self.device.set_progress_reporter(updater)
info = self.device.get_device_information(end_session=False)
info = [i.replace('\x00', '').replace('\x01', '') for i in info]
cp = self.device.card_prefix(end_session=False)
fs = self.device.free_space()
return info, cp, fs
return get_device_information
def books_func(self):
def books(self, done):
'''Return callable that returns the list of books on device as two booklists'''
def books(updater):
'''Get metadata from device'''
self.device.set_progress_reporter(updater)
mainlist = self.device.books(oncard=False, end_session=False)
cardlist = self.device.books(oncard=True)
return (mainlist, cardlist)
return books
return self.create_job(self._books, done, description=_('Get list of books on device'))
def sync_booklists_func(self):
'''Upload booklists to device'''
def sync_booklists(updater, booklists):
'''Sync metadata to device'''
self.device.set_progress_reporter(updater)
self.device.sync_booklists(booklists, end_session=False)
return self.device.card_prefix(end_session=False), self.device.free_space()
return sync_booklists
def _sync_booklists(self, booklists):
'''Sync metadata to device'''
self.device.sync_booklists(booklists, end_session=False)
return self.device.card_prefix(end_session=False), self.device.free_space()
def upload_books_func(self):
'''Upload books to device'''
def upload_books(updater, files, names, on_card=False):
'''Upload books to device: '''
self.device.set_progress_reporter(updater)
return self.device.upload_books(files, names, on_card, end_session=False)
return upload_books
def sync_booklists(self, done, booklists):
return self.create_job(self._sync_booklists, done, args=[booklists],
description=_('Send metadata to device'))
def _upload_books(self, files, names, on_card=False):
'''Upload books to device: '''
return self.device.upload_books(files, names, on_card, end_session=False)
def upload_books(self, done, files, names, on_card=False, titles=None):
desc = _('Upload %d books to device')%len(names)
if titles:
desc += u':' + u', '.join(titles)
return self.create_job(self._upload_books, done, args=[files, names],
kwargs={'on_card':on_card}, description=desc)
def add_books_to_metadata(self, locations, metadata, booklists):
self.device_class.add_books_to_metadata(locations, metadata, booklists)
self.device.add_books_to_metadata(locations, metadata, booklists)
def delete_books_func(self):
def _delete_books(self, paths):
'''Remove books from device'''
def delete_books(updater, paths):
'''Remove books from device'''
self.device.delete_books(paths, end_session=True)
return delete_books
self.device.delete_books(paths, end_session=True)
def delete_books(self, done, paths):
return self.create_job(self._delete_books, done, args=[paths],
description=_('Delete books from device'))
def remove_books_from_metadata(self, paths, booklists):
self.device_class.remove_books_from_metadata(paths, booklists)
self.device.remove_books_from_metadata(paths, booklists)
def save_books_func(self):
def _save_books(self, paths, target):
'''Copy books from device to disk'''
def save_books(updater, paths, target):
'''Copy books from device to disk'''
self.device.set_progress_reporter(updater)
for path in paths:
name = path.rpartition('/')[2]
f = open(os.path.join(target, name), 'wb')
self.device.get_file(path, f)
f.close()
return save_books
def view_book_func(self):
'''Copy book from device to local hdd for viewing'''
def view_book(updater, path, target):
self.device.set_progress_reporter(updater)
f = open(target, 'wb')
for path in paths:
name = path.rpartition('/')[2]
f = open(os.path.join(target, name), 'wb')
self.device.get_file(path, f)
f.close()
return target
return view_book
def save_books(self, done, paths, target):
return self.create_job(self._save_books, done, args=[paths, target],
description=_('Download books from device'))
def _view_book(self, path, target):
f = open(target, 'wb')
self.device.get_file(path, f)
f.close()
return target
def view_book(self, done, path, target):
return self.create_job(self._view_book, done, args=[path, target],
description=_('View book on device'))

View File

@ -2,7 +2,7 @@ __license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
'''Display active jobs'''
from PyQt4.QtCore import Qt, QObject, SIGNAL, QSize, QString
from PyQt4.QtCore import Qt, QObject, SIGNAL, QSize, QString, QTimer
from PyQt4.QtGui import QDialog, QAbstractItemDelegate, QStyleOptionProgressBarV2, \
QApplication, QStyle
@ -45,10 +45,20 @@ class JobsDialog(QDialog, Ui_JobsDialog):
self.pb_delegate = ProgressBarDelegate(self)
self.jobs_view.setItemDelegateForColumn(2, self.pb_delegate)
self.running_time_timer = QTimer(self)
self.connect(self.running_time_timer, SIGNAL('timeout()'), self.update_running_time)
self.running_time_timer.start(1000)
def update_running_time(self):
model = self.model
for row, job in enumerate(model.jobs):
if job.is_running:
self.jobs_view.dataChanged(model.index(row, 3), model.index(row, 3))
def kill_job(self):
for index in self.jobs_view.selectedIndexes():
row = index.row()
self.emit(SIGNAL('kill_job(int, PyQt_PyObject)'), row, self)
self.model.kill_job(row, self)
return
def closeEvent(self, e):

View File

@ -1,415 +0,0 @@
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
import traceback, logging, collections, time
from PyQt4.QtCore import QAbstractTableModel, QMutex, QObject, SIGNAL, Qt, \
QVariant, QThread
from PyQt4.QtGui import QIcon, QDialog
from calibre import detect_ncpus, Settings
from calibre.gui2 import NONE, error_dialog
from calibre.parallel import Server
from calibre.gui2.dialogs.job_view_ui import Ui_Dialog
class JobException(Exception):
pass
class Job(QThread):
''' Class to run a function in a separate thread with optional mutex based locking.'''
def __init__(self, id, description, slot, priority, func, *args, **kwargs):
'''
@param id: Number. Id of this thread.
@param description: String. Description of this job.
@param slot: The callable that should be called when the job is done.
@param priority: The priority with which this thread should be run
@param func: A callable that should be executed in this thread.
'''
QThread.__init__(self)
self.id = id
self.func = func
self.description = description if description else 'Job #' + str(self.id)
self.args = args
self.kwargs = kwargs
self.slot, self._priority = slot, priority
self.result = None
self.percent_done = 0
self.logger = logging.getLogger('Job #'+str(id))
self.logger.setLevel(logging.DEBUG)
self.is_locked = False
self.log = self.exception = self.last_traceback = None
self.connect_done_signal()
self.start_time = None
def start(self):
self.start_time = time.time()
QThread.start(self, self._priority)
def progress_update(self, val):
self.percent_done = val
self.emit(SIGNAL('status_update(int, int)'), self.id, int(val))
def formatted_log(self):
if self.log is None:
return ''
return '<h2>Log:</h2><pre>%s</pre>'%self.log
class DeviceJob(Job):
''' Jobs that involve communication with the device. '''
def run(self):
last_traceback, exception = None, None
try:
self.result = self.func(self.progress_update, *self.args, **self.kwargs)
except Exception, err:
exception = err
last_traceback = traceback.format_exc()
self.exception, self.last_traceback = exception, last_traceback
def formatted_error(self):
if self.exception is None:
return ''
ans = u'<p><b>%s</b>: %s</p>'%(self.exception.__class__.__name__, self.exception)
ans += '<h2>Traceback:</h2><pre>%s</pre>'%self.last_traceback
return ans
def notify(self):
self.emit(SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'),
self.id, self.description, self.result, self.exception, self.last_traceback)
def connect_done_signal(self):
if self.slot is not None:
self.connect(self, SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'),
self.slot, Qt.QueuedConnection)
class ConversionJob(Job):
''' Jobs that involve conversion of content.'''
def __init__(self, *args, **kwdargs):
Job.__init__(self, *args, **kwdargs)
self.log = ''
def run(self):
result = None
self.server.run_job(self.id, self.func, progress=self.progress,
args=self.args, kwdargs=self.kwargs,
output=self.output)
res = None
while res is None:
time.sleep(2)
res = self.server.result(self.id)
if res is None:
exception, tb = 'UnknownError: This should not have happened', ''
else:
result, exception, tb = res
self.result, self.last_traceback, self.exception = result, tb, exception
def output(self, msg):
if self.log is None:
self.log = ''
self.log += msg
self.emit(SIGNAL('output_received()'))
def formatted_log(self):
return '<h2>Log:</h2><pre>%s</pre>'%self.log
def notify(self):
self.emit(SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'),
self.id, self.description, self.result, self.exception, self.last_traceback, self.log)
def connect_done_signal(self):
if self.slot is not None:
self.connect(self, SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'),
self.slot, Qt.QueuedConnection)
def formatted_error(self):
if self.exception is None:
return ''
ans = u'<p><b>%s</b>:'%repr(self.exception)
ans += '<h2>Traceback:</h2><pre>%s</pre>'%self.last_traceback
return ans
def progress(self, percent, msg):
self.emit(SIGNAL('update_progress(int, PyQt_PyObject)'), self.id, percent)
class JobManager(QAbstractTableModel):
PRIORITY = {'Idle' : QThread.IdlePriority,
'Lowest': QThread.LowestPriority,
'Low' : QThread.LowPriority,
'Normal': QThread.NormalPriority
}
def __init__(self):
QAbstractTableModel.__init__(self)
self.waiting_jobs = collections.deque()
self.running_jobs = collections.deque()
self.finished_jobs = collections.deque()
self.add_queue = collections.deque()
self.update_lock = QMutex() # Protects write access to the above dequeues
self.next_id = 0
self.wait_icon = QVariant(QIcon(':/images/jobs.svg'))
self.running_icon = QVariant(QIcon(':/images/exec.svg'))
self.error_icon = QVariant(QIcon(':/images/dialog_error.svg'))
self.done_icon = QVariant(QIcon(':/images/ok.svg'))
self.process_server = Server()
self.ncpus = detect_ncpus()
self.timer_id = self.startTimer(500)
def terminate_device_jobs(self):
for job in self.running_jobs:
if isinstance(job, DeviceJob):
job.terminate()
def terminate_all_jobs(self):
for job in self.running_jobs:
try:
if isinstance(job, DeviceJob):
job.terminate()
except:
continue
self.process_server.killall()
def timerEvent(self, event):
if event.timerId() == self.timer_id:
self.update_lock.lock()
try:
refresh = False
while self.add_queue:
job = self.add_queue.pop()
self.waiting_jobs.append(job)
self.emit(SIGNAL('job_added(int)'), job.id, Qt.QueuedConnection)
refresh = True
for job in [job for job in self.running_jobs if job.isFinished()]:
self.running_jobs.remove(job)
self.finished_jobs.appendleft(job)
if job.result != self.process_server.KILL_RESULT:
job.notify()
job.running_time = time.time() - job.start_time
self.emit(SIGNAL('job_done(int)'), job.id)
refresh = True
cjs = list(self.running_conversion_jobs())
if len(cjs) < self.ncpus:
cj = None
for job in self.waiting_jobs:
if isinstance(job, ConversionJob):
cj = job
break
if cj is not None:
self.waiting_jobs.remove(cj)
cj.start()
self.running_jobs.append(cj)
refresh = True
djs = list(self.running_device_jobs())
if len(djs) == 0:
dj = None
for job in self.waiting_jobs:
if isinstance(job, DeviceJob):
dj = job
break
if dj is not None:
self.waiting_jobs.remove(dj)
dj.start()
self.running_jobs.append(dj)
refresh = True
if refresh:
self.reset()
if len(self.running_jobs) == 0:
self.emit(SIGNAL('no_more_jobs()'))
for i in range(len(self.running_jobs)):
self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'), self.index(i, 3), self.index(i, 3))
finally:
self.update_lock.unlock()
def has_jobs(self):
return len(self.waiting_jobs) + len(self.running_jobs) > 0
def has_device_jobs(self):
return len(tuple(self.running_device_jobs())) > 0
def running_device_jobs(self):
for job in self.running_jobs:
if isinstance(job, DeviceJob):
yield job
def running_conversion_jobs(self):
for job in self.running_jobs:
if isinstance(job, ConversionJob):
yield job
def update_progress(self, id, percent):
row = -1
for collection in (self.running_jobs, self.waiting_jobs, self.finished_jobs):
for job in collection:
row += 1
if job.id == id:
job.percent_done = percent
index = self.index(row, 2)
self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'), index, index)
return
def create_job(self, job_class, description, slot, priority, *args, **kwargs):
self.next_id += 1
id = self.next_id
job = job_class(id, description, slot, priority, *args, **kwargs)
job.server = self.process_server
QObject.connect(job, SIGNAL('status_update(int, int)'), self.status_update,
Qt.QueuedConnection)
self.connect(job, SIGNAL('update_progress(int, PyQt_PyObject)'),
self.update_progress, Qt.QueuedConnection)
self.update_lock.lock()
self.add_queue.append(job)
self.update_lock.unlock()
return job
def run_conversion_job(self, slot, callable, args=[], **kwargs):
'''
Run a conversion job.
@param slot: The function to call with the job result.
@param callable: The function to call to communicate with the device.
@param args: The arguments to pass to callable
@param kwargs: The keyword arguments to pass to callable
'''
desc = kwargs.pop('job_description', '')
if args and hasattr(args[0], 'append') and '--verbose' not in args[0]:
args[0].append('--verbose')
priority = self.PRIORITY[Settings().get('conversion job priority', 'Normal')]
job = self.create_job(ConversionJob, desc, slot, priority,
callable, *args, **kwargs)
return job.id
def run_device_job(self, slot, callable, *args, **kwargs):
'''
Run a job to communicate with the device.
@param slot: The function to call with the job result.
@param callable: The function to call to communicate with the device.
@param args: The arguments to pass to callable
@param kwargs: The keyword arguments to pass to callable
'''
desc = callable.__doc__ if callable.__doc__ else ''
desc += kwargs.pop('job_extra_description', '')
job = self.create_job(DeviceJob, desc, slot, QThread.NormalPriority,
callable, *args, **kwargs)
return job.id
def rowCount(self, parent):
return len(self.running_jobs) + len(self.waiting_jobs) + len(self.finished_jobs)
def columnCount(self, parent):
return 4
def headerData(self, section, orientation, role):
if role != Qt.DisplayRole:
return NONE
if orientation == Qt.Horizontal:
if section == 0: text = _("Job")
elif section == 1: text = _("Status")
elif section == 2: text = _("Progress")
elif section == 3: text = _('Running time')
return QVariant(text)
else:
return QVariant(section+1)
def row_to_job(self, row):
if row < len(self.running_jobs):
return self.running_jobs[row], 0
row -= len(self.running_jobs)
if row < len(self.waiting_jobs):
return self.waiting_jobs[row], 1
row -= len(self.running_jobs)
return self.finished_jobs[row], 2
def data(self, index, role):
if role not in (Qt.DisplayRole, Qt.DecorationRole):
return NONE
row, col = index.row(), index.column()
try:
job, status = self.row_to_job(row)
except IndexError:
return NONE
if role == Qt.DisplayRole:
if col == 0:
return QVariant(job.description)
if col == 1:
if status == 2:
st = _('Finished') if job.exception is None else _('Error')
else:
st = [_('Working'), _('Waiting')][status]
return QVariant(st)
if col == 2:
return QVariant(int(100*job.percent_done))
if col == 3:
if job.start_time is None:
return NONE
rtime = job.running_time if hasattr(job, 'running_time') else time.time() - job.start_time
return QVariant('%dm %ds'%(int(rtime)//60, int(rtime)%60))
if role == Qt.DecorationRole and col == 0:
if status == 1:
return self.wait_icon
if status == 0:
return self.running_icon
if status == 2:
if job.exception or job.result == self.process_server.KILL_RESULT:
return self.error_icon
return self.done_icon
return NONE
def status_update(self, id, progress):
for i in range(len(self.running_jobs)):
job = self.running_jobs[i]
if job.id == id:
self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'), self.index(i, 2), self.index(i, 3))
break
def kill_job(self, row, gui_parent):
job, status = self.row_to_job(row)
if isinstance(job, DeviceJob):
error_dialog(gui_parent, _('Cannot kill job'),
_('Cannot kill jobs that are communicating with the device as this may cause data corruption.')).exec_()
return
if status == 2:
error_dialog(gui_parent, _('Cannot kill job'),
_('Cannot kill already completed jobs.')).exec_()
return
if status == 1:
self.update_lock.lock()
try:
self.waiting_jobs.remove(job)
self.finished_jobs.append(job)
self.emit(SIGNAL('job_done(int)'), job.id)
job.result = self.process_server.KILL_RESULT
finally:
self.update_lock.unlock()
else:
self.process_server.kill(job.id)
self.reset()
if len(self.running_jobs) + len(self.waiting_jobs) == 0:
self.emit(SIGNAL('no_more_jobs()'))
class DetailView(QDialog, Ui_Dialog):
def __init__(self, parent, job):
QDialog.__init__(self, parent)
self.setupUi(self)
self.setWindowTitle(job.description)
self.job = job
self.connect(self.job, SIGNAL('output_received()'), self.update)
self.update()
def update(self):
txt = self.job.formatted_error() + self.job.formatted_log()
if not txt:
txt = 'No details available'
self.log.setHtml(txt)
vbar = self.log.verticalScrollBar()
vbar.setValue(vbar.maximum())

193
src/calibre/gui2/jobs2.py Normal file
View File

@ -0,0 +1,193 @@
#!/usr/bin/env python
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
__docformat__ = 'restructuredtext en'
'''
Job management.
'''
import time
from PyQt4.QtCore import QAbstractTableModel, QVariant, QModelIndex, Qt, SIGNAL
from PyQt4.QtGui import QIcon, QDialog
from calibre.parallel import ParallelJob, Server
from calibre.gui2 import Dispatcher, error_dialog
from calibre.gui2.device import DeviceJob
from calibre.gui2.dialogs.job_view_ui import Ui_Dialog
NONE = QVariant()
class JobManager(QAbstractTableModel):
wait_icon = QVariant(QIcon(':/images/jobs.svg'))
running_icon = QVariant(QIcon(':/images/exec.svg'))
error_icon = QVariant(QIcon(':/images/dialog_error.svg'))
done_icon = QVariant(QIcon(':/images/ok.svg'))
def __init__(self):
QAbstractTableModel.__init__(self)
self.jobs = []
self.server = Server()
self.add_job = Dispatcher(self._add_job)
self.status_update = Dispatcher(self._status_update)
self.start_work = Dispatcher(self._start_work)
self.job_done = Dispatcher(self._job_done)
def columnCount(self, parent=QModelIndex()):
return 4
def rowCount(self, parent=QModelIndex()):
return len(self.jobs)
def headerData(self, section, orientation, role):
if role != Qt.DisplayRole:
return NONE
if orientation == Qt.Horizontal:
if section == 0: text = _("Job")
elif section == 1: text = _("Status")
elif section == 2: text = _("Progress")
elif section == 3: text = _('Running time')
return QVariant(text)
else:
return QVariant(section+1)
def data(self, index, role):
try:
if role not in (Qt.DisplayRole, Qt.DecorationRole):
return NONE
row, col = index.row(), index.column()
job = self.jobs[row]
if role == Qt.DisplayRole:
if col == 0:
desc = job.description
if not desc:
desc = _('Unknown job')
return QVariant(desc)
if col == 1:
status = job.status()
if status == 'DONE':
st = _('Finished')
elif status == 'ERROR':
st = _('Error')
elif status == 'WAITING':
st = _('Waiting')
else:
st = _('Working')
return QVariant(st)
if col == 2:
pc = job.percent
if pc <=0:
percent = 0
else:
percent = int(100*pc)
return QVariant(percent)
if col == 3:
if job.start_time is None:
return NONE
rtime = job.running_time if job.running_time is not None else \
time.time() - job.start_time
return QVariant('%dm %ds'%(int(rtime)//60, int(rtime)%60))
if role == Qt.DecorationRole and col == 0:
status = job.status()
if status == 'WAITING':
return self.wait_icon
if status == 'WORKING':
return self.running_icon
if status == 'ERROR':
return self.error_icon
if status == 'DONE':
return self.done_icon
except:
import traceback
traceback.print_exc()
return NONE
def _add_job(self, job):
self.emit(SIGNAL('layoutAboutToBeChanged()'))
self.jobs.append(job)
self.jobs.sort()
self.emit(SIGNAL('job_added(int)'), self.rowCount())
self.emit(SIGNAL('layoutChanged()'))
def done_jobs(self):
return [j for j in self.jobs if j.status() in ['DONE', 'ERROR']]
def row_to_job(self, row):
return self.jobs[row]
def _start_work(self, job):
self.emit(SIGNAL('layoutAboutToBeChanged()'))
self.jobs.sort()
self.emit(SIGNAL('layoutChanged()'))
def _job_done(self, job):
self.emit(SIGNAL('layoutAboutToBeChanged()'))
self.jobs.sort()
self.emit(SIGNAL('job_done(int)'), len(self.jobs) - len(self.done_jobs()))
self.emit(SIGNAL('layoutChanged()'))
def _status_update(self, job):
row = self.jobs.index(job)
self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'),
self.index(row, 0), self.index(row, 3))
def has_device_jobs(self):
for job in self.jobs:
if job.is_running and isinstance(job, DeviceJob):
return True
return False
def has_jobs(self):
for job in self.jobs:
if job.is_running:
return True
return False
def run_job(self, done, func, args=[], kwargs={},
description=None):
job = ParallelJob(func, done, self, args=args, kwargs=kwargs,
description=description)
self.server.add_job(job)
return job
def output(self, job):
self.emit(SIGNAL('output_received()'))
def kill_job(self, row, view):
job = self.jobs[row]
if isinstance(job, DeviceJob):
error_dialog(view, _('Cannot kill job'),
_('Cannot kill jobs that communicate with the device')).exec_()
return
if job.has_run:
error_dialog(view, _('Cannot kill job'),
_('Job has already run')).exec_()
return
if not job.is_running:
error_dialog(view, _('Cannot kill job'),
_('Cannot kill waiting job')).exec_()
return
self.server.kill(job)
def terminate_all_jobs(self):
pass
class DetailView(QDialog, Ui_Dialog):
def __init__(self, parent, job):
QDialog.__init__(self, parent)
self.setupUi(self)
self.setWindowTitle(job.description)
self.job = job
self.update()
def update(self):
self.log.setHtml(self.job.gui_text())
vbar = self.log.verticalScrollBar()
vbar.setValue(vbar.maximum())

View File

@ -4,13 +4,13 @@ import os, sys, textwrap, collections, traceback, shutil, time
from xml.parsers.expat import ExpatError
from functools import partial
from PyQt4.QtCore import Qt, SIGNAL, QObject, QCoreApplication, \
QVariant, QThread, QUrl, QSize
QVariant, QUrl, QSize
from PyQt4.QtGui import QPixmap, QColor, QPainter, QMenu, QIcon, QMessageBox, \
QToolButton, QDialog, QDesktopServices
from PyQt4.QtSvg import QSvgRenderer
from calibre import __version__, __appname__, islinux, sanitize_file_name, \
Settings, iswindows, isosx, preferred_encoding
Settings, iswindows, isosx
from calibre.ptempfile import PersistentTemporaryFile
from calibre.ebooks.metadata.meta import get_metadata, get_filename_pat, set_filename_pat
from calibre.devices.errors import FreeSpaceError
@ -18,16 +18,16 @@ from calibre.devices.interface import Device
from calibre.gui2 import APP_UID, warning_dialog, choose_files, error_dialog, \
initialize_file_icon_provider, question_dialog,\
pixmap_to_data, choose_dir, ORG_NAME, \
set_sidebar_directories, \
set_sidebar_directories, Dispatcher, \
SingleApplication, Application, available_height, max_available_height
from calibre.gui2.cover_flow import CoverFlow, DatabaseImages, pictureflowerror
from calibre.library.database import LibraryDatabase
from calibre.gui2.update import CheckForUpdates
from calibre.gui2.main_window import MainWindow, option_parser
from calibre.gui2.main_ui import Ui_MainWindow
from calibre.gui2.device import DeviceDetector, DeviceManager
from calibre.gui2.device import DeviceManager
from calibre.gui2.status import StatusBar
from calibre.gui2.jobs import JobManager
from calibre.gui2.jobs2 import JobManager
from calibre.gui2.news import NewsMenu
from calibre.gui2.dialogs.metadata_single import MetadataSingleDialog
from calibre.gui2.dialogs.metadata_bulk import MetadataBulkDialog
@ -44,6 +44,7 @@ from calibre.ebooks.metadata.meta import set_metadata
from calibre.ebooks.metadata import MetaInformation
from calibre.ebooks import BOOK_EXTENSIONS
from calibre.ebooks.lrf import preferred_source_formats as LRF_PREFERRED_SOURCE_FORMATS
from calibre.parallel import JobKilled
class Main(MainWindow, Ui_MainWindow):
@ -72,7 +73,6 @@ class Main(MainWindow, Ui_MainWindow):
self.read_settings()
self.job_manager = JobManager()
self.jobs_dialog = JobsDialog(self, self.job_manager)
self.device_manager = None
self.upload_memory = {}
self.delete_memory = {}
self.conversion_jobs = {}
@ -238,10 +238,8 @@ class Main(MainWindow, Ui_MainWindow):
self.setMaximumHeight(max_available_height())
####################### Setup device detection ########################
self.detector = DeviceDetector(sleep_time=2000)
QObject.connect(self.detector, SIGNAL('connected(PyQt_PyObject, PyQt_PyObject)'),
self.device_detected, Qt.QueuedConnection)
self.detector.start(QThread.InheritPriority)
self.device_manager = DeviceManager(Dispatcher(self.device_detected), self.job_manager)
self.device_manager.start()
self.news_menu.set_custom_feeds(self.library_view.model().db.get_feeds())
@ -300,22 +298,19 @@ class Main(MainWindow, Ui_MainWindow):
########################## Connect to device ##############################
def device_detected(self, device, connected):
def device_detected(self, connected):
'''
Called when a device is connected to the computer.
'''
if connected:
self.device_manager = DeviceManager(device)
self.job_manager.run_device_job(self.info_read, self.device_manager.info_func())
self.set_default_thumbnail(device.THUMBNAIL_HEIGHT)
self.status_bar.showMessage(_('Device: ')+device.__class__.__name__+_(' detected.'), 3000)
self.device_manager.get_device_information(Dispatcher(self.info_read))
self.set_default_thumbnail(self.device_manager.device.THUMBNAIL_HEIGHT)
self.status_bar.showMessage(_('Device: ')+\
self.device_manager.device.__class__.__name__+_(' detected.'), 3000)
self.action_sync.setEnabled(True)
self.device_connected = True
else:
self.device_connected = False
self.job_manager.terminate_device_jobs()
if self.device_manager:
self.device_manager.device_removed()
self.location_view.model().update_devices()
self.action_sync.setEnabled(False)
self.vanity.setText(self.vanity_template%dict(version=self.latest_version, device=' '))
@ -324,27 +319,25 @@ class Main(MainWindow, Ui_MainWindow):
self.status_bar.reset_info()
self.location_selected('library')
def info_read(self, id, description, result, exception, formatted_traceback):
def info_read(self, job):
'''
Called once device information has been read.
'''
if exception:
self.device_job_exception(id, description, exception, formatted_traceback)
if job.exception is not None:
self.device_job_exception(job)
return
info, cp, fs = result
info, cp, fs = job.result
self.location_view.model().update_devices(cp, fs)
self.device_info = _('Connected ')+' '.join(info[:-1])
self.vanity.setText(self.vanity_template%dict(version=self.latest_version, device=self.device_info))
func = self.device_manager.books_func()
self.job_manager.run_device_job(self.metadata_downloaded, func)
self.device_manager.books(Dispatcher(self.metadata_downloaded))
def metadata_downloaded(self, id, description, result, exception, formatted_traceback):
def metadata_downloaded(self, job):
'''
Called once metadata has been read for all books on the device.
'''
if exception:
print exception, type(exception)
if isinstance(exception, ExpatError):
if job.exception is not None:
if isinstance(job.exception, ExpatError):
error_dialog(self, _('Device database corrupted'),
_('''
<p>The database of books on the reader is corrupted. Try the following:
@ -354,9 +347,9 @@ class Main(MainWindow, Ui_MainWindow):
</ol>
''')%dict(app=__appname__)).exec_()
else:
self.device_job_exception(id, description, exception, formatted_traceback)
self.device_job_exception(job)
return
mainlist, cardlist = result
mainlist, cardlist = job.result
self.memory_view.set_database(mainlist)
self.card_view.set_database(cardlist)
for view in (self.memory_view, self.card_view):
@ -373,18 +366,17 @@ class Main(MainWindow, Ui_MainWindow):
'''
Upload metadata to device.
'''
self.job_manager.run_device_job(self.metadata_synced,
self.device_manager.sync_booklists_func(),
self.booklists())
self.device_manager.sync_booklists(Dispatcher(self.metadata_synced),
self.booklists())
def metadata_synced(self, id, description, result, exception, formatted_traceback):
def metadata_synced(self, job):
'''
Called once metadata has been uploaded.
'''
if exception:
self.device_job_exception(id, description, exception, formatted_traceback)
if job.exception is not None:
self.device_job_exception(job)
return
cp, fs = result
cp, fs = job.result
self.location_view.model().update_devices(cp, fs)
############################################################################
@ -486,35 +478,34 @@ class Main(MainWindow, Ui_MainWindow):
def upload_books(self, files, names, metadata, on_card=False, memory=None):
'''
Upload books to device.
@param files: List of either paths to files or file like objects
:param files: List of either paths to files or file like objects
'''
titles = ', '.join([i['title'] for i in metadata])
id = self.job_manager.run_device_job(self.books_uploaded,
self.device_manager.upload_books_func(),
titles = [i['title'] for i in metadata]
job = self.device_manager.upload_books(Dispatcher(self.books_uploaded),
files, names, on_card=on_card,
job_extra_description=titles
titles=titles
)
self.upload_memory[id] = (metadata, on_card, memory)
self.upload_memory[job] = (metadata, on_card, memory)
def books_uploaded(self, id, description, result, exception, formatted_traceback):
def books_uploaded(self, job):
'''
Called once books have been uploaded.
'''
metadata, on_card, memory = self.upload_memory.pop(id)
metadata, on_card, memory = self.upload_memory.pop(job)
if exception:
if isinstance(exception, FreeSpaceError):
where = 'in main memory.' if 'memory' in str(exception) else 'on the storage card.'
if job.exception is not None:
if isinstance(job.exception, FreeSpaceError):
where = 'in main memory.' if 'memory' in str(job.exception) else 'on the storage card.'
titles = '\n'.join(['<li>'+mi['title']+'</li>' for mi in metadata])
d = error_dialog(self, _('No space on device'),
_('<p>Cannot upload books to device there is no more free space available ')+where+
'</p>\n<ul>%s</ul>'%(titles,))
d.exec_()
else:
self.device_job_exception(id, description, exception, formatted_traceback)
self.device_job_exception(job)
return
self.device_manager.add_books_to_metadata(result, metadata, self.booklists())
self.device_manager.add_books_to_metadata(job.result, metadata, self.booklists())
self.upload_booklists()
@ -554,22 +545,20 @@ class Main(MainWindow, Ui_MainWindow):
self.status_bar.showMessage(_('Deleting books from device.'), 1000)
def remove_paths(self, paths):
return self.job_manager.run_device_job(self.books_deleted,
self.device_manager.delete_books_func(), paths)
return self.device_manager.delete_books(Dispatcher(self.books_deleted), paths)
def books_deleted(self, id, description, result, exception, formatted_traceback):
def books_deleted(self, job):
'''
Called once deletion is done on the device
'''
for view in (self.memory_view, self.card_view):
view.model().deletion_done(id, bool(exception))
if exception:
self.device_job_exception(id, description, exception, formatted_traceback)
view.model().deletion_done(id, bool(job.exception))
if job.exception is not None:
self.device_job_exception(job)
return
if self.delete_memory.has_key(id):
paths, model = self.delete_memory.pop(id)
if self.delete_memory.has_key(job):
paths, model = self.delete_memory.pop(job)
self.device_manager.remove_books_from_metadata(paths, self.booklists())
model.paths_deleted(paths)
self.upload_booklists()
@ -718,12 +707,11 @@ class Main(MainWindow, Ui_MainWindow):
QDesktopServices.openUrl(QUrl('file:'+dir))
else:
paths = self.current_view().model().paths(rows)
self.job_manager.run_device_job(self.books_saved,
self.device_manager.save_books_func(), paths, dir)
self.device_manager.save_books(Dispatcher(self.books_saved), paths, dir)
def books_saved(self, id, description, result, exception, formatted_traceback):
if exception:
self.device_job_exception(id, description, exception, formatted_traceback)
def books_saved(self, job):
if job.exception is not None:
self.device_job_exception(job)
return
############################################################################
@ -746,15 +734,15 @@ class Main(MainWindow, Ui_MainWindow):
if data['password']:
args.extend(['--password', data['password']])
args.append(data['script'] if data['script'] else data['title'])
id = self.job_manager.run_conversion_job(self.news_fetched, 'feeds2lrf', args=[args],
job_description=_('Fetch news from ')+data['title'])
self.conversion_jobs[id] = (pt, 'lrf')
job = self.job_manager.run_job(Dispatcher(self.news_fetched), 'feeds2lrf', args=[args],
description=_('Fetch news from ')+data['title'])
self.conversion_jobs[job] = (pt, 'lrf')
self.status_bar.showMessage(_('Fetching news from ')+data['title'], 2000)
def news_fetched(self, id, description, result, exception, formatted_traceback, log):
pt, fmt = self.conversion_jobs.pop(id)
if exception:
self.conversion_job_exception(id, description, exception, formatted_traceback, log)
def news_fetched(self, job):
pt, fmt = self.conversion_jobs.pop(job)
if job.exception is not None:
self.job_exception(job)
return
to_device = self.device_connected and fmt in self.device_manager.device_class.FORMATS
self._add_books([pt.name], to_device)
@ -828,12 +816,12 @@ class Main(MainWindow, Ui_MainWindow):
cmdline.extend(['--cover', cf.name])
cmdline.extend(['-o', of.name])
cmdline.append(pt.name)
id = self.job_manager.run_conversion_job(self.book_converted,
job = self.job_manager.run_job(Dispatcher(self.book_converted),
'any2lrf', args=[cmdline],
job_description=_('Convert book %d of %d (%s)')%(i+1, len(rows), repr(mi.title)))
description=_('Convert book %d of %d (%s)')%(i+1, len(rows), repr(mi.title)))
self.conversion_jobs[id] = (d.cover_file, pt, of, d.output_format,
self.conversion_jobs[job] = (d.cover_file, pt, of, d.output_format,
self.library_view.model().db.id(row))
res = []
for row in bad_rows:
@ -874,10 +862,10 @@ class Main(MainWindow, Ui_MainWindow):
setattr(options, 'output', of.name)
options.verbose = 1
args = [pt.name, options]
id = self.job_manager.run_conversion_job(self.book_converted,
'comic2lrf', args=args,
job_description=_('Convert comic %d of %d (%s)')%(i+1, len(comics), repr(options.title)))
self.conversion_jobs[id] = (None, pt, of, 'lrf',
job = self.job_manager.run_job(Dispatcher(self.book_converted),
'comic2lrf', args=args,
description=_('Convert comic %d of %d (%s)')%(i+1, len(comics), repr(options.title)))
self.conversion_jobs[job] = (None, pt, of, 'lrf',
self.library_view.model().db.id(row))
@ -904,12 +892,12 @@ class Main(MainWindow, Ui_MainWindow):
of.close()
cmdline.extend(['-o', of.name])
cmdline.append(pt.name)
id = self.job_manager.run_conversion_job(self.book_converted,
'any2lrf', args=[cmdline],
job_description=_('Convert book: ')+d.title())
job = self.job_manager.run_job(Dispatcher(self.book_converted),
'any2lrf', args=[cmdline],
description=_('Convert book: ')+d.title())
self.conversion_jobs[id] = (d.cover_file, pt, of, d.output_format, d.id)
self.conversion_jobs[job] = (d.cover_file, pt, of, d.output_format, d.id)
changed = True
if changed:
self.library_view.model().resort(reset=False)
@ -949,24 +937,24 @@ class Main(MainWindow, Ui_MainWindow):
opts.verbose = 1
args = [pt.name, opts]
changed = True
id = self.job_manager.run_conversion_job(self.book_converted,
job = self.job_manager.run_job(Dispatcher(self.book_converted),
'comic2lrf', args=args,
job_description=_('Convert comic: ')+opts.title)
self.conversion_jobs[id] = (None, pt, of, 'lrf',
description=_('Convert comic: ')+opts.title)
self.conversion_jobs[job] = (None, pt, of, 'lrf',
self.library_view.model().db.id(row))
if changed:
self.library_view.model().resort(reset=False)
self.library_view.model().research()
def book_converted(self, id, description, result, exception, formatted_traceback, log):
of, fmt, book_id = self.conversion_jobs.pop(id)[2:]
if exception:
self.conversion_job_exception(id, description, exception, formatted_traceback, log)
def book_converted(self, job):
of, fmt, book_id = self.conversion_jobs.pop(job)[2:]
if job.exception is not None:
self.job_exception(job)
return
data = open(of.name, 'rb')
self.library_view.model().db.add_format(book_id, fmt, data, index_is_id=True)
data.close()
self.status_bar.showMessage(description + (' completed'), 2000)
self.status_bar.showMessage(job.description + (' completed'), 2000)
#############################View book######################################
@ -977,19 +965,18 @@ class Main(MainWindow, Ui_MainWindow):
self.persistent_files.append(pt)
self._view_file(pt.name)
def book_downloaded_for_viewing(self, id, description, result, exception, formatted_traceback):
if exception:
self.device_job_exception(id, description, exception, formatted_traceback)
def book_downloaded_for_viewing(self, job):
if job.exception:
self.device_job_exception(job)
return
print result
self._view_file(result)
self._view_file(job.result)
def _view_file(self, name):
self.setCursor(Qt.BusyCursor)
try:
if name.upper().endswith('.LRF'):
args = ['lrfviewer', name]
self.job_manager.process_server.run_free_job('lrfviewer', kwdargs=dict(args=args))
self.job_manager.server.run_free_job('lrfviewer', kwdargs=dict(args=args))
else:
QDesktopServices.openUrl(QUrl('file:'+name))#launch(name)
time.sleep(5) # User feedback
@ -1050,8 +1037,8 @@ class Main(MainWindow, Ui_MainWindow):
pt = PersistentTemporaryFile('_viewer_'+os.path.splitext(paths[0])[1])
self.persistent_files.append(pt)
pt.close()
self.job_manager.run_device_job(self.book_downloaded_for_viewing,
self.device_manager.view_book_func(), paths[0], pt.name)
self.device_manager.view_book(Dispatcher(self.book_downloaded_for_viewing),
paths[0], pt.name)
@ -1176,72 +1163,40 @@ class Main(MainWindow, Ui_MainWindow):
self.action_convert.setEnabled(False)
self.view_menu.actions()[1].setEnabled(False)
def device_job_exception(self, id, description, exception, formatted_traceback):
def device_job_exception(self, job):
'''
Handle exceptions in threaded device jobs.
'''
if 'Could not read 32 bytes on the control bus.' in str(exception):
if 'Could not read 32 bytes on the control bus.' in str(job.exception):
error_dialog(self, _('Error talking to device'),
_('There was a temporary error talking to the device. Please unplug and reconnect the device and or reboot.')).show()
return
print >>sys.stderr, 'Error in job:', description.encode('utf8')
print >>sys.stderr, exception
print >>sys.stderr, formatted_traceback.encode('utf8')
try:
print >>sys.stderr, job.console_text()
except:
pass
if not self.device_error_dialog.isVisible():
msg = u'<p><b>%s</b>: '%(exception.__class__.__name__,) + unicode(str(exception), 'utf8', 'replace') + u'</p>'
msg += u'<p>Failed to perform <b>job</b>: '+description
msg += u'<p>Further device related error messages will not be shown while this message is visible.'
msg += u'<p>Detailed <b>traceback</b>:<pre>'
if isinstance(formatted_traceback, str):
formatted_traceback = unicode(formatted_traceback, 'utf8', 'replace')
msg += formatted_traceback
self.device_error_dialog.set_message(msg)
self.device_error_dialog.set_message(job.gui_text())
self.device_error_dialog.show()
def conversion_job_exception(self, id, description, exception, formatted_traceback, log):
def job_exception(self, job):
def safe_print(msgs, file=sys.stderr):
for i, msg in enumerate(msgs):
if not msg:
msg = ''
if isinstance(msg, unicode):
msgs[i] = msg.encode(preferred_encoding, 'replace')
msg = ' '.join(msgs)
print >>file, msg
def safe_unicode(arg):
if not arg:
arg = unicode(repr(arg))
if isinstance(arg, str):
arg = arg.decode(preferred_encoding, 'replace')
if not isinstance(arg, unicode):
try:
arg = unicode(repr(arg))
except:
arg = u'Could not convert to unicode'
return arg
only_msg = getattr(exception, 'only_msg', False)
description, exception, formatted_traceback, log = map(safe_unicode,
(description, exception, formatted_traceback, log))
only_msg = getattr(job.exception, 'only_msg', False)
try:
safe_print('Error in job:', description)
if log:
safe_print(log)
safe_print(exception)
safe_print(formatted_traceback)
print job.console_text()
except:
pass
if only_msg:
error_dialog(self, _('Conversion Error'), exception).exec_()
try:
exc = unicode(job.exception)
except:
exc = repr(job.exception)
error_dialog(self, _('Conversion Error'), exc).exec_()
return
msg = u'<p><b>%s</b>:'%exception
msg += u'<p>Failed to perform <b>job</b>: '+description
msg += u'<p>Detailed <b>traceback</b>:<pre>'
msg += formatted_traceback + u'</pre>'
msg += u'<p><b>Log:</b></p><pre>'
msg += log
ConversionErrorDialog(self, 'Conversion Error', msg, show=True)
if isinstance(job.exception, JobKilled):
return
ConversionErrorDialog(self, _('Conversion Error'), job.gui_text(),
show=True)
def read_settings(self):
@ -1294,10 +1249,9 @@ class Main(MainWindow, Ui_MainWindow):
self.job_manager.terminate_all_jobs()
self.write_settings()
self.detector.keep_going = False
self.device_manager.keep_going = False
self.hide()
time.sleep(2)
self.detector.terminate()
e.accept()
def update_found(self, version):

View File

@ -163,24 +163,23 @@ class StatusBar(QStatusBar):
def show_book_info(self):
self.emit(SIGNAL('show_book_info()'))
def job_added(self, id):
def job_added(self, nnum):
jobs = self.movie_button.jobs
src = qstring_to_unicode(jobs.text())
num = self.jobs()
nnum = num+1
nnum = num + 1
text = src.replace(str(num), str(nnum))
jobs.setText(text)
if self.movie_button.movie.state() == QMovie.Paused:
self.movie_button.movie.setPaused(False)
def job_done(self, id):
def job_done(self, running):
jobs = self.movie_button.jobs
src = qstring_to_unicode(jobs.text())
num = self.jobs()
nnum = num-1
text = src.replace(str(num), str(nnum))
text = src.replace(str(num), str(running))
jobs.setText(text)
if nnum == 0:
if running == 0:
self.no_more_jobs()
def no_more_jobs(self):

View File

@ -11,7 +11,7 @@ from PyQt4.QtGui import QListView, QIcon, QFont, QLabel, QListWidget, \
from PyQt4.QtCore import QAbstractListModel, QVariant, Qt, SIGNAL, \
QObject, QRegExp, QString
from calibre.gui2.jobs import DetailView
from calibre.gui2.jobs2 import DetailView
from calibre.gui2 import human_readable, NONE, TableView, qstring_to_unicode, error_dialog
from calibre.gui2.filename_pattern_ui import Ui_Form
from calibre import fit_image, Settings
@ -235,8 +235,10 @@ class JobsView(TableView):
def show_details(self, index):
row = index.row()
job = self.model().row_to_job(row)[0]
DetailView(self, job).exec_()
job = self.model().row_to_job(row)
d = DetailView(self, job)
self.connect(self.model(), SIGNAL('output_received()'), d.update)
d.exec_()
class FontFamilyModel(QAbstractListModel):

View File

@ -17,7 +17,7 @@ E-book Format Conversion
What formats does |app| support conversion to/from?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|app| supports the conversion of the following formats to LRF: HTML, LIT, MOBI, PRC, EPUB, RTF, TXT, PDF and LRS. It also supports the conversion of LRF to LRS and HTML. Note that calibre does not support the conversion of DRMed ebooks.
|app| supports the conversion of the following formats to LRF: HTML, LIT, MOBI, PRC, EPUB, CBR, CBZ, RTF, TXT, PDF and LRS. It also supports the conversion of LRF to LRS and HTML(forthcoming). Note that calibre does not support the conversion of DRMed ebooks.
What are the best formats to convert to LRF?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -27,7 +27,6 @@ is buffered and asynchronous to prevent the job from being IO bound.
import sys, os, gc, cPickle, traceback, atexit, cStringIO, time, signal, \
subprocess, socket, collections, binascii, re, thread, tempfile
from select import select
from functools import partial
from threading import RLock, Thread, Event
from calibre.ptempfile import PersistentTemporaryFile
@ -334,7 +333,7 @@ class Overseer(object):
def __init__(self, server, port, timeout=5):
self.worker_status = mother.spawn_worker('127.0.0.1:'+str(port))
self.socket = server.accept()[0]
# Needed if terminate called hwen interpreter is shutting down
# Needed if terminate called when interpreter is shutting down
self.os = os
self.signal = signal
self.on_probation = False
@ -343,7 +342,6 @@ class Overseer(object):
self.working = False
self.timeout = timeout
self.last_job_time = time.time()
self.job_id = None
self._stop = False
if not select([self.socket], [], [], 120)[0]:
raise RuntimeError(_('Could not launch worker process.'))
@ -408,16 +406,14 @@ class Overseer(object):
`job`: An instance of :class:`Job`.
'''
self.job_id = job.job_id
self.working = True
self.write('JOB:'+cPickle.dumps((job.func, job.args, job.kwdargs), -1))
self.write('JOB:'+cPickle.dumps((job.func, job.args, job.kwargs), -1))
msg = self.read()
if msg != 'OK':
raise ControlError('Failed to initialize job on worker %d:%s'%(self.worker_pid, msg))
self.output = job.output if callable(job.output) else sys.stdout.write
self.progress = job.progress if callable(job.progress) else None
self.job = job
self.last_report = time.time()
job.start_work()
def control(self):
'''
@ -435,7 +431,9 @@ class Overseer(object):
else:
if self.on_probation:
self.terminate()
return Result(None, ControlError('Worker process died unexpectedly'), '')
self.job.result = None
self.job.exception = ControlError('Worker process died unexpectedly')
return
else:
self.on_probation = True
return
@ -445,13 +443,14 @@ class Overseer(object):
return
elif word == 'RESULT':
self.write('OK')
return Result(cPickle.loads(msg), None, None)
self.job.result = cPickle.loads(msg)
return True
elif word == 'OUTPUT':
self.write('OK')
try:
self.output(''.join(cPickle.loads(msg)))
self.job.output(''.join(cPickle.loads(msg)))
except:
self.output('Bad output message: '+ repr(msg))
self.job.output('Bad output message: '+ repr(msg))
elif word == 'PROGRESS':
self.write('OK')
percent = None
@ -459,45 +458,154 @@ class Overseer(object):
percent, msg = cPickle.loads(msg)[-1]
except:
print 'Bad progress update:', repr(msg)
if self.progress and percent is not None:
self.progress(percent, msg)
if percent is not None:
self.job.update_status(percent, msg)
elif word == 'ERROR':
self.write('OK')
return Result(None, *cPickle.loads(msg))
self.job.excetion, self.job.traceback = cPickle.loads(msg)
return True
else:
self.terminate()
return Result(None, ControlError('Worker sent invalid msg: %s'%repr(msg)), '')
self.job.exception = ControlError('Worker sent invalid msg: %s'%repr(msg))
return
if not self.worker_status.is_alive() or time.time() - self.last_report > 180:
self.terminate()
return Result(None, ControlError('Worker process died unexpectedly with returncode: %s'%str(self.process.returncode)), '')
self.job.exception = ControlError('Worker process died unexpectedly with returncode: %s'%str(self.process.returncode))
return
class JobKilled(Exception):
pass
class Job(object):
def __init__(self, job_id, func, args, kwdargs, output, progress, done):
self.job_id = job_id
def __init__(self, job_done, job_manager=None,
args=[], kwargs={}, description=None):
self.args = args
self.kwargs = kwargs
self._job_done = job_done
self.job_manager = job_manager
self.is_running = False
self.has_run = False
self.percent = -1
self.msg = None
self.description = description
self.start_time = None
self.running_time = None
self.result = self.exception = self.traceback = self.log = None
def __cmp__(self, other):
sstatus, ostatus = self.status(), other.status()
if sstatus == ostatus or (self.has_run and other.has_run):
if self.start_time == other.start_time:
return cmp(id(self), id(other))
return cmp(self.start_time, other.start_time)
if sstatus == 'WORKING':
return -1
if ostatus == 'WORKING':
return 1
if sstatus == 'WAITING':
return -1
if ostatus == 'WAITING':
return 1
def job_done(self):
self.is_running, self.has_run = False, True
self.running_time = (time.time() - self.start_time) if \
self.start_time is not None else 0
if self.job_manager is not None:
self.job_manager.job_done(self)
self._job_done(self)
def start_work(self):
self.is_running = True
self.has_run = False
self.start_time = time.time()
if self.job_manager is not None:
self.job_manager.start_work(self)
def update_status(self, percent, msg=None):
self.percent = percent
self.msg = msg
if self.job_manager is not None:
self.job_manager.status_update(self)
def status(self):
if self.is_running:
return 'WORKING'
if not self.has_run:
return 'WAITING'
if self.has_run:
if self.exception is None:
return 'DONE'
return 'ERROR'
def console_text(self):
ans = [u'Error in job: ']
if self.description:
ans[0] += self.description
if self.log:
if isinstance(self.log, str):
self.log = unicode(self.log, 'utf-8', 'replace')
ans.append(self.log)
header = unicode(self.exception.__class__.__name__) if \
hasattr(self.exception, '__class__') else u'Error'
header += u': '
try:
header += unicode(self.exception)
except:
header += unicode(repr(self.exception))
ans.append(header)
if self.traceback:
ans.append(self.traceback)
return (u'\n'.join(ans)).encode('utf-8')
def gui_text(self):
ans = [u'Job: ']
if self.description:
if not isinstance(self.description, unicode):
self.description = self.description.decode('utf-8', 'replace')
ans[0] += u'<b>%s</b>'%self.description
if self.exception is not None:
header = unicode(self.exception.__class__.__name__) if \
hasattr(self.exception, '__class__') else u'Error'
header = u'<b>%s</b>'%header
header += u': '
try:
header += unicode(self.exception)
except:
header += unicode(repr(self.exception))
ans.append(header)
if self.traceback:
ans.append(u'<b>Traceback</b>:')
ans.extend(self.traceback.split('\n'))
if self.log:
ans.append(u'<b>Log</b>:')
if isinstance(self.log, str):
self.log = unicode(self.log, 'utf-8', 'replace')
ans.extend(self.log.split('\n'))
return '<br>'.join(ans)
class ParallelJob(Job):
def __init__(self, func, *args, **kwargs):
Job.__init__(self, *args, **kwargs)
self.func = func
self.args = args
self.kwdargs = kwdargs
self.output = output
self.progress = progress
self.done = done
self.done = self.job_done
class Result(object):
def output(self, msg):
if not self.log:
self.log = u''
if not isinstance(msg, unicode):
msg = msg.decode('utf-8', 'replace')
if msg:
self.log += msg
if self.job_manager is not None:
self.job_manager.output(self)
def __init__(self, result, exception, traceback):
self.result = result
self.exception = exception
self.traceback = traceback
def __len__(self):
return 3
def __item__(self, i):
return (self.result, self.exception, self.traceback)[i]
def __iter__(self):
return iter((self.result, self.exception, self.traceback))
def remove_ipc_socket(path):
os = __import__('os')
@ -527,7 +635,7 @@ class Server(Thread):
atexit.register(remove_ipc_socket, self.port)
self.server_socket.listen(5)
self.number_of_workers = number_of_workers
self.pool, self.jobs, self.working, self.results = [], collections.deque(), [], {}
self.pool, self.jobs, self.working = [], collections.deque(), []
atexit.register(self.killall)
atexit.register(self.close)
self.job_lock = RLock()
@ -546,15 +654,8 @@ class Server(Thread):
def add_job(self, job):
with self.job_lock:
self.jobs.append(job)
def store_result(self, result, id=None):
if id:
with self.job_lock:
self.results[id] = result
def result(self, id):
with self.result_lock:
return self.results.pop(id, None)
if job.job_manager is not None:
job.job_manager.add_job(job)
def run(self):
while True:
@ -577,8 +678,9 @@ class Server(Thread):
o.initialize_job(job)
except Exception, err:
o.terminate()
res = Result(None, unicode(err), traceback.format_exc())
job.done(res)
job.exception = err
job.traceback = traceback.format_exc()
job.done()
o = None
if o and o.is_viable():
with self.working_lock:
@ -588,12 +690,14 @@ class Server(Thread):
done = []
for o in self.working:
try:
res = o.control()
if o.control() is not None or o.job.exception is not None:
o.job.done()
done.append(o)
except Exception, err:
res = Result(None, unicode(err), traceback.format_exc())
o.job.exception = err
o.job.traceback = traceback.format_exc()
o.terminate()
if isinstance(res, Result):
o.job.done(res)
o.job.done()
done.append(o)
for o in done:
self.working.remove(o)
@ -613,32 +717,23 @@ class Server(Thread):
self.pool = []
def kill(self, job_id):
def kill(self, job):
with self.working_lock:
pop = None
for o in self.working:
if o.job_id == job_id:
o.terminate()
o.job.done(Result(self.KILL_RESULT, None, ''))
if o.job == job or o == job:
try:
o.terminate()
except: pass
o.job.exception = JobKilled(_('Job stopped by user'))
try:
o.job.done()
except: pass
pop = o
break
if pop is not None:
self.working.remove(pop)
def run_job(self, job_id, func, args=[], kwdargs={},
output=None, progress=None, done=None):
'''
Run a job in a separate process. Supports job control, output redirection
and progress reporting.
'''
if done is None:
done = partial(self.store_result, id=job_id)
job = Job(job_id, func, args, kwdargs, output, progress, done)
with self.job_lock:
self.jobs.append(job)
def run_free_job(self, func, args=[], kwdargs={}):
pt = PersistentTemporaryFile('.pickle', '_IPC_')
pt.write(cPickle.dumps((func, args, kwdargs)))