New job system works in linux (needs more testing especially on non-linux systems)

This commit is contained in:
Kovid Goyal 2009-05-13 18:37:51 -07:00
parent 1166406654
commit c3ba827f57
25 changed files with 891 additions and 1416 deletions

View File

@ -2,7 +2,7 @@
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import sys, os, re, logging, time, subprocess, mimetypes, \
import sys, os, re, logging, time, mimetypes, \
__builtin__, warnings, multiprocessing
__builtin__.__dict__['dynamic_property'] = lambda(func): func(None)
from htmlentitydefs import name2codepoint
@ -91,11 +91,16 @@ def prints(*args, **kwargs):
file = kwargs.get('file', sys.stdout)
sep = kwargs.get('sep', ' ')
end = kwargs.get('end', '\n')
enc = preferred_encoding
if 'CALIBRE_WORKER' in os.environ:
enc = 'utf-8'
for i, arg in enumerate(args):
if isinstance(arg, unicode):
arg = arg.encode(preferred_encoding)
arg = arg.encode(enc)
if not isinstance(arg, str):
arg = str(arg)
if not isinstance(arg, unicode):
arg = arg.decode(preferred_encoding, 'replace').encode(enc)
file.write(arg)
if i != len(args)-1:
file.write(sep)

View File

@ -18,9 +18,9 @@ class BEBOOK(USBMS):
VENDOR_ID = [0x0525]
PRODUCT_ID = [0x8803, 0x6803]
BCD = [0x312]
BCD = [0x312]
VENDOR_NAME = 'LINUX'
VENDOR_NAME = 'LINUX'
WINDOWS_MAIN_MEM = 'FILE-STOR_GADGET'
WINDOWS_CARD_MEM = 'FILE-STOR_GADGET'
@ -51,7 +51,7 @@ class BEBOOK_MINI(BEBOOK):
VENDOR_ID = [0x0492]
PRODUCT_ID = [0x8813]
BCD = [0x319]
BCD = [0x319]
OSX_MAIN_MEM = 'BeBook Mini Internal Memory'
OSX_CARD_MEM = 'BeBook Mini Storage Card'

View File

@ -7,12 +7,14 @@ __docformat__ = 'restructuredtext en'
Based on ideas from comiclrf created by FangornUK.
'''
import os, shutil, traceback, textwrap
import os, shutil, traceback, textwrap, time
from Queue import Empty
from calibre.customize.conversion import InputFormatPlugin, OptionRecommendation
from calibre import extract, CurrentDir
from calibre import extract, CurrentDir, prints
from calibre.ptempfile import PersistentTemporaryDirectory
from calibre.parallel import Server, ParallelJob
from calibre.utils.ipc.server import Server
from calibre.utils.ipc.job import ParallelJob
def extract_comic(path_to_comic_file):
'''
@ -47,8 +49,8 @@ def find_pages(dir, sort_on_mtime=False, verbose=False):
pages.sort(cmp=comparator)
if verbose:
print 'Found comic pages...'
print '\t'+'\n\t'.join([os.path.basename(p) for p in pages])
prints('Found comic pages...')
prints('\t'+'\n\t'.join([os.path.basename(p) for p in pages]))
return pages
class PageProcessor(list):
@ -181,7 +183,7 @@ class PageProcessor(list):
p.DestroyPixelWand(pw)
p.DestroyMagickWand(wand)
def render_pages(tasks, dest, opts, notification=None):
def render_pages(tasks, dest, opts, notification=lambda x, y: x):
'''
Entry point for the job server.
'''
@ -197,30 +199,23 @@ def render_pages(tasks, dest, opts, notification=None):
msg = _('Failed %s')%path
if opts.verbose:
msg += '\n' + traceback.format_exc()
if notification is not None:
notification(0.5, msg)
prints(msg)
notification(0.5, msg)
return pages, failures
class JobManager(object):
'''
Simple job manager responsible for keeping track of overall progress.
'''
class Progress(object):
def __init__(self, total, update):
self.total = total
self.update = update
self.done = 0
self.add_job = lambda j: j
self.output = lambda j: j
self.start_work = lambda j: j
self.job_done = lambda j: j
def status_update(self, job):
def __call__(self, percent, msg=''):
self.done += 1
#msg = msg%os.path.basename(job.args[0])
self.update(float(self.done)/self.total, job.msg)
self.update(float(self.done)/self.total, msg)
def process_pages(pages, opts, update, tdir):
'''
@ -229,22 +224,38 @@ def process_pages(pages, opts, update, tdir):
from calibre.utils.PythonMagickWand import ImageMagick
ImageMagick
job_manager = JobManager(len(pages), update)
progress = Progress(len(pages), update)
server = Server()
jobs = []
tasks = [(p, os.path.join(tdir, os.path.basename(p))) for p in pages]
tasks = server.split(pages)
for task in tasks:
jobs.append(ParallelJob('render_pages', lambda s:s, job_manager=job_manager,
jobs.append(ParallelJob('render_pages', '', progress,
args=[task, tdir, opts]))
server.add_job(jobs[-1])
server.wait()
server.killall()
while True:
time.sleep(1)
running = False
for job in jobs:
while True:
try:
x = job.notifications.get_nowait()
progress(*x)
except Empty:
break
job.update()
if not job.is_finished:
running = True
if not running:
break
server.close()
ans, failures = [], []
for job in jobs:
if job.result is None:
raise Exception(_('Failed to process comic: %s\n\n%s')%(job.exception, job.traceback))
if job.failed:
raw_input()
raise Exception(_('Failed to process comic: \n\n%s')%
job.log_file.read())
pages, failures_ = job.result
ans += pages
failures += failures_

View File

@ -668,6 +668,7 @@ OptionRecommendation(name='list_recipes',
self.output_plugin.convert(self.oeb, self.output, self.input_plugin,
self.opts, self.log)
self.ui_reporter(1.)
self.log(self.output_fmt.upper(), 'output written to', self.output)
def create_oebbook(log, path_or_stream, opts, input_plugin, reader=None):
'''

View File

@ -7,7 +7,8 @@ __copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
from calibre.customize.conversion import OutputFormatPlugin
from calibre.customize.conversion import OutputFormatPlugin, \
OptionRecommendation
class LITOutput(OutputFormatPlugin):
@ -15,12 +16,23 @@ class LITOutput(OutputFormatPlugin):
author = 'Marshall T. Vandegrift'
file_type = 'lit'
recommendations = set([
('dont_split_on_page_breaks', False, OptionRecommendation.HIGH),
])
def convert(self, oeb, output_path, input_plugin, opts, log):
self.log, self.opts, self.oeb = log, opts, oeb
from calibre.ebooks.oeb.transforms.manglecase import CaseMangler
from calibre.ebooks.oeb.transforms.rasterize import SVGRasterizer
from calibre.ebooks.oeb.transforms.htmltoc import HTMLTOCAdder
from calibre.ebooks.lit.writer import LitWriter
from calibre.ebooks.oeb.transforms.split import Split
split = Split(not self.opts.dont_split_on_page_breaks,
max_flow_size=0
)
split(self.oeb, self.opts)
tocadder = HTMLTOCAdder()
tocadder(oeb, opts)
mangler = CaseMangler()

View File

@ -96,8 +96,12 @@ def iterlinks(root):
for el in root.iter():
attribs = el.attrib
try:
tag = el.tag
except UnicodeDecodeError:
continue
if el.tag == XHTML('object'):
if tag == XHTML('object'):
codebase = None
## <object> tags have attributes that are relative to
## codebase
@ -122,7 +126,7 @@ def iterlinks(root):
yield (el, attr, attribs[attr], 0)
if el.tag == XHTML('style') and el.text:
if tag == XHTML('style') and el.text:
for match in _css_url_re.finditer(el.text):
yield (el, None, match.group(1), match.start(1))
for match in _css_import_re.finditer(el.text):
@ -801,6 +805,11 @@ class Manifest(object):
self.oeb.logger.warn(
'File %r missing <body/> element' % self.href)
etree.SubElement(data, XHTML('body'))
# Remove microsoft office markup
r = [x for x in data.iterdescendants(etree.Element) if 'microsoft-com' in x.tag]
for x in r:
x.tag = XHTML('span')
return data
def _parse_css(self, data):

View File

@ -12,7 +12,13 @@ from lxml import etree
from urlparse import urlparse
from calibre.ebooks.oeb.base import XPNSMAP, TOC, XHTML
XPath = lambda x: etree.XPath(x, namespaces=XPNSMAP)
from calibre.ebooks import ConversionError
def XPath(x):
try:
return etree.XPath(x, namespaces=XPNSMAP)
except etree.XPathSyntaxError:
raise ConversionError(
'The syntax of the XPath expression %s is invalid.' % repr(x))
class DetectStructure(object):

View File

@ -6,7 +6,7 @@ __copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>, ' \
'2009, John Schember <john@nachtimwald.com>'
__docformat__ = 'restructuredtext en'
import errno, os, re, sys, subprocess
import errno, os, sys, subprocess
from functools import partial
from calibre.ebooks import ConversionError, DRMError
@ -33,7 +33,7 @@ def pdftohtml(pdf_path):
if isinstance(pdf_path, unicode):
pdf_path = pdf_path.encode(sys.getfilesystemencoding())
if not os.access(pdf_path, os.R_OK):
raise ConversionError, 'Cannot read from ' + pdf_path
raise ConversionError('Cannot read from ' + pdf_path)
with TemporaryDirectory('_pdftohtml') as tdir:
index = os.path.join(tdir, 'index.html')
@ -47,7 +47,7 @@ def pdftohtml(pdf_path):
p = popen(cmd, stderr=subprocess.PIPE)
except OSError, err:
if err.errno == 2:
raise ConversionError(_('Could not find pdftohtml, check it is in your PATH'), True)
raise ConversionError(_('Could not find pdftohtml, check it is in your PATH'))
else:
raise
@ -63,13 +63,13 @@ def pdftohtml(pdf_path):
if ret != 0:
err = p.stderr.read()
raise ConversionError, err
raise ConversionError(err)
if not os.path.exists(index) or os.stat(index).st_size < 100:
raise DRMError()
with open(index, 'rb') as i:
raw = i.read()
if not '<br' in raw[:4000]:
raise ConversionError(os.path.basename(pdf_path) + _(' is an image based PDF. Only conversion of text based PDFs is supported.'), True)
raise ConversionError(os.path.basename(pdf_path) + _(' is an image based PDF. Only conversion of text based PDFs is supported.'))
return '<!-- created by calibre\'s pdftohtml -->\n' + raw

View File

@ -176,7 +176,7 @@ class Widget(QWidget):
elif isinstance(g, QCheckBox):
return bool(g.isChecked())
elif isinstance(g, XPathEdit):
return g.xpath
return g.xpath if g.xpath else None
else:
raise Exception('Can\'t get value from %s'%type(g))

