+
+
JobsDialog
-
-
+
+
0
0
@@ -9,31 +10,32 @@
542
-
+
Active Jobs
-
- :/images/jobs.svg
+
+
+ :/images/jobs.svg:/images/jobs.svg
-
+
-
-
-
+
+
Qt::NoContextMenu
-
+
QAbstractItemView::NoEditTriggers
-
+
true
-
+
QAbstractItemView::SingleSelection
-
+
QAbstractItemView::SelectRows
-
+
32
32
@@ -42,12 +44,19 @@
-
-
-
+
+
&Stop selected job
+ -
+
+
+ Show job &details
+
+
+
@@ -58,7 +67,7 @@
-
+
diff --git a/src/calibre/gui2/dialogs/user_profiles.py b/src/calibre/gui2/dialogs/user_profiles.py
index cab28c66eb..c46b4ba392 100644
--- a/src/calibre/gui2/dialogs/user_profiles.py
+++ b/src/calibre/gui2/dialogs/user_profiles.py
@@ -86,7 +86,7 @@ class UserProfiles(ResizableDialog, Ui_Dialog):
self.source_code.setPlainText('')
else:
self.source_code.setPlainText(src)
- #self.highlighter = PythonHighlighter(self.source_code.document())
+ self.highlighter = PythonHighlighter(self.source_code.document())
self.stacks.setCurrentIndex(1)
self.toggle_mode_button.setText(_('Switch to Basic mode'))
diff --git a/src/calibre/gui2/jobs.py b/src/calibre/gui2/jobs.py
new file mode 100644
index 0000000000..6be6188ab9
--- /dev/null
+++ b/src/calibre/gui2/jobs.py
@@ -0,0 +1,257 @@
+#!/usr/bin/env python
+__license__ = 'GPL v3'
+__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
+__docformat__ = 'restructuredtext en'
+
+'''
+Job management.
+'''
+
+from Queue import Empty, Queue
+
+from PyQt4.Qt import QAbstractTableModel, QVariant, QModelIndex, Qt, \
+ QTimer, SIGNAL, QIcon, QDialog, QAbstractItemDelegate, QApplication, \
+ QSize, QStyleOptionProgressBarV2, QString, QStyle
+
+from calibre.utils.ipc.server import Server
+from calibre.utils.ipc.job import ParallelJob
+from calibre.gui2 import Dispatcher, error_dialog, NONE
+from calibre.gui2.device import DeviceJob
+from calibre.gui2.dialogs.jobs_ui import Ui_JobsDialog
+from calibre import __appname__
+
+class JobManager(QAbstractTableModel):
+
+ def __init__(self):
+ QAbstractTableModel.__init__(self)
+ self.wait_icon = QVariant(QIcon(':/images/jobs.svg'))
+ self.running_icon = QVariant(QIcon(':/images/exec.svg'))
+ self.error_icon = QVariant(QIcon(':/images/dialog_error.svg'))
+ self.done_icon = QVariant(QIcon(':/images/ok.svg'))
+
+ self.jobs = []
+ self.add_job = Dispatcher(self._add_job)
+ self.job_done = Dispatcher(self._job_done)
+ self.server = Server(self.job_done)
+ self.changed_queue = Queue()
+
+ self.timer = QTimer(self)
+ self.connect(self.timer, SIGNAL('timeout()'), self.update,
+ Qt.QueuedConnection)
+ self.timer.start(1000)
+
+ def columnCount(self, parent=QModelIndex()):
+ return 4
+
+ def rowCount(self, parent=QModelIndex()):
+ return len(self.jobs)
+
+ def headerData(self, section, orientation, role):
+ if role != Qt.DisplayRole:
+ return NONE
+ if orientation == Qt.Horizontal:
+ if section == 0: text = _('Job')
+ elif section == 1: text = _('Status')
+ elif section == 2: text = _('Progress')
+ elif section == 3: text = _('Running time')
+ return QVariant(text)
+ else:
+ return QVariant(section+1)
+
+ def data(self, index, role):
+ try:
+ if role not in (Qt.DisplayRole, Qt.DecorationRole):
+ return NONE
+ row, col = index.row(), index.column()
+ job = self.jobs[row]
+
+ if role == Qt.DisplayRole:
+ if col == 0:
+ desc = job.description
+ if not desc:
+ desc = _('Unknown job')
+ return QVariant(desc)
+ if col == 1:
+ return QVariant(job.status_text)
+ if col == 2:
+ return QVariant(job.percent)
+ if col == 3:
+ rtime = job.running_time
+ if rtime is None:
+ return NONE
+ return QVariant('%dm %ds'%(int(rtime)//60, int(rtime)%60))
+ if role == Qt.DecorationRole and col == 0:
+ state = job.run_state
+ if state == job.WAITING:
+ return self.wait_icon
+ if state == job.RUNNING:
+ return self.running_icon
+ if job.killed or job.failed:
+ return self.error_icon
+ return self.done_icon
+ except:
+ import traceback
+ traceback.print_exc()
+ return NONE
+
+ def update(self):
+ try:
+ self._update()
+ except BaseException:
+ pass
+
+ def _update(self):
+ # Update running time
+ rows = set([])
+ for i, j in enumerate(self.jobs):
+ if j.run_state == j.RUNNING:
+ idx = self.index(i, 3)
+ self.emit(SIGNAL('dataChanged(QModelIndex,QModelIndex)'),
+ idx, idx)
+
+ # Update parallel jobs
+ jobs = set([])
+ while True:
+ try:
+ jobs.add(self.server.changed_jobs_queue.get_nowait())
+ except Empty:
+ break
+ while True:
+ try:
+ jobs.add(self.changed_queue.get_nowait())
+ except Empty:
+ break
+
+ if jobs:
+ needs_reset = False
+ for job in jobs:
+ orig_state = job.run_state
+ job.update()
+ if orig_state != job.run_state:
+ needs_reset = True
+ if needs_reset:
+ self.jobs.sort()
+ self.reset()
+ else:
+ for job in jobs:
+ idx = self.jobs.index(job)
+ self.emit(SIGNAL('dataChanged(QModelIndex,QModelIndex)'),
+ self.index(idx, 0), self.index(idx, 3))
+
+
+ def _add_job(self, job):
+ self.emit(SIGNAL('layoutAboutToBeChanged()'))
+ self.jobs.append(job)
+ self.jobs.sort()
+ self.emit(SIGNAL('job_added(int)'), len(self.unfinished_jobs()))
+ self.emit(SIGNAL('layoutChanged()'))
+
+ def done_jobs(self):
+ return [j for j in self.jobs if j.is_finished]
+
+ def unfinished_jobs(self):
+ return [j for j in self.jobs if not j.is_finished]
+
+ def row_to_job(self, row):
+ return self.jobs[row]
+
+ def _job_done(self, job):
+ self.emit(SIGNAL('layoutAboutToBeChanged()'))
+ self.jobs.sort()
+ self.emit(SIGNAL('job_done(int)'), len(self.unfinished_jobs()))
+ self.emit(SIGNAL('layoutChanged()'))
+
+ def has_device_jobs(self):
+ for job in self.jobs:
+ if job.is_running and isinstance(job, DeviceJob):
+ return True
+ return False
+
+ def has_jobs(self):
+ for job in self.jobs:
+ if job.is_running:
+ return True
+ return False
+
+ def run_job(self, done, name, args=[], kwargs={},
+ description=''):
+ job = ParallelJob(name, description, done, args=args, kwargs=kwargs)
+ self.add_job(job)
+ self.server.add_job(job)
+ return job
+
+ def launch_gui_app(self, name, args=[], kwargs={}, description=''):
+ job = ParallelJob(name, description, lambda x: x,
+ args=args, kwargs=kwargs)
+ self.server.run_job(job, gui=True, redirect_output=False)
+
+
+ def kill_job(self, row, view):
+ job = self.jobs[row]
+ if isinstance(job, DeviceJob):
+ return error_dialog(view, _('Cannot kill job'),
+ _('Cannot kill jobs that communicate with the device')).exec_()
+ if job.duration is not None:
+ return error_dialog(view, _('Cannot kill job'),
+ _('Job has already run')).exec_()
+ self.server.kill_job(job)
+
+ def terminate_all_jobs(self):
+ self.server.killall()
+
+
+class ProgressBarDelegate(QAbstractItemDelegate):
+
+ def sizeHint(self, option, index):
+ return QSize(120, 30)
+
+ def paint(self, painter, option, index):
+ opts = QStyleOptionProgressBarV2()
+ opts.rect = option.rect
+ opts.minimum = 1
+ opts.maximum = 100
+ opts.textVisible = True
+ percent, ok = index.model().data(index, Qt.DisplayRole).toInt()
+ if not ok:
+ percent = 0
+ opts.progress = percent
+ opts.text = QString(_('Unavailable') if percent == 0 else '%d%%'%percent)
+ QApplication.style().drawControl(QStyle.CE_ProgressBar, opts, painter)
+
+class JobsDialog(QDialog, Ui_JobsDialog):
+ def __init__(self, window, model):
+ QDialog.__init__(self, window)
+ Ui_JobsDialog.__init__(self)
+ self.setupUi(self)
+ self.jobs_view.setModel(model)
+ self.model = model
+ self.setWindowModality(Qt.NonModal)
+ self.setWindowTitle(__appname__ + _(' - Jobs'))
+ self.connect(self.jobs_view.model(), SIGNAL('modelReset()'),
+ self.jobs_view.resizeColumnsToContents)
+ self.connect(self.kill_button, SIGNAL('clicked()'),
+ self.kill_job)
+ self.connect(self.details_button, SIGNAL('clicked()'),
+ self.show_details)
+ self.connect(self, SIGNAL('kill_job(int, PyQt_PyObject)'),
+ self.jobs_view.model().kill_job)
+ self.pb_delegate = ProgressBarDelegate(self)
+ self.jobs_view.setItemDelegateForColumn(2, self.pb_delegate)
+
+
+ def kill_job(self):
+ for index in self.jobs_view.selectedIndexes():
+ row = index.row()
+ self.model.kill_job(row, self)
+ return
+
+ def show_details(self):
+ for index in self.jobs_view.selectedIndexes():
+ self.jobs_view.show_details(index)
+ return
+
+
+
+ def closeEvent(self, e):
+ self.jobs_view.write_settings()
+ e.accept()
diff --git a/src/calibre/gui2/jobs2.py b/src/calibre/gui2/jobs2.py
deleted file mode 100644
index fc6ddb642e..0000000000
--- a/src/calibre/gui2/jobs2.py
+++ /dev/null
@@ -1,203 +0,0 @@
-#!/usr/bin/env python
-__license__ = 'GPL v3'
-__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
-__docformat__ = 'restructuredtext en'
-
-'''
-Job management.
-'''
-import time
-from PyQt4.QtCore import QAbstractTableModel, QVariant, QModelIndex, Qt, SIGNAL
-from PyQt4.QtGui import QIcon, QDialog
-
-from calibre.parallel import ParallelJob, Server
-from calibre.gui2 import Dispatcher, error_dialog
-from calibre.gui2.device import DeviceJob
-from calibre.gui2.dialogs.job_view_ui import Ui_Dialog
-
-NONE = QVariant()
-
-class JobManager(QAbstractTableModel):
-
- def __init__(self):
- QAbstractTableModel.__init__(self)
- self.wait_icon = QVariant(QIcon(':/images/jobs.svg'))
- self.running_icon = QVariant(QIcon(':/images/exec.svg'))
- self.error_icon = QVariant(QIcon(':/images/dialog_error.svg'))
- self.done_icon = QVariant(QIcon(':/images/ok.svg'))
-
- self.jobs = []
- self.server = Server()
- self.add_job = Dispatcher(self._add_job)
- self.status_update = Dispatcher(self._status_update)
- self.start_work = Dispatcher(self._start_work)
- self.job_done = Dispatcher(self._job_done)
-
- def columnCount(self, parent=QModelIndex()):
- return 4
-
- def rowCount(self, parent=QModelIndex()):
- return len(self.jobs)
-
- def headerData(self, section, orientation, role):
- if role != Qt.DisplayRole:
- return NONE
- if orientation == Qt.Horizontal:
- if section == 0: text = _("Job")
- elif section == 1: text = _("Status")
- elif section == 2: text = _("Progress")
- elif section == 3: text = _('Running time')
- return QVariant(text)
- else:
- return QVariant(section+1)
-
- def data(self, index, role):
- try:
- if role not in (Qt.DisplayRole, Qt.DecorationRole):
- return NONE
- row, col = index.row(), index.column()
- job = self.jobs[row]
-
- if role == Qt.DisplayRole:
- if col == 0:
- desc = job.description
- if not desc:
- desc = _('Unknown job')
- return QVariant(desc)
- if col == 1:
- status = job.status()
- if status == 'DONE':
- st = _('Finished')
- elif status == 'ERROR':
- st = _('Error')
- elif status == 'WAITING':
- st = _('Waiting')
- else:
- st = _('Working')
- return QVariant(st)
- if col == 2:
- pc = job.percent
- if pc <=0:
- percent = 0
- else:
- percent = int(100*pc)
- return QVariant(percent)
- if col == 3:
- if job.start_time is None:
- return NONE
- rtime = job.running_time if job.running_time is not None else \
- time.time() - job.start_time
- return QVariant('%dm %ds'%(int(rtime)//60, int(rtime)%60))
- if role == Qt.DecorationRole and col == 0:
- status = job.status()
- if status == 'WAITING':
- return self.wait_icon
- if status == 'WORKING':
- return self.running_icon
- if status == 'ERROR':
- return self.error_icon
- if status == 'DONE':
- return self.done_icon
- except:
- import traceback
- traceback.print_exc()
- return NONE
-
- def _add_job(self, job):
- self.emit(SIGNAL('layoutAboutToBeChanged()'))
- self.jobs.append(job)
- self.jobs.sort()
- self.emit(SIGNAL('job_added(int)'), self.rowCount())
- self.emit(SIGNAL('layoutChanged()'))
-
- def done_jobs(self):
- return [j for j in self.jobs if j.status() in ['DONE', 'ERROR']]
-
- def row_to_job(self, row):
- return self.jobs[row]
-
- def _start_work(self, job):
- self.emit(SIGNAL('layoutAboutToBeChanged()'))
- self.jobs.sort()
- self.emit(SIGNAL('layoutChanged()'))
-
- def _job_done(self, job):
- self.emit(SIGNAL('layoutAboutToBeChanged()'))
- self.jobs.sort()
- self.emit(SIGNAL('job_done(int)'), len(self.jobs) - len(self.done_jobs()))
- self.emit(SIGNAL('layoutChanged()'))
-
- def _status_update(self, job):
- try:
- row = self.jobs.index(job)
- except ValueError: # Job has been stopped
- return
- self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'),
- self.index(row, 0), self.index(row, 3))
-
- def running_time_updated(self, *args):
- for job in self.jobs:
- if not job.is_running:
- continue
- row = self.jobs.index(job)
- self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'),
- self.index(row, 3), self.index(row, 3))
-
- def has_device_jobs(self):
- for job in self.jobs:
- if job.is_running and isinstance(job, DeviceJob):
- return True
- return False
-
- def has_jobs(self):
- for job in self.jobs:
- if job.is_running:
- return True
- return False
-
- def run_job(self, done, func, args=[], kwargs={},
- description=None):
- job = ParallelJob(func, done, self, args=args, kwargs=kwargs,
- description=description)
- self.server.add_job(job)
- return job
-
-
- def output(self, job):
- self.emit(SIGNAL('output_received()'))
-
- def kill_job(self, row, view):
- job = self.jobs[row]
- if isinstance(job, DeviceJob):
- error_dialog(view, _('Cannot kill job'),
- _('Cannot kill jobs that communicate with the device')).exec_()
- return
- if job.has_run:
- error_dialog(view, _('Cannot kill job'),
- _('Job has already run')).exec_()
- return
- if not job.is_running:
- self.jobs.remove(job)
- self.reset()
- return
-
-
- self.server.kill(job)
-
- def terminate_all_jobs(self):
- pass
-
-class DetailView(QDialog, Ui_Dialog):
-
- def __init__(self, parent, job):
- QDialog.__init__(self, parent)
- self.setupUi(self)
- self.setWindowTitle(job.description)
- self.job = job
- self.update()
-
-
- def update(self):
- self.log.setPlainText(self.job.console_text())
- vbar = self.log.verticalScrollBar()
- vbar.setValue(vbar.maximum())
diff --git a/src/calibre/gui2/main.py b/src/calibre/gui2/main.py
index 47276d2519..d64591bcd7 100644
--- a/src/calibre/gui2/main.py
+++ b/src/calibre/gui2/main.py
@@ -13,7 +13,7 @@ from PyQt4.Qt import Qt, SIGNAL, QObject, QCoreApplication, QUrl, QTimer, \
from PyQt4.QtSvg import QSvgRenderer
from calibre import __version__, __appname__, islinux, sanitize_file_name, \
- iswindows, isosx
+ iswindows, isosx, prints
from calibre.ptempfile import PersistentTemporaryFile
from calibre.utils.config import prefs, dynamic
from calibre.gui2 import APP_UID, warning_dialog, choose_files, error_dialog, \
@@ -32,10 +32,9 @@ from calibre.gui2.main_window import MainWindow, option_parser as _option_parser
from calibre.gui2.main_ui import Ui_MainWindow
from calibre.gui2.device import DeviceManager, DeviceMenu, DeviceGUI, Emailer
from calibre.gui2.status import StatusBar
-from calibre.gui2.jobs2 import JobManager
+from calibre.gui2.jobs import JobManager, JobsDialog
from calibre.gui2.dialogs.metadata_single import MetadataSingleDialog
from calibre.gui2.dialogs.metadata_bulk import MetadataBulkDialog
-from calibre.gui2.dialogs.jobs import JobsDialog
from calibre.gui2.tools import convert_single_ebook, convert_bulk_ebook, \
fetch_scheduled_recipe
from calibre.gui2.dialogs.config import ConfigDialog
@@ -44,7 +43,6 @@ from calibre.gui2.dialogs.choose_format import ChooseFormatDialog
from calibre.gui2.dialogs.book_info import BookInfo
from calibre.ebooks import BOOK_EXTENSIONS
from calibre.library.database2 import LibraryDatabase2, CoverCache
-from calibre.parallel import JobKilled
from calibre.gui2.dialogs.confirm_delete import confirm
class SaveMenu(QMenu):
@@ -626,9 +624,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
'''
Called once device information has been read.
'''
- if job.exception is not None:
- self.device_job_exception(job)
- return
+ if job.failed:
+ return self.device_job_exception(job)
info, cp, fs = job.result
self.location_view.model().update_devices(cp, fs)
self.device_info = _('Connected ')+info[0]
@@ -641,7 +638,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
'''
Called once metadata has been read for all books on the device.
'''
- if job.exception is not None:
+ if job.failed:
if isinstance(job.exception, ExpatError):
error_dialog(self, _('Device database corrupted'),
_('''
@@ -823,8 +820,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
Called once deletion is done on the device
'''
for view in (self.memory_view, self.card_a_view, self.card_b_view):
- view.model().deletion_done(job, bool(job.exception))
- if job.exception is not None:
+ view.model().deletion_done(job, job.failed)
+ if job.failed:
self.device_job_exception(job)
return
@@ -993,9 +990,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
progress.hide()
def books_saved(self, job):
- if job.exception is not None:
- self.device_job_exception(job)
- return
+ if job.failed:
+ return self.device_job_exception(job)
############################################################################
@@ -1013,9 +1009,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
def scheduled_recipe_fetched(self, job):
temp_files, fmt, recipe, callback = self.conversion_jobs.pop(job)
pt = temp_files[0]
- if job.exception is not None:
- self.job_exception(job)
- return
+ if job.failed:
+ return self.job_exception(job)
id = self.library_view.model().add_news(pt.name, recipe)
self.library_view.model().reset()
sync = dynamic.get('news_to_be_synced', set([]))
@@ -1098,9 +1093,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
def book_auto_converted(self, job):
temp_files, fmt, book_id, on_card = self.conversion_jobs.pop(job)
try:
- if job.exception is not None:
- self.job_exception(job)
- return
+ if job.failed:
+ return self.job_exception(job)
data = open(temp_files[0].name, 'rb')
self.library_view.model().db.add_format(book_id, fmt, data, index_is_id=True)
data.close()
@@ -1122,7 +1116,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
def book_converted(self, job):
temp_files, fmt, book_id = self.conversion_jobs.pop(job)
try:
- if job.exception is not None:
+ if job.failed:
self.job_exception(job)
return
data = open(temp_files[-1].name, 'rb')
@@ -1151,7 +1145,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
self._view_file(fmt_path)
def book_downloaded_for_viewing(self, job):
- if job.exception:
+ if job.failed:
self.device_job_exception(job)
return
self._view_file(job.result)
@@ -1165,12 +1159,11 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
args.append('--raise-window')
if name is not None:
args.append(name)
- self.job_manager.server.run_free_job(viewer,
- kwdargs=dict(args=args))
+ self.job_manager.launch_gui_app(viewer,
+ kwargs=dict(args=args))
else:
QDesktopServices.openUrl(QUrl.fromLocalFile(name))#launch(name)
-
- time.sleep(5) # User feedback
+ time.sleep(2) # User feedback
finally:
self.unsetCursor()
@@ -1395,7 +1388,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
'''
try:
if 'Could not read 32 bytes on the control bus.' in \
- unicode(job.exception):
+ unicode(job.details):
error_dialog(self, _('Error talking to device'),
_('There was a temporary error talking to the '
'device. Please unplug and reconnect the device '
@@ -1404,16 +1397,16 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
except:
pass
try:
- print >>sys.stderr, job.console_text()
+ prints(job.details, file=sys.stderr)
except:
pass
if not self.device_error_dialog.isVisible():
- self.device_error_dialog.set_message(job.gui_text())
+ self.device_error_dialog.set_message(job.details)
self.device_error_dialog.show()
def job_exception(self, job):
try:
- if job.exception[0] == 'DRMError':
+ if 'calibre.ebooks.DRMError' in job.details:
error_dialog(self, _('Conversion Error'),
_('Could not convert: %s
It is a '
'DRMed book. You must first remove the '
@@ -1423,23 +1416,15 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
return
except:
pass
- only_msg = getattr(job.exception, 'only_msg', False)
+ if job.killed:
+ return
try:
- print job.console_text()
+ prints(job.details, file=sys.stderr)
except:
pass
- if only_msg:
- try:
- exc = unicode(job.exception)
- except:
- exc = repr(job.exception)
- error_dialog(self, _('Conversion Error'), exc).exec_()
- return
- if isinstance(job.exception, JobKilled):
- return
error_dialog(self, _('Conversion Error'),
- _('Failed to process')+': '+unicode(job.description),
- det_msg=job.console_text()).exec_()
+ _('Failed')+': '+unicode(job.description),
+ det_msg=job.details).exec_()
def initialize_database(self):
@@ -1555,7 +1540,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
def shutdown(self, write_settings=True):
if write_settings:
self.write_settings()
- self.job_manager.terminate_all_jobs()
+ self.job_manager.server.close()
self.device_manager.keep_going = False
self.cover_cache.stop()
self.hide()
diff --git a/src/calibre/gui2/status.py b/src/calibre/gui2/status.py
index 7b3dac4dc8..11b442fd17 100644
--- a/src/calibre/gui2/status.py
+++ b/src/calibre/gui2/status.py
@@ -10,10 +10,10 @@ from calibre.gui2 import qstring_to_unicode, config
class BookInfoDisplay(QWidget):
class BookCoverDisplay(QLabel):
-
+
WIDTH = 81
HEIGHT = 108
-
+
def __init__(self, coverpath=':/images/book.svg'):
QLabel.__init__(self)
self.default_pixmap = QPixmap(coverpath).scaled(self.__class__.WIDTH,
@@ -23,42 +23,42 @@ class BookInfoDisplay(QWidget):
self.setScaledContents(True)
self.setMaximumHeight(self.HEIGHT)
self.setPixmap(self.default_pixmap)
-
-
+
+
def setPixmap(self, pixmap):
width, height = fit_image(pixmap.width(), pixmap.height(),
self.WIDTH, self.HEIGHT)[1:]
self.setMaximumHeight(height)
self.setMaximumWidth(width)
QLabel.setPixmap(self, pixmap)
-
+
try:
aspect_ratio = pixmap.width()/float(pixmap.height())
except ZeroDivisionError:
aspect_ratio = 1
self.setMaximumWidth(int(aspect_ratio*self.HEIGHT))
-
+
def sizeHint(self):
return QSize(self.__class__.WIDTH, self.__class__.HEIGHT)
-
-
+
+
class BookDataDisplay(QLabel):
def __init__(self):
QLabel.__init__(self)
self.setText('')
self.setWordWrap(True)
self.setSizePolicy(QSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding))
-
+
def mouseReleaseEvent(self, ev):
self.emit(SIGNAL('mr(int)'), 1)
-
+
WEIGHTS = collections.defaultdict(lambda : 100)
WEIGHTS[_('Path')] = 0
WEIGHTS[_('Formats')] = 1
WEIGHTS[_('Comments')] = 2
WEIGHTS[_('Series')] = 3
WEIGHTS[_('Tags')] = 4
-
+
def __init__(self, clear_message):
QWidget.__init__(self)
self.setCursor(Qt.PointingHandCursor)
@@ -74,16 +74,16 @@ class BookInfoDisplay(QWidget):
self.data = {}
self.setVisible(False)
self._layout.setAlignment(self.cover_display, Qt.AlignTop|Qt.AlignLeft)
-
+
def mouseReleaseEvent(self, ev):
self.emit(SIGNAL('show_book_info()'))
-
+
def show_data(self, data):
if data.has_key('cover'):
self.cover_display.setPixmap(QPixmap.fromImage(data.pop('cover')))
else:
self.cover_display.setPixmap(self.cover_display.default_pixmap)
-
+
rows = u''
self.book_data.setText('')
self.data = data.copy()
@@ -97,7 +97,7 @@ class BookInfoDisplay(QWidget):
txt = txt.decode(preferred_encoding, 'replace')
rows += u'
%s: | %s |
'%(key, txt)
self.book_data.setText(u'')
-
+
self.clear_message()
self.book_data.updateGeometry()
self.updateGeometry()
@@ -113,7 +113,7 @@ class MovieButton(QFrame):
self.movie = movie
self.layout().addWidget(self.movie_widget)
self.jobs = QLabel(''+_('Jobs:')+' 0')
- self.jobs.setAlignment(Qt.AlignHCenter|Qt.AlignBottom)
+ self.jobs.setAlignment(Qt.AlignHCenter|Qt.AlignBottom)
self.layout().addWidget(self.jobs)
self.layout().setAlignment(self.jobs, Qt.AlignHCenter)
self.jobs.setMargin(0)
@@ -125,8 +125,8 @@ class MovieButton(QFrame):
movie.start()
movie.setPaused(True)
self.jobs_dialog.jobs_view.restore_column_widths()
-
-
+
+
def mouseReleaseEvent(self, event):
if self.jobs_dialog.isVisible():
self.jobs_dialog.jobs_view.write_settings()
@@ -137,7 +137,7 @@ class MovieButton(QFrame):
self.jobs_dialog.jobs_view.restore_column_widths()
class CoverFlowButton(QToolButton):
-
+
def __init__(self, parent=None):
QToolButton.__init__(self, parent)
self.setIconSize(QSize(80, 80))
@@ -149,17 +149,17 @@ class CoverFlowButton(QToolButton):
self.connect(self, SIGNAL('toggled(bool)'), self.adjust_tooltip)
self.adjust_tooltip(False)
self.setCursor(Qt.PointingHandCursor)
-
+
def adjust_tooltip(self, on):
tt = _('Click to turn off Cover Browsing') if on else _('Click to browse books by their covers')
self.setToolTip(tt)
-
+
def disable(self, reason):
self.setDisabled(True)
self.setToolTip(_('Browsing books by their covers is disabled.
Import of pictureflow module failed:
')+reason)
-
+
class TagViewButton(QToolButton):
-
+
def __init__(self, parent=None):
QToolButton.__init__(self, parent)
self.setIconSize(QSize(80, 80))
@@ -170,10 +170,10 @@ class TagViewButton(QToolButton):
self.setCheckable(True)
self.setChecked(False)
self.setAutoRaise(True)
-
+
class StatusBar(QStatusBar):
-
+
def __init__(self, jobs_dialog, systray=None):
QStatusBar.__init__(self)
self.systray = systray
@@ -192,11 +192,11 @@ class StatusBar(QStatusBar):
self.addWidget(self.scroll_area, 100)
self.setMinimumHeight(120)
self.setMaximumHeight(120)
-
-
+
+
def reset_info(self):
self.book_info.show_data({})
-
+
def showMessage(self, msg, timeout=0):
ret = QStatusBar.showMessage(self, msg, timeout)
if self.systray is not None and not config['disable_tray_notification']:
@@ -207,39 +207,38 @@ class StatusBar(QStatusBar):
msg = msg.encode('utf-8')
self.systray.showMessage('calibre', msg, self.systray.Information, 10000)
return ret
-
+
def jobs(self):
src = qstring_to_unicode(self.movie_button.jobs.text())
return int(re.search(r'\d+', src).group())
-
+
def show_book_info(self):
self.emit(SIGNAL('show_book_info()'))
-
+
def job_added(self, nnum):
jobs = self.movie_button.jobs
src = qstring_to_unicode(jobs.text())
num = self.jobs()
- nnum = num + 1
text = src.replace(str(num), str(nnum))
jobs.setText(text)
if self.movie_button.movie.state() == QMovie.Paused:
self.movie_button.movie.setPaused(False)
-
- def job_done(self, running):
+
+ def job_done(self, nnum):
jobs = self.movie_button.jobs
src = qstring_to_unicode(jobs.text())
num = self.jobs()
- text = src.replace(str(num), str(running))
+ text = src.replace(str(num), str(nnum))
jobs.setText(text)
- if running == 0:
+ if nnum == 0:
self.no_more_jobs()
-
+
def no_more_jobs(self):
if self.movie_button.movie.state() == QMovie.Running:
self.movie_button.movie.jumpToFrame(0)
self.movie_button.movie.setPaused(True)
QCoreApplication.instance().alert(self, 5000)
-
+
if __name__ == '__main__':
# Used to create the animated status icon
from PyQt4.Qt import QApplication, QPainter, QSvgRenderer, QColor
@@ -280,4 +279,4 @@ if __name__ == '__main__':
os.remove(file)
import sys
create_mng(sys.argv[1])
-
+
diff --git a/src/calibre/gui2/widgets.py b/src/calibre/gui2/widgets.py
index db36daa784..2e5c982791 100644
--- a/src/calibre/gui2/widgets.py
+++ b/src/calibre/gui2/widgets.py
@@ -4,16 +4,16 @@ __copyright__ = '2008, Kovid Goyal '
Miscellaneous widgets used in the GUI
'''
import re, os, traceback
-from PyQt4.QtGui import QListView, QIcon, QFont, QLabel, QListWidget, \
+from PyQt4.Qt import QListView, QIcon, QFont, QLabel, QListWidget, \
QListWidgetItem, QTextCharFormat, QApplication, \
- QSyntaxHighlighter, QCursor, QColor, QWidget, QDialog, \
- QPixmap, QMovie, QPalette
-from PyQt4.QtCore import QAbstractListModel, QVariant, Qt, SIGNAL, \
+ QSyntaxHighlighter, QCursor, QColor, QWidget, \
+ QPixmap, QMovie, QPalette, QTimer, QDialog, \
+ QAbstractListModel, QVariant, Qt, SIGNAL, \
QRegExp, QSettings, QSize, QModelIndex
-from calibre.gui2.jobs2 import DetailView
from calibre.gui2 import human_readable, NONE, TableView, \
qstring_to_unicode, error_dialog
+from calibre.gui2.dialogs.job_view_ui import Ui_Dialog
from calibre.gui2.filename_pattern_ui import Ui_Form
from calibre import fit_image
from calibre.utils.fontconfig import find_font_families
@@ -249,6 +249,31 @@ class LocationView(QListView):
if 0 <= row and row <= 3:
self.model().location_changed(row)
+class DetailView(QDialog, Ui_Dialog):
+
+ def __init__(self, parent, job):
+ QDialog.__init__(self, parent)
+ self.setupUi(self)
+ self.setWindowTitle(job.description)
+ self.job = job
+ self.next_pos = 0
+ self.update()
+ self.timer = QTimer(self)
+ self.connect(self.timer, SIGNAL('timeout()'), self.update)
+ self.timer.start(1000)
+
+
+ def update(self):
+ f = self.job.log_file
+ f.seek(self.next_pos)
+ more = f.read()
+ self.next_pos = f.tell()
+ if more:
+ self.log.appendPlainText(more.decode('utf-8', 'replace'))
+ vbar = self.log.verticalScrollBar()
+ vbar.setValue(vbar.maximum())
+
+
class JobsView(TableView):
def __init__(self, parent):
@@ -259,7 +284,6 @@ class JobsView(TableView):
row = index.row()
job = self.model().row_to_job(row)
d = DetailView(self, job)
- self.connect(self.model(), SIGNAL('output_received()'), d.update)
d.exec_()
@@ -539,12 +563,12 @@ class PythonHighlighter(QSyntaxHighlighter):
return
for regex, format in PythonHighlighter.Rules:
- i = text.indexOf(regex)
+ i = regex.indexIn(text)
while i >= 0:
length = regex.matchedLength()
self.setFormat(i, length,
PythonHighlighter.Formats[format])
- i = text.indexOf(regex, i + length)
+ i = regex.indexIn(text, i + length)
# Slow but good quality highlighting for comments. For more
# speed, comment this out and add the following to __init__:
@@ -569,12 +593,12 @@ class PythonHighlighter(QSyntaxHighlighter):
self.setCurrentBlockState(NORMAL)
- if text.indexOf(self.stringRe) != -1:
+ if self.stringRe.indexIn(text) != -1:
return
# This is fooled by triple quotes inside single quoted strings
- for i, state in ((text.indexOf(self.tripleSingleRe),
+ for i, state in ((self.tripleSingleRe.indexIn(text),
TRIPLESINGLE),
- (text.indexOf(self.tripleDoubleRe),
+ (self.tripleDoubleRe.indexIn(text),
TRIPLEDOUBLE)):
if self.previousBlockState() == state:
if i == -1:
diff --git a/src/calibre/linux.py b/src/calibre/linux.py
index 215236e83d..9582651ca0 100644
--- a/src/calibre/linux.py
+++ b/src/calibre/linux.py
@@ -29,7 +29,7 @@ entry_points = {
'calibre-debug = calibre.debug:main',
'calibredb = calibre.library.cli:main',
'calibre-fontconfig = calibre.utils.fontconfig:main',
- 'calibre-parallel = calibre.parallel:main',
+ 'calibre-parallel = calibre.utils.ipc.worker:main',
'calibre-customize = calibre.customize.ui:main',
'calibre-complete = calibre.utils.complete:main',
'pdfmanipulate = calibre.ebooks.pdf.manipulate.cli:main',
diff --git a/src/calibre/parallel.py b/src/calibre/parallel.py
deleted file mode 100644
index 0ec7ed09cc..0000000000
--- a/src/calibre/parallel.py
+++ /dev/null
@@ -1,980 +0,0 @@
-from __future__ import with_statement
-__license__ = 'GPL v3'
-__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
-__docformat__ = 'restructuredtext en'
-
-'''
-Used to run jobs in parallel in separate processes. Features output streaming,
-support for progress notification as well as job killing. The worker processes
-are controlled via a simple protocol run over sockets. The control happens
-mainly in two class, :class:`Server` and :class:`Overseer`. The worker is
-encapsulated in the function :function:`worker`. Every worker process
-has the environment variable :envvar:`CALIBRE_WORKER` defined.
-
-The worker control protocol has two modes of operation. In the first mode, the
-worker process listens for commands from the controller process. The controller
-process can either hand off a job to the worker or tell the worker to die.
-Once a job is handed off to the worker, the protocol enters the second mode, where
-the controller listens for messages from the worker. The worker can send progress updates
-as well as console output (i.e. text that would normally have been written to stdout
-or stderr by the job). Once the job completes (or raises an exception) the worker
-returns the result (or exception) to the controller and the protocol reverts to the first mode.
-
-In the second mode, the controller can also send the worker STOP messages, in which case
-the worker interrupts the job and dies. The sending of progress and console output messages
-is buffered and asynchronous to prevent the job from being IO bound.
-'''
-import sys, os, gc, cPickle, traceback, cStringIO, time, signal, \
- subprocess, socket, collections, binascii, re, thread, tempfile, atexit
-from select import select
-from threading import RLock, Thread, Event
-from math import ceil
-
-from calibre.ptempfile import PersistentTemporaryFile
-from calibre import iswindows, detect_ncpus, isosx, preferred_encoding
-from calibre.utils.config import prefs
-
-DEBUG = False
-
-#: A mapping from job names to functions that perform the jobs
-PARALLEL_FUNCS = {
- 'lrfviewer' :
- ('calibre.gui2.lrf_renderer.main', 'main', {}, None),
-
- 'ebook-viewer' :
- ('calibre.gui2.viewer.main', 'main', {}, None),
-
- 'render_pages' :
- ('calibre.ebooks.comic.input', 'render_pages', {}, 'notification'),
-
- 'ebook-convert' :
- ('calibre.ebooks.conversion.cli', 'main', {}, None),
-
- 'gui_convert' :
- ('calibre.gui2.convert.gui_conversion', 'gui_convert', {}, 'notification'),
-}
-
-
-isfrozen = hasattr(sys, 'frozen')
-isworker = False
-
-win32event = __import__('win32event') if iswindows else None
-win32process = __import__('win32process') if iswindows else None
-msvcrt = __import__('msvcrt') if iswindows else None
-
-SOCKET_TYPE = socket.AF_UNIX if not iswindows else socket.AF_INET
-
-class WorkerStatus(object):
- '''
- A platform independent class to control child processes. Provides the
- methods:
-
- .. method:: WorkerStatus.is_alive()
-
- Return True is the child process is alive (i.e. it hasn't exited and returned a return code).
-
- .. method:: WorkerStatus.returncode()
-
- Wait for the child process to exit and return its return code (blocks until child returns).
-
- .. method:: WorkerStatus.kill()
-
- Forcibly terminates child process using operating system specific semantics.
- '''
-
- def __init__(self, obj):
- '''
- `obj`: On windows a process handle, on unix a subprocess.Popen object.
- '''
- self.obj = obj
- self.win32process = win32process # Needed if kill is called during shutdown of interpreter
- self.os = os
- self.signal = signal
- ext = 'windows' if iswindows else 'unix'
- for func in ('is_alive', 'returncode', 'kill'):
- setattr(self, func, getattr(self, func+'_'+ext))
-
- def is_alive_unix(self):
- return self.obj.poll() == None
-
- def returncode_unix(self):
- return self.obj.wait()
-
- def kill_unix(self):
- os.kill(self.obj.pid, self.signal.SIGKILL)
-
- def is_alive_windows(self):
- return win32event.WaitForSingleObject(self.obj, 0) != win32event.WAIT_OBJECT_0
-
- def returncode_windows(self):
- return win32process.GetExitCodeProcess(self.obj)
-
- def kill_windows(self, returncode=-1):
- self.win32process.TerminateProcess(self.obj, returncode)
-
-class WorkerMother(object):
- '''
- Platform independent object for launching child processes. All processes
- have the environment variable :envvar:`CALIBRE_WORKER` set.
-
- ..method:: WorkerMother.spawn_free_spirit(arg)
-
- Launch a non monitored process with argument `arg`.
-
- ..method:: WorkerMother.spawn_worker(arg)
-
- Launch a monitored and controllable process with argument `arg`.
- '''
-
- def __init__(self):
- ext = 'windows' if iswindows else 'osx' if isosx else 'linux'
- self.os = os # Needed incase cleanup called when interpreter is shutting down
- self.env = {}
- if iswindows:
- self.executable = os.path.join(os.path.dirname(sys.executable),
- 'calibre-parallel.exe' if isfrozen else 'Scripts\\calibre-parallel.exe')
- elif isosx:
- self.executable = self.gui_executable = sys.executable
- self.prefix = ''
- if isfrozen:
- fd = os.path.realpath(getattr(sys, 'frameworks_dir'))
- contents = os.path.dirname(fd)
- self.gui_executable = os.path.join(contents, 'MacOS',
- os.path.basename(sys.executable))
- contents = os.path.join(contents, 'console.app', 'Contents')
- exe = os.path.basename(sys.executable)
- if 'python' not in exe:
- exe = 'python'
- self.executable = os.path.join(contents, 'MacOS', exe)
-
- resources = os.path.join(contents, 'Resources')
- fd = os.path.join(contents, 'Frameworks')
- sp = os.path.join(resources, 'lib', 'python'+sys.version[:3], 'site-packages.zip')
- self.prefix += 'import sys; sys.frameworks_dir = "%s"; sys.frozen = "macosx_app"; '%fd
- self.prefix += 'sys.path.insert(0, %s); '%repr(sp)
- if fd not in os.environ['PATH']:
- self.env['PATH'] = os.environ['PATH']+':'+fd
- self.env['PYTHONHOME'] = resources
- self.env['MAGICK_HOME'] = os.path.join(fd, 'ImageMagick')
- self.env['DYLD_LIBRARY_PATH'] = os.path.join(fd, 'ImageMagick', 'lib')
- else:
- self.executable = os.path.join(getattr(sys, 'frozen_path'), 'calibre-parallel') \
- if isfrozen else 'calibre-parallel'
- if isfrozen:
- self.env['LD_LIBRARY_PATH'] = getattr(sys, 'frozen_path') + ':' + os.environ.get('LD_LIBRARY_PATH', '')
-
- self.spawn_worker_windows = lambda arg : self.spawn_free_spirit_windows(arg, type='worker')
- self.spawn_worker_linux = lambda arg : self.spawn_free_spirit_linux(arg, type='worker')
- self.spawn_worker_osx = lambda arg : self.spawn_free_spirit_osx(arg, type='worker')
-
- for func in ('spawn_free_spirit', 'spawn_worker'):
- setattr(self, func, getattr(self, func+'_'+ext))
-
-
- def cleanup_child_windows(self, child, name=None, fd=None):
- try:
- child.kill()
- except:
- pass
- try:
- if fd is not None:
- self.os.close(fd)
- except:
- pass
- try:
- if name is not None and os.path.exists(name):
- self.os.unlink(name)
- except:
- pass
-
- def cleanup_child_linux(self, child):
- try:
- child.kill()
- except:
- pass
-
- def get_env(self):
- env = dict(os.environ)
- env['CALIBRE_WORKER'] = '1'
- env['ORIGWD'] = os.path.abspath(os.getcwd())
- if hasattr(self, 'env'):
- env.update(self.env)
- return env
-
- def spawn_free_spirit_osx(self, arg, type='free_spirit'):
- script = ('from calibre.parallel import main; '
- 'main(args=["calibre-parallel", %s]);')%repr(arg)
- exe = self.gui_executable if type == 'free_spirit' else self.executable
- cmdline = [exe, '-c', self.prefix+script]
- child = WorkerStatus(subprocess.Popen(cmdline, env=self.get_env()))
- atexit.register(self.cleanup_child_linux, child)
- return child
-
- def spawn_free_spirit_linux(self, arg, type='free_spirit'):
- cmdline = [self.executable, arg]
- child = WorkerStatus(subprocess.Popen(cmdline,
- env=self.get_env(), cwd=getattr(sys, 'frozen_path', None)))
- atexit.register(self.cleanup_child_linux, child)
- return child
-
- def spawn_free_spirit_windows(self, arg, type='free_spirit'):
- priority = {'high':win32process.HIGH_PRIORITY_CLASS, 'normal':win32process.NORMAL_PRIORITY_CLASS,
- 'low':win32process.IDLE_PRIORITY_CLASS}[prefs['worker_process_priority']]
- fd, name = tempfile.mkstemp('.log', 'calibre_'+type+'_')
- handle = msvcrt.get_osfhandle(fd)
- si = win32process.STARTUPINFO()
- si.hStdOutput = handle
- si.hStdError = handle
- cmdline = self.executable + ' ' + str(arg)
- hProcess = \
- win32process.CreateProcess(
- None, # Application Name
- cmdline, # Command line
- None, # processAttributes
- None, # threadAttributes
- 1, # bInheritHandles
- win32process.CREATE_NO_WINDOW|priority, # Dont want ugly console popping up
- self.get_env(), # New environment
- None, # Current directory
- si
- )[0]
- child = WorkerStatus(hProcess)
- atexit.register(self.cleanup_child_windows, child, name, fd)
- return child
-
-
-mother = WorkerMother()
-
-_comm_lock = RLock()
-def write(socket, msg, timeout=5):
- '''
- Write a message on socket. If `msg` is unicode, it is encoded in utf-8.
- Raises a `RuntimeError` if the socket is not ready for writing or the writing fails.
- `msg` is broken into chunks of size 4096 and sent. The :function:`read` function
- automatically re-assembles the chunks into whole message.
- '''
- if isworker:
- _comm_lock.acquire()
- try:
- if isinstance(msg, unicode):
- msg = msg.encode('utf-8')
- if DEBUG:
- print >>sys.__stdout__, 'write(%s):'%('worker' if isworker else 'overseer'), repr(msg)
- length = None
- while len(msg) > 0:
- if length is None:
- length = len(msg)
- chunk = ('%-12d'%length) + msg[:4096-12]
- msg = msg[4096-12:]
- else:
- chunk, msg = msg[:4096], msg[4096:]
- w = select([], [socket], [], timeout)[1]
- if not w:
- raise RuntimeError('Write to socket timed out')
- if socket.sendall(chunk) is not None:
- raise RuntimeError('Failed to write chunk to socket')
- finally:
- if isworker:
- _comm_lock.release()
-
-def read(socket, timeout=5):
- '''
- Read a message from `socket`. The message must have been sent with the :function:`write`
- function. Raises a `RuntimeError` if the message is corrupted. Can return an
- empty string.
- '''
- if isworker:
- _comm_lock.acquire()
- try:
- buf = cStringIO.StringIO()
- length = None
- while select([socket],[],[],timeout)[0]:
- msg = socket.recv(4096)
- if not msg:
- break
- if length is None:
- try:
- length, msg = int(msg[:12]), msg[12:]
- except ValueError:
- if DEBUG:
- print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), 'no length in', msg
- return ''
- buf.write(msg)
- if buf.tell() >= length:
- break
- if not length:
- if DEBUG:
- print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), 'nothing'
- return ''
- msg = buf.getvalue()[:length]
- if len(msg) < length:
- raise RuntimeError('Corrupted packet received')
- if DEBUG:
- print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), repr(msg)
- return msg
- finally:
- if isworker:
- _comm_lock.release()
-
-class RepeatingTimer(Thread):
- '''
- Calls a specified function repeatedly at a specified interval. Runs in a
- daemon thread (i.e. the interpreter can exit while it is still running).
- Call :meth:`start()` to start it.
- '''
-
- def repeat(self):
- while True:
- self.event.wait(self.interval)
- if self.event.isSet():
- break
- self.action()
-
- def __init__(self, interval, func, name):
- self.event = Event()
- self.interval = interval
- self.action = func
- Thread.__init__(self, target=self.repeat, name=name)
- self.setDaemon(True)
-
-class ControlError(Exception):
- pass
-
-class Overseer(object):
- '''
- Responsible for controlling worker processes. The main interface is the
- methods, :meth:`initialize_job`, :meth:`control`.
- '''
-
- KILL_RESULT = 'Server: job killed by user|||#@#$%&*)*(*$#$%#$@&'
- INTERVAL = 0.1
-
- def __init__(self, server, port, timeout=5):
- self.worker_status = mother.spawn_worker('127.0.0.1:'+str(port))
- self.socket = server.accept()[0]
- # Needed if terminate called when interpreter is shutting down
- self.os = os
- self.signal = signal
- self.on_probation = False
- self.terminated = False
-
- self.working = False
- self.timeout = timeout
- self.last_job_time = time.time()
- self._stop = False
- if not select([self.socket], [], [], 120)[0]:
- raise RuntimeError(_('Could not launch worker process.'))
- ID = self.read().split(':')
- if ID[0] != 'CALIBRE_WORKER':
- raise RuntimeError('Impostor')
- self.worker_pid = int(ID[1])
- self.write('OK')
- if self.read() != 'WAITING':
- raise RuntimeError('Worker sulking')
-
- def terminate(self):
- 'Kill worker process.'
- self.terminated = True
- try:
- if self.socket:
- self.write('STOP:')
- time.sleep(1)
- self.socket.shutdown(socket.SHUT_RDWR)
- except:
- pass
- if iswindows:
- win32api = __import__('win32api')
- try:
- handle = win32api.OpenProcess(1, False, self.worker_pid)
- win32api.TerminateProcess(handle, -1)
- except:
- pass
- else:
- try:
- try:
- self.os.kill(self.worker_pid, self.signal.SIGKILL)
- time.sleep(0.5)
- finally:
- self.worker_status.kill()
- except:
- pass
-
-
- def write(self, msg, timeout=None):
- write(self.socket, msg, timeout=self.timeout if timeout is None else timeout)
-
- def read(self, timeout=None):
- return read(self.socket, timeout=self.timeout if timeout is None else timeout)
-
- def __eq__(self, other):
- return hasattr(other, 'process') and hasattr(other, 'worker_pid') and self.worker_pid == other.worker_pid
-
- def is_viable(self):
- if self.terminated:
- return False
- return self.worker_status.is_alive()
-
- def select(self, timeout=0):
- return select([self.socket], [self.socket], [self.socket], timeout)
-
- def initialize_job(self, job):
- '''
- Sends `job` to worker process. Can raise `ControlError` if worker process
- does not respond appropriately. In this case, this Overseer is useless
- and should be discarded.
-
- `job`: An instance of :class:`Job`.
- '''
- self.working = True
- self.write('JOB:'+cPickle.dumps((job.func, job.args, job.kwargs), -1))
- msg = self.read()
- if msg != 'OK':
- raise ControlError('Failed to initialize job on worker %d:%s'%(self.worker_pid, msg))
- self.job = job
- self.last_report = time.time()
- job.start_work()
-
- def control(self):
- '''
- Listens for messages from the worker process and dispatches them
- appropriately. If the worker process dies unexpectedly, returns a result
- of None with a ControlError indicating the worker died.
-
- Returns a :class:`Result` instance or None, if the worker is still working.
- '''
- if select([self.socket],[],[],0)[0]:
- msg = self.read()
- if msg:
- self.on_probation = False
- self.last_report = time.time()
- else:
- if self.on_probation:
- self.terminate()
- self.job.result = None
- self.job.exception = ControlError('Worker process died unexpectedly')
- return
- else:
- self.on_probation = True
- return
- word, msg = msg.partition(':')[0], msg.partition(':')[-1]
- if word == 'PING':
- self.write('OK')
- return
- elif word == 'RESULT':
- self.write('OK')
- self.job.result = cPickle.loads(msg)
- return True
- elif word == 'OUTPUT':
- self.write('OK')
- try:
- self.job.output(''.join(cPickle.loads(msg)))
- except:
- self.job.output('Bad output message: '+ repr(msg))
- elif word == 'PROGRESS':
- self.write('OK')
- percent = None
- try:
- percent, msg = cPickle.loads(msg)[-1]
- except:
- print 'Bad progress update:', repr(msg)
- if percent is not None:
- self.job.update_status(percent, msg)
- elif word == 'ERROR':
- self.write('OK')
- exception, tb = cPickle.loads(msg)
- self.job.output(u'%s\n%s'%(exception, tb))
- self.job.exception, self.job.traceback = exception, tb
- return True
- else:
- self.terminate()
- self.job.exception = ControlError('Worker sent invalid msg: %s'%repr(msg))
- return
- if not self.worker_status.is_alive() or time.time() - self.last_report > 380:
- self.terminate()
- self.job.exception = ControlError('Worker process died unexpectedly')
- return
-
-class JobKilled(Exception):
- pass
-
-class Job(object):
-
- def __init__(self, job_done, job_manager=None,
- args=[], kwargs={}, description=None):
- self.args = args
- self.kwargs = kwargs
- self._job_done = job_done
- self.job_manager = job_manager
- self.is_running = False
- self.has_run = False
- self.percent = -1
- self.msg = None
- self.description = description
- self.start_time = None
- self.running_time = None
-
- self.result = self.exception = self.traceback = self.log = None
-
- def __cmp__(self, other):
- sstatus, ostatus = self.status(), other.status()
- if sstatus == ostatus or (self.has_run and other.has_run):
- if self.start_time == other.start_time:
- return cmp(id(self), id(other))
- return cmp(self.start_time, other.start_time)
- if sstatus == 'WORKING':
- return -1
- if ostatus == 'WORKING':
- return 1
- if sstatus == 'WAITING':
- return -1
- if ostatus == 'WAITING':
- return 1
-
-
- def job_done(self):
- self.is_running, self.has_run = False, True
- self.running_time = (time.time() - self.start_time) if \
- self.start_time is not None else 0
- if self.job_manager is not None:
- self.job_manager.job_done(self)
- self._job_done(self)
-
- def start_work(self):
- self.is_running = True
- self.has_run = False
- self.start_time = time.time()
- if self.job_manager is not None:
- self.job_manager.start_work(self)
-
- def update_status(self, percent, msg=None):
- self.percent = percent
- self.msg = msg
- if self.job_manager is not None:
- try:
- self.job_manager.status_update(self)
- except:
- traceback.print_exc()
-
- def status(self):
- if self.is_running:
- return 'WORKING'
- if not self.has_run:
- return 'WAITING'
- if self.has_run:
- if self.exception is None:
- return 'DONE'
- return 'ERROR'
-
- def console_text(self):
- ans = [u'Job: ']
- if self.description:
- ans[0] += self.description
- if self.exception is not None:
- header = unicode(self.exception.__class__.__name__) if \
- hasattr(self.exception, '__class__') else u'Error'
- header = u'**%s**'%header
- header += u': '
- try:
- header += unicode(self.exception)
- except:
- header += unicode(repr(self.exception))
- ans.append(header)
- if self.traceback:
- ans.append(u'**Traceback**:')
- ans.extend(self.traceback.split('\n'))
-
- if self.log:
- if isinstance(self.log, str):
- self.log = unicode(self.log, 'utf-8', 'replace')
- ans.append(self.log)
- return (u'\n'.join(ans)).encode('utf-8')
-
- def gui_text(self):
- ans = [u'Job: ']
- if self.description:
- if not isinstance(self.description, unicode):
- self.description = self.description.decode('utf-8', 'replace')
- ans[0] += u'**%s**'%self.description
- if self.exception is not None:
- header = unicode(self.exception.__class__.__name__) if \
- hasattr(self.exception, '__class__') else u'Error'
- header = u'**%s**'%header
- header += u': '
- try:
- header += unicode(self.exception)
- except:
- header += unicode(repr(self.exception))
- ans.append(header)
- if self.traceback:
- ans.append(u'**Traceback**:')
- ans.extend(self.traceback.split('\n'))
- if self.log:
- ans.append(u'**Log**:')
- if isinstance(self.log, str):
- self.log = unicode(self.log, 'utf-8', 'replace')
- ans.extend(self.log.split('\n'))
-
- ans = [x.decode(preferred_encoding, 'replace') if isinstance(x, str) else x for x in ans]
-
- return u'
'.join(ans)
-
-
-class ParallelJob(Job):
-
- def __init__(self, func, *args, **kwargs):
- Job.__init__(self, *args, **kwargs)
- self.func = func
- self.done = self.job_done
-
- def output(self, msg):
- if not self.log:
- self.log = u''
- if not isinstance(msg, unicode):
- msg = msg.decode('utf-8', 'replace')
- if msg:
- self.log += msg
- if self.job_manager is not None:
- self.job_manager.output(self)
-
-
-def remove_ipc_socket(path):
- os = __import__('os')
- if os.path.exists(path):
- os.unlink(path)
-
-class Server(Thread):
-
- KILL_RESULT = Overseer.KILL_RESULT
- START_PORT = 10013
- PID = os.getpid()
-
-
- def __init__(self, number_of_workers=detect_ncpus()):
- Thread.__init__(self)
- self.setDaemon(True)
- self.server_socket = socket.socket(SOCKET_TYPE, socket.SOCK_STREAM)
- self.port = tempfile.mktemp(prefix='calibre_server')+'_%d_'%self.PID if not iswindows else self.START_PORT
- while True:
- try:
- address = ('localhost', self.port) if iswindows else self.port
- self.server_socket.bind(address)
- break
- except socket.error:
- self.port += (1 if iswindows else '1')
- if not iswindows:
- atexit.register(remove_ipc_socket, self.port)
- self.server_socket.listen(5)
- self.number_of_workers = number_of_workers
- self.pool, self.jobs, self.working = [], collections.deque(), []
- atexit.register(self.killall)
- atexit.register(self.close)
- self.job_lock = RLock()
- self.overseer_lock = RLock()
- self.working_lock = RLock()
- self.result_lock = RLock()
- self.pool_lock = RLock()
- self.start()
-
- def split(self, tasks):
- '''
- Split a list into a list of sub lists, with the number of sub lists being
- no more than the number of workers this server supports. Each sublist contains
- two tuples of the form (i, x) where x is an element fro the original list
- and i is the index of the element x in the original list.
- '''
- ans, count, pos = [], 0, 0
- delta = int(ceil(len(tasks)/float(self.number_of_workers)))
- while count < len(tasks):
- section = []
- for t in tasks[pos:pos+delta]:
- section.append((count, t))
- count += 1
- ans.append(section)
- pos += delta
- return ans
-
-
- def close(self):
- try:
- self.server_socket.shutdown(socket.SHUT_RDWR)
- except:
- pass
-
- def add_job(self, job):
- with self.job_lock:
- self.jobs.append(job)
- if job.job_manager is not None:
- job.job_manager.add_job(job)
-
- def poll(self):
- '''
- Return True if the server has either working or queued jobs
- '''
- with self.job_lock:
- with self.working_lock:
- return len(self.jobs) + len(self.working) > 0
-
- def wait(self, sleep=1):
- '''
- Wait until job queue is empty
- '''
- while self.poll():
- time.sleep(sleep)
-
- def run(self):
- while True:
- job = None
- with self.job_lock:
- if len(self.jobs) > 0 and len(self.working) < self.number_of_workers:
- job = self.jobs.popleft()
- with self.pool_lock:
- o = None
- while self.pool:
- o = self.pool.pop()
- try:
- o.initialize_job(job)
- break
- except:
- o.terminate()
- if o is None:
- o = Overseer(self.server_socket, self.port)
- try:
- o.initialize_job(job)
- except Exception, err:
- o.terminate()
- job.exception = err
- job.traceback = traceback.format_exc()
- job.done()
- o = None
- if o and o.is_viable():
- with self.working_lock:
- self.working.append(o)
-
- with self.working_lock:
- done = []
- for o in self.working:
- try:
- if o.control() is not None or o.job.exception is not None:
- o.job.done()
- done.append(o)
- except Exception, err:
- o.job.exception = err
- o.job.traceback = traceback.format_exc()
- o.terminate()
- o.job.done()
- done.append(o)
- for o in done:
- self.working.remove(o)
- if o and o.is_viable():
- with self.pool_lock:
- self.pool.append(o)
-
- try:
- time.sleep(1)
- except:
- return
-
-
- def killall(self):
- with self.pool_lock:
- map(lambda x: x.terminate(), self.pool)
- self.pool = []
-
-
- def kill(self, job):
- with self.working_lock:
- pop = None
- for o in self.working:
- if o.job == job or o == job:
- try:
- o.terminate()
- except: pass
- o.job.exception = JobKilled(_('Job stopped by user'))
- try:
- o.job.done()
- except: pass
- pop = o
- break
- if pop is not None:
- self.working.remove(pop)
-
- def run_free_job(self, func, args=[], kwdargs={}):
- pt = PersistentTemporaryFile('.pickle', '_IPC_')
- pt.write(cPickle.dumps((func, args, kwdargs)))
- pt.close()
- mother.spawn_free_spirit(binascii.hexlify(pt.name))
-
-
-##########################################################################################
-##################################### CLIENT CODE #####################################
-##########################################################################################
-
-class BufferedSender(object):
-
- def __init__(self, socket):
- self.socket = socket
- self.wbuf, self.pbuf = [], []
- self.wlock, self.plock = RLock(), RLock()
- self.last_report = None
- self.timer = RepeatingTimer(0.5, self.send, 'BufferedSender')
- self.timer.start()
-
-
- def write(self, msg):
- if not isinstance(msg, basestring):
- msg = unicode(msg)
- with self.wlock:
- self.wbuf.append(msg)
-
- def send(self):
- if callable(select) and select([self.socket], [], [], 0)[0]:
- msg = read(self.socket)
- if msg == 'PING:':
- write(self.socket, 'OK')
- elif msg:
- self.socket.shutdown(socket.SHUT_RDWR)
- thread.interrupt_main()
- time.sleep(1)
- raise SystemExit
- if not select([], [self.socket], [], 30)[1]:
- print >>sys.__stderr__, 'Cannot pipe to overseer'
- return
-
- reported = False
- with self.wlock:
- if self.wbuf:
- msg = cPickle.dumps(self.wbuf, -1)
- self.wbuf = []
- write(self.socket, 'OUTPUT:'+msg)
- read(self.socket, 10)
- reported = True
-
- with self.plock:
- if self.pbuf:
- msg = cPickle.dumps(self.pbuf, -1)
- self.pbuf = []
- write(self.socket, 'PROGRESS:'+msg)
- read(self.socket, 10)
- reported = True
-
- if self.last_report is not None:
- if reported:
- self.last_report = time.time()
- elif time.time() - self.last_report > 60:
- write(self.socket, 'PING:')
- read(self.socket, 10)
- self.last_report = time.time()
-
- def notify(self, percent, msg=''):
- with self.plock:
- self.pbuf.append((percent, msg))
-
- def flush(self):
- pass
-
-def get_func(name):
- module, func, kwdargs, notification = PARALLEL_FUNCS[name]
- module = __import__(module, fromlist=[1])
- func = getattr(module, func)
- return func, kwdargs, notification
-
-_atexit = collections.deque()
-def myatexit(func, *args, **kwargs):
- _atexit.append((func, args, kwargs))
-
-def work(client_socket, func, args, kwdargs):
- sys.stdout.last_report = time.time()
- orig = atexit.register
- atexit.register = myatexit
- try:
- func, kargs, notification = get_func(func)
- if notification is not None and hasattr(sys.stdout, 'notify'):
- kargs[notification] = sys.stdout.notify
- kargs.update(kwdargs)
- res = func(*args, **kargs)
- if hasattr(sys.stdout, 'send'):
- sys.stdout.send()
- return res
- finally:
- atexit.register = orig
- sys.stdout.last_report = None
- while True:
- try:
- func, args, kwargs = _atexit.pop()
- except IndexError:
- break
- try:
- func(*args, **kwargs)
- except (Exception, SystemExit):
- continue
-
- time.sleep(5) # Give any in progress BufferedSend time to complete
-
-
-def worker(host, port):
- client_socket = socket.socket(SOCKET_TYPE, socket.SOCK_STREAM)
- address = (host, port) if iswindows else port
- client_socket.connect(address)
- write(client_socket, 'CALIBRE_WORKER:%d'%os.getpid())
- msg = read(client_socket, timeout=10)
- if msg != 'OK':
- return 1
- write(client_socket, 'WAITING')
-
- sys.stdout = BufferedSender(client_socket)
- sys.stderr = sys.stdout
-
- while True:
- if not select([client_socket], [], [], 60)[0]:
- time.sleep(1)
- continue
- msg = read(client_socket, timeout=60)
- if msg.startswith('JOB:'):
- func, args, kwdargs = cPickle.loads(msg[4:])
- write(client_socket, 'OK')
- try:
- result = work(client_socket, func, args, kwdargs)
- write(client_socket, 'RESULT:'+ cPickle.dumps(result))
- except BaseException, err:
- exception = (err.__class__.__name__, unicode(str(err), 'utf-8', 'replace'))
- tb = unicode(traceback.format_exc(), 'utf-8', 'replace')
- msg = 'ERROR:'+cPickle.dumps((exception, tb),-1)
- write(client_socket, msg)
- res = read(client_socket, 10)
- if res != 'OK':
- break
- gc.collect()
- elif msg == 'PING:':
- write(client_socket, 'OK')
- elif msg == 'STOP:':
- client_socket.shutdown(socket.SHUT_RDWR)
- return 0
- elif not msg:
- time.sleep(1)
- else:
- print >>sys.__stderr__, 'Invalid protocols message', msg
- return 1
-
-def free_spirit(path):
- func, args, kwdargs = cPickle.load(open(path, 'rb'))
- try:
- os.unlink(path)
- except:
- pass
- func, kargs = get_func(func)[:2]
- kargs.update(kwdargs)
- func(*args, **kargs)
-
-def main(args=sys.argv):
- global isworker
- isworker = True
- args = args[1].split(':')
- if len(args) == 1:
- free_spirit(binascii.unhexlify(re.sub(r'[^a-f0-9A-F]', '', args[0])))
- else:
- worker(args[0].replace("'", ''), int(args[1]) if iswindows else args[1])
- return 0
-
-if __name__ == '__main__':
- sys.exit(main())
-
diff --git a/src/calibre/utils/ipc/job.py b/src/calibre/utils/ipc/job.py
new file mode 100644
index 0000000000..6a055706e3
--- /dev/null
+++ b/src/calibre/utils/ipc/job.py
@@ -0,0 +1,137 @@
+#!/usr/bin/env python
+# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
+from __future__ import with_statement
+
+__license__ = 'GPL v3'
+__copyright__ = '2009, Kovid Goyal '
+__docformat__ = 'restructuredtext en'
+
+_count = 0
+
+import time, cStringIO
+from Queue import Queue, Empty
+
+class BaseJob(object):
+
+ WAITING = 0
+ RUNNING = 1
+ FINISHED = 2
+
+ def __init__(self, description, done=lambda x: x):
+ global _count
+ _count += 1
+
+ self.id = _count
+ self.description = description
+ self.done = done
+ self.done2 = None
+ self.killed = False
+ self.failed = False
+ self.start_time = None
+ self.result = None
+ self.duration = None
+ self.log_path = None
+ self.notifications = Queue()
+
+ self._run_state = self.WAITING
+ self.percent = 0
+ self._message = None
+ self._status_text = _('Waiting...')
+ self._done_called = False
+
+ def update(self):
+ if self.duration is not None:
+ self._run_state = self.FINISHED
+ self.percent = 1
+ if self.killed:
+ self._status_text = _('Stopped')
+ else:
+ self._status_text = _('Error') if self.failed else _('Finished')
+ if not self._done_called:
+ self._done_called = True
+ try:
+ self.done(self)
+ except:
+ pass
+ try:
+ if callable(self.done2):
+ self.done2(self)
+ except:
+ pass
+ elif self.start_time is not None:
+ self._run_state = self.RUNNING
+ self._status_text = _('Working...')
+
+ while True:
+ try:
+ self.percent, self._message = self.notifications.get_nowait()
+ self.percent *= 100.
+ except Empty:
+ break
+
+ @property
+ def status_text(self):
+ if self._run_state == self.FINISHED or not self._message:
+ return self._status_text
+ return self._message
+
+ @property
+ def run_state(self):
+ return self._run_state
+
+ @property
+ def running_time(self):
+ if self.duration is not None:
+ return self.duration
+ if self.start_time is not None:
+ return time.time() - self.start_time
+ return None
+
+ @property
+ def is_finished(self):
+ return self._run_state == self.FINISHED
+
+ @property
+ def is_started(self):
+ return self._run_state != self.WAITING
+
+ @property
+ def is_running(self):
+ return self.is_started and not self.is_finished
+
+ def __cmp__(self, other):
+ if self.is_finished == other.is_finished:
+ if self.start_time is None:
+ if other.start_time is None: # Both waiting
+ return cmp(other.id, self.id)
+ else:
+ return 1
+ else:
+ if other.start_time is None:
+ return -1
+ else: # Both running
+ return cmp(other.start_time, self.start_time)
+
+ else:
+ return 1 if self.is_finished else -1
+ return 0
+
+ @property
+ def log_file(self):
+ if self.log_path:
+ return open(self.log_path, 'rb')
+ return cStringIO.StringIO(_('No details available.'))
+
+ @property
+ def details(self):
+ return self.log_file.read().decode('utf-8')
+
+
+class ParallelJob(BaseJob):
+
+ def __init__(self, name, description, done, args=[], kwargs={}):
+ self.name, self.args, self.kwargs = name, args, kwargs
+ BaseJob.__init__(self, description, done)
+
+
+
diff --git a/src/calibre/utils/ipc/launch.py b/src/calibre/utils/ipc/launch.py
index 6c0ba46885..14530d7fea 100644
--- a/src/calibre/utils/ipc/launch.py
+++ b/src/calibre/utils/ipc/launch.py
@@ -70,7 +70,7 @@ class Worker(object):
@property
def is_alive(self):
- return hasattr(self, 'child') and self.child.poll() is not None
+ return hasattr(self, 'child') and self.child.poll() is None
@property
def returncode(self):
@@ -144,6 +144,7 @@ class Worker(object):
self.child = subprocess.Popen(cmd, **args)
+ self.log_path = ret
return ret
diff --git a/src/calibre/utils/ipc/server.py b/src/calibre/utils/ipc/server.py
index 3d1a86922e..3dec90a644 100644
--- a/src/calibre/utils/ipc/server.py
+++ b/src/calibre/utils/ipc/server.py
@@ -6,5 +6,241 @@ __license__ = 'GPL v3'
__copyright__ = '2009, Kovid Goyal '
__docformat__ = 'restructuredtext en'
+import os, cPickle, time, tempfile
+from math import ceil
+from threading import Thread, RLock
+from Queue import Queue, Empty
+from multiprocessing.connection import Listener
+from multiprocessing import cpu_count
+from collections import deque
+from binascii import hexlify
+
+from calibre.utils.ipc.launch import Worker
+from calibre.utils.ipc.worker import PARALLEL_FUNCS
+
+_counter = 0
+
+class ConnectedWorker(Thread):
+
+ def __init__(self, worker, conn, rfile):
+ Thread.__init__(self)
+ self.daemon = True
+ self.conn = conn
+ self.worker = worker
+ self.notifications = Queue()
+ self._returncode = 'dummy'
+ self.killed = False
+ self.log_path = worker.log_path
+ self.rfile = rfile
+
+ def start_job(self, job):
+ notification = PARALLEL_FUNCS[job.name][-1] is not None
+ self.conn.send((job.name, job.args, job.kwargs))
+ if notification:
+ self.start()
+ else:
+ self.conn.close()
+ self.job = job
+
+ def run(self):
+ while True:
+ try:
+ x = self.conn.recv()
+ self.notifications.put(x)
+ except BaseException:
+ break
+ try:
+ self.conn.close()
+ except BaseException:
+ pass
+
+ def kill(self):
+ self.killed = True
+ try:
+ self.worker.kill()
+ except BaseException:
+ pass
+
+ @property
+ def is_alive(self):
+ return not self.killed and self.worker.is_alive
+
+ @property
+ def returncode(self):
+ if self._returncode != 'dummy':
+ return self._returncode
+ r = self.worker.returncode
+ if self.killed and r is None:
+ self._returncode = 1
+ return 1
+ if r is not None:
+ self._returncode = r
+ return r
+
+class Server(Thread):
+
+ def __init__(self, notify_on_job_done=lambda x: x, pool_size=None):
+ Thread.__init__(self)
+ self.daemon = True
+ global _counter
+ self.id = _counter+1
+ _counter += 1
+
+ self.pool_size = cpu_count() if pool_size is None else pool_size
+ self.notify_on_job_done = notify_on_job_done
+ self.auth_key = os.urandom(32)
+ self.listener = Listener(authkey=self.auth_key, backlog=4)
+ self.add_jobs_queue, self.changed_jobs_queue = Queue(), Queue()
+ self.kill_queue = Queue()
+ self.waiting_jobs, self.processing_jobs = deque(), deque()
+ self.pool, self.workers = deque(), deque()
+ self.launched_worker_count = 0
+ self._worker_launch_lock = RLock()
+
+ self.start()
+
+ def launch_worker(self, gui=False, redirect_output=None):
+ with self._worker_launch_lock:
+ self.launched_worker_count += 1
+ id = self.launched_worker_count
+ rfile = os.path.join(tempfile.gettempdir(),
+ 'calibre_ipc_result_%d_%d.pickle'%(self.id, id))
+
+ env = {
+ 'CALIBRE_WORKER_ADDRESS' :
+ hexlify(cPickle.dumps(self.listener.address, -1)),
+ 'CALIBRE_WORKER_KEY' : hexlify(self.auth_key),
+ 'CALIBRE_WORKER_RESULT' : hexlify(rfile),
+ }
+ w = Worker(env, gui=gui)
+ if redirect_output is None:
+ redirect_output = not gui
+ w(redirect_output=redirect_output)
+ conn = self.listener.accept()
+ if conn is None:
+ raise Exception('Failed to launch worker process')
+ return ConnectedWorker(w, conn, rfile)
+
+ def add_job(self, job):
+ job.done2 = self.notify_on_job_done
+ self.add_jobs_queue.put(job)
+
+ def run_job(self, job, gui=True, redirect_output=False):
+ w = self.launch_worker(gui=gui, redirect_output=redirect_output)
+ w.start_job(job)
+
+
+ def run(self):
+ while True:
+ try:
+ job = self.add_jobs_queue.get(True, 0.2)
+ if job is None:
+ break
+ self.waiting_jobs.append(job)
+ except Empty:
+ pass
+
+ for worker in self.workers:
+ while True:
+ try:
+ n = worker.notifications.get_nowait()
+ worker.job.notifications.put(n)
+ self.changed_jobs_queue.put(job)
+ except Empty:
+ break
+
+ for worker in [w for w in self.workers if not w.is_alive]:
+ self.workers.remove(worker)
+ job = worker.job
+ if worker.returncode != 0:
+ job.failed = True
+ job.returncode = worker.returncode
+ elif os.path.exists(worker.rfile):
+ job.result = cPickle.load(open(worker.rfile, 'rb'))
+ os.remove(worker.rfile)
+ job.duration = time.time() - job.start_time
+ self.changed_jobs_queue.put(job)
+
+ if len(self.pool) + len(self.workers) < self.pool_size:
+ try:
+ self.pool.append(self.launch_worker())
+ except:
+ break
+
+ if len(self.pool) > 0 and len(self.waiting_jobs) > 0:
+ job = self.waiting_jobs.pop()
+ worker = self.pool.pop()
+ job.start_time = time.time()
+ worker.start_job(job)
+ self.workers.append(worker)
+ job.log_path = worker.log_path
+ self.changed_jobs_queue.put(job)
+
+ while True:
+ try:
+ j = self.kill_queue.get_nowait()
+ self._kill_job(j)
+ except Empty:
+ break
+
+ def kill_job(self, job):
+ self.kill_queue.put(job)
+
+ def killall(self):
+ for job in self.workers:
+ self.kill_queue.put(job)
+
+ def _kill_job(self, job):
+ if job.start_time is None: return
+ for worker in self.workers:
+ if job is worker.job:
+ worker.kill()
+ job.killed = True
+ break
+
+ def split(self, tasks):
+ '''
+ Split a list into a list of sub lists, with the number of sub lists being
+ no more than the number of workers this server supports. Each sublist contains
+ two tuples of the form (i, x) where x is an element from the original list
+ and i is the index of the element x in the original list.
+ '''
+ ans, count, pos = [], 0, 0
+ delta = int(ceil(len(tasks)/float(self.pool_size)))
+ while count < len(tasks):
+ section = []
+ for t in tasks[pos:pos+delta]:
+ section.append((count, t))
+ count += 1
+ ans.append(section)
+ pos += delta
+ return ans
+
+
+
+ def close(self):
+ try:
+ self.add_jobs_queue.put(None)
+ self.listener.close()
+ except:
+ pass
+ time.sleep(0.2)
+ for worker in self.workers:
+ try:
+ worker.kill()
+ except:
+ pass
+ for worker in self.pool:
+ try:
+ worker.kill()
+ except:
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
diff --git a/src/calibre/utils/ipc/worker.py b/src/calibre/utils/ipc/worker.py
index 75b42c9a25..6c974568ac 100644
--- a/src/calibre/utils/ipc/worker.py
+++ b/src/calibre/utils/ipc/worker.py
@@ -6,11 +6,12 @@ __license__ = 'GPL v3'
__copyright__ = '2009, Kovid Goyal '
__docformat__ = 'restructuredtext en'
-import os, cPickle
+import os, cPickle, sys
from multiprocessing.connection import Client
from threading import Thread
-from queue import Queue
+from Queue import Queue
from contextlib import closing
+from binascii import unhexlify
PARALLEL_FUNCS = {
'lrfviewer' :
@@ -29,8 +30,8 @@ PARALLEL_FUNCS = {
class Progress(Thread):
def __init__(self, conn):
- self.daemon = True
Thread.__init__(self)
+ self.daemon = True
self.conn = conn
self.queue = Queue()
@@ -56,8 +57,9 @@ def get_func(name):
return func, notification
def main():
- address = cPickle.loads(os.environ['CALIBRE_WORKER_ADDRESS'])
- key = os.environ['CALIBRE_WORKER_KEY']
+ address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS']))
+ key = unhexlify(os.environ['CALIBRE_WORKER_KEY'])
+ resultf = unhexlify(os.environ['CALIBRE_WORKER_RESULT'])
with closing(Client(address, authkey=key)) as conn:
name, args, kwargs = conn.recv()
func, notification = get_func(name)
@@ -66,13 +68,17 @@ def main():
kwargs[notification] = notifier
notifier.start()
- func(*args, **kwargs)
+ result = func(*args, **kwargs)
+ if result is not None:
+ cPickle.dump(result, open(resultf, 'wb'), -1)
notifier.queue.put(None)
+ sys.stdout.flush()
+ sys.stderr.flush()
return 0
if __name__ == '__main__':
- raise SystemExit(main())
+ sys.exit(main())
From 2e0ad5d1e082ef7339c4e744ed3156c208f84cf0 Mon Sep 17 00:00:00 2001
From: Kovid Goyal
Date: Wed, 13 May 2009 19:20:47 -0700
Subject: [PATCH 08/15] Allow recipes to specify overrides for conversion
options
---
src/calibre/manual/news_recipe.rst | 2 +
src/calibre/web/feeds/input.py | 2 +
src/calibre/web/feeds/news.py | 17 +--
.../web/feeds/recipes/recipe_barrons.py | 116 +++++++++---------
.../web/feeds/recipes/recipe_winsupersite.py | 5 +-
5 files changed, 74 insertions(+), 68 deletions(-)
diff --git a/src/calibre/manual/news_recipe.rst b/src/calibre/manual/news_recipe.rst
index 6872d8e532..6eb47a26a1 100644
--- a/src/calibre/manual/news_recipe.rst
+++ b/src/calibre/manual/news_recipe.rst
@@ -54,6 +54,8 @@ Customizing e-book download
.. automember:: BasicNewsRecipe.timefmt
+.. automember:: basicNewsRecipe.conversion_options
+
.. automember:: BasicNewsRecipe.feeds
.. automember:: BasicNewsRecipe.no_stylesheets
diff --git a/src/calibre/web/feeds/input.py b/src/calibre/web/feeds/input.py
index ee003be0da..3052ffebed 100644
--- a/src/calibre/web/feeds/input.py
+++ b/src/calibre/web/feeds/input.py
@@ -57,6 +57,8 @@ class RecipeInput(InputFormatPlugin):
ro = recipe(opts, log, self.report_progress)
ro.download()
+ for key, val in recipe.conversion_options.items():
+ setattr(opts, key, val)
opts.output_profile.flow_size = 0
diff --git a/src/calibre/web/feeds/news.py b/src/calibre/web/feeds/news.py
index 3f6b9b9ae1..eca8eb0c93 100644
--- a/src/calibre/web/feeds/news.py
+++ b/src/calibre/web/feeds/news.py
@@ -156,13 +156,16 @@ class BasicNewsRecipe(Recipe):
#: :attr:`BasicNewsRecipe.filter_regexps` should be defined.
filter_regexps = []
- #: List of options to pass to html2lrf, to customize generation of LRF ebooks.
- html2lrf_options = []
-
- #: Options to pass to html2epub to customize generation of EPUB ebooks.
- html2epub_options = ''
- #: Options to pass to oeb2mobi to customize generation of MOBI ebooks.
- oeb2mobi_options = ''
+ #: Recipe specific options to control the conversion of the downloaded
+ #: content into an e-book. These will override any user or plugin specified
+ #: values, so only use if absolutely necessary. For example:
+ #: conversion_options = {
+ #: 'base_font_size' : 16,
+ #: 'tags' : 'mytag1,mytag2',
+ #: 'title' : 'My Title',
+ #: 'linearize_tables' : True,
+ #: }
+ conversion_options = {}
#: List of tags to be removed. Specified tags are removed from downloaded HTML.
#: A tag is specified as a dictionary of the form::
diff --git a/src/calibre/web/feeds/recipes/recipe_barrons.py b/src/calibre/web/feeds/recipes/recipe_barrons.py
index 164be20d3e..f9f501a9c3 100644
--- a/src/calibre/web/feeds/recipes/recipe_barrons.py
+++ b/src/calibre/web/feeds/recipes/recipe_barrons.py
@@ -1,76 +1,76 @@
##
-## web2lrf profile to download articles from Barrons.com
-## can download subscriber-only content if username and
+## web2lrf profile to download articles from Barrons.com
+## can download subscriber-only content if username and
## password are supplied.
##
-'''
-'''
-
-import re
-
-from calibre.web.feeds.news import BasicNewsRecipe
-
-class Barrons(BasicNewsRecipe):
-
- title = 'Barron\'s'
+'''
+'''
+
+import re
+
+from calibre.web.feeds.news import BasicNewsRecipe
+
+class Barrons(BasicNewsRecipe):
+
+ title = 'Barron\'s'
max_articles_per_feed = 50
needs_subscription = True
language = _('English')
__author__ = 'Kovid Goyal'
description = 'Weekly publication for investors from the publisher of the Wall Street Journal'
- timefmt = ' [%a, %b %d, %Y]'
- use_embedded_content = False
+ timefmt = ' [%a, %b %d, %Y]'
+ use_embedded_content = False
no_stylesheets = False
match_regexps = ['http://online.barrons.com/.*?html\?mod=.*?|file:.*']
- html2lrf_options = [('--ignore-tables'),('--base-font-size=10')]
+ conversion_options = {'linearize_tables': True}
##delay = 1
-
- ## Don't grab articles more than 7 days old
- oldest_article = 7
+
+ ## Don't grab articles more than 7 days old
+ oldest_article = 7
- preprocess_regexps = [(re.compile(i[0], re.IGNORECASE | re.DOTALL), i[1]) for i in
- [
- ## Remove anything before the body of the article.
- (r'