diff --git a/src/calibre/__init__.py b/src/calibre/__init__.py index 280bf02cae..a08f0417ee 100644 --- a/src/calibre/__init__.py +++ b/src/calibre/__init__.py @@ -2,7 +2,7 @@ __license__ = 'GPL v3' __copyright__ = '2008, Kovid Goyal ' __docformat__ = 'restructuredtext en' -import sys, os, re, logging, time, subprocess, mimetypes, \ +import sys, os, re, logging, time, mimetypes, \ __builtin__, warnings, multiprocessing __builtin__.__dict__['dynamic_property'] = lambda(func): func(None) from htmlentitydefs import name2codepoint @@ -91,11 +91,16 @@ def prints(*args, **kwargs): file = kwargs.get('file', sys.stdout) sep = kwargs.get('sep', ' ') end = kwargs.get('end', '\n') + enc = preferred_encoding + if 'CALIBRE_WORKER' in os.environ: + enc = 'utf-8' for i, arg in enumerate(args): if isinstance(arg, unicode): - arg = arg.encode(preferred_encoding) + arg = arg.encode(enc) if not isinstance(arg, str): arg = str(arg) + if not isinstance(arg, unicode): + arg = arg.decode(preferred_encoding, 'replace').encode(enc) file.write(arg) if i != len(args)-1: file.write(sep) diff --git a/src/calibre/devices/bebook/driver.py b/src/calibre/devices/bebook/driver.py index 0980554387..d3e887eb74 100644 --- a/src/calibre/devices/bebook/driver.py +++ b/src/calibre/devices/bebook/driver.py @@ -18,9 +18,9 @@ class BEBOOK(USBMS): VENDOR_ID = [0x0525] PRODUCT_ID = [0x8803, 0x6803] - BCD = [0x312] + BCD = [0x312] - VENDOR_NAME = 'LINUX' + VENDOR_NAME = 'LINUX' WINDOWS_MAIN_MEM = 'FILE-STOR_GADGET' WINDOWS_CARD_MEM = 'FILE-STOR_GADGET' @@ -51,7 +51,7 @@ class BEBOOK_MINI(BEBOOK): VENDOR_ID = [0x0492] PRODUCT_ID = [0x8813] - BCD = [0x319] + BCD = [0x319] OSX_MAIN_MEM = 'BeBook Mini Internal Memory' OSX_CARD_MEM = 'BeBook Mini Storage Card' diff --git a/src/calibre/ebooks/comic/input.py b/src/calibre/ebooks/comic/input.py index b228182ef4..bf2aac1162 100755 --- a/src/calibre/ebooks/comic/input.py +++ b/src/calibre/ebooks/comic/input.py @@ -7,12 +7,14 @@ __docformat__ = 'restructuredtext en' Based on ideas from comiclrf created by FangornUK. ''' -import os, shutil, traceback, textwrap +import os, shutil, traceback, textwrap, time +from Queue import Empty from calibre.customize.conversion import InputFormatPlugin, OptionRecommendation -from calibre import extract, CurrentDir +from calibre import extract, CurrentDir, prints from calibre.ptempfile import PersistentTemporaryDirectory -from calibre.parallel import Server, ParallelJob +from calibre.utils.ipc.server import Server +from calibre.utils.ipc.job import ParallelJob def extract_comic(path_to_comic_file): ''' @@ -47,8 +49,8 @@ def find_pages(dir, sort_on_mtime=False, verbose=False): pages.sort(cmp=comparator) if verbose: - print 'Found comic pages...' - print '\t'+'\n\t'.join([os.path.basename(p) for p in pages]) + prints('Found comic pages...') + prints('\t'+'\n\t'.join([os.path.basename(p) for p in pages])) return pages class PageProcessor(list): @@ -181,7 +183,7 @@ class PageProcessor(list): p.DestroyPixelWand(pw) p.DestroyMagickWand(wand) -def render_pages(tasks, dest, opts, notification=None): +def render_pages(tasks, dest, opts, notification=lambda x, y: x): ''' Entry point for the job server. ''' @@ -197,30 +199,23 @@ def render_pages(tasks, dest, opts, notification=None): msg = _('Failed %s')%path if opts.verbose: msg += '\n' + traceback.format_exc() - if notification is not None: - notification(0.5, msg) + prints(msg) + notification(0.5, msg) return pages, failures -class JobManager(object): - ''' - Simple job manager responsible for keeping track of overall progress. - ''' +class Progress(object): def __init__(self, total, update): self.total = total self.update = update self.done = 0 - self.add_job = lambda j: j - self.output = lambda j: j - self.start_work = lambda j: j - self.job_done = lambda j: j - def status_update(self, job): + def __call__(self, percent, msg=''): self.done += 1 #msg = msg%os.path.basename(job.args[0]) - self.update(float(self.done)/self.total, job.msg) + self.update(float(self.done)/self.total, msg) def process_pages(pages, opts, update, tdir): ''' @@ -229,22 +224,38 @@ def process_pages(pages, opts, update, tdir): from calibre.utils.PythonMagickWand import ImageMagick ImageMagick - job_manager = JobManager(len(pages), update) + progress = Progress(len(pages), update) server = Server() jobs = [] + tasks = [(p, os.path.join(tdir, os.path.basename(p))) for p in pages] tasks = server.split(pages) for task in tasks: - jobs.append(ParallelJob('render_pages', lambda s:s, job_manager=job_manager, + jobs.append(ParallelJob('render_pages', '', progress, args=[task, tdir, opts])) server.add_job(jobs[-1]) - server.wait() - server.killall() + while True: + time.sleep(1) + running = False + for job in jobs: + while True: + try: + x = job.notifications.get_nowait() + progress(*x) + except Empty: + break + job.update() + if not job.is_finished: + running = True + if not running: + break server.close() ans, failures = [], [] for job in jobs: - if job.result is None: - raise Exception(_('Failed to process comic: %s\n\n%s')%(job.exception, job.traceback)) + if job.failed: + raw_input() + raise Exception(_('Failed to process comic: \n\n%s')% + job.log_file.read()) pages, failures_ = job.result ans += pages failures += failures_ diff --git a/src/calibre/ebooks/conversion/plumber.py b/src/calibre/ebooks/conversion/plumber.py index ee26c3001c..7387cf158e 100644 --- a/src/calibre/ebooks/conversion/plumber.py +++ b/src/calibre/ebooks/conversion/plumber.py @@ -668,6 +668,7 @@ OptionRecommendation(name='list_recipes', self.output_plugin.convert(self.oeb, self.output, self.input_plugin, self.opts, self.log) self.ui_reporter(1.) + self.log(self.output_fmt.upper(), 'output written to', self.output) def create_oebbook(log, path_or_stream, opts, input_plugin, reader=None): ''' diff --git a/src/calibre/ebooks/lit/output.py b/src/calibre/ebooks/lit/output.py index 42be1ecac7..c72d8eae96 100644 --- a/src/calibre/ebooks/lit/output.py +++ b/src/calibre/ebooks/lit/output.py @@ -7,7 +7,8 @@ __copyright__ = '2009, Kovid Goyal ' __docformat__ = 'restructuredtext en' -from calibre.customize.conversion import OutputFormatPlugin +from calibre.customize.conversion import OutputFormatPlugin, \ + OptionRecommendation class LITOutput(OutputFormatPlugin): @@ -15,12 +16,23 @@ class LITOutput(OutputFormatPlugin): author = 'Marshall T. Vandegrift' file_type = 'lit' + recommendations = set([ + ('dont_split_on_page_breaks', False, OptionRecommendation.HIGH), + ]) + def convert(self, oeb, output_path, input_plugin, opts, log): self.log, self.opts, self.oeb = log, opts, oeb from calibre.ebooks.oeb.transforms.manglecase import CaseMangler from calibre.ebooks.oeb.transforms.rasterize import SVGRasterizer from calibre.ebooks.oeb.transforms.htmltoc import HTMLTOCAdder from calibre.ebooks.lit.writer import LitWriter + from calibre.ebooks.oeb.transforms.split import Split + split = Split(not self.opts.dont_split_on_page_breaks, + max_flow_size=0 + ) + split(self.oeb, self.opts) + + tocadder = HTMLTOCAdder() tocadder(oeb, opts) mangler = CaseMangler() diff --git a/src/calibre/ebooks/oeb/base.py b/src/calibre/ebooks/oeb/base.py index bbac34f0b1..bdf78f96e4 100644 --- a/src/calibre/ebooks/oeb/base.py +++ b/src/calibre/ebooks/oeb/base.py @@ -96,8 +96,12 @@ def iterlinks(root): for el in root.iter(): attribs = el.attrib + try: + tag = el.tag + except UnicodeDecodeError: + continue - if el.tag == XHTML('object'): + if tag == XHTML('object'): codebase = None ## tags have attributes that are relative to ## codebase @@ -122,7 +126,7 @@ def iterlinks(root): yield (el, attr, attribs[attr], 0) - if el.tag == XHTML('style') and el.text: + if tag == XHTML('style') and el.text: for match in _css_url_re.finditer(el.text): yield (el, None, match.group(1), match.start(1)) for match in _css_import_re.finditer(el.text): @@ -801,6 +805,11 @@ class Manifest(object): self.oeb.logger.warn( 'File %r missing element' % self.href) etree.SubElement(data, XHTML('body')) + + # Remove microsoft office markup + r = [x for x in data.iterdescendants(etree.Element) if 'microsoft-com' in x.tag] + for x in r: + x.tag = XHTML('span') return data def _parse_css(self, data): diff --git a/src/calibre/ebooks/oeb/transforms/structure.py b/src/calibre/ebooks/oeb/transforms/structure.py index 8ec3c7737a..5e3322a57a 100644 --- a/src/calibre/ebooks/oeb/transforms/structure.py +++ b/src/calibre/ebooks/oeb/transforms/structure.py @@ -12,7 +12,13 @@ from lxml import etree from urlparse import urlparse from calibre.ebooks.oeb.base import XPNSMAP, TOC, XHTML -XPath = lambda x: etree.XPath(x, namespaces=XPNSMAP) +from calibre.ebooks import ConversionError +def XPath(x): + try: + return etree.XPath(x, namespaces=XPNSMAP) + except etree.XPathSyntaxError: + raise ConversionError( + 'The syntax of the XPath expression %s is invalid.' % repr(x)) class DetectStructure(object): diff --git a/src/calibre/ebooks/pdf/pdftohtml.py b/src/calibre/ebooks/pdf/pdftohtml.py index e03d7d0647..2c0daf05ca 100644 --- a/src/calibre/ebooks/pdf/pdftohtml.py +++ b/src/calibre/ebooks/pdf/pdftohtml.py @@ -6,7 +6,7 @@ __copyright__ = '2008, Kovid Goyal , ' \ '2009, John Schember ' __docformat__ = 'restructuredtext en' -import errno, os, re, sys, subprocess +import errno, os, sys, subprocess from functools import partial from calibre.ebooks import ConversionError, DRMError @@ -33,7 +33,7 @@ def pdftohtml(pdf_path): if isinstance(pdf_path, unicode): pdf_path = pdf_path.encode(sys.getfilesystemencoding()) if not os.access(pdf_path, os.R_OK): - raise ConversionError, 'Cannot read from ' + pdf_path + raise ConversionError('Cannot read from ' + pdf_path) with TemporaryDirectory('_pdftohtml') as tdir: index = os.path.join(tdir, 'index.html') @@ -47,7 +47,7 @@ def pdftohtml(pdf_path): p = popen(cmd, stderr=subprocess.PIPE) except OSError, err: if err.errno == 2: - raise ConversionError(_('Could not find pdftohtml, check it is in your PATH'), True) + raise ConversionError(_('Could not find pdftohtml, check it is in your PATH')) else: raise @@ -63,13 +63,13 @@ def pdftohtml(pdf_path): if ret != 0: err = p.stderr.read() - raise ConversionError, err + raise ConversionError(err) if not os.path.exists(index) or os.stat(index).st_size < 100: raise DRMError() - + with open(index, 'rb') as i: raw = i.read() if not '\n' + raw diff --git a/src/calibre/gui2/convert/__init__.py b/src/calibre/gui2/convert/__init__.py index 67b6e47aa9..d7dde4c190 100644 --- a/src/calibre/gui2/convert/__init__.py +++ b/src/calibre/gui2/convert/__init__.py @@ -176,7 +176,7 @@ class Widget(QWidget): elif isinstance(g, QCheckBox): return bool(g.isChecked()) elif isinstance(g, XPathEdit): - return g.xpath + return g.xpath if g.xpath else None else: raise Exception('Can\'t get value from %s'%type(g)) diff --git a/src/calibre/gui2/convert/single.py b/src/calibre/gui2/convert/single.py index c0da36e5dd..531448b1f3 100644 --- a/src/calibre/gui2/convert/single.py +++ b/src/calibre/gui2/convert/single.py @@ -189,6 +189,8 @@ class Config(ResizableDialog, Ui_Dialog): def setup_input_output_formats(self, db, book_id, preferred_input_format, preferred_output_format): + if preferred_output_format: + preferred_output_format = preferred_output_format.lower() available_formats = db.formats(book_id, index_is_id=True) if not available_formats: available_formats = '' diff --git a/src/calibre/gui2/device.py b/src/calibre/gui2/device.py index b3dcf8a21c..7f2513361a 100644 --- a/src/calibre/gui2/device.py +++ b/src/calibre/gui2/device.py @@ -1,7 +1,7 @@ from __future__ import with_statement __license__ = 'GPL v3' __copyright__ = '2008, Kovid Goyal ' -import os, traceback, Queue, time, socket +import os, traceback, Queue, time, socket, cStringIO from threading import Thread, RLock from itertools import repeat from functools import partial @@ -15,7 +15,7 @@ from calibre.customize.ui import available_input_formats, available_output_forma from calibre.devices.interface import DevicePlugin from calibre.constants import iswindows from calibre.gui2.dialogs.choose_format import ChooseFormatDialog -from calibre.parallel import Job +from calibre.utils.ipc.job import BaseJob from calibre.devices.scanner import DeviceScanner from calibre.gui2 import config, error_dialog, Dispatcher, dynamic, \ pixmap_to_data, warning_dialog, \ @@ -27,22 +27,46 @@ from calibre.devices.errors import FreeSpaceError from calibre.utils.smtp import compose_mail, sendmail, extract_email_address, \ config as email_config -class DeviceJob(Job): +class DeviceJob(BaseJob): - def __init__(self, func, *args, **kwargs): - Job.__init__(self, *args, **kwargs) + def __init__(self, func, done, job_manager, args=[], kwargs={}, + description=''): + BaseJob.__init__(self, description, done=done) self.func = func + self.args, self.kwargs = args, kwargs + self.job_manager = job_manager + self.job_manager.add_job(self) + self.details = _('No details available.') + + def start_work(self): + self.start_time = time.time() + self.job_manager.changed_queue.put(self) + + def job_done(self): + self.duration = time.time() - self.start_time() + self.job_manager.changed_queue.put(self) + self.job_manager.job_done(self) + + def report_progress(self, percent, msg=''): + self.notifications.put((percent, msg)) + self.job_manager.changed_queue.put(self) def run(self): self.start_work() try: self.result = self.func(*self.args, **self.kwargs) except (Exception, SystemExit), err: + self.failed = True + self.details = unicode(err) + '\n\n' + \ + traceback.format_exc() self.exception = err - self.traceback = traceback.format_exc() finally: self.job_done() + @property + def log_file(self): + return cStringIO.StringIO(self.details.encode('utf-8')) + class DeviceManager(Thread): @@ -113,7 +137,7 @@ class DeviceManager(Thread): job = self.next() if job is not None: self.current_job = job - self.device.set_progress_reporter(job.update_status) + self.device.set_progress_reporter(job.report_progress) self.current_job.run() self.current_job = None else: diff --git a/src/calibre/gui2/dialogs/jobs.py b/src/calibre/gui2/dialogs/jobs.py deleted file mode 100644 index c6907c6b60..0000000000 --- a/src/calibre/gui2/dialogs/jobs.py +++ /dev/null @@ -1,66 +0,0 @@ -__license__ = 'GPL v3' -__copyright__ = '2008, Kovid Goyal ' -'''Display active jobs''' - -from PyQt4.QtCore import Qt, QObject, SIGNAL, QSize, QString, QTimer -from PyQt4.QtGui import QDialog, QAbstractItemDelegate, QStyleOptionProgressBarV2, \ - QApplication, QStyle - -from calibre.gui2.dialogs.jobs_ui import Ui_JobsDialog -from calibre import __appname__ - -class ProgressBarDelegate(QAbstractItemDelegate): - - def sizeHint(self, option, index): - return QSize(120, 30) - - def paint(self, painter, option, index): - opts = QStyleOptionProgressBarV2() - opts.rect = option.rect - opts.minimum = 1 - opts.maximum = 100 - opts.textVisible = True - percent, ok = index.model().data(index, Qt.DisplayRole).toInt() - if not ok: - percent = 0 - opts.progress = percent - opts.text = QString(_('Unavailable') if percent == 0 else '%d%%'%percent) - QApplication.style().drawControl(QStyle.CE_ProgressBar, opts, painter) - -class JobsDialog(QDialog, Ui_JobsDialog): - def __init__(self, window, model): - QDialog.__init__(self, window) - Ui_JobsDialog.__init__(self) - self.setupUi(self) - self.jobs_view.setModel(model) - self.model = model - self.setWindowModality(Qt.NonModal) - self.setWindowTitle(__appname__ + _(' - Jobs')) - QObject.connect(self.jobs_view.model(), SIGNAL('modelReset()'), - self.jobs_view.resizeColumnsToContents) - QObject.connect(self.kill_button, SIGNAL('clicked()'), - self.kill_job) - QObject.connect(self, SIGNAL('kill_job(int, PyQt_PyObject)'), - self.jobs_view.model().kill_job) - self.pb_delegate = ProgressBarDelegate(self) - self.jobs_view.setItemDelegateForColumn(2, self.pb_delegate) - - self.running_time_timer = QTimer(self) - self.connect(self.running_time_timer, SIGNAL('timeout()'), self.update_running_time) - self.running_time_timer.start(1000) - - def update_running_time(self, *args): - try: - self.model.running_time_updated() - except: # Raises random exceptions on OS X - pass - - def kill_job(self): - for index in self.jobs_view.selectedIndexes(): - row = index.row() - self.model.kill_job(row, self) - return - - def closeEvent(self, e): - self.jobs_view.write_settings() - e.accept() diff --git a/src/calibre/gui2/dialogs/jobs.ui b/src/calibre/gui2/dialogs/jobs.ui index a14be17f78..3716c9fbb9 100644 --- a/src/calibre/gui2/dialogs/jobs.ui +++ b/src/calibre/gui2/dialogs/jobs.ui @@ -1,7 +1,8 @@ - + + JobsDialog - - + + 0 0 @@ -9,31 +10,32 @@ 542 - + Active Jobs - - :/images/jobs.svg + + + :/images/jobs.svg:/images/jobs.svg - + - - + + Qt::NoContextMenu - + QAbstractItemView::NoEditTriggers - + true - + QAbstractItemView::SingleSelection - + QAbstractItemView::SelectRows - + 32 32 @@ -42,12 +44,19 @@ - - + + &Stop selected job + + + + Show job &details + + + @@ -58,7 +67,7 @@ - + diff --git a/src/calibre/gui2/dialogs/user_profiles.py b/src/calibre/gui2/dialogs/user_profiles.py index cab28c66eb..c46b4ba392 100644 --- a/src/calibre/gui2/dialogs/user_profiles.py +++ b/src/calibre/gui2/dialogs/user_profiles.py @@ -86,7 +86,7 @@ class UserProfiles(ResizableDialog, Ui_Dialog): self.source_code.setPlainText('') else: self.source_code.setPlainText(src) - #self.highlighter = PythonHighlighter(self.source_code.document()) + self.highlighter = PythonHighlighter(self.source_code.document()) self.stacks.setCurrentIndex(1) self.toggle_mode_button.setText(_('Switch to Basic mode')) diff --git a/src/calibre/gui2/jobs.py b/src/calibre/gui2/jobs.py new file mode 100644 index 0000000000..6be6188ab9 --- /dev/null +++ b/src/calibre/gui2/jobs.py @@ -0,0 +1,257 @@ +#!/usr/bin/env python +__license__ = 'GPL v3' +__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net' +__docformat__ = 'restructuredtext en' + +''' +Job management. +''' + +from Queue import Empty, Queue + +from PyQt4.Qt import QAbstractTableModel, QVariant, QModelIndex, Qt, \ + QTimer, SIGNAL, QIcon, QDialog, QAbstractItemDelegate, QApplication, \ + QSize, QStyleOptionProgressBarV2, QString, QStyle + +from calibre.utils.ipc.server import Server +from calibre.utils.ipc.job import ParallelJob +from calibre.gui2 import Dispatcher, error_dialog, NONE +from calibre.gui2.device import DeviceJob +from calibre.gui2.dialogs.jobs_ui import Ui_JobsDialog +from calibre import __appname__ + +class JobManager(QAbstractTableModel): + + def __init__(self): + QAbstractTableModel.__init__(self) + self.wait_icon = QVariant(QIcon(':/images/jobs.svg')) + self.running_icon = QVariant(QIcon(':/images/exec.svg')) + self.error_icon = QVariant(QIcon(':/images/dialog_error.svg')) + self.done_icon = QVariant(QIcon(':/images/ok.svg')) + + self.jobs = [] + self.add_job = Dispatcher(self._add_job) + self.job_done = Dispatcher(self._job_done) + self.server = Server(self.job_done) + self.changed_queue = Queue() + + self.timer = QTimer(self) + self.connect(self.timer, SIGNAL('timeout()'), self.update, + Qt.QueuedConnection) + self.timer.start(1000) + + def columnCount(self, parent=QModelIndex()): + return 4 + + def rowCount(self, parent=QModelIndex()): + return len(self.jobs) + + def headerData(self, section, orientation, role): + if role != Qt.DisplayRole: + return NONE + if orientation == Qt.Horizontal: + if section == 0: text = _('Job') + elif section == 1: text = _('Status') + elif section == 2: text = _('Progress') + elif section == 3: text = _('Running time') + return QVariant(text) + else: + return QVariant(section+1) + + def data(self, index, role): + try: + if role not in (Qt.DisplayRole, Qt.DecorationRole): + return NONE + row, col = index.row(), index.column() + job = self.jobs[row] + + if role == Qt.DisplayRole: + if col == 0: + desc = job.description + if not desc: + desc = _('Unknown job') + return QVariant(desc) + if col == 1: + return QVariant(job.status_text) + if col == 2: + return QVariant(job.percent) + if col == 3: + rtime = job.running_time + if rtime is None: + return NONE + return QVariant('%dm %ds'%(int(rtime)//60, int(rtime)%60)) + if role == Qt.DecorationRole and col == 0: + state = job.run_state + if state == job.WAITING: + return self.wait_icon + if state == job.RUNNING: + return self.running_icon + if job.killed or job.failed: + return self.error_icon + return self.done_icon + except: + import traceback + traceback.print_exc() + return NONE + + def update(self): + try: + self._update() + except BaseException: + pass + + def _update(self): + # Update running time + rows = set([]) + for i, j in enumerate(self.jobs): + if j.run_state == j.RUNNING: + idx = self.index(i, 3) + self.emit(SIGNAL('dataChanged(QModelIndex,QModelIndex)'), + idx, idx) + + # Update parallel jobs + jobs = set([]) + while True: + try: + jobs.add(self.server.changed_jobs_queue.get_nowait()) + except Empty: + break + while True: + try: + jobs.add(self.changed_queue.get_nowait()) + except Empty: + break + + if jobs: + needs_reset = False + for job in jobs: + orig_state = job.run_state + job.update() + if orig_state != job.run_state: + needs_reset = True + if needs_reset: + self.jobs.sort() + self.reset() + else: + for job in jobs: + idx = self.jobs.index(job) + self.emit(SIGNAL('dataChanged(QModelIndex,QModelIndex)'), + self.index(idx, 0), self.index(idx, 3)) + + + def _add_job(self, job): + self.emit(SIGNAL('layoutAboutToBeChanged()')) + self.jobs.append(job) + self.jobs.sort() + self.emit(SIGNAL('job_added(int)'), len(self.unfinished_jobs())) + self.emit(SIGNAL('layoutChanged()')) + + def done_jobs(self): + return [j for j in self.jobs if j.is_finished] + + def unfinished_jobs(self): + return [j for j in self.jobs if not j.is_finished] + + def row_to_job(self, row): + return self.jobs[row] + + def _job_done(self, job): + self.emit(SIGNAL('layoutAboutToBeChanged()')) + self.jobs.sort() + self.emit(SIGNAL('job_done(int)'), len(self.unfinished_jobs())) + self.emit(SIGNAL('layoutChanged()')) + + def has_device_jobs(self): + for job in self.jobs: + if job.is_running and isinstance(job, DeviceJob): + return True + return False + + def has_jobs(self): + for job in self.jobs: + if job.is_running: + return True + return False + + def run_job(self, done, name, args=[], kwargs={}, + description=''): + job = ParallelJob(name, description, done, args=args, kwargs=kwargs) + self.add_job(job) + self.server.add_job(job) + return job + + def launch_gui_app(self, name, args=[], kwargs={}, description=''): + job = ParallelJob(name, description, lambda x: x, + args=args, kwargs=kwargs) + self.server.run_job(job, gui=True, redirect_output=False) + + + def kill_job(self, row, view): + job = self.jobs[row] + if isinstance(job, DeviceJob): + return error_dialog(view, _('Cannot kill job'), + _('Cannot kill jobs that communicate with the device')).exec_() + if job.duration is not None: + return error_dialog(view, _('Cannot kill job'), + _('Job has already run')).exec_() + self.server.kill_job(job) + + def terminate_all_jobs(self): + self.server.killall() + + +class ProgressBarDelegate(QAbstractItemDelegate): + + def sizeHint(self, option, index): + return QSize(120, 30) + + def paint(self, painter, option, index): + opts = QStyleOptionProgressBarV2() + opts.rect = option.rect + opts.minimum = 1 + opts.maximum = 100 + opts.textVisible = True + percent, ok = index.model().data(index, Qt.DisplayRole).toInt() + if not ok: + percent = 0 + opts.progress = percent + opts.text = QString(_('Unavailable') if percent == 0 else '%d%%'%percent) + QApplication.style().drawControl(QStyle.CE_ProgressBar, opts, painter) + +class JobsDialog(QDialog, Ui_JobsDialog): + def __init__(self, window, model): + QDialog.__init__(self, window) + Ui_JobsDialog.__init__(self) + self.setupUi(self) + self.jobs_view.setModel(model) + self.model = model + self.setWindowModality(Qt.NonModal) + self.setWindowTitle(__appname__ + _(' - Jobs')) + self.connect(self.jobs_view.model(), SIGNAL('modelReset()'), + self.jobs_view.resizeColumnsToContents) + self.connect(self.kill_button, SIGNAL('clicked()'), + self.kill_job) + self.connect(self.details_button, SIGNAL('clicked()'), + self.show_details) + self.connect(self, SIGNAL('kill_job(int, PyQt_PyObject)'), + self.jobs_view.model().kill_job) + self.pb_delegate = ProgressBarDelegate(self) + self.jobs_view.setItemDelegateForColumn(2, self.pb_delegate) + + + def kill_job(self): + for index in self.jobs_view.selectedIndexes(): + row = index.row() + self.model.kill_job(row, self) + return + + def show_details(self): + for index in self.jobs_view.selectedIndexes(): + self.jobs_view.show_details(index) + return + + + + def closeEvent(self, e): + self.jobs_view.write_settings() + e.accept() diff --git a/src/calibre/gui2/jobs2.py b/src/calibre/gui2/jobs2.py deleted file mode 100644 index fc6ddb642e..0000000000 --- a/src/calibre/gui2/jobs2.py +++ /dev/null @@ -1,203 +0,0 @@ -#!/usr/bin/env python -__license__ = 'GPL v3' -__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net' -__docformat__ = 'restructuredtext en' - -''' -Job management. -''' -import time -from PyQt4.QtCore import QAbstractTableModel, QVariant, QModelIndex, Qt, SIGNAL -from PyQt4.QtGui import QIcon, QDialog - -from calibre.parallel import ParallelJob, Server -from calibre.gui2 import Dispatcher, error_dialog -from calibre.gui2.device import DeviceJob -from calibre.gui2.dialogs.job_view_ui import Ui_Dialog - -NONE = QVariant() - -class JobManager(QAbstractTableModel): - - def __init__(self): - QAbstractTableModel.__init__(self) - self.wait_icon = QVariant(QIcon(':/images/jobs.svg')) - self.running_icon = QVariant(QIcon(':/images/exec.svg')) - self.error_icon = QVariant(QIcon(':/images/dialog_error.svg')) - self.done_icon = QVariant(QIcon(':/images/ok.svg')) - - self.jobs = [] - self.server = Server() - self.add_job = Dispatcher(self._add_job) - self.status_update = Dispatcher(self._status_update) - self.start_work = Dispatcher(self._start_work) - self.job_done = Dispatcher(self._job_done) - - def columnCount(self, parent=QModelIndex()): - return 4 - - def rowCount(self, parent=QModelIndex()): - return len(self.jobs) - - def headerData(self, section, orientation, role): - if role != Qt.DisplayRole: - return NONE - if orientation == Qt.Horizontal: - if section == 0: text = _("Job") - elif section == 1: text = _("Status") - elif section == 2: text = _("Progress") - elif section == 3: text = _('Running time') - return QVariant(text) - else: - return QVariant(section+1) - - def data(self, index, role): - try: - if role not in (Qt.DisplayRole, Qt.DecorationRole): - return NONE - row, col = index.row(), index.column() - job = self.jobs[row] - - if role == Qt.DisplayRole: - if col == 0: - desc = job.description - if not desc: - desc = _('Unknown job') - return QVariant(desc) - if col == 1: - status = job.status() - if status == 'DONE': - st = _('Finished') - elif status == 'ERROR': - st = _('Error') - elif status == 'WAITING': - st = _('Waiting') - else: - st = _('Working') - return QVariant(st) - if col == 2: - pc = job.percent - if pc <=0: - percent = 0 - else: - percent = int(100*pc) - return QVariant(percent) - if col == 3: - if job.start_time is None: - return NONE - rtime = job.running_time if job.running_time is not None else \ - time.time() - job.start_time - return QVariant('%dm %ds'%(int(rtime)//60, int(rtime)%60)) - if role == Qt.DecorationRole and col == 0: - status = job.status() - if status == 'WAITING': - return self.wait_icon - if status == 'WORKING': - return self.running_icon - if status == 'ERROR': - return self.error_icon - if status == 'DONE': - return self.done_icon - except: - import traceback - traceback.print_exc() - return NONE - - def _add_job(self, job): - self.emit(SIGNAL('layoutAboutToBeChanged()')) - self.jobs.append(job) - self.jobs.sort() - self.emit(SIGNAL('job_added(int)'), self.rowCount()) - self.emit(SIGNAL('layoutChanged()')) - - def done_jobs(self): - return [j for j in self.jobs if j.status() in ['DONE', 'ERROR']] - - def row_to_job(self, row): - return self.jobs[row] - - def _start_work(self, job): - self.emit(SIGNAL('layoutAboutToBeChanged()')) - self.jobs.sort() - self.emit(SIGNAL('layoutChanged()')) - - def _job_done(self, job): - self.emit(SIGNAL('layoutAboutToBeChanged()')) - self.jobs.sort() - self.emit(SIGNAL('job_done(int)'), len(self.jobs) - len(self.done_jobs())) - self.emit(SIGNAL('layoutChanged()')) - - def _status_update(self, job): - try: - row = self.jobs.index(job) - except ValueError: # Job has been stopped - return - self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'), - self.index(row, 0), self.index(row, 3)) - - def running_time_updated(self, *args): - for job in self.jobs: - if not job.is_running: - continue - row = self.jobs.index(job) - self.emit(SIGNAL('dataChanged(QModelIndex, QModelIndex)'), - self.index(row, 3), self.index(row, 3)) - - def has_device_jobs(self): - for job in self.jobs: - if job.is_running and isinstance(job, DeviceJob): - return True - return False - - def has_jobs(self): - for job in self.jobs: - if job.is_running: - return True - return False - - def run_job(self, done, func, args=[], kwargs={}, - description=None): - job = ParallelJob(func, done, self, args=args, kwargs=kwargs, - description=description) - self.server.add_job(job) - return job - - - def output(self, job): - self.emit(SIGNAL('output_received()')) - - def kill_job(self, row, view): - job = self.jobs[row] - if isinstance(job, DeviceJob): - error_dialog(view, _('Cannot kill job'), - _('Cannot kill jobs that communicate with the device')).exec_() - return - if job.has_run: - error_dialog(view, _('Cannot kill job'), - _('Job has already run')).exec_() - return - if not job.is_running: - self.jobs.remove(job) - self.reset() - return - - - self.server.kill(job) - - def terminate_all_jobs(self): - pass - -class DetailView(QDialog, Ui_Dialog): - - def __init__(self, parent, job): - QDialog.__init__(self, parent) - self.setupUi(self) - self.setWindowTitle(job.description) - self.job = job - self.update() - - - def update(self): - self.log.setPlainText(self.job.console_text()) - vbar = self.log.verticalScrollBar() - vbar.setValue(vbar.maximum()) diff --git a/src/calibre/gui2/main.py b/src/calibre/gui2/main.py index 47276d2519..d64591bcd7 100644 --- a/src/calibre/gui2/main.py +++ b/src/calibre/gui2/main.py @@ -13,7 +13,7 @@ from PyQt4.Qt import Qt, SIGNAL, QObject, QCoreApplication, QUrl, QTimer, \ from PyQt4.QtSvg import QSvgRenderer from calibre import __version__, __appname__, islinux, sanitize_file_name, \ - iswindows, isosx + iswindows, isosx, prints from calibre.ptempfile import PersistentTemporaryFile from calibre.utils.config import prefs, dynamic from calibre.gui2 import APP_UID, warning_dialog, choose_files, error_dialog, \ @@ -32,10 +32,9 @@ from calibre.gui2.main_window import MainWindow, option_parser as _option_parser from calibre.gui2.main_ui import Ui_MainWindow from calibre.gui2.device import DeviceManager, DeviceMenu, DeviceGUI, Emailer from calibre.gui2.status import StatusBar -from calibre.gui2.jobs2 import JobManager +from calibre.gui2.jobs import JobManager, JobsDialog from calibre.gui2.dialogs.metadata_single import MetadataSingleDialog from calibre.gui2.dialogs.metadata_bulk import MetadataBulkDialog -from calibre.gui2.dialogs.jobs import JobsDialog from calibre.gui2.tools import convert_single_ebook, convert_bulk_ebook, \ fetch_scheduled_recipe from calibre.gui2.dialogs.config import ConfigDialog @@ -44,7 +43,6 @@ from calibre.gui2.dialogs.choose_format import ChooseFormatDialog from calibre.gui2.dialogs.book_info import BookInfo from calibre.ebooks import BOOK_EXTENSIONS from calibre.library.database2 import LibraryDatabase2, CoverCache -from calibre.parallel import JobKilled from calibre.gui2.dialogs.confirm_delete import confirm class SaveMenu(QMenu): @@ -626,9 +624,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI): ''' Called once device information has been read. ''' - if job.exception is not None: - self.device_job_exception(job) - return + if job.failed: + return self.device_job_exception(job) info, cp, fs = job.result self.location_view.model().update_devices(cp, fs) self.device_info = _('Connected ')+info[0] @@ -641,7 +638,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI): ''' Called once metadata has been read for all books on the device. ''' - if job.exception is not None: + if job.failed: if isinstance(job.exception, ExpatError): error_dialog(self, _('Device database corrupted'), _(''' @@ -823,8 +820,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI): Called once deletion is done on the device ''' for view in (self.memory_view, self.card_a_view, self.card_b_view): - view.model().deletion_done(job, bool(job.exception)) - if job.exception is not None: + view.model().deletion_done(job, job.failed) + if job.failed: self.device_job_exception(job) return @@ -993,9 +990,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI): progress.hide() def books_saved(self, job): - if job.exception is not None: - self.device_job_exception(job) - return + if job.failed: + return self.device_job_exception(job) ############################################################################ @@ -1013,9 +1009,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI): def scheduled_recipe_fetched(self, job): temp_files, fmt, recipe, callback = self.conversion_jobs.pop(job) pt = temp_files[0] - if job.exception is not None: - self.job_exception(job) - return + if job.failed: + return self.job_exception(job) id = self.library_view.model().add_news(pt.name, recipe) self.library_view.model().reset() sync = dynamic.get('news_to_be_synced', set([])) @@ -1098,9 +1093,8 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI): def book_auto_converted(self, job): temp_files, fmt, book_id, on_card = self.conversion_jobs.pop(job) try: - if job.exception is not None: - self.job_exception(job) - return + if job.failed: + return self.job_exception(job) data = open(temp_files[0].name, 'rb') self.library_view.model().db.add_format(book_id, fmt, data, index_is_id=True) data.close() @@ -1122,7 +1116,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI): def book_converted(self, job): temp_files, fmt, book_id = self.conversion_jobs.pop(job) try: - if job.exception is not None: + if job.failed: self.job_exception(job) return data = open(temp_files[-1].name, 'rb') @@ -1151,7 +1145,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI): self._view_file(fmt_path) def book_downloaded_for_viewing(self, job): - if job.exception: + if job.failed: self.device_job_exception(job) return self._view_file(job.result) @@ -1165,12 +1159,11 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI): args.append('--raise-window') if name is not None: args.append(name) - self.job_manager.server.run_free_job(viewer, - kwdargs=dict(args=args)) + self.job_manager.launch_gui_app(viewer, + kwargs=dict(args=args)) else: QDesktopServices.openUrl(QUrl.fromLocalFile(name))#launch(name) - - time.sleep(5) # User feedback + time.sleep(2) # User feedback finally: self.unsetCursor() @@ -1395,7 +1388,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI): ''' try: if 'Could not read 32 bytes on the control bus.' in \ - unicode(job.exception): + unicode(job.details): error_dialog(self, _('Error talking to device'), _('There was a temporary error talking to the ' 'device. Please unplug and reconnect the device ' @@ -1404,16 +1397,16 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI): except: pass try: - print >>sys.stderr, job.console_text() + prints(job.details, file=sys.stderr) except: pass if not self.device_error_dialog.isVisible(): - self.device_error_dialog.set_message(job.gui_text()) + self.device_error_dialog.set_message(job.details) self.device_error_dialog.show() def job_exception(self, job): try: - if job.exception[0] == 'DRMError': + if 'calibre.ebooks.DRMError' in job.details: error_dialog(self, _('Conversion Error'), _('