View File

@ -189,6 +189,8 @@ class Config(ResizableDialog, Ui_Dialog):
def setup_input_output_formats(self, db, book_id, preferred_input_format,
preferred_output_format):
if preferred_output_format:
preferred_output_format = preferred_output_format.lower()
available_formats = db.formats(book_id, index_is_id=True)
if not available_formats:
available_formats = ''

View File

@ -1,7 +1,7 @@
from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
import os, traceback, Queue, time, socket
import os, traceback, Queue, time, socket, cStringIO
from threading import Thread, RLock
from itertools import repeat
from functools import partial
@ -15,7 +15,7 @@ from calibre.customize.ui import available_input_formats, available_output_forma
from calibre.devices.interface import DevicePlugin
from calibre.constants import iswindows
from calibre.gui2.dialogs.choose_format import ChooseFormatDialog
from calibre.parallel import Job
from calibre.utils.ipc.job import BaseJob
from calibre.devices.scanner import DeviceScanner
from calibre.gui2 import config, error_dialog, Dispatcher, dynamic, \
pixmap_to_data, warning_dialog, \
@ -27,22 +27,46 @@ from calibre.devices.errors import FreeSpaceError
from calibre.utils.smtp import compose_mail, sendmail, extract_email_address, \
config as email_config
class DeviceJob(Job):
class DeviceJob(BaseJob):
def __init__(self, func, *args, **kwargs):
Job.__init__(self, *args, **kwargs)
def __init__(self, func, done, job_manager, args=[], kwargs={},
description=''):
BaseJob.__init__(self, description, done=done)
self.func = func
self.args, self.kwargs = args, kwargs
self.job_manager = job_manager
self.job_manager.add_job(self)
self.details = _('No details available.')
def start_work(self):
self.start_time = time.time()
self.job_manager.changed_queue.put(self)
def job_done(self):
self.duration = time.time() - self.start_time()
self.job_manager.changed_queue.put(self)
self.job_manager.job_done(self)
def report_progress(self, percent, msg=''):
self.notifications.put((percent, msg))
self.job_manager.changed_queue.put(self)
def run(self):
self.start_work()
try:
self.result = self.func(*self.args, **self.kwargs)
except (Exception, SystemExit), err:
self.failed = True
self.details = unicode(err) + '\n\n' + \
traceback.format_exc()
self.exception = err
self.traceback = traceback.format_exc()
finally:
self.job_done()
@property
def log_file(self):
return cStringIO.StringIO(self.details.encode('utf-8'))
class DeviceManager(Thread):
@ -113,7 +137,7 @@ class DeviceManager(Thread):
job = self.next()
if job is not None:
self.current_job = job
self.device.set_progress_reporter(job.update_status)
self.device.set_progress_reporter(job.report_progress)
self.current_job.run()
self.current_job = None
else:

View File

@ -1,66 +0,0 @@
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
'''Display active jobs'''
from PyQt4.QtCore import Qt, QObject, SIGNAL, QSize, QString, QTimer
from PyQt4.QtGui import QDialog, QAbstractItemDelegate, QStyleOptionProgressBarV2, \
QApplication, QStyle
from calibre.gui2.dialogs.jobs_ui import Ui_JobsDialog
from calibre import __appname__
class ProgressBarDelegate(QAbstractItemDelegate):
def sizeHint(self, option, index):
return QSize(120, 30)
def paint(self, painter, option, index):
opts = QStyleOptionProgressBarV2()
opts.rect = option.rect
opts.minimum = 1
opts.maximum = 100
opts.textVisible = True
percent, ok = index.model().data(index, Qt.DisplayRole).toInt()
if not ok:
percent = 0
opts.progress = percent
opts.text = QString(_('Unavailable') if percent == 0 else '%d%%'%percent)
QApplication.style().drawControl(QStyle.CE_ProgressBar, opts, painter)
class JobsDialog(QDialog, Ui_JobsDialog):
def __init__(self, window, model):
QDialog.__init__(self, window)
Ui_JobsDialog.__init__(self)
self.setupUi(self)
self.jobs_view.setModel(model)
self.model = model
self.setWindowModality(Qt.NonModal)
self.setWindowTitle(__appname__ + _(' - Jobs'))
QObject.connect(self.jobs_view.model(), SIGNAL('modelReset()'),
self.jobs_view.resizeColumnsToContents)
QObject.connect(self.kill_button, SIGNAL('clicked()'),
self.kill_job)
QObject.connect(self, SIGNAL('kill_job(int, PyQt_PyObject)'),
self.jobs_view.model().kill_job)
self.pb_delegate = ProgressBarDelegate(self)
self.jobs_view.setItemDelegateForColumn(2, self.pb_delegate)
self.running_time_timer = QTimer(self)
self.connect(self.running_time_timer, SIGNAL('timeout()'), self.update_running_time)
self.running_time_timer.start(1000)
def update_running_time(self, *args):
try:
self.model.running_time_updated()
except: # Raises random exceptions on OS X
pass
def kill_job(self):
for index in self.jobs_view.selectedIndexes():
row = index.row()
self.model.kill_job(row, self)
return
def closeEvent(self, e):
self.jobs_view.write_settings()
e.accept()

View File

@ -1,7 +1,8 @@
<ui version="4.0" >
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>JobsDialog</class>
<widget class="QDialog" name="JobsDialog" >
<property name="geometry" >
<widget class="QDialog" name="JobsDialog">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
@ -9,31 +10,32 @@
<height>542</height>
</rect>
</property>
<property name="windowTitle" >
<property name="windowTitle">
<string>Active Jobs</string>
</property>
<property name="windowIcon" >
<iconset resource="../images.qrc" >:/images/jobs.svg</iconset>
<property name="windowIcon">
<iconset resource="../images.qrc">
<normaloff>:/images/jobs.svg</normaloff>:/images/jobs.svg</iconset>
</property>
<layout class="QVBoxLayout" >
<layout class="QVBoxLayout">
<item>
<widget class="JobsView" name="jobs_view" >
<property name="contextMenuPolicy" >
<widget class="JobsView" name="jobs_view">
<property name="contextMenuPolicy">
<enum>Qt::NoContextMenu</enum>
</property>
<property name="editTriggers" >
<property name="editTriggers">
<set>QAbstractItemView::NoEditTriggers</set>
</property>
<property name="alternatingRowColors" >
<property name="alternatingRowColors">
<bool>true</bool>
</property>
<property name="selectionMode" >
<property name="selectionMode">
<enum>QAbstractItemView::SingleSelection</enum>
</property>
<property name="selectionBehavior" >
<property name="selectionBehavior">
<enum>QAbstractItemView::SelectRows</enum>
</property>
<property name="iconSize" >
<property name="iconSize">
<size>
<width>32</width>
<height>32</height>
@ -42,12 +44,19 @@
</widget>
</item>
<item>
<widget class="QPushButton" name="kill_button" >
<property name="text" >
<widget class="QPushButton" name="kill_button">
<property name="text">
<string>&amp;Stop selected job</string>
</property>
</widget>
</item>
<item>
<widget class="QPushButton" name="details_button">
<property name="text">
<string>Show job &amp;details</string>
</property>
</widget>
</item>
</layout>
</widget>
<customwidgets>
@ -58,7 +67,7 @@
</customwidget>
</customwidgets>
<resources>
<include location="../images.qrc" />
<include location="../images.qrc"/>
</resources>
<connections/>
</ui>

View File

@ -86,7 +86,7 @@ class UserProfiles(ResizableDialog, Ui_Dialog):
self.source_code.setPlainText('')
else:
self.source_code.setPlainText(src)
#self.highlighter = PythonHighlighter(self.source_code.document())
self.highlighter = PythonHighlighter(self.source_code.document())
self.stacks.setCurrentIndex(1)
self.toggle_mode_button.setText(_('Switch to Basic mode'))

257
src/calibre/gui2/jobs.py Normal file
View File

@ -0,0 +1,257 @@
#!/usr/bin/env python
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
__docformat__ = 'restructuredtext en'
'''
Job management.
'''
from Queue import Empty, Queue
from PyQt4.Qt import QAbstractTableModel, QVariant, QModelIndex, Qt, \
QTimer, SIGNAL, QIcon, QDialog, QAbstractItemDelegate, QApplication, \
QSize, QStyleOptionProgressBarV2, QString, QStyle
from calibre.utils.ipc.server import Server
from calibre.utils.ipc.job import ParallelJob
from calibre.gui2 import Dispatcher, error_dialog, NONE
from calibre.gui2.device import DeviceJob
from calibre.gui2.dialogs.jobs_ui import Ui_JobsDialog
from calibre import __appname__
class JobManager(QAbstractTableModel):
def __init__(self):
QAbstractTableModel.__init__(self)
self.wait_icon = QVariant(QIcon(':/images/jobs.svg'))
self.running_icon = QVariant(QIcon(':/images/exec.svg'))
self.error_icon = QVariant(QIcon(':/images/dialog_error.svg'))
self.done_icon = QVariant(QIcon(':/images/ok.svg'))
self.jobs = []
self.add_job = Dispatcher(self._add_job)
self.job_done = Dispatcher(self._job_done)
self.server = Server(self.job_done)
self.changed_queue = Queue()
self.timer = QTimer(self)
self.connect(self.timer, SIGNAL('timeout()'), self.update,
Qt.QueuedConnection)
self.timer.start(1000)
def columnCount(self, parent=QModelIndex()):
return 4
def rowCount(self, parent=QModelIndex()):
return len(self.jobs)
def headerData(self, section, orientation, role):
if role != Qt.DisplayRole:
return NONE
if orientation == Qt.Horizontal:
if section == 0: text = _('Job')
elif section == 1: text = _('Status')
elif section == 2: text = _('Progress')
elif section == 3: text = _('Running time')
return QVariant(text)
else:
return QVariant(section+1)
def data(self, index, role):
try:
if role not in (Qt.DisplayRole, Qt.DecorationRole):
return NONE
row, col = index.row(), index.column()
job = self.jobs[row]
if role == Qt.DisplayRole:
if col == 0:
desc = job.description
if not desc:
desc = _('Unknown job')
return QVariant(desc)
if col == 1:
return QVariant(job.status_text)
if col == 2:
return QVariant(job.percent)
if col == 3:
rtime = job.running_time
if rtime is None:
return NONE
return QVariant('%dm %ds'%(int(rtime)//60, int(rtime)%60))
if role == Qt.DecorationRole and col == 0:
state = job.run_state
if state == job.WAITING:
return self.wait_icon
if state == job.RUNNING:
return self.running_icon
if job.killed or job.failed:
return self.error_icon
return self.done_icon
except:
import traceback
traceback.print_exc()
return NONE
def update(self):
try:
self._update()
except BaseException:
pass
def _update(self):
# Update running time
rows = set([])
for i, j in enumerate(self.jobs):
if j.run_state == j.RUNNING:
idx = self.index(i, 3)
self.emit(SIGNAL('dataChanged(QModelIndex,QModelIndex)'),
idx, idx)
# Update parallel jobs
jobs = set([])
while True:
try:
jobs.add(self.server.changed_jobs_queue.get_nowait())
except Empty:
break
while True:
try:
jobs.add(self.changed_queue.get_nowait())
except Empty:
break
if jobs:
needs_reset = False
for job in jobs:
orig_state = job.run_state
job.update()
if orig_state != job.run_state:
needs_reset = True
if needs_reset:
self.jobs.sort()
self.reset()
else:
for job in jobs:
idx = self.jobs.index(job)
self.emit(SIGNAL('dataChanged(QModelIndex,QModelIndex)'),
self.index(idx, 0), self.index(idx, 3))
def _add_job(self, job):
self.emit(SIGNAL('layoutAboutToBeChanged()'))
self.jobs.append(job)
self.jobs.sort()
self.emit(SIGNAL('job_added(int)'), len(self.unfinished_jobs()))
self.emit(SIGNAL('layoutChanged()'))
def done_jobs(self):
return [j for j in self.jobs if j.is_finished]
def unfinished_jobs(self):
return [j for j in self.jobs if not j.is_finished]
def row_to_job(self, row):
return self.jobs[row]
def _job_done(self, job):
self.emit(SIGNAL('layoutAboutToBeChanged()'))
self.jobs.sort()
self.emit(SIGNAL('job_done(int)'), len(self.unfinished_jobs()))
self.emit(SIGNAL('layoutChanged()'))
def has_device_jobs(self):
for job in self.jobs:
if job.is_running and isinstance(job, DeviceJob):
return True
return False
def has_jobs(self):
for job in self.jobs:
if job.is_running:
return True
return False
def run_job(self, done, name, args=[], kwargs={},
description=''):
job = ParallelJob(name, description, done, args=args, kwargs=kwargs)
self.add_job(job)
self.server.add_job(job)
return job
def launch_gui_app(self, name, args=[], kwargs={}, description=''):
job = ParallelJob(name, description, lambda x: x,
args=args, kwargs=kwargs)
self.server.run_job(job, gui=True, redirect_output=False)
def kill_job(self, row, view):
job = self.jobs[row]
if isinstance(job, DeviceJob):
return error_dialog(view, _('Cannot kill job'),
_('Cannot kill jobs that communicate with the device')).exec_()
if job.duration is not None:
return error_dialog(view, _('Cannot kill job'),
_('Job has already run')).exec_()
self.server.kill_job(job)
def terminate_all_jobs(self):
self.server.killall()
class ProgressBarDelegate(QAbstractItemDelegate):
def sizeHint(self, option, index):
return QSize(120, 30)
def paint(self, painter, option, index):
opts = QStyleOptionProgressBarV2()
opts.rect = option.rect
opts.minimum = 1
opts.maximum = 100
opts.textVisible = True
percent, ok = index.model().data(index, Qt.DisplayRole).toInt()
if not ok:
percent = 0
opts.progress = percent
opts.text = QString(_('Unavailable') if percent == 0 else '%d%%'%percent)
QApplication.style().drawControl(QStyle.CE_ProgressBar, opts, painter)
class JobsDialog(QDialog, Ui_JobsDialog):
def __init__(self, window, model):
QDialog.__init__(self, window)
Ui_JobsDialog.__init__(self)
self.setupUi(self)
self.jobs_view.setModel(model)
self.model = model
self.setWindowModality(Qt.NonModal)
self.setWindowTitle(__appname__ + _(' - Jobs'))
self.connect(self.jobs_view.model(), SIGNAL('modelReset()'),
self.jobs_view.resizeColumnsToContents)
self.connect(self.kill_button, SIGNAL('clicked()'),
self.kill_job)
self.connect(self.details_button, SIGNAL('clicked()'),
self.show_details)
self.connect(self, SIGNAL('kill_job(int, PyQt_PyObject)'),
self.jobs_view.model().kill_job)
self.pb_delegate = ProgressBarDelegate(self)
self.jobs_view.setItemDelegateForColumn(2, self.pb_delegate)
def kill_job(self):
for index in self.jobs_view.selectedIndexes():
row = index.row()
self.model.kill_job(row, self)
return
def show_details(self):
for index in self.jobs_view.selectedIndexes():
self.jobs_view.show_details(index)
return
def closeEvent(self, e):
self.jobs_view.write_settings()
e.accept()

View File

@ -1,203 +0,0 @@
#!/usr/bin/env python
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
__docformat__ = 'restructuredtext en'
'''
Job management.
'''
import time
from PyQt4.QtCore import QAbstractTableModel, QVariant, QModelIndex, Qt, SIGNAL
from PyQt4.QtGui import QIcon, QDialog
from calibre.parallel import ParallelJob, Server
from calibre.gui2 import Dispatcher, error_dialog
from calibre.gui2.device import DeviceJob
from calibre.gui2.dialogs.job_view_ui import Ui_Dialog
NONE = QVariant()
class JobManager(QAbstractTableModel):
def __init__(self):
QAbstractTableModel.__init__(self)
self.wait_icon = QVariant(QIcon(':/images/jobs.svg'))
self.running_icon = QVariant(QIcon(':/images/exec.svg'))
self.error_icon = QVariant(QIcon(':/images/dialog_error.svg'))
self.done_icon = QVariant(QIcon(':/images/ok.svg'))
self.jobs = []
self.server = Server()
self.add_job = Dispatcher(self._add_job)
self.status_update = Dispatcher(self._status_update)
self.start_work = Dispatcher(self._start_work)
self.job_done = Dispatcher(self._job_done)
def columnCount(self, parent=QModelIndex()):
return 4
def rowCount(self, parent=QModelIndex()):
return len(self.jobs)
def headerData(self, section, orientation, role):
if role != Qt.DisplayRole:
return NONE
if orientation == Qt.Horizontal:
if section == 0: text = _("Job")
elif section == 1: text = _("Status")
elif section == 2: text = _("Progress")
elif section == 3: text = _('Running time')
return QVariant(text)
else:
return QVariant(section+1)
def data(self, index, role):
try:
if role not in (Qt.DisplayRole, Qt.DecorationRole):
return NONE
row, col = index.row(), index.column()
job = self.jobs[row]
if role == Qt.DisplayRole:
if col == 0:
desc = job.description
if not desc:
desc = _('Unknown job')
return QVariant(desc)
if col == 1:
status = job.status()
if status == 'DONE':
st = _('Finished')
elif status == 'ERROR':
st = _('Error')
elif status == 'WAITING':
st = _('Waiting')
else:
st = _('Working')
return QVariant(st)
if col == 2:
pc = job.percent
if pc <=0:
percent = 0
else:
percent = int(100*pc)
return QVariant(percent)
if col == 3:
if job.start_time is None:
return NONE
rtime = job.running_time if job.running_time is not None else \
time.time() - job.start_time
return QVariant('%dm %ds'%(int(rtime)//60, int(rtime)%60))
if role == Qt.DecorationRole and col == 0:
status = job.status()
if status == 'WAITING':
return self.wait_icon
if status == 'WORKING':
return self.running_icon
if status == 'ERROR':
return self.error_icon
if status == 'DONE':
return self.done_icon
except:
import traceback
traceback.print_exc()
return NONE
def _add_job(self, job):
self.emit(SIGNAL('layoutAboutToBeChanged()'))
self.jobs.append(job)
self.jobs.sort()
self.emit(SIGNAL('job_added(int)'), self.rowCount())
self.emit(SIGNAL('layoutChanged()'))
def done_jobs(self):
return [j for j in self.jobs if j.status() in ['DONE', 'ERROR']]
def row_to_job(self, row):
return self.jobs[row]
def _start_work(self, job):
self.emit(SIGNAL('layoutAboutToBeChanged()'))
self.jobs.sort()
self.emit(SIGNAL('layoutChanged()'))
def _job_done(self, job):
self.emit(SIGNAL('layoutAboutToBeChanged()'))
self.jobs.sort()
self.emit(SIGNAL('job_done(int)'), len(self.jobs) - len(self.done_jobs()))
self.emit(SIGNAL('layoutChanged()'))
def _status_update(self, job):
try:
row = self.jobs.index(job)
except ValueError: # Job has been stopped
return
self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'),
self.index(row, 0), self.index(row, 3))
def running_time_updated(self, *args):
for job in self.jobs:
if not job.is_running:
continue
row = self.jobs.index(job)
self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'),
self.index(row, 3), self.index(row, 3))
def has_device_jobs(self):
for job in self.jobs:
if job.is_running and isinstance(job, DeviceJob):
return True
return False
def has_jobs(self):
for job in self.jobs:
if job.is_running:
return True
return False
def run_job(self, done, func, args=[], kwargs={},
description=None):
job = ParallelJob(func, done, self, args=args, kwargs=kwargs,
description=description)
self.server.add_job(job)
return job
def output(self, job):
self.emit(SIGNAL('output_received()'))
def kill_job(self, row, view):
job = self.jobs[row]
if isinstance(job, DeviceJob):
error_dialog(view, _('Cannot kill job'),
_('Cannot kill jobs that communicate with the device')).exec_()
return
if job.has_run:
error_dialog(view, _('Cannot kill job'),
_('Job has already run')).exec_()
return
if not job.is_running:
self.jobs.remove(job)
self.reset()
return
self.server.kill(job)
def terminate_all_jobs(self):
pass
class DetailView(QDialog, Ui_Dialog):
def __init__(self, parent, job):
QDialog.__init__(self, parent)
self.setupUi(self)
self.setWindowTitle(job.description)
self.job = job
self.update()
def update(self):
self.log.setPlainText(self.job.console_text())
vbar = self.log.verticalScrollBar()
vbar.setValue(vbar.maximum())

View File

@ -13,7 +13,7 @@ from PyQt4.Qt import Qt, SIGNAL, QObject, QCoreApplication, QUrl, QTimer, \
from PyQt4.QtSvg import QSvgRenderer
from calibre import __version__, __appname__, islinux, sanitize_file_name, \
iswindows, isosx
iswindows, isosx, prints
from calibre.ptempfile import PersistentTemporaryFile
from calibre.utils.config import prefs, dynamic
from calibre.gui2 import APP_UID, warning_dialog, choose_files, error_dialog, \
@ -32,10 +32,9 @@ from calibre.gui2.main_window import MainWindow, option_parser as _option_parser
from calibre.gui2.main_ui import Ui_MainWindow
from calibre.gui2.device import DeviceManager, DeviceMenu, DeviceGUI, Emailer
from calibre.gui2.status import StatusBar
from calibre.gui2.jobs2 import JobManager
from calibre.gui2.jobs import JobManager, JobsDialog
from calibre.gui2.dialogs.metadata_single import MetadataSingleDialog
from calibre.gui2.dialogs.metadata_bulk import MetadataBulkDialog
from calibre.gui2.dialogs.jobs import JobsDialog
from calibre.gui2.tools import convert_single_ebook, convert_bulk_ebook, \
fetch_scheduled_recipe
from calibre.gui2.dialogs.config import ConfigDialog
@ -44,7 +43,6 @@ from calibre.gui2.dialogs.choose_format import ChooseFormatDialog
from calibre.gui2.dialogs.book_info import BookInfo
from calibre.ebooks import BOOK_EXTENSIONS
from calibre.library.database2 import LibraryDatabase2, CoverCache
from calibre.parallel import JobKilled
from calibre.gui2.dialogs.confirm_delete import confirm
class SaveMenu(QMenu):
@ -626,9 +624,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
'''
Called once device information has been read.
'''
if job.exception is not None:
self.device_job_exception(job)
return
if job.failed:
return self.device_job_exception(job)
info, cp, fs = job.result
self.location_view.model().update_devices(cp, fs)
self.device_info = _('Connected ')+info[0]
@ -641,7 +638,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
'''
Called once metadata has been read for all books on the device.
'''
if job.exception is not None:
if job.failed:
if isinstance(job.exception, ExpatError):
error_dialog(self, _('Device database corrupted'),
_('''
@ -823,8 +820,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
Called once deletion is done on the device
'''
for view in (self.memory_view, self.card_a_view, self.card_b_view):
view.model().deletion_done(job, bool(job.exception))
if job.exception is not None:
view.model().deletion_done(job, job.failed)
if job.failed:
self.device_job_exception(job)
return
@ -993,9 +990,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
progress.hide()
def books_saved(self, job):
if job.exception is not None:
self.device_job_exception(job)
return
if job.failed:
return self.device_job_exception(job)
############################################################################
@ -1013,9 +1009,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
def scheduled_recipe_fetched(self, job):
temp_files, fmt, recipe, callback = self.conversion_jobs.pop(job)
pt = temp_files[0]
if job.exception is not None:
self.job_exception(job)
return
if job.failed:
return self.job_exception(job)
id = self.library_view.model().add_news(pt.name, recipe)
self.library_view.model().reset()
sync = dynamic.get('news_to_be_synced', set([]))
@ -1098,9 +1093,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
def book_auto_converted(self, job):
temp_files, fmt, book_id, on_card = self.conversion_jobs.pop(job)
try:
if job.exception is not None:
self.job_exception(job)
return
if job.failed:
return self.job_exception(job)
data = open(temp_files[0].name, 'rb')
self.library_view.model().db.add_format(book_id, fmt, data, index_is_id=True)
data.close()
@ -1122,7 +1116,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
def book_converted(self, job):
temp_files, fmt, book_id = self.conversion_jobs.pop(job)
try:
if job.exception is not None:
if job.failed:
self.job_exception(job)
return
data = open(temp_files[-1].name, 'rb')
@ -1151,7 +1145,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
self._view_file(fmt_path)
def book_downloaded_for_viewing(self, job):
if job.exception:
if job.failed:
self.device_job_exception(job)
return
self._view_file(job.result)
@ -1165,12 +1159,11 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
args.append('--raise-window')
if name is not None:
args.append(name)
self.job_manager.server.run_free_job(viewer,
kwdargs=dict(args=args))
self.job_manager.launch_gui_app(viewer,
kwargs=dict(args=args))
else:
QDesktopServices.openUrl(QUrl.fromLocalFile(name))#launch(name)
time.sleep(5) # User feedback
time.sleep(2) # User feedback
finally:
self.unsetCursor()
@ -1395,7 +1388,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
'''
try:
if 'Could not read 32 bytes on the control bus.' in \
unicode(job.exception):
unicode(job.details):
error_dialog(self, _('Error talking to device'),
_('There was a temporary error talking to the '
'device. Please unplug and reconnect the device '
@ -1404,16 +1397,16 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
except:
pass
try:
print >>sys.stderr, job.console_text()
prints(job.details, file=sys.stderr)
except:
pass
if not self.device_error_dialog.isVisible():
self.device_error_dialog.set_message(job.gui_text())
self.device_error_dialog.set_message(job.details)
self.device_error_dialog.show()
def job_exception(self, job):
try:
if job.exception[0] == 'DRMError':
if 'calibre.ebooks.DRMError' in job.details:
error_dialog(self, _('Conversion Error'),
_('<p>Could not convert: %s<p>It is a '
'<a href="%s">DRM</a>ed book. You must first remove the '
@ -1423,23 +1416,15 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
return
except:
pass
only_msg = getattr(job.exception, 'only_msg', False)
if job.killed:
return
try:
print job.console_text()
prints(job.details, file=sys.stderr)
except:
pass
if only_msg:
try:
exc = unicode(job.exception)
except:
exc = repr(job.exception)
error_dialog(self, _('Conversion Error'), exc).exec_()
return
if isinstance(job.exception, JobKilled):
return
error_dialog(self, _('Conversion Error'),
_('Failed to process')+': '+unicode(job.description),
det_msg=job.console_text()).exec_()
_('<b>Failed</b>')+': '+unicode(job.description),
det_msg=job.details).exec_()
def initialize_database(self):
@ -1555,7 +1540,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
def shutdown(self, write_settings=True):
if write_settings:
self.write_settings()
self.job_manager.terminate_all_jobs()
self.job_manager.server.close()
self.device_manager.keep_going = False
self.cover_cache.stop()
self.hide()

View File

@ -10,10 +10,10 @@ from calibre.gui2 import qstring_to_unicode, config
class BookInfoDisplay(QWidget):
class BookCoverDisplay(QLabel):
WIDTH = 81
HEIGHT = 108
def __init__(self, coverpath=':/images/book.svg'):
QLabel.__init__(self)
self.default_pixmap = QPixmap(coverpath).scaled(self.__class__.WIDTH,
@ -23,42 +23,42 @@ class BookInfoDisplay(QWidget):
self.setScaledContents(True)
self.setMaximumHeight(self.HEIGHT)
self.setPixmap(self.default_pixmap)
def setPixmap(self, pixmap):
width, height = fit_image(pixmap.width(), pixmap.height(),
self.WIDTH, self.HEIGHT)[1:]
self.setMaximumHeight(height)
self.setMaximumWidth(width)
QLabel.setPixmap(self, pixmap)
try:
aspect_ratio = pixmap.width()/float(pixmap.height())
except ZeroDivisionError:
aspect_ratio = 1
self.setMaximumWidth(int(aspect_ratio*self.HEIGHT))
def sizeHint(self):
return QSize(self.__class__.WIDTH, self.__class__.HEIGHT)
class BookDataDisplay(QLabel):
def __init__(self):
QLabel.__init__(self)
self.setText('')
self.setWordWrap(True)
self.setSizePolicy(QSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding))
def mouseReleaseEvent(self, ev):
self.emit(SIGNAL('mr(int)'), 1)
WEIGHTS = collections.defaultdict(lambda : 100)
WEIGHTS[_('Path')] = 0
WEIGHTS[_('Formats')] = 1
WEIGHTS[_('Comments')] = 2
WEIGHTS[_('Series')] = 3
WEIGHTS[_('Tags')] = 4
def __init__(self, clear_message):
QWidget.__init__(self)
self.setCursor(Qt.PointingHandCursor)
@ -74,16 +74,16 @@ class BookInfoDisplay(QWidget):
self.data = {}
self.setVisible(False)
self._layout.setAlignment(self.cover_display, Qt.AlignTop|Qt.AlignLeft)
def mouseReleaseEvent(self, ev):
self.emit(SIGNAL('show_book_info()'))
def show_data(self, data):
if data.has_key('cover'):
self.cover_display.setPixmap(QPixmap.fromImage(data.pop('cover')))
else:
self.cover_display.setPixmap(self.cover_display.default_pixmap)
rows = u''
self.book_data.setText('')
self.data = data.copy()
@ -97,7 +97,7 @@ class BookInfoDisplay(QWidget):
txt = txt.decode(preferred_encoding, 'replace')
rows += u'<tr><td><b>%s:</b></td><td>%s</td></tr>'%(key, txt)
self.book_data.setText(u'<table>'+rows+u'</table>')
self.clear_message()
self.book_data.updateGeometry()
self.updateGeometry()
@ -113,7 +113,7 @@ class MovieButton(QFrame):
self.movie = movie
self.layout().addWidget(self.movie_widget)
self.jobs = QLabel('<b>'+_('Jobs:')+' 0')
self.jobs.setAlignment(Qt.AlignHCenter|Qt.AlignBottom)
self.jobs.setAlignment(Qt.AlignHCenter|Qt.AlignBottom)
self.layout().addWidget(self.jobs)
self.layout().setAlignment(self.jobs, Qt.AlignHCenter)
self.jobs.setMargin(0)
@ -125,8 +125,8 @@ class MovieButton(QFrame):
movie.start()
movie.setPaused(True)
self.jobs_dialog.jobs_view.restore_column_widths()
def mouseReleaseEvent(self, event):
if self.jobs_dialog.isVisible():
self.jobs_dialog.jobs_view.write_settings()
@ -137,7 +137,7 @@ class MovieButton(QFrame):
self.jobs_dialog.jobs_view.restore_column_widths()
class CoverFlowButton(QToolButton):
def __init__(self, parent=None):
QToolButton.__init__(self, parent)
self.setIconSize(QSize(80, 80))
@ -149,17 +149,17 @@ class CoverFlowButton(QToolButton):
self.connect(self, SIGNAL('toggled(bool)'), self.adjust_tooltip)
self.adjust_tooltip(False)
self.setCursor(Qt.PointingHandCursor)
def adjust_tooltip(self, on):
tt = _('Click to turn off Cover Browsing') if on else _('Click to browse books by their covers')
self.setToolTip(tt)
def disable(self, reason):
self.setDisabled(True)
self.setToolTip(_('<p>Browsing books by their covers is disabled.<br>Import of pictureflow module failed:<br>')+reason)
class TagViewButton(QToolButton):
def __init__(self, parent=None):
QToolButton.__init__(self, parent)
self.setIconSize(QSize(80, 80))
@ -170,10 +170,10 @@ class TagViewButton(QToolButton):
self.setCheckable(True)
self.setChecked(False)
self.setAutoRaise(True)
class StatusBar(QStatusBar):
def __init__(self, jobs_dialog, systray=None):
QStatusBar.__init__(self)
self.systray = systray
@ -192,11 +192,11 @@ class StatusBar(QStatusBar):
self.addWidget(self.scroll_area, 100)
self.setMinimumHeight(120)
self.setMaximumHeight(120)
def reset_info(self):
self.book_info.show_data({})
def showMessage(self, msg, timeout=0):
ret = QStatusBar.showMessage(self, msg, timeout)
if self.systray is not None and not config['disable_tray_notification']:
@ -207,39 +207,38 @@ class StatusBar(QStatusBar):
msg = msg.encode('utf-8')
self.systray.showMessage('calibre', msg, self.systray.Information, 10000)
return ret
def jobs(self):
src = qstring_to_unicode(self.movie_button.jobs.text())
return int(re.search(r'\d+', src).group())
def show_book_info(self):
self.emit(SIGNAL('show_book_info()'))
def job_added(self, nnum):
jobs = self.movie_button.jobs
src = qstring_to_unicode(jobs.text())
num = self.jobs()
nnum = num + 1
text = src.replace(str(num), str(nnum))
jobs.setText(text)
if self.movie_button.movie.state() == QMovie.Paused:
self.movie_button.movie.setPaused(False)
def job_done(self, running):
def job_done(self, nnum):
jobs = self.movie_button.jobs
src = qstring_to_unicode(jobs.text())
num = self.jobs()
text = src.replace(str(num), str(running))
text = src.replace(str(num), str(nnum))
jobs.setText(text)
if running == 0:
if nnum == 0:
self.no_more_jobs()
def no_more_jobs(self):
if self.movie_button.movie.state() == QMovie.Running:
self.movie_button.movie.jumpToFrame(0)
self.movie_button.movie.setPaused(True)
QCoreApplication.instance().alert(self, 5000)
if __name__ == '__main__':
# Used to create the animated status icon
from PyQt4.Qt import QApplication, QPainter, QSvgRenderer, QColor
@ -280,4 +279,4 @@ if __name__ == '__main__':
os.remove(file)
import sys
create_mng(sys.argv[1])

View File

@ -4,16 +4,16 @@ __copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
Miscellaneous widgets used in the GUI
'''
import re, os, traceback
from PyQt4.QtGui import QListView, QIcon, QFont, QLabel, QListWidget, \
from PyQt4.Qt import QListView, QIcon, QFont, QLabel, QListWidget, \
QListWidgetItem, QTextCharFormat, QApplication, \
QSyntaxHighlighter, QCursor, QColor, QWidget, QDialog, \
QPixmap, QMovie, QPalette
from PyQt4.QtCore import QAbstractListModel, QVariant, Qt, SIGNAL, \
QSyntaxHighlighter, QCursor, QColor, QWidget, \
QPixmap, QMovie, QPalette, QTimer, QDialog, \
QAbstractListModel, QVariant, Qt, SIGNAL, \
QRegExp, QSettings, QSize, QModelIndex
from calibre.gui2.jobs2 import DetailView
from calibre.gui2 import human_readable, NONE, TableView, \
qstring_to_unicode, error_dialog
from calibre.gui2.dialogs.job_view_ui import Ui_Dialog
from calibre.gui2.filename_pattern_ui import Ui_Form
from calibre import fit_image
from calibre.utils.fontconfig import find_font_families
@ -249,6 +249,31 @@ class LocationView(QListView):
if 0 <= row and row <= 3:
self.model().location_changed(row)
class DetailView(QDialog, Ui_Dialog):
def __init__(self, parent, job):
QDialog.__init__(self, parent)
self.setupUi(self)
self.setWindowTitle(job.description)
self.job = job
self.next_pos = 0
self.update()
self.timer = QTimer(self)
self.connect(self.timer, SIGNAL('timeout()'), self.update)
self.timer.start(1000)
def update(self):
f = self.job.log_file
f.seek(self.next_pos)
more = f.read()
self.next_pos = f.tell()
if more:
self.log.appendPlainText(more.decode('utf-8', 'replace'))
vbar = self.log.verticalScrollBar()
vbar.setValue(vbar.maximum())
class JobsView(TableView):
def __init__(self, parent):
@ -259,7 +284,6 @@ class JobsView(TableView):
row = index.row()
job = self.model().row_to_job(row)
d = DetailView(self, job)
self.connect(self.model(), SIGNAL('output_received()'), d.update)
d.exec_()
@ -539,12 +563,12 @@ class PythonHighlighter(QSyntaxHighlighter):
return
for regex, format in PythonHighlighter.Rules:
i = text.indexOf(regex)
i = regex.indexIn(text)
while i >= 0:
length = regex.matchedLength()
self.setFormat(i, length,
PythonHighlighter.Formats[format])
i = text.indexOf(regex, i + length)
i = regex.indexIn(text, i + length)
# Slow but good quality highlighting for comments. For more
# speed, comment this out and add the following to __init__:
@ -569,12 +593,12 @@ class PythonHighlighter(QSyntaxHighlighter):
self.setCurrentBlockState(NORMAL)
if text.indexOf(self.stringRe) != -1:
if self.stringRe.indexIn(text) != -1:
return
# This is fooled by triple quotes inside single quoted strings
for i, state in ((text.indexOf(self.tripleSingleRe),
for i, state in ((self.tripleSingleRe.indexIn(text),
TRIPLESINGLE),
(text.indexOf(self.tripleDoubleRe),
(self.tripleDoubleRe.indexIn(text),
TRIPLEDOUBLE)):
if self.previousBlockState() == state:
if i == -1:

View File

@ -29,7 +29,7 @@ entry_points = {
'calibre-debug = calibre.debug:main',
'calibredb = calibre.library.cli:main',
'calibre-fontconfig = calibre.utils.fontconfig:main',
'calibre-parallel = calibre.parallel:main',
'calibre-parallel = calibre.utils.ipc.worker:main',
'calibre-customize = calibre.customize.ui:main',
'calibre-complete = calibre.utils.complete:main',
'pdfmanipulate = calibre.ebooks.pdf.manipulate.cli:main',

View File

@ -1,980 +0,0 @@
from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
__docformat__ = 'restructuredtext en'
'''
Used to run jobs in parallel in separate processes. Features output streaming,
support for progress notification as well as job killing. The worker processes
are controlled via a simple protocol run over sockets. The control happens
mainly in two class, :class:`Server` and :class:`Overseer`. The worker is
encapsulated in the function :function:`worker`. Every worker process
has the environment variable :envvar:`CALIBRE_WORKER` defined.
The worker control protocol has two modes of operation. In the first mode, the
worker process listens for commands from the controller process. The controller
process can either hand off a job to the worker or tell the worker to die.
Once a job is handed off to the worker, the protocol enters the second mode, where
the controller listens for messages from the worker. The worker can send progress updates
as well as console output (i.e. text that would normally have been written to stdout
or stderr by the job). Once the job completes (or raises an exception) the worker
returns the result (or exception) to the controller and the protocol reverts to the first mode.
In the second mode, the controller can also send the worker STOP messages, in which case
the worker interrupts the job and dies. The sending of progress and console output messages
is buffered and asynchronous to prevent the job from being IO bound.
'''
import sys, os, gc, cPickle, traceback, cStringIO, time, signal, \
subprocess, socket, collections, binascii, re, thread, tempfile, atexit
from select import select
from threading import RLock, Thread, Event
from math import ceil
from calibre.ptempfile import PersistentTemporaryFile
from calibre import iswindows, detect_ncpus, isosx, preferred_encoding
from calibre.utils.config import prefs
DEBUG = False
#: A mapping from job names to functions that perform the jobs
PARALLEL_FUNCS = {
'lrfviewer' :
('calibre.gui2.lrf_renderer.main', 'main', {}, None),
'ebook-viewer' :
('calibre.gui2.viewer.main', 'main', {}, None),
'render_pages' :
('calibre.ebooks.comic.input', 'render_pages', {}, 'notification'),
'ebook-convert' :
('calibre.ebooks.conversion.cli', 'main', {}, None),
'gui_convert' :
('calibre.gui2.convert.gui_conversion', 'gui_convert', {}, 'notification'),
}
isfrozen = hasattr(sys, 'frozen')
isworker = False
win32event = __import__('win32event') if iswindows else None
win32process = __import__('win32process') if iswindows else None
msvcrt = __import__('msvcrt') if iswindows else None
SOCKET_TYPE = socket.AF_UNIX if not iswindows else socket.AF_INET
class WorkerStatus(object):
'''
A platform independent class to control child processes. Provides the
methods:
.. method:: WorkerStatus.is_alive()
Return True is the child process is alive (i.e. it hasn't exited and returned a return code).
.. method:: WorkerStatus.returncode()
Wait for the child process to exit and return its return code (blocks until child returns).
.. method:: WorkerStatus.kill()
Forcibly terminates child process using operating system specific semantics.
'''
def __init__(self, obj):
'''
`obj`: On windows a process handle, on unix a subprocess.Popen object.
'''
self.obj = obj
self.win32process = win32process # Needed if kill is called during shutdown of interpreter
self.os = os
self.signal = signal
ext = 'windows' if iswindows else 'unix'
for func in ('is_alive', 'returncode', 'kill'):
setattr(self, func, getattr(self, func+'_'+ext))
def is_alive_unix(self):
return self.obj.poll() == None
def returncode_unix(self):
return self.obj.wait()
def kill_unix(self):
os.kill(self.obj.pid, self.signal.SIGKILL)
def is_alive_windows(self):
return win32event.WaitForSingleObject(self.obj, 0) != win32event.WAIT_OBJECT_0
def returncode_windows(self):
return win32process.GetExitCodeProcess(self.obj)
def kill_windows(self, returncode=-1):
self.win32process.TerminateProcess(self.obj, returncode)
class WorkerMother(object):
'''
Platform independent object for launching child processes. All processes
have the environment variable :envvar:`CALIBRE_WORKER` set.
..method:: WorkerMother.spawn_free_spirit(arg)
Launch a non monitored process with argument `arg`.
..method:: WorkerMother.spawn_worker(arg)
Launch a monitored and controllable process with argument `arg`.
'''
def __init__(self):
ext = 'windows' if iswindows else 'osx' if isosx else 'linux'
self.os = os # Needed incase cleanup called when interpreter is shutting down
self.env = {}
if iswindows:
self.executable = os.path.join(os.path.dirname(sys.executable),
'calibre-parallel.exe' if isfrozen else 'Scripts\\calibre-parallel.exe')
elif isosx:
self.executable = self.gui_executable = sys.executable
self.prefix = ''
if isfrozen:
fd = os.path.realpath(getattr(sys, 'frameworks_dir'))
contents = os.path.dirname(fd)
self.gui_executable = os.path.join(contents, 'MacOS',
os.path.basename(sys.executable))
contents = os.path.join(contents, 'console.app', 'Contents')
exe = os.path.basename(sys.executable)
if 'python' not in exe:
exe = 'python'
self.executable = os.path.join(contents, 'MacOS', exe)
resources = os.path.join(contents, 'Resources')
fd = os.path.join(contents, 'Frameworks')
sp = os.path.join(resources, 'lib', 'python'+sys.version[:3], 'site-packages.zip')
self.prefix += 'import sys; sys.frameworks_dir = "%s"; sys.frozen = "macosx_app"; '%fd
self.prefix += 'sys.path.insert(0, %s); '%repr(sp)
if fd not in os.environ['PATH']:
self.env['PATH'] = os.environ['PATH']+':'+fd
self.env['PYTHONHOME'] = resources
self.env['MAGICK_HOME'] = os.path.join(fd, 'ImageMagick')
self.env['DYLD_LIBRARY_PATH'] = os.path.join(fd, 'ImageMagick', 'lib')
else:
self.executable = os.path.join(getattr(sys, 'frozen_path'), 'calibre-parallel') \
if isfrozen else 'calibre-parallel'
if isfrozen:
self.env['LD_LIBRARY_PATH'] = getattr(sys, 'frozen_path') + ':' + os.environ.get('LD_LIBRARY_PATH', '')
self.spawn_worker_windows = lambda arg : self.spawn_free_spirit_windows(arg, type='worker')
self.spawn_worker_linux = lambda arg : self.spawn_free_spirit_linux(arg, type='worker')
self.spawn_worker_osx = lambda arg : self.spawn_free_spirit_osx(arg, type='worker')
for func in ('spawn_free_spirit', 'spawn_worker'):
setattr(self, func, getattr(self, func+'_'+ext))
def cleanup_child_windows(self, child, name=None, fd=None):
try:
child.kill()
except:
pass
try:
if fd is not None:
self.os.close(fd)
except:
pass
try:
if name is not None and os.path.exists(name):
self.os.unlink(name)
except:
pass
def cleanup_child_linux(self, child):
try:
child.kill()
except:
pass
def get_env(self):
env = dict(os.environ)
env['CALIBRE_WORKER'] = '1'
env['ORIGWD'] = os.path.abspath(os.getcwd())
if hasattr(self, 'env'):
env.update(self.env)
return env
def spawn_free_spirit_osx(self, arg, type='free_spirit'):
script = ('from calibre.parallel import main; '
'main(args=["calibre-parallel", %s]);')%repr(arg)
exe = self.gui_executable if type == 'free_spirit' else self.executable
cmdline = [exe, '-c', self.prefix+script]
child = WorkerStatus(subprocess.Popen(cmdline, env=self.get_env()))
atexit.register(self.cleanup_child_linux, child)
return child
def spawn_free_spirit_linux(self, arg, type='free_spirit'):
cmdline = [self.executable, arg]
child = WorkerStatus(subprocess.Popen(cmdline,
env=self.get_env(), cwd=getattr(sys, 'frozen_path', None)))
atexit.register(self.cleanup_child_linux, child)
return child
def spawn_free_spirit_windows(self, arg, type='free_spirit'):
priority = {'high':win32process.HIGH_PRIORITY_CLASS, 'normal':win32process.NORMAL_PRIORITY_CLASS,
'low':win32process.IDLE_PRIORITY_CLASS}[prefs['worker_process_priority']]
fd, name = tempfile.mkstemp('.log', 'calibre_'+type+'_')
handle = msvcrt.get_osfhandle(fd)
si = win32process.STARTUPINFO()
si.hStdOutput = handle
si.hStdError = handle
cmdline = self.executable + ' ' + str(arg)
hProcess = \
win32process.CreateProcess(
None, # Application Name
cmdline, # Command line
None, # processAttributes
None, # threadAttributes
1, # bInheritHandles
win32process.CREATE_NO_WINDOW|priority, # Dont want ugly console popping up
self.get_env(), # New environment
None, # Current directory
si
)[0]
child = WorkerStatus(hProcess)
atexit.register(self.cleanup_child_windows, child, name, fd)
return child
mother = WorkerMother()
_comm_lock = RLock()
def write(socket, msg, timeout=5):
'''
Write a message on socket. If `msg` is unicode, it is encoded in utf-8.
Raises a `RuntimeError` if the socket is not ready for writing or the writing fails.
`msg` is broken into chunks of size 4096 and sent. The :function:`read` function
automatically re-assembles the chunks into whole message.
'''
if isworker:
_comm_lock.acquire()
try:
if isinstance(msg, unicode):
msg = msg.encode('utf-8')
if DEBUG:
print >>sys.__stdout__, 'write(%s):'%('worker' if isworker else 'overseer'), repr(msg)
length = None
while len(msg) > 0:
if length is None:
length = len(msg)
chunk = ('%-12d'%length) + msg[:4096-12]
msg = msg[4096-12:]
else:
chunk, msg = msg[:4096], msg[4096:]
w = select([], [socket], [], timeout)[1]
if not w:
raise RuntimeError('Write to socket timed out')
if socket.sendall(chunk) is not None:
raise RuntimeError('Failed to write chunk to socket')
finally:
if isworker:
_comm_lock.release()
def read(socket, timeout=5):
'''
Read a message from `socket`. The message must have been sent with the :function:`write`
function. Raises a `RuntimeError` if the message is corrupted. Can return an
empty string.
'''
if isworker:
_comm_lock.acquire()
try:
buf = cStringIO.StringIO()
length = None
while select([socket],[],[],timeout)[0]:
msg = socket.recv(4096)
if not msg:
break
if length is None:
try:
length, msg = int(msg[:12]), msg[12:]
except ValueError:
if DEBUG:
print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), 'no length in', msg
return ''
buf.write(msg)
if buf.tell() >= length:
break
if not length:
if DEBUG:
print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), 'nothing'
return ''
msg = buf.getvalue()[:length]
if len(msg) < length:
raise RuntimeError('Corrupted packet received')
if DEBUG:
print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), repr(msg)
return msg
finally:
if isworker:
_comm_lock.release()
class RepeatingTimer(Thread):
'''
Calls a specified function repeatedly at a specified interval. Runs in a
daemon thread (i.e. the interpreter can exit while it is still running).
Call :meth:`start()` to start it.
'''
def repeat(self):
while True:
self.event.wait(self.interval)
if self.event.isSet():
break
self.action()
def __init__(self, interval, func, name):
self.event = Event()
self.interval = interval
self.action = func
Thread.__init__(self, target=self.repeat, name=name)
self.setDaemon(True)
class ControlError(Exception):
pass
class Overseer(object):
'''
Responsible for controlling worker processes. The main interface is the
methods, :meth:`initialize_job`, :meth:`control`.
'''
KILL_RESULT = 'Server: job killed by user|||#@#$%&*)*(*$#$%#$@&'
INTERVAL = 0.1
def __init__(self, server, port, timeout=5):
self.worker_status = mother.spawn_worker('127.0.0.1:'+str(port))
self.socket = server.accept()[0]
# Needed if terminate called when interpreter is shutting down
self.os = os
self.signal = signal
self.on_probation = False
self.terminated = False
self.working = False
self.timeout = timeout
self.last_job_time = time.time()
self._stop = False
if not select([self.socket], [], [], 120)[0]:
raise RuntimeError(_('Could not launch worker process.'))
ID = self.read().split(':')
if ID[0] != 'CALIBRE_WORKER':
raise RuntimeError('Impostor')
self.worker_pid = int(ID[1])
self.write('OK')
if self.read() != 'WAITING':
raise RuntimeError('Worker sulking')
def terminate(self):
'Kill worker process.'
self.terminated = True
try:
if self.socket:
self.write('STOP:')
time.sleep(1)
self.socket.shutdown(socket.SHUT_RDWR)
except:
pass
if iswindows:
win32api = __import__('win32api')
try:
handle = win32api.OpenProcess(1, False, self.worker_pid)
win32api.TerminateProcess(handle, -1)
except:
pass
else:
try:
try:
self.os.kill(self.worker_pid, self.signal.SIGKILL)
time.sleep(0.5)
finally:
self.worker_status.kill()
except:
pass
def write(self, msg, timeout=None):
write(self.socket, msg, timeout=self.timeout if timeout is None else timeout)
def read(self, timeout=None):
return read(self.socket, timeout=self.timeout if timeout is None else timeout)
def __eq__(self, other):
return hasattr(other, 'process') and hasattr(other, 'worker_pid') and self.worker_pid == other.worker_pid
def is_viable(self):
if self.terminated:
return False
return self.worker_status.is_alive()
def select(self, timeout=0):
return select([self.socket], [self.socket], [self.socket], timeout)
def initialize_job(self, job):
'''
Sends `job` to worker process. Can raise `ControlError` if worker process
does not respond appropriately. In this case, this Overseer is useless
and should be discarded.
`job`: An instance of :class:`Job`.
'''
self.working = True
self.write('JOB:'+cPickle.dumps((job.func, job.args, job.kwargs), -1))
msg = self.read()
if msg != 'OK':
raise ControlError('Failed to initialize job on worker %d:%s'%(self.worker_pid, msg))
self.job = job
self.last_report = time.time()
job.start_work()
def control(self):
'''
Listens for messages from the worker process and dispatches them
appropriately. If the worker process dies unexpectedly, returns a result
of None with a ControlError indicating the worker died.
Returns a :class:`Result` instance or None, if the worker is still working.
'''
if select([self.socket],[],[],0)[0]:
msg = self.read()
if msg:
self.on_probation = False
self.last_report = time.time()
else:
if self.on_probation:
self.terminate()
self.job.result = None
self.job.exception = ControlError('Worker process died unexpectedly')
return
else:
self.on_probation = True
return
word, msg = msg.partition(':')[0], msg.partition(':')[-1]
if word == 'PING':
self.write('OK')
return
elif word == 'RESULT':
self.write('OK')
self.job.result = cPickle.loads(msg)
return True
elif word == 'OUTPUT':
self.write('OK')
try:
self.job.output(''.join(cPickle.loads(msg)))
except:
self.job.output('Bad output message: '+ repr(msg))
elif word == 'PROGRESS':
self.write('OK')
percent = None
try:
percent, msg = cPickle.loads(msg)[-1]
except:
print 'Bad progress update:', repr(msg)
if percent is not None:
self.job.update_status(percent, msg)
elif word == 'ERROR':
self.write('OK')
exception, tb = cPickle.loads(msg)
self.job.output(u'%s\n%s'%(exception, tb))
self.job.exception, self.job.traceback = exception, tb
return True
else:
self.terminate()
self.job.exception = ControlError('Worker sent invalid msg: %s'%repr(msg))
return
if not self.worker_status.is_alive() or time.time() - self.last_report > 380:
self.terminate()
self.job.exception = ControlError('Worker process died unexpectedly')
return
class JobKilled(Exception):
pass
class Job(object):
def __init__(self, job_done, job_manager=None,
args=[], kwargs={}, description=None):
self.args = args
self.kwargs = kwargs
self._job_done = job_done
self.job_manager = job_manager
self.is_running = False
self.has_run = False
self.percent = -1
self.msg = None
self.description = description
self.start_time = None
self.running_time = None
self.result = self.exception = self.traceback = self.log = None
def __cmp__(self, other):
sstatus, ostatus = self.status(), other.status()
if sstatus == ostatus or (self.has_run and other.has_run):
if self.start_time == other.start_time:
return cmp(id(self), id(other))
return cmp(self.start_time, other.start_time)
if sstatus == 'WORKING':
return -1
if ostatus == 'WORKING':
return 1
if sstatus == 'WAITING':
return -1
if ostatus == 'WAITING':
return 1
def job_done(self):
self.is_running, self.has_run = False, True
self.running_time = (time.time() - self.start_time) if \
self.start_time is not None else 0
if self.job_manager is not None:
self.job_manager.job_done(self)
self._job_done(self)
def start_work(self):
self.is_running = True
self.has_run = False
self.start_time = time.time()
if self.job_manager is not None:
self.job_manager.start_work(self)
def update_status(self, percent, msg=None):
self.percent = percent
self.msg = msg
if self.job_manager is not None:
try:
self.job_manager.status_update(self)
except:
traceback.print_exc()
def status(self):
if self.is_running:
return 'WORKING'
if not self.has_run:
return 'WAITING'
if self.has_run:
if self.exception is None:
return 'DONE'
return 'ERROR'
def console_text(self):
ans = [u'Job: ']
if self.description:
ans[0] += self.description
if self.exception is not None:
header = unicode(self.exception.__class__.__name__) if \
hasattr(self.exception, '__class__') else u'Error'
header = u'**%s**'%header
header += u': '
try:
header += unicode(self.exception)
except:
header += unicode(repr(self.exception))
ans.append(header)
if self.traceback:
ans.append(u'**Traceback**:')
ans.extend(self.traceback.split('\n'))
if self.log:
if isinstance(self.log, str):
self.log = unicode(self.log, 'utf-8', 'replace')
ans.append(self.log)
return (u'\n'.join(ans)).encode('utf-8')
def gui_text(self):
ans = [u'Job: ']
if self.description:
if not isinstance(self.description, unicode):
self.description = self.description.decode('utf-8', 'replace')
ans[0] += u'**%s**'%self.description
if self.exception is not None:
header = unicode(self.exception.__class__.__name__) if \
hasattr(self.exception, '__class__') else u'Error'
header = u'**%s**'%header
header += u': '
try:
header += unicode(self.exception)
except:
header += unicode(repr(self.exception))
ans.append(header)
if self.traceback:
ans.append(u'**Traceback**:')
ans.extend(self.traceback.split('\n'))
if self.log:
ans.append(u'**Log**:')
if isinstance(self.log, str):
self.log = unicode(self.log, 'utf-8', 'replace')
ans.extend(self.log.split('\n'))
ans = [x.decode(preferred_encoding, 'replace') if isinstance(x, str) else x for x in ans]
return u'<br>'.join(ans)
class ParallelJob(Job):
def __init__(self, func, *args, **kwargs):
Job.__init__(self, *args, **kwargs)
self.func = func
self.done = self.job_done
def output(self, msg):
if not self.log:
self.log = u''
if not isinstance(msg, unicode):
msg = msg.decode('utf-8', 'replace')
if msg:
self.log += msg
if self.job_manager is not None:
self.job_manager.output(self)
def remove_ipc_socket(path):
os = __import__('os')
if os.path.exists(path):
os.unlink(path)
class Server(Thread):
KILL_RESULT = Overseer.KILL_RESULT
START_PORT = 10013
PID = os.getpid()
def __init__(self, number_of_workers=detect_ncpus()):
Thread.__init__(self)
self.setDaemon(True)
self.server_socket = socket.socket(SOCKET_TYPE, socket.SOCK_STREAM)
self.port = tempfile.mktemp(prefix='calibre_server')+'_%d_'%self.PID if not iswindows else self.START_PORT
while True:
try:
address = ('localhost', self.port) if iswindows else self.port
self.server_socket.bind(address)
break
except socket.error:
self.port += (1 if iswindows else '1')
if not iswindows:
atexit.register(remove_ipc_socket, self.port)
self.server_socket.listen(5)
self.number_of_workers = number_of_workers
self.pool, self.jobs, self.working = [], collections.deque(), []
atexit.register(self.killall)
atexit.register(self.close)
self.job_lock = RLock()
self.overseer_lock = RLock()
self.working_lock = RLock()
self.result_lock = RLock()
self.pool_lock = RLock()
self.start()
def split(self, tasks):
'''
Split a list into a list of sub lists, with the number of sub lists being
no more than the number of workers this server supports. Each sublist contains
two tuples of the form (i, x) where x is an element fro the original list
and i is the index of the element x in the original list.
'''
ans, count, pos = [], 0, 0
delta = int(ceil(len(tasks)/float(self.number_of_workers)))
while count < len(tasks):
section = []
for t in tasks[pos:pos+delta]:
section.append((count, t))
count += 1
ans.append(section)
pos += delta
return ans
def close(self):
try:
self.server_socket.shutdown(socket.SHUT_RDWR)
except:
pass
def add_job(self, job):
with self.job_lock:
self.jobs.append(job)
if job.job_manager is not None:
job.job_manager.add_job(job)
def poll(self):
'''
Return True if the server has either working or queued jobs
'''
with self.job_lock:
with self.working_lock:
return len(self.jobs) + len(self.working) > 0
def wait(self, sleep=1):
'''
Wait until job queue is empty
'''
while self.poll():
time.sleep(sleep)
def run(self):
while True:
job = None
with self.job_lock:
if len(self.jobs) > 0 and len(self.working) < self.number_of_workers:
job = self.jobs.popleft()
with self.pool_lock:
o = None
while self.pool:
o = self.pool.pop()
try:
o.initialize_job(job)
break
except:
o.terminate()
if o is None:
o = Overseer(self.server_socket, self.port)
try:
o.initialize_job(job)
except Exception, err:
o.terminate()
job.exception = err
job.traceback = traceback.format_exc()
job.done()
o = None
if o and o.is_viable():
with self.working_lock:
self.working.append(o)
with self.working_lock:
done = []
for o in self.working:
try:
if o.control() is not None or o.job.exception is not None:
o.job.done()
done.append(o)
except Exception, err:
o.job.exception = err
o.job.traceback = traceback.format_exc()
o.terminate()
o.job.done()
done.append(o)
for o in done:
self.working.remove(o)
if o and o.is_viable():
with self.pool_lock:
self.pool.append(o)
try:
time.sleep(1)
except:
return
def killall(self):
with self.pool_lock:
map(lambda x: x.terminate(), self.pool)
self.pool = []
def kill(self, job):
with self.working_lock:
pop = None
for o in self.working:
if o.job == job or o == job:
try:
o.terminate()
except: pass
o.job.exception = JobKilled(_('Job stopped by user'))
try:
o.job.done()
except: pass
pop = o
break
if pop is not None:
self.working.remove(pop)
def run_free_job(self, func, args=[], kwdargs={}):
pt = PersistentTemporaryFile('.pickle', '_IPC_')
pt.write(cPickle.dumps((func, args, kwdargs)))
pt.close()
mother.spawn_free_spirit(binascii.hexlify(pt.name))
##########################################################################################
##################################### CLIENT CODE #####################################
##########################################################################################
class BufferedSender(object):
def __init__(self, socket):
self.socket = socket
self.wbuf, self.pbuf = [], []
self.wlock, self.plock = RLock(), RLock()
self.last_report = None
self.timer = RepeatingTimer(0.5, self.send, 'BufferedSender')
self.timer.start()
def write(self, msg):
if not isinstance(msg, basestring):
msg = unicode(msg)
with self.wlock:
self.wbuf.append(msg)
def send(self):
if callable(select) and select([self.socket], [], [], 0)[0]:
msg = read(self.socket)
if msg == 'PING:':
write(self.socket, 'OK')
elif msg:
self.socket.shutdown(socket.SHUT_RDWR)
thread.interrupt_main()
time.sleep(1)
raise SystemExit
if not select([], [self.socket], [], 30)[1]:
print >>sys.__stderr__, 'Cannot pipe to overseer'
return
reported = False
with self.wlock:
if self.wbuf:
msg = cPickle.dumps(self.wbuf, -1)
self.wbuf = []
write(self.socket, 'OUTPUT:'+msg)
read(self.socket, 10)
reported = True
with self.plock:
if self.pbuf:
msg = cPickle.dumps(self.pbuf, -1)
self.pbuf = []
write(self.socket, 'PROGRESS:'+msg)
read(self.socket, 10)
reported = True
if self.last_report is not None:
if reported:
self.last_report = time.time()
elif time.time() - self.last_report > 60:
write(self.socket, 'PING:')
read(self.socket, 10)
self.last_report = time.time()
def notify(self, percent, msg=''):
with self.plock:
self.pbuf.append((percent, msg))
def flush(self):
pass
def get_func(name):
module, func, kwdargs, notification = PARALLEL_FUNCS[name]
module = __import__(module, fromlist=[1])
func = getattr(module, func)
return func, kwdargs, notification
_atexit = collections.deque()
def myatexit(func, *args, **kwargs):
_atexit.append((func, args, kwargs))
def work(client_socket, func, args, kwdargs):
sys.stdout.last_report = time.time()
orig = atexit.register
atexit.register = myatexit
try:
func, kargs, notification = get_func(func)
if notification is not None and hasattr(sys.stdout, 'notify'):
kargs[notification] = sys.stdout.notify
kargs.update(kwdargs)
res = func(*args, **kargs)
if hasattr(sys.stdout, 'send'):
sys.stdout.send()
return res
finally:
atexit.register = orig
sys.stdout.last_report = None
while True:
try:
func, args, kwargs = _atexit.pop()
except IndexError:
break
try:
func(*args, **kwargs)
except (Exception, SystemExit):
continue
time.sleep(5) # Give any in progress BufferedSend time to complete
def worker(host, port):
client_socket = socket.socket(SOCKET_TYPE, socket.SOCK_STREAM)
address = (host, port) if iswindows else port
client_socket.connect(address)
write(client_socket, 'CALIBRE_WORKER:%d'%os.getpid())
msg = read(client_socket, timeout=10)
if msg != 'OK':
return 1
write(client_socket, 'WAITING')
sys.stdout = BufferedSender(client_socket)
sys.stderr = sys.stdout
while True:
if not select([client_socket], [], [], 60)[0]:
time.sleep(1)
continue
msg = read(client_socket, timeout=60)
if msg.startswith('JOB:'):
func, args, kwdargs = cPickle.loads(msg[4:])
write(client_socket, 'OK')
try:
result = work(client_socket, func, args, kwdargs)
write(client_socket, 'RESULT:'+ cPickle.dumps(result))
except BaseException, err:
exception = (err.__class__.__name__, unicode(str(err), 'utf-8', 'replace'))
tb = unicode(traceback.format_exc(), 'utf-8', 'replace')
msg = 'ERROR:'+cPickle.dumps((exception, tb),-1)
write(client_socket, msg)
res = read(client_socket, 10)
if res != 'OK':
break
gc.collect()
elif msg == 'PING:':
write(client_socket, 'OK')
elif msg == 'STOP:':
client_socket.shutdown(socket.SHUT_RDWR)
return 0
elif not msg:
time.sleep(1)
else:
print >>sys.__stderr__, 'Invalid protocols message', msg
return 1
def free_spirit(path):
func, args, kwdargs = cPickle.load(open(path, 'rb'))
try:
os.unlink(path)
except:
pass
func, kargs = get_func(func)[:2]
kargs.update(kwdargs)
func(*args, **kargs)
def main(args=sys.argv):
global isworker
isworker = True
args = args[1].split(':')
if len(args) == 1:
free_spirit(binascii.unhexlify(re.sub(r'[^a-f0-9A-F]', '', args[0])))
else:
worker(args[0].replace("'", ''), int(args[1]) if iswindows else args[1])
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -0,0 +1,137 @@
#!/usr/bin/env python
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
_count = 0
import time, cStringIO
from Queue import Queue, Empty
class BaseJob(object):
WAITING = 0
RUNNING = 1
FINISHED = 2
def __init__(self, description, done=lambda x: x):
global _count
_count += 1
self.id = _count
self.description = description
self.done = done
self.done2 = None
self.killed = False
self.failed = False
self.start_time = None
self.result = None
self.duration = None
self.log_path = None
self.notifications = Queue()
self._run_state = self.WAITING
self.percent = 0
self._message = None
self._status_text = _('Waiting...')
self._done_called = False
def update(self):
if self.duration is not None:
self._run_state = self.FINISHED
self.percent = 1
if self.killed:
self._status_text = _('Stopped')
else:
self._status_text = _('Error') if self.failed else _('Finished')
if not self._done_called:
self._done_called = True
try:
self.done(self)
except:
pass
try:
if callable(self.done2):
self.done2(self)
except:
pass
elif self.start_time is not None:
self._run_state = self.RUNNING
self._status_text = _('Working...')
while True:
try:
self.percent, self._message = self.notifications.get_nowait()
self.percent *= 100.
except Empty:
break
@property
def status_text(self):
if self._run_state == self.FINISHED or not self._message:
return self._status_text
return self._message
@property
def run_state(self):
return self._run_state
@property
def running_time(self):
if self.duration is not None:
return self.duration
if self.start_time is not None:
return time.time() - self.start_time
return None
@property
def is_finished(self):
return self._run_state == self.FINISHED
@property
def is_started(self):
return self._run_state != self.WAITING
@property
def is_running(self):
return self.is_started and not self.is_finished
def __cmp__(self, other):
if self.is_finished == other.is_finished:
if self.start_time is None:
if other.start_time is None: # Both waiting
return cmp(other.id, self.id)
else:
return 1
else:
if other.start_time is None:
return -1
else: # Both running
return cmp(other.start_time, self.start_time)
else:
return 1 if self.is_finished else -1
return 0
@property
def log_file(self):
if self.log_path:
return open(self.log_path, 'rb')
return cStringIO.StringIO(_('No details available.'))
@property
def details(self):
return self.log_file.read().decode('utf-8')
class ParallelJob(BaseJob):
def __init__(self, name, description, done, args=[], kwargs={}):
self.name, self.args, self.kwargs = name, args, kwargs
BaseJob.__init__(self, description, done)

View File

@ -70,7 +70,7 @@ class Worker(object):
@property
def is_alive(self):
return hasattr(self, 'child') and self.child.poll() is not None
return hasattr(self, 'child') and self.child.poll() is None
@property
def returncode(self):
@ -144,6 +144,7 @@ class Worker(object):
self.child = subprocess.Popen(cmd, **args)
self.log_path = ret
return ret

View File

@ -6,5 +6,241 @@ __license__ = 'GPL v3'
__copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import os, cPickle, time, tempfile
from math import ceil
from threading import Thread, RLock
from Queue import Queue, Empty
from multiprocessing.connection import Listener
from multiprocessing import cpu_count
from collections import deque
from binascii import hexlify
from calibre.utils.ipc.launch import Worker
from calibre.utils.ipc.worker import PARALLEL_FUNCS
_counter = 0
class ConnectedWorker(Thread):
def __init__(self, worker, conn, rfile):
Thread.__init__(self)
self.daemon = True
self.conn = conn
self.worker = worker
self.notifications = Queue()
self._returncode = 'dummy'
self.killed = False
self.log_path = worker.log_path
self.rfile = rfile
def start_job(self, job):
notification = PARALLEL_FUNCS[job.name][-1] is not None
self.conn.send((job.name, job.args, job.kwargs))
if notification:
self.start()
else:
self.conn.close()
self.job = job
def run(self):
while True:
try:
x = self.conn.recv()
self.notifications.put(x)
except BaseException:
break
try:
self.conn.close()
except BaseException:
pass
def kill(self):
self.killed = True
try:
self.worker.kill()
except BaseException:
pass
@property
def is_alive(self):
return not self.killed and self.worker.is_alive
@property
def returncode(self):
if self._returncode != 'dummy':
return self._returncode
r = self.worker.returncode
if self.killed and r is None:
self._returncode = 1
return 1
if r is not None:
self._returncode = r
return r
class Server(Thread):
def __init__(self, notify_on_job_done=lambda x: x, pool_size=None):
Thread.__init__(self)
self.daemon = True
global _counter
self.id = _counter+1
_counter += 1
self.pool_size = cpu_count() if pool_size is None else pool_size
self.notify_on_job_done = notify_on_job_done
self.auth_key = os.urandom(32)
self.listener = Listener(authkey=self.auth_key, backlog=4)
self.add_jobs_queue, self.changed_jobs_queue = Queue(), Queue()
self.kill_queue = Queue()
self.waiting_jobs, self.processing_jobs = deque(), deque()
self.pool, self.workers = deque(), deque()
self.launched_worker_count = 0
self._worker_launch_lock = RLock()
self.start()
def launch_worker(self, gui=False, redirect_output=None):
with self._worker_launch_lock:
self.launched_worker_count += 1
id = self.launched_worker_count
rfile = os.path.join(tempfile.gettempdir(),
'calibre_ipc_result_%d_%d.pickle'%(self.id, id))
env = {
'CALIBRE_WORKER_ADDRESS' :
hexlify(cPickle.dumps(self.listener.address, -1)),
'CALIBRE_WORKER_KEY' : hexlify(self.auth_key),
'CALIBRE_WORKER_RESULT' : hexlify(rfile),
}
w = Worker(env, gui=gui)
if redirect_output is None:
redirect_output = not gui
w(redirect_output=redirect_output)
conn = self.listener.accept()
if conn is None:
raise Exception('Failed to launch worker process')
return ConnectedWorker(w, conn, rfile)
def add_job(self, job):
job.done2 = self.notify_on_job_done
self.add_jobs_queue.put(job)
def run_job(self, job, gui=True, redirect_output=False):
w = self.launch_worker(gui=gui, redirect_output=redirect_output)
w.start_job(job)
def run(self):
while True:
try:
job = self.add_jobs_queue.get(True, 0.2)
if job is None:
break
self.waiting_jobs.append(job)
except Empty:
pass
for worker in self.workers:
while True:
try:
n = worker.notifications.get_nowait()
worker.job.notifications.put(n)
self.changed_jobs_queue.put(job)
except Empty:
break
for worker in [w for w in self.workers if not w.is_alive]:
self.workers.remove(worker)
job = worker.job
if worker.returncode != 0:
job.failed = True
job.returncode = worker.returncode
elif os.path.exists(worker.rfile):
job.result = cPickle.load(open(worker.rfile, 'rb'))
os.remove(worker.rfile)
job.duration = time.time() - job.start_time
self.changed_jobs_queue.put(job)
if len(self.pool) + len(self.workers) < self.pool_size:
try:
self.pool.append(self.launch_worker())
except:
break
if len(self.pool) > 0 and len(self.waiting_jobs) > 0:
job = self.waiting_jobs.pop()
worker = self.pool.pop()
job.start_time = time.time()
worker.start_job(job)
self.workers.append(worker)
job.log_path = worker.log_path
self.changed_jobs_queue.put(job)
while True:
try:
j = self.kill_queue.get_nowait()
self._kill_job(j)
except Empty:
break
def kill_job(self, job):
self.kill_queue.put(job)
def killall(self):
for job in self.workers:
self.kill_queue.put(job)
def _kill_job(self, job):
if job.start_time is None: return
for worker in self.workers:
if job is worker.job:
worker.kill()
job.killed = True
break
def split(self, tasks):
'''
Split a list into a list of sub lists, with the number of sub lists being
no more than the number of workers this server supports. Each sublist contains
two tuples of the form (i, x) where x is an element from the original list
and i is the index of the element x in the original list.
'''
ans, count, pos = [], 0, 0
delta = int(ceil(len(tasks)/float(self.pool_size)))
while count < len(tasks):
section = []
for t in tasks[pos:pos+delta]:
section.append((count, t))
count += 1
ans.append(section)
pos += delta
return ans
def close(self):
try:
self.add_jobs_queue.put(None)
self.listener.close()
except:
pass
time.sleep(0.2)
for worker in self.workers:
try:
worker.kill()
except:
pass
for worker in self.pool:
try:
worker.kill()
except:
pass
def __enter__(self):
return self
def __exit__(self, *args):
self.close()

View File

@ -6,11 +6,12 @@ __license__ = 'GPL v3'
__copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import os, cPickle
import os, cPickle, sys
from multiprocessing.connection import Client
from threading import Thread
from queue import Queue
from Queue import Queue
from contextlib import closing
from binascii import unhexlify
PARALLEL_FUNCS = {
'lrfviewer' :
@ -29,8 +30,8 @@ PARALLEL_FUNCS = {
class Progress(Thread):
def __init__(self, conn):
self.daemon = True
Thread.__init__(self)
self.daemon = True
self.conn = conn
self.queue = Queue()
@ -56,8 +57,9 @@ def get_func(name):
return func, notification
def main():
address = cPickle.loads(os.environ['CALIBRE_WORKER_ADDRESS'])
key = os.environ['CALIBRE_WORKER_KEY']
address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS']))
key = unhexlify(os.environ['CALIBRE_WORKER_KEY'])
resultf = unhexlify(os.environ['CALIBRE_WORKER_RESULT'])
with closing(Client(address, authkey=key)) as conn:
name, args, kwargs = conn.recv()
func, notification = get_func(name)
@ -66,13 +68,17 @@ def main():
kwargs[notification] = notifier
notifier.start()
func(*args, **kwargs)
result = func(*args, **kwargs)
if result is not None:
cPickle.dump(result, open(resultf, 'wb'), -1)
notifier.queue.put(None)
sys.stdout.flush()
sys.stderr.flush()
return 0
if __name__ == '__main__':
raise SystemExit(main())
sys.exit(main())