Could not convert: %s

It is a ' 'DRMed book. You must first remove the ' @@ -1423,23 +1416,15 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI): return except: pass - only_msg = getattr(job.exception, 'only_msg', False) + if job.killed: + return try: - print job.console_text() + prints(job.details, file=sys.stderr) except: pass - if only_msg: - try: - exc = unicode(job.exception) - except: - exc = repr(job.exception) - error_dialog(self, _('Conversion Error'), exc).exec_() - return - if isinstance(job.exception, JobKilled): - return error_dialog(self, _('Conversion Error'), - _('Failed to process')+': '+unicode(job.description), - det_msg=job.console_text()).exec_() + _('Failed')+': '+unicode(job.description), + det_msg=job.details).exec_() def initialize_database(self): @@ -1555,7 +1540,7 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI): def shutdown(self, write_settings=True): if write_settings: self.write_settings() - self.job_manager.terminate_all_jobs() + self.job_manager.server.close() self.device_manager.keep_going = False self.cover_cache.stop() self.hide() diff --git a/src/calibre/gui2/status.py b/src/calibre/gui2/status.py index 7b3dac4dc8..11b442fd17 100644 --- a/src/calibre/gui2/status.py +++ b/src/calibre/gui2/status.py @@ -10,10 +10,10 @@ from calibre.gui2 import qstring_to_unicode, config class BookInfoDisplay(QWidget): class BookCoverDisplay(QLabel): - + WIDTH = 81 HEIGHT = 108 - + def __init__(self, coverpath=':/images/book.svg'): QLabel.__init__(self) self.default_pixmap = QPixmap(coverpath).scaled(self.__class__.WIDTH, @@ -23,42 +23,42 @@ class BookInfoDisplay(QWidget): self.setScaledContents(True) self.setMaximumHeight(self.HEIGHT) self.setPixmap(self.default_pixmap) - - + + def setPixmap(self, pixmap): width, height = fit_image(pixmap.width(), pixmap.height(), self.WIDTH, self.HEIGHT)[1:] self.setMaximumHeight(height) self.setMaximumWidth(width) QLabel.setPixmap(self, pixmap) - + try: aspect_ratio = pixmap.width()/float(pixmap.height()) except ZeroDivisionError: aspect_ratio = 1 self.setMaximumWidth(int(aspect_ratio*self.HEIGHT)) - + def sizeHint(self): return QSize(self.__class__.WIDTH, self.__class__.HEIGHT) - - + + class BookDataDisplay(QLabel): def __init__(self): QLabel.__init__(self) self.setText('') self.setWordWrap(True) self.setSizePolicy(QSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)) - + def mouseReleaseEvent(self, ev): self.emit(SIGNAL('mr(int)'), 1) - + WEIGHTS = collections.defaultdict(lambda : 100) WEIGHTS[_('Path')] = 0 WEIGHTS[_('Formats')] = 1 WEIGHTS[_('Comments')] = 2 WEIGHTS[_('Series')] = 3 WEIGHTS[_('Tags')] = 4 - + def __init__(self, clear_message): QWidget.__init__(self) self.setCursor(Qt.PointingHandCursor) @@ -74,16 +74,16 @@ class BookInfoDisplay(QWidget): self.data = {} self.setVisible(False) self._layout.setAlignment(self.cover_display, Qt.AlignTop|Qt.AlignLeft) - + def mouseReleaseEvent(self, ev): self.emit(SIGNAL('show_book_info()')) - + def show_data(self, data): if data.has_key('cover'): self.cover_display.setPixmap(QPixmap.fromImage(data.pop('cover'))) else: self.cover_display.setPixmap(self.cover_display.default_pixmap) - + rows = u'' self.book_data.setText('') self.data = data.copy() @@ -97,7 +97,7 @@ class BookInfoDisplay(QWidget): txt = txt.decode(preferred_encoding, 'replace') rows += u'%s:%s'%(key, txt) self.book_data.setText(u''+rows+u'
') - + self.clear_message() self.book_data.updateGeometry() self.updateGeometry() @@ -113,7 +113,7 @@ class MovieButton(QFrame): self.movie = movie self.layout().addWidget(self.movie_widget) self.jobs = QLabel(''+_('Jobs:')+' 0') - self.jobs.setAlignment(Qt.AlignHCenter|Qt.AlignBottom) + self.jobs.setAlignment(Qt.AlignHCenter|Qt.AlignBottom) self.layout().addWidget(self.jobs) self.layout().setAlignment(self.jobs, Qt.AlignHCenter) self.jobs.setMargin(0) @@ -125,8 +125,8 @@ class MovieButton(QFrame): movie.start() movie.setPaused(True) self.jobs_dialog.jobs_view.restore_column_widths() - - + + def mouseReleaseEvent(self, event): if self.jobs_dialog.isVisible(): self.jobs_dialog.jobs_view.write_settings() @@ -137,7 +137,7 @@ class MovieButton(QFrame): self.jobs_dialog.jobs_view.restore_column_widths() class CoverFlowButton(QToolButton): - + def __init__(self, parent=None): QToolButton.__init__(self, parent) self.setIconSize(QSize(80, 80)) @@ -149,17 +149,17 @@ class CoverFlowButton(QToolButton): self.connect(self, SIGNAL('toggled(bool)'), self.adjust_tooltip) self.adjust_tooltip(False) self.setCursor(Qt.PointingHandCursor) - + def adjust_tooltip(self, on): tt = _('Click to turn off Cover Browsing') if on else _('Click to browse books by their covers') self.setToolTip(tt) - + def disable(self, reason): self.setDisabled(True) self.setToolTip(_('

Browsing books by their covers is disabled.
Import of pictureflow module failed:
')+reason) - + class TagViewButton(QToolButton): - + def __init__(self, parent=None): QToolButton.__init__(self, parent) self.setIconSize(QSize(80, 80)) @@ -170,10 +170,10 @@ class TagViewButton(QToolButton): self.setCheckable(True) self.setChecked(False) self.setAutoRaise(True) - + class StatusBar(QStatusBar): - + def __init__(self, jobs_dialog, systray=None): QStatusBar.__init__(self) self.systray = systray @@ -192,11 +192,11 @@ class StatusBar(QStatusBar): self.addWidget(self.scroll_area, 100) self.setMinimumHeight(120) self.setMaximumHeight(120) - - + + def reset_info(self): self.book_info.show_data({}) - + def showMessage(self, msg, timeout=0): ret = QStatusBar.showMessage(self, msg, timeout) if self.systray is not None and not config['disable_tray_notification']: @@ -207,39 +207,38 @@ class StatusBar(QStatusBar): msg = msg.encode('utf-8') self.systray.showMessage('calibre', msg, self.systray.Information, 10000) return ret - + def jobs(self): src = qstring_to_unicode(self.movie_button.jobs.text()) return int(re.search(r'\d+', src).group()) - + def show_book_info(self): self.emit(SIGNAL('show_book_info()')) - + def job_added(self, nnum): jobs = self.movie_button.jobs src = qstring_to_unicode(jobs.text()) num = self.jobs() - nnum = num + 1 text = src.replace(str(num), str(nnum)) jobs.setText(text) if self.movie_button.movie.state() == QMovie.Paused: self.movie_button.movie.setPaused(False) - - def job_done(self, running): + + def job_done(self, nnum): jobs = self.movie_button.jobs src = qstring_to_unicode(jobs.text()) num = self.jobs() - text = src.replace(str(num), str(running)) + text = src.replace(str(num), str(nnum)) jobs.setText(text) - if running == 0: + if nnum == 0: self.no_more_jobs() - + def no_more_jobs(self): if self.movie_button.movie.state() == QMovie.Running: self.movie_button.movie.jumpToFrame(0) self.movie_button.movie.setPaused(True) QCoreApplication.instance().alert(self, 5000) - + if __name__ == '__main__': # Used to create the animated status icon from PyQt4.Qt import QApplication, QPainter, QSvgRenderer, QColor @@ -280,4 +279,4 @@ if __name__ == '__main__': os.remove(file) import sys create_mng(sys.argv[1]) - + diff --git a/src/calibre/gui2/widgets.py b/src/calibre/gui2/widgets.py index db36daa784..2e5c982791 100644 --- a/src/calibre/gui2/widgets.py +++ b/src/calibre/gui2/widgets.py @@ -4,16 +4,16 @@ __copyright__ = '2008, Kovid Goyal ' Miscellaneous widgets used in the GUI ''' import re, os, traceback -from PyQt4.QtGui import QListView, QIcon, QFont, QLabel, QListWidget, \ +from PyQt4.Qt import QListView, QIcon, QFont, QLabel, QListWidget, \ QListWidgetItem, QTextCharFormat, QApplication, \ - QSyntaxHighlighter, QCursor, QColor, QWidget, QDialog, \ - QPixmap, QMovie, QPalette -from PyQt4.QtCore import QAbstractListModel, QVariant, Qt, SIGNAL, \ + QSyntaxHighlighter, QCursor, QColor, QWidget, \ + QPixmap, QMovie, QPalette, QTimer, QDialog, \ + QAbstractListModel, QVariant, Qt, SIGNAL, \ QRegExp, QSettings, QSize, QModelIndex -from calibre.gui2.jobs2 import DetailView from calibre.gui2 import human_readable, NONE, TableView, \ qstring_to_unicode, error_dialog +from calibre.gui2.dialogs.job_view_ui import Ui_Dialog from calibre.gui2.filename_pattern_ui import Ui_Form from calibre import fit_image from calibre.utils.fontconfig import find_font_families @@ -249,6 +249,31 @@ class LocationView(QListView): if 0 <= row and row <= 3: self.model().location_changed(row) +class DetailView(QDialog, Ui_Dialog): + + def __init__(self, parent, job): + QDialog.__init__(self, parent) + self.setupUi(self) + self.setWindowTitle(job.description) + self.job = job + self.next_pos = 0 + self.update() + self.timer = QTimer(self) + self.connect(self.timer, SIGNAL('timeout()'), self.update) + self.timer.start(1000) + + + def update(self): + f = self.job.log_file + f.seek(self.next_pos) + more = f.read() + self.next_pos = f.tell() + if more: + self.log.appendPlainText(more.decode('utf-8', 'replace')) + vbar = self.log.verticalScrollBar() + vbar.setValue(vbar.maximum()) + + class JobsView(TableView): def __init__(self, parent): @@ -259,7 +284,6 @@ class JobsView(TableView): row = index.row() job = self.model().row_to_job(row) d = DetailView(self, job) - self.connect(self.model(), SIGNAL('output_received()'), d.update) d.exec_() @@ -539,12 +563,12 @@ class PythonHighlighter(QSyntaxHighlighter): return for regex, format in PythonHighlighter.Rules: - i = text.indexOf(regex) + i = regex.indexIn(text) while i >= 0: length = regex.matchedLength() self.setFormat(i, length, PythonHighlighter.Formats[format]) - i = text.indexOf(regex, i + length) + i = regex.indexIn(text, i + length) # Slow but good quality highlighting for comments. For more # speed, comment this out and add the following to __init__: @@ -569,12 +593,12 @@ class PythonHighlighter(QSyntaxHighlighter): self.setCurrentBlockState(NORMAL) - if text.indexOf(self.stringRe) != -1: + if self.stringRe.indexIn(text) != -1: return # This is fooled by triple quotes inside single quoted strings - for i, state in ((text.indexOf(self.tripleSingleRe), + for i, state in ((self.tripleSingleRe.indexIn(text), TRIPLESINGLE), - (text.indexOf(self.tripleDoubleRe), + (self.tripleDoubleRe.indexIn(text), TRIPLEDOUBLE)): if self.previousBlockState() == state: if i == -1: diff --git a/src/calibre/linux.py b/src/calibre/linux.py index 215236e83d..9582651ca0 100644 --- a/src/calibre/linux.py +++ b/src/calibre/linux.py @@ -29,7 +29,7 @@ entry_points = { 'calibre-debug = calibre.debug:main', 'calibredb = calibre.library.cli:main', 'calibre-fontconfig = calibre.utils.fontconfig:main', - 'calibre-parallel = calibre.parallel:main', + 'calibre-parallel = calibre.utils.ipc.worker:main', 'calibre-customize = calibre.customize.ui:main', 'calibre-complete = calibre.utils.complete:main', 'pdfmanipulate = calibre.ebooks.pdf.manipulate.cli:main', diff --git a/src/calibre/parallel.py b/src/calibre/parallel.py deleted file mode 100644 index 0ec7ed09cc..0000000000 --- a/src/calibre/parallel.py +++ /dev/null @@ -1,980 +0,0 @@ -from __future__ import with_statement -__license__ = 'GPL v3' -__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net' -__docformat__ = 'restructuredtext en' - -''' -Used to run jobs in parallel in separate processes. Features output streaming, -support for progress notification as well as job killing. The worker processes -are controlled via a simple protocol run over sockets. The control happens -mainly in two class, :class:`Server` and :class:`Overseer`. The worker is -encapsulated in the function :function:`worker`. Every worker process -has the environment variable :envvar:`CALIBRE_WORKER` defined. - -The worker control protocol has two modes of operation. In the first mode, the -worker process listens for commands from the controller process. The controller -process can either hand off a job to the worker or tell the worker to die. -Once a job is handed off to the worker, the protocol enters the second mode, where -the controller listens for messages from the worker. The worker can send progress updates -as well as console output (i.e. text that would normally have been written to stdout -or stderr by the job). Once the job completes (or raises an exception) the worker -returns the result (or exception) to the controller and the protocol reverts to the first mode. - -In the second mode, the controller can also send the worker STOP messages, in which case -the worker interrupts the job and dies. The sending of progress and console output messages -is buffered and asynchronous to prevent the job from being IO bound. -''' -import sys, os, gc, cPickle, traceback, cStringIO, time, signal, \ - subprocess, socket, collections, binascii, re, thread, tempfile, atexit -from select import select -from threading import RLock, Thread, Event -from math import ceil - -from calibre.ptempfile import PersistentTemporaryFile -from calibre import iswindows, detect_ncpus, isosx, preferred_encoding -from calibre.utils.config import prefs - -DEBUG = False - -#: A mapping from job names to functions that perform the jobs -PARALLEL_FUNCS = { - 'lrfviewer' : - ('calibre.gui2.lrf_renderer.main', 'main', {}, None), - - 'ebook-viewer' : - ('calibre.gui2.viewer.main', 'main', {}, None), - - 'render_pages' : - ('calibre.ebooks.comic.input', 'render_pages', {}, 'notification'), - - 'ebook-convert' : - ('calibre.ebooks.conversion.cli', 'main', {}, None), - - 'gui_convert' : - ('calibre.gui2.convert.gui_conversion', 'gui_convert', {}, 'notification'), -} - - -isfrozen = hasattr(sys, 'frozen') -isworker = False - -win32event = __import__('win32event') if iswindows else None -win32process = __import__('win32process') if iswindows else None -msvcrt = __import__('msvcrt') if iswindows else None - -SOCKET_TYPE = socket.AF_UNIX if not iswindows else socket.AF_INET - -class WorkerStatus(object): - ''' - A platform independent class to control child processes. Provides the - methods: - - .. method:: WorkerStatus.is_alive() - - Return True is the child process is alive (i.e. it hasn't exited and returned a return code). - - .. method:: WorkerStatus.returncode() - - Wait for the child process to exit and return its return code (blocks until child returns). - - .. method:: WorkerStatus.kill() - - Forcibly terminates child process using operating system specific semantics. - ''' - - def __init__(self, obj): - ''' - `obj`: On windows a process handle, on unix a subprocess.Popen object. - ''' - self.obj = obj - self.win32process = win32process # Needed if kill is called during shutdown of interpreter - self.os = os - self.signal = signal - ext = 'windows' if iswindows else 'unix' - for func in ('is_alive', 'returncode', 'kill'): - setattr(self, func, getattr(self, func+'_'+ext)) - - def is_alive_unix(self): - return self.obj.poll() == None - - def returncode_unix(self): - return self.obj.wait() - - def kill_unix(self): - os.kill(self.obj.pid, self.signal.SIGKILL) - - def is_alive_windows(self): - return win32event.WaitForSingleObject(self.obj, 0) != win32event.WAIT_OBJECT_0 - - def returncode_windows(self): - return win32process.GetExitCodeProcess(self.obj) - - def kill_windows(self, returncode=-1): - self.win32process.TerminateProcess(self.obj, returncode) - -class WorkerMother(object): - ''' - Platform independent object for launching child processes. All processes - have the environment variable :envvar:`CALIBRE_WORKER` set. - - ..method:: WorkerMother.spawn_free_spirit(arg) - - Launch a non monitored process with argument `arg`. - - ..method:: WorkerMother.spawn_worker(arg) - - Launch a monitored and controllable process with argument `arg`. - ''' - - def __init__(self): - ext = 'windows' if iswindows else 'osx' if isosx else 'linux' - self.os = os # Needed incase cleanup called when interpreter is shutting down - self.env = {} - if iswindows: - self.executable = os.path.join(os.path.dirname(sys.executable), - 'calibre-parallel.exe' if isfrozen else 'Scripts\\calibre-parallel.exe') - elif isosx: - self.executable = self.gui_executable = sys.executable - self.prefix = '' - if isfrozen: - fd = os.path.realpath(getattr(sys, 'frameworks_dir')) - contents = os.path.dirname(fd) - self.gui_executable = os.path.join(contents, 'MacOS', - os.path.basename(sys.executable)) - contents = os.path.join(contents, 'console.app', 'Contents') - exe = os.path.basename(sys.executable) - if 'python' not in exe: - exe = 'python' - self.executable = os.path.join(contents, 'MacOS', exe) - - resources = os.path.join(contents, 'Resources') - fd = os.path.join(contents, 'Frameworks') - sp = os.path.join(resources, 'lib', 'python'+sys.version[:3], 'site-packages.zip') - self.prefix += 'import sys; sys.frameworks_dir = "%s"; sys.frozen = "macosx_app"; '%fd - self.prefix += 'sys.path.insert(0, %s); '%repr(sp) - if fd not in os.environ['PATH']: - self.env['PATH'] = os.environ['PATH']+':'+fd - self.env['PYTHONHOME'] = resources - self.env['MAGICK_HOME'] = os.path.join(fd, 'ImageMagick') - self.env['DYLD_LIBRARY_PATH'] = os.path.join(fd, 'ImageMagick', 'lib') - else: - self.executable = os.path.join(getattr(sys, 'frozen_path'), 'calibre-parallel') \ - if isfrozen else 'calibre-parallel' - if isfrozen: - self.env['LD_LIBRARY_PATH'] = getattr(sys, 'frozen_path') + ':' + os.environ.get('LD_LIBRARY_PATH', '') - - self.spawn_worker_windows = lambda arg : self.spawn_free_spirit_windows(arg, type='worker') - self.spawn_worker_linux = lambda arg : self.spawn_free_spirit_linux(arg, type='worker') - self.spawn_worker_osx = lambda arg : self.spawn_free_spirit_osx(arg, type='worker') - - for func in ('spawn_free_spirit', 'spawn_worker'): - setattr(self, func, getattr(self, func+'_'+ext)) - - - def cleanup_child_windows(self, child, name=None, fd=None): - try: - child.kill() - except: - pass - try: - if fd is not None: - self.os.close(fd) - except: - pass - try: - if name is not None and os.path.exists(name): - self.os.unlink(name) - except: - pass - - def cleanup_child_linux(self, child): - try: - child.kill() - except: - pass - - def get_env(self): - env = dict(os.environ) - env['CALIBRE_WORKER'] = '1' - env['ORIGWD'] = os.path.abspath(os.getcwd()) - if hasattr(self, 'env'): - env.update(self.env) - return env - - def spawn_free_spirit_osx(self, arg, type='free_spirit'): - script = ('from calibre.parallel import main; ' - 'main(args=["calibre-parallel", %s]);')%repr(arg) - exe = self.gui_executable if type == 'free_spirit' else self.executable - cmdline = [exe, '-c', self.prefix+script] - child = WorkerStatus(subprocess.Popen(cmdline, env=self.get_env())) - atexit.register(self.cleanup_child_linux, child) - return child - - def spawn_free_spirit_linux(self, arg, type='free_spirit'): - cmdline = [self.executable, arg] - child = WorkerStatus(subprocess.Popen(cmdline, - env=self.get_env(), cwd=getattr(sys, 'frozen_path', None))) - atexit.register(self.cleanup_child_linux, child) - return child - - def spawn_free_spirit_windows(self, arg, type='free_spirit'): - priority = {'high':win32process.HIGH_PRIORITY_CLASS, 'normal':win32process.NORMAL_PRIORITY_CLASS, - 'low':win32process.IDLE_PRIORITY_CLASS}[prefs['worker_process_priority']] - fd, name = tempfile.mkstemp('.log', 'calibre_'+type+'_') - handle = msvcrt.get_osfhandle(fd) - si = win32process.STARTUPINFO() - si.hStdOutput = handle - si.hStdError = handle - cmdline = self.executable + ' ' + str(arg) - hProcess = \ - win32process.CreateProcess( - None, # Application Name - cmdline, # Command line - None, # processAttributes - None, # threadAttributes - 1, # bInheritHandles - win32process.CREATE_NO_WINDOW|priority, # Dont want ugly console popping up - self.get_env(), # New environment - None, # Current directory - si - )[0] - child = WorkerStatus(hProcess) - atexit.register(self.cleanup_child_windows, child, name, fd) - return child - - -mother = WorkerMother() - -_comm_lock = RLock() -def write(socket, msg, timeout=5): - ''' - Write a message on socket. If `msg` is unicode, it is encoded in utf-8. - Raises a `RuntimeError` if the socket is not ready for writing or the writing fails. - `msg` is broken into chunks of size 4096 and sent. The :function:`read` function - automatically re-assembles the chunks into whole message. - ''' - if isworker: - _comm_lock.acquire() - try: - if isinstance(msg, unicode): - msg = msg.encode('utf-8') - if DEBUG: - print >>sys.__stdout__, 'write(%s):'%('worker' if isworker else 'overseer'), repr(msg) - length = None - while len(msg) > 0: - if length is None: - length = len(msg) - chunk = ('%-12d'%length) + msg[:4096-12] - msg = msg[4096-12:] - else: - chunk, msg = msg[:4096], msg[4096:] - w = select([], [socket], [], timeout)[1] - if not w: - raise RuntimeError('Write to socket timed out') - if socket.sendall(chunk) is not None: - raise RuntimeError('Failed to write chunk to socket') - finally: - if isworker: - _comm_lock.release() - -def read(socket, timeout=5): - ''' - Read a message from `socket`. The message must have been sent with the :function:`write` - function. Raises a `RuntimeError` if the message is corrupted. Can return an - empty string. - ''' - if isworker: - _comm_lock.acquire() - try: - buf = cStringIO.StringIO() - length = None - while select([socket],[],[],timeout)[0]: - msg = socket.recv(4096) - if not msg: - break - if length is None: - try: - length, msg = int(msg[:12]), msg[12:] - except ValueError: - if DEBUG: - print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), 'no length in', msg - return '' - buf.write(msg) - if buf.tell() >= length: - break - if not length: - if DEBUG: - print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), 'nothing' - return '' - msg = buf.getvalue()[:length] - if len(msg) < length: - raise RuntimeError('Corrupted packet received') - if DEBUG: - print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), repr(msg) - return msg - finally: - if isworker: - _comm_lock.release() - -class RepeatingTimer(Thread): - ''' - Calls a specified function repeatedly at a specified interval. Runs in a - daemon thread (i.e. the interpreter can exit while it is still running). - Call :meth:`start()` to start it. - ''' - - def repeat(self): - while True: - self.event.wait(self.interval) - if self.event.isSet(): - break - self.action() - - def __init__(self, interval, func, name): - self.event = Event() - self.interval = interval - self.action = func - Thread.__init__(self, target=self.repeat, name=name) - self.setDaemon(True) - -class ControlError(Exception): - pass - -class Overseer(object): - ''' - Responsible for controlling worker processes. The main interface is the - methods, :meth:`initialize_job`, :meth:`control`. - ''' - - KILL_RESULT = 'Server: job killed by user|||#@#$%&*)*(*$#$%#$@&' - INTERVAL = 0.1 - - def __init__(self, server, port, timeout=5): - self.worker_status = mother.spawn_worker('127.0.0.1:'+str(port)) - self.socket = server.accept()[0] - # Needed if terminate called when interpreter is shutting down - self.os = os - self.signal = signal - self.on_probation = False - self.terminated = False - - self.working = False - self.timeout = timeout - self.last_job_time = time.time() - self._stop = False - if not select([self.socket], [], [], 120)[0]: - raise RuntimeError(_('Could not launch worker process.')) - ID = self.read().split(':') - if ID[0] != 'CALIBRE_WORKER': - raise RuntimeError('Impostor') - self.worker_pid = int(ID[1]) - self.write('OK') - if self.read() != 'WAITING': - raise RuntimeError('Worker sulking') - - def terminate(self): - 'Kill worker process.' - self.terminated = True - try: - if self.socket: - self.write('STOP:') - time.sleep(1) - self.socket.shutdown(socket.SHUT_RDWR) - except: - pass - if iswindows: - win32api = __import__('win32api') - try: - handle = win32api.OpenProcess(1, False, self.worker_pid) - win32api.TerminateProcess(handle, -1) - except: - pass - else: - try: - try: - self.os.kill(self.worker_pid, self.signal.SIGKILL) - time.sleep(0.5) - finally: - self.worker_status.kill() - except: - pass - - - def write(self, msg, timeout=None): - write(self.socket, msg, timeout=self.timeout if timeout is None else timeout) - - def read(self, timeout=None): - return read(self.socket, timeout=self.timeout if timeout is None else timeout) - - def __eq__(self, other): - return hasattr(other, 'process') and hasattr(other, 'worker_pid') and self.worker_pid == other.worker_pid - - def is_viable(self): - if self.terminated: - return False - return self.worker_status.is_alive() - - def select(self, timeout=0): - return select([self.socket], [self.socket], [self.socket], timeout) - - def initialize_job(self, job): - ''' - Sends `job` to worker process. Can raise `ControlError` if worker process - does not respond appropriately. In this case, this Overseer is useless - and should be discarded. - - `job`: An instance of :class:`Job`. - ''' - self.working = True - self.write('JOB:'+cPickle.dumps((job.func, job.args, job.kwargs), -1)) - msg = self.read() - if msg != 'OK': - raise ControlError('Failed to initialize job on worker %d:%s'%(self.worker_pid, msg)) - self.job = job - self.last_report = time.time() - job.start_work() - - def control(self): - ''' - Listens for messages from the worker process and dispatches them - appropriately. If the worker process dies unexpectedly, returns a result - of None with a ControlError indicating the worker died. - - Returns a :class:`Result` instance or None, if the worker is still working. - ''' - if select([self.socket],[],[],0)[0]: - msg = self.read() - if msg: - self.on_probation = False - self.last_report = time.time() - else: - if self.on_probation: - self.terminate() - self.job.result = None - self.job.exception = ControlError('Worker process died unexpectedly') - return - else: - self.on_probation = True - return - word, msg = msg.partition(':')[0], msg.partition(':')[-1] - if word == 'PING': - self.write('OK') - return - elif word == 'RESULT': - self.write('OK') - self.job.result = cPickle.loads(msg) - return True - elif word == 'OUTPUT': - self.write('OK') - try: - self.job.output(''.join(cPickle.loads(msg))) - except: - self.job.output('Bad output message: '+ repr(msg)) - elif word == 'PROGRESS': - self.write('OK') - percent = None - try: - percent, msg = cPickle.loads(msg)[-1] - except: - print 'Bad progress update:', repr(msg) - if percent is not None: - self.job.update_status(percent, msg) - elif word == 'ERROR': - self.write('OK') - exception, tb = cPickle.loads(msg) - self.job.output(u'%s\n%s'%(exception, tb)) - self.job.exception, self.job.traceback = exception, tb - return True - else: - self.terminate() - self.job.exception = ControlError('Worker sent invalid msg: %s'%repr(msg)) - return - if not self.worker_status.is_alive() or time.time() - self.last_report > 380: - self.terminate() - self.job.exception = ControlError('Worker process died unexpectedly') - return - -class JobKilled(Exception): - pass - -class Job(object): - - def __init__(self, job_done, job_manager=None, - args=[], kwargs={}, description=None): - self.args = args - self.kwargs = kwargs - self._job_done = job_done - self.job_manager = job_manager - self.is_running = False - self.has_run = False - self.percent = -1 - self.msg = None - self.description = description - self.start_time = None - self.running_time = None - - self.result = self.exception = self.traceback = self.log = None - - def __cmp__(self, other): - sstatus, ostatus = self.status(), other.status() - if sstatus == ostatus or (self.has_run and other.has_run): - if self.start_time == other.start_time: - return cmp(id(self), id(other)) - return cmp(self.start_time, other.start_time) - if sstatus == 'WORKING': - return -1 - if ostatus == 'WORKING': - return 1 - if sstatus == 'WAITING': - return -1 - if ostatus == 'WAITING': - return 1 - - - def job_done(self): - self.is_running, self.has_run = False, True - self.running_time = (time.time() - self.start_time) if \ - self.start_time is not None else 0 - if self.job_manager is not None: - self.job_manager.job_done(self) - self._job_done(self) - - def start_work(self): - self.is_running = True - self.has_run = False - self.start_time = time.time() - if self.job_manager is not None: - self.job_manager.start_work(self) - - def update_status(self, percent, msg=None): - self.percent = percent - self.msg = msg - if self.job_manager is not None: - try: - self.job_manager.status_update(self) - except: - traceback.print_exc() - - def status(self): - if self.is_running: - return 'WORKING' - if not self.has_run: - return 'WAITING' - if self.has_run: - if self.exception is None: - return 'DONE' - return 'ERROR' - - def console_text(self): - ans = [u'Job: '] - if self.description: - ans[0] += self.description - if self.exception is not None: - header = unicode(self.exception.__class__.__name__) if \ - hasattr(self.exception, '__class__') else u'Error' - header = u'**%s**'%header - header += u': ' - try: - header += unicode(self.exception) - except: - header += unicode(repr(self.exception)) - ans.append(header) - if self.traceback: - ans.append(u'**Traceback**:') - ans.extend(self.traceback.split('\n')) - - if self.log: - if isinstance(self.log, str): - self.log = unicode(self.log, 'utf-8', 'replace') - ans.append(self.log) - return (u'\n'.join(ans)).encode('utf-8') - - def gui_text(self): - ans = [u'Job: '] - if self.description: - if not isinstance(self.description, unicode): - self.description = self.description.decode('utf-8', 'replace') - ans[0] += u'**%s**'%self.description - if self.exception is not None: - header = unicode(self.exception.__class__.__name__) if \ - hasattr(self.exception, '__class__') else u'Error' - header = u'**%s**'%header - header += u': ' - try: - header += unicode(self.exception) - except: - header += unicode(repr(self.exception)) - ans.append(header) - if self.traceback: - ans.append(u'**Traceback**:') - ans.extend(self.traceback.split('\n')) - if self.log: - ans.append(u'**Log**:') - if isinstance(self.log, str): - self.log = unicode(self.log, 'utf-8', 'replace') - ans.extend(self.log.split('\n')) - - ans = [x.decode(preferred_encoding, 'replace') if isinstance(x, str) else x for x in ans] - - return u'
'.join(ans) - - -class ParallelJob(Job): - - def __init__(self, func, *args, **kwargs): - Job.__init__(self, *args, **kwargs) - self.func = func - self.done = self.job_done - - def output(self, msg): - if not self.log: - self.log = u'' - if not isinstance(msg, unicode): - msg = msg.decode('utf-8', 'replace') - if msg: - self.log += msg - if self.job_manager is not None: - self.job_manager.output(self) - - -def remove_ipc_socket(path): - os = __import__('os') - if os.path.exists(path): - os.unlink(path) - -class Server(Thread): - - KILL_RESULT = Overseer.KILL_RESULT - START_PORT = 10013 - PID = os.getpid() - - - def __init__(self, number_of_workers=detect_ncpus()): - Thread.__init__(self) - self.setDaemon(True) - self.server_socket = socket.socket(SOCKET_TYPE, socket.SOCK_STREAM) - self.port = tempfile.mktemp(prefix='calibre_server')+'_%d_'%self.PID if not iswindows else self.START_PORT - while True: - try: - address = ('localhost', self.port) if iswindows else self.port - self.server_socket.bind(address) - break - except socket.error: - self.port += (1 if iswindows else '1') - if not iswindows: - atexit.register(remove_ipc_socket, self.port) - self.server_socket.listen(5) - self.number_of_workers = number_of_workers - self.pool, self.jobs, self.working = [], collections.deque(), [] - atexit.register(self.killall) - atexit.register(self.close) - self.job_lock = RLock() - self.overseer_lock = RLock() - self.working_lock = RLock() - self.result_lock = RLock() - self.pool_lock = RLock() - self.start() - - def split(self, tasks): - ''' - Split a list into a list of sub lists, with the number of sub lists being - no more than the number of workers this server supports. Each sublist contains - two tuples of the form (i, x) where x is an element fro the original list - and i is the index of the element x in the original list. - ''' - ans, count, pos = [], 0, 0 - delta = int(ceil(len(tasks)/float(self.number_of_workers))) - while count < len(tasks): - section = [] - for t in tasks[pos:pos+delta]: - section.append((count, t)) - count += 1 - ans.append(section) - pos += delta - return ans - - - def close(self): - try: - self.server_socket.shutdown(socket.SHUT_RDWR) - except: - pass - - def add_job(self, job): - with self.job_lock: - self.jobs.append(job) - if job.job_manager is not None: - job.job_manager.add_job(job) - - def poll(self): - ''' - Return True if the server has either working or queued jobs - ''' - with self.job_lock: - with self.working_lock: - return len(self.jobs) + len(self.working) > 0 - - def wait(self, sleep=1): - ''' - Wait until job queue is empty - ''' - while self.poll(): - time.sleep(sleep) - - def run(self): - while True: - job = None - with self.job_lock: - if len(self.jobs) > 0 and len(self.working) < self.number_of_workers: - job = self.jobs.popleft() - with self.pool_lock: - o = None - while self.pool: - o = self.pool.pop() - try: - o.initialize_job(job) - break - except: - o.terminate() - if o is None: - o = Overseer(self.server_socket, self.port) - try: - o.initialize_job(job) - except Exception, err: - o.terminate() - job.exception = err - job.traceback = traceback.format_exc() - job.done() - o = None - if o and o.is_viable(): - with self.working_lock: - self.working.append(o) - - with self.working_lock: - done = [] - for o in self.working: - try: - if o.control() is not None or o.job.exception is not None: - o.job.done() - done.append(o) - except Exception, err: - o.job.exception = err - o.job.traceback = traceback.format_exc() - o.terminate() - o.job.done() - done.append(o) - for o in done: - self.working.remove(o) - if o and o.is_viable(): - with self.pool_lock: - self.pool.append(o) - - try: - time.sleep(1) - except: - return - - - def killall(self): - with self.pool_lock: - map(lambda x: x.terminate(), self.pool) - self.pool = [] - - - def kill(self, job): - with self.working_lock: - pop = None - for o in self.working: - if o.job == job or o == job: - try: - o.terminate() - except: pass - o.job.exception = JobKilled(_('Job stopped by user')) - try: - o.job.done() - except: pass - pop = o - break - if pop is not None: - self.working.remove(pop) - - def run_free_job(self, func, args=[], kwdargs={}): - pt = PersistentTemporaryFile('.pickle', '_IPC_') - pt.write(cPickle.dumps((func, args, kwdargs))) - pt.close() - mother.spawn_free_spirit(binascii.hexlify(pt.name)) - - -########################################################################################## -##################################### CLIENT CODE ##################################### -########################################################################################## - -class BufferedSender(object): - - def __init__(self, socket): - self.socket = socket - self.wbuf, self.pbuf = [], [] - self.wlock, self.plock = RLock(), RLock() - self.last_report = None - self.timer = RepeatingTimer(0.5, self.send, 'BufferedSender') - self.timer.start() - - - def write(self, msg): - if not isinstance(msg, basestring): - msg = unicode(msg) - with self.wlock: - self.wbuf.append(msg) - - def send(self): - if callable(select) and select([self.socket], [], [], 0)[0]: - msg = read(self.socket) - if msg == 'PING:': - write(self.socket, 'OK') - elif msg: - self.socket.shutdown(socket.SHUT_RDWR) - thread.interrupt_main() - time.sleep(1) - raise SystemExit - if not select([], [self.socket], [], 30)[1]: - print >>sys.__stderr__, 'Cannot pipe to overseer' - return - - reported = False - with self.wlock: - if self.wbuf: - msg = cPickle.dumps(self.wbuf, -1) - self.wbuf = [] - write(self.socket, 'OUTPUT:'+msg) - read(self.socket, 10) - reported = True - - with self.plock: - if self.pbuf: - msg = cPickle.dumps(self.pbuf, -1) - self.pbuf = [] - write(self.socket, 'PROGRESS:'+msg) - read(self.socket, 10) - reported = True - - if self.last_report is not None: - if reported: - self.last_report = time.time() - elif time.time() - self.last_report > 60: - write(self.socket, 'PING:') - read(self.socket, 10) - self.last_report = time.time() - - def notify(self, percent, msg=''): - with self.plock: - self.pbuf.append((percent, msg)) - - def flush(self): - pass - -def get_func(name): - module, func, kwdargs, notification = PARALLEL_FUNCS[name] - module = __import__(module, fromlist=[1]) - func = getattr(module, func) - return func, kwdargs, notification - -_atexit = collections.deque() -def myatexit(func, *args, **kwargs): - _atexit.append((func, args, kwargs)) - -def work(client_socket, func, args, kwdargs): - sys.stdout.last_report = time.time() - orig = atexit.register - atexit.register = myatexit - try: - func, kargs, notification = get_func(func) - if notification is not None and hasattr(sys.stdout, 'notify'): - kargs[notification] = sys.stdout.notify - kargs.update(kwdargs) - res = func(*args, **kargs) - if hasattr(sys.stdout, 'send'): - sys.stdout.send() - return res - finally: - atexit.register = orig - sys.stdout.last_report = None - while True: - try: - func, args, kwargs = _atexit.pop() - except IndexError: - break - try: - func(*args, **kwargs) - except (Exception, SystemExit): - continue - - time.sleep(5) # Give any in progress BufferedSend time to complete - - -def worker(host, port): - client_socket = socket.socket(SOCKET_TYPE, socket.SOCK_STREAM) - address = (host, port) if iswindows else port - client_socket.connect(address) - write(client_socket, 'CALIBRE_WORKER:%d'%os.getpid()) - msg = read(client_socket, timeout=10) - if msg != 'OK': - return 1 - write(client_socket, 'WAITING') - - sys.stdout = BufferedSender(client_socket) - sys.stderr = sys.stdout - - while True: - if not select([client_socket], [], [], 60)[0]: - time.sleep(1) - continue - msg = read(client_socket, timeout=60) - if msg.startswith('JOB:'): - func, args, kwdargs = cPickle.loads(msg[4:]) - write(client_socket, 'OK') - try: - result = work(client_socket, func, args, kwdargs) - write(client_socket, 'RESULT:'+ cPickle.dumps(result)) - except BaseException, err: - exception = (err.__class__.__name__, unicode(str(err), 'utf-8', 'replace')) - tb = unicode(traceback.format_exc(), 'utf-8', 'replace') - msg = 'ERROR:'+cPickle.dumps((exception, tb),-1) - write(client_socket, msg) - res = read(client_socket, 10) - if res != 'OK': - break - gc.collect() - elif msg == 'PING:': - write(client_socket, 'OK') - elif msg == 'STOP:': - client_socket.shutdown(socket.SHUT_RDWR) - return 0 - elif not msg: - time.sleep(1) - else: - print >>sys.__stderr__, 'Invalid protocols message', msg - return 1 - -def free_spirit(path): - func, args, kwdargs = cPickle.load(open(path, 'rb')) - try: - os.unlink(path) - except: - pass - func, kargs = get_func(func)[:2] - kargs.update(kwdargs) - func(*args, **kargs) - -def main(args=sys.argv): - global isworker - isworker = True - args = args[1].split(':') - if len(args) == 1: - free_spirit(binascii.unhexlify(re.sub(r'[^a-f0-9A-F]', '', args[0]))) - else: - worker(args[0].replace("'", ''), int(args[1]) if iswindows else args[1]) - return 0 - -if __name__ == '__main__': - sys.exit(main()) - diff --git a/src/calibre/utils/ipc/job.py b/src/calibre/utils/ipc/job.py new file mode 100644 index 0000000000..6a055706e3 --- /dev/null +++ b/src/calibre/utils/ipc/job.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python +# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai +from __future__ import with_statement + +__license__ = 'GPL v3' +__copyright__ = '2009, Kovid Goyal ' +__docformat__ = 'restructuredtext en' + +_count = 0 + +import time, cStringIO +from Queue import Queue, Empty + +class BaseJob(object): + + WAITING = 0 + RUNNING = 1 + FINISHED = 2 + + def __init__(self, description, done=lambda x: x): + global _count + _count += 1 + + self.id = _count + self.description = description + self.done = done + self.done2 = None + self.killed = False + self.failed = False + self.start_time = None + self.result = None + self.duration = None + self.log_path = None + self.notifications = Queue() + + self._run_state = self.WAITING + self.percent = 0 + self._message = None + self._status_text = _('Waiting...') + self._done_called = False + + def update(self): + if self.duration is not None: + self._run_state = self.FINISHED + self.percent = 1 + if self.killed: + self._status_text = _('Stopped') + else: + self._status_text = _('Error') if self.failed else _('Finished') + if not self._done_called: + self._done_called = True + try: + self.done(self) + except: + pass + try: + if callable(self.done2): + self.done2(self) + except: + pass + elif self.start_time is not None: + self._run_state = self.RUNNING + self._status_text = _('Working...') + + while True: + try: + self.percent, self._message = self.notifications.get_nowait() + self.percent *= 100. + except Empty: + break + + @property + def status_text(self): + if self._run_state == self.FINISHED or not self._message: + return self._status_text + return self._message + + @property + def run_state(self): + return self._run_state + + @property + def running_time(self): + if self.duration is not None: + return self.duration + if self.start_time is not None: + return time.time() - self.start_time + return None + + @property + def is_finished(self): + return self._run_state == self.FINISHED + + @property + def is_started(self): + return self._run_state != self.WAITING + + @property + def is_running(self): + return self.is_started and not self.is_finished + + def __cmp__(self, other): + if self.is_finished == other.is_finished: + if self.start_time is None: + if other.start_time is None: # Both waiting + return cmp(other.id, self.id) + else: + return 1 + else: + if other.start_time is None: + return -1 + else: # Both running + return cmp(other.start_time, self.start_time) + + else: + return 1 if self.is_finished else -1 + return 0 + + @property + def log_file(self): + if self.log_path: + return open(self.log_path, 'rb') + return cStringIO.StringIO(_('No details available.')) + + @property + def details(self): + return self.log_file.read().decode('utf-8') + + +class ParallelJob(BaseJob): + + def __init__(self, name, description, done, args=[], kwargs={}): + self.name, self.args, self.kwargs = name, args, kwargs + BaseJob.__init__(self, description, done) + + + diff --git a/src/calibre/utils/ipc/launch.py b/src/calibre/utils/ipc/launch.py index 6c0ba46885..14530d7fea 100644 --- a/src/calibre/utils/ipc/launch.py +++ b/src/calibre/utils/ipc/launch.py @@ -70,7 +70,7 @@ class Worker(object): @property def is_alive(self): - return hasattr(self, 'child') and self.child.poll() is not None + return hasattr(self, 'child') and self.child.poll() is None @property def returncode(self): @@ -144,6 +144,7 @@ class Worker(object): self.child = subprocess.Popen(cmd, **args) + self.log_path = ret return ret diff --git a/src/calibre/utils/ipc/server.py b/src/calibre/utils/ipc/server.py index 3d1a86922e..3dec90a644 100644 --- a/src/calibre/utils/ipc/server.py +++ b/src/calibre/utils/ipc/server.py @@ -6,5 +6,241 @@ __license__ = 'GPL v3' __copyright__ = '2009, Kovid Goyal ' __docformat__ = 'restructuredtext en' +import os, cPickle, time, tempfile +from math import ceil +from threading import Thread, RLock +from Queue import Queue, Empty +from multiprocessing.connection import Listener +from multiprocessing import cpu_count +from collections import deque +from binascii import hexlify + +from calibre.utils.ipc.launch import Worker +from calibre.utils.ipc.worker import PARALLEL_FUNCS + +_counter = 0 + +class ConnectedWorker(Thread): + + def __init__(self, worker, conn, rfile): + Thread.__init__(self) + self.daemon = True + self.conn = conn + self.worker = worker + self.notifications = Queue() + self._returncode = 'dummy' + self.killed = False + self.log_path = worker.log_path + self.rfile = rfile + + def start_job(self, job): + notification = PARALLEL_FUNCS[job.name][-1] is not None + self.conn.send((job.name, job.args, job.kwargs)) + if notification: + self.start() + else: + self.conn.close() + self.job = job + + def run(self): + while True: + try: + x = self.conn.recv() + self.notifications.put(x) + except BaseException: + break + try: + self.conn.close() + except BaseException: + pass + + def kill(self): + self.killed = True + try: + self.worker.kill() + except BaseException: + pass + + @property + def is_alive(self): + return not self.killed and self.worker.is_alive + + @property + def returncode(self): + if self._returncode != 'dummy': + return self._returncode + r = self.worker.returncode + if self.killed and r is None: + self._returncode = 1 + return 1 + if r is not None: + self._returncode = r + return r + +class Server(Thread): + + def __init__(self, notify_on_job_done=lambda x: x, pool_size=None): + Thread.__init__(self) + self.daemon = True + global _counter + self.id = _counter+1 + _counter += 1 + + self.pool_size = cpu_count() if pool_size is None else pool_size + self.notify_on_job_done = notify_on_job_done + self.auth_key = os.urandom(32) + self.listener = Listener(authkey=self.auth_key, backlog=4) + self.add_jobs_queue, self.changed_jobs_queue = Queue(), Queue() + self.kill_queue = Queue() + self.waiting_jobs, self.processing_jobs = deque(), deque() + self.pool, self.workers = deque(), deque() + self.launched_worker_count = 0 + self._worker_launch_lock = RLock() + + self.start() + + def launch_worker(self, gui=False, redirect_output=None): + with self._worker_launch_lock: + self.launched_worker_count += 1 + id = self.launched_worker_count + rfile = os.path.join(tempfile.gettempdir(), + 'calibre_ipc_result_%d_%d.pickle'%(self.id, id)) + + env = { + 'CALIBRE_WORKER_ADDRESS' : + hexlify(cPickle.dumps(self.listener.address, -1)), + 'CALIBRE_WORKER_KEY' : hexlify(self.auth_key), + 'CALIBRE_WORKER_RESULT' : hexlify(rfile), + } + w = Worker(env, gui=gui) + if redirect_output is None: + redirect_output = not gui + w(redirect_output=redirect_output) + conn = self.listener.accept() + if conn is None: + raise Exception('Failed to launch worker process') + return ConnectedWorker(w, conn, rfile) + + def add_job(self, job): + job.done2 = self.notify_on_job_done + self.add_jobs_queue.put(job) + + def run_job(self, job, gui=True, redirect_output=False): + w = self.launch_worker(gui=gui, redirect_output=redirect_output) + w.start_job(job) + + + def run(self): + while True: + try: + job = self.add_jobs_queue.get(True, 0.2) + if job is None: + break + self.waiting_jobs.append(job) + except Empty: + pass + + for worker in self.workers: + while True: + try: + n = worker.notifications.get_nowait() + worker.job.notifications.put(n) + self.changed_jobs_queue.put(job) + except Empty: + break + + for worker in [w for w in self.workers if not w.is_alive]: + self.workers.remove(worker) + job = worker.job + if worker.returncode != 0: + job.failed = True + job.returncode = worker.returncode + elif os.path.exists(worker.rfile): + job.result = cPickle.load(open(worker.rfile, 'rb')) + os.remove(worker.rfile) + job.duration = time.time() - job.start_time + self.changed_jobs_queue.put(job) + + if len(self.pool) + len(self.workers) < self.pool_size: + try: + self.pool.append(self.launch_worker()) + except: + break + + if len(self.pool) > 0 and len(self.waiting_jobs) > 0: + job = self.waiting_jobs.pop() + worker = self.pool.pop() + job.start_time = time.time() + worker.start_job(job) + self.workers.append(worker) + job.log_path = worker.log_path + self.changed_jobs_queue.put(job) + + while True: + try: + j = self.kill_queue.get_nowait() + self._kill_job(j) + except Empty: + break + + def kill_job(self, job): + self.kill_queue.put(job) + + def killall(self): + for job in self.workers: + self.kill_queue.put(job) + + def _kill_job(self, job): + if job.start_time is None: return + for worker in self.workers: + if job is worker.job: + worker.kill() + job.killed = True + break + + def split(self, tasks): + ''' + Split a list into a list of sub lists, with the number of sub lists being + no more than the number of workers this server supports. Each sublist contains + two tuples of the form (i, x) where x is an element from the original list + and i is the index of the element x in the original list. + ''' + ans, count, pos = [], 0, 0 + delta = int(ceil(len(tasks)/float(self.pool_size))) + while count < len(tasks): + section = [] + for t in tasks[pos:pos+delta]: + section.append((count, t)) + count += 1 + ans.append(section) + pos += delta + return ans + + + + def close(self): + try: + self.add_jobs_queue.put(None) + self.listener.close() + except: + pass + time.sleep(0.2) + for worker in self.workers: + try: + worker.kill() + except: + pass + for worker in self.pool: + try: + worker.kill() + except: + pass + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + diff --git a/src/calibre/utils/ipc/worker.py b/src/calibre/utils/ipc/worker.py index 75b42c9a25..6c974568ac 100644 --- a/src/calibre/utils/ipc/worker.py +++ b/src/calibre/utils/ipc/worker.py @@ -6,11 +6,12 @@ __license__ = 'GPL v3' __copyright__ = '2009, Kovid Goyal ' __docformat__ = 'restructuredtext en' -import os, cPickle +import os, cPickle, sys from multiprocessing.connection import Client from threading import Thread -from queue import Queue +from Queue import Queue from contextlib import closing +from binascii import unhexlify PARALLEL_FUNCS = { 'lrfviewer' : @@ -29,8 +30,8 @@ PARALLEL_FUNCS = { class Progress(Thread): def __init__(self, conn): - self.daemon = True Thread.__init__(self) + self.daemon = True self.conn = conn self.queue = Queue() @@ -56,8 +57,9 @@ def get_func(name): return func, notification def main(): - address = cPickle.loads(os.environ['CALIBRE_WORKER_ADDRESS']) - key = os.environ['CALIBRE_WORKER_KEY'] + address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS'])) + key = unhexlify(os.environ['CALIBRE_WORKER_KEY']) + resultf = unhexlify(os.environ['CALIBRE_WORKER_RESULT']) with closing(Client(address, authkey=key)) as conn: name, args, kwargs = conn.recv() func, notification = get_func(name) @@ -66,13 +68,17 @@ def main(): kwargs[notification] = notifier notifier.start() - func(*args, **kwargs) + result = func(*args, **kwargs) + if result is not None: + cPickle.dump(result, open(resultf, 'wb'), -1) notifier.queue.put(None) + sys.stdout.flush() + sys.stderr.flush() return 0 if __name__ == '__main__': - raise SystemExit(main()) + sys.exit(main())