diff --git a/osx_installer.py b/osx_installer.py index 6fb8b3a4e8..c42fe5f0a1 100644 --- a/osx_installer.py +++ b/osx_installer.py @@ -80,6 +80,8 @@ if not os.path.exists('/etc/fonts/fonts.conf'): continue bad = True break + if not bad: + bad = os.path.exists('/etc/fonts/fonts.conf') if bad: auth = Authorization(destroyflags=(kAuthorizationFlagDestroyRights,)) fd, name = tempfile.mkstemp('.py') diff --git a/src/calibre/__init__.py b/src/calibre/__init__.py index 12eeb2f625..d70637d84f 100644 --- a/src/calibre/__init__.py +++ b/src/calibre/__init__.py @@ -88,6 +88,9 @@ def setup_cli_handlers(logger, level): handler = logging.StreamHandler(sys.stderr) handler.setLevel(logging.DEBUG) handler.setFormatter(logging.Formatter('[%(levelname)s] %(filename)s:%(lineno)s: %(message)s')) + for hdlr in logger.handlers: + if hdlr.__class__ == handler.__class__: + logger.removeHandler(hdlr) logger.addHandler(handler) class CustomHelpFormatter(IndentedHelpFormatter): diff --git a/src/calibre/ebooks/lrf/html/convert_from.py b/src/calibre/ebooks/lrf/html/convert_from.py index 457837aa30..4d93b1a56b 100644 --- a/src/calibre/ebooks/lrf/html/convert_from.py +++ b/src/calibre/ebooks/lrf/html/convert_from.py @@ -242,6 +242,7 @@ class HTMLConverter(object, LoggingInterface): self.override_css = {} self.override_pcss = {} + self.table_render_job_server = None if self._override_css is not None: if os.access(self._override_css, os.R_OK): @@ -262,37 +263,41 @@ class HTMLConverter(object, LoggingInterface): paths = [os.path.abspath(path) for path in paths] paths = [path.decode(sys.getfilesystemencoding()) if not isinstance(path, unicode) else path for path in paths] - while len(paths) > 0 and self.link_level <= self.link_levels: - for path in paths: - if path in self.processed_files: - continue - try: - self.add_file(path) - except KeyboardInterrupt: - raise - except: - if self.link_level == 0: # Die on errors in the first level + try: + while len(paths) > 0 and self.link_level <= self.link_levels: + for path in paths: + if path in self.processed_files: + continue + try: + self.add_file(path) + except KeyboardInterrupt: raise - for link in self.links: - if link['path'] == path: - self.links.remove(link) - break - self.log_warn('Could not process '+path) - if self.verbose: - self.log_exception(' ') - self.links = self.process_links() - self.link_level += 1 - paths = [link['path'] for link in self.links] - - if self.current_page is not None and self.current_page.has_text(): - self.book.append(self.current_page) - - for text, tb in self.extra_toc_entries: - self.book.addTocEntry(text, tb) - - if self.base_font_size > 0: - self.log_info('\tRationalizing font sizes...') - self.book.rationalize_font_sizes(self.base_font_size) + except: + if self.link_level == 0: # Die on errors in the first level + raise + for link in self.links: + if link['path'] == path: + self.links.remove(link) + break + self.log_warn('Could not process '+path) + if self.verbose: + self.log_exception(' ') + self.links = self.process_links() + self.link_level += 1 + paths = [link['path'] for link in self.links] + + if self.current_page is not None and self.current_page.has_text(): + self.book.append(self.current_page) + + for text, tb in self.extra_toc_entries: + self.book.addTocEntry(text, tb) + + if self.base_font_size > 0: + self.log_info('\tRationalizing font sizes...') + self.book.rationalize_font_sizes(self.base_font_size) + finally: + if self.table_render_job_server is not None: + self.table_render_job_server.killall() def is_baen(self, soup): return bool(soup.find('meta', attrs={'name':'Publisher', @@ -1701,11 +1706,15 @@ class HTMLConverter(object, LoggingInterface): self.process_children(tag, tag_css, tag_pseudo_css) elif tagname == 'table' and not self.ignore_tables and not self.in_table: if self.render_tables_as_images: + if self.table_render_job_server is None: + from calibre.parallel import Server + self.table_render_job_server = Server(number_of_workers=1) print 'Rendering table...' from calibre.ebooks.lrf.html.table_as_image import render_table pheight = int(self.current_page.pageStyle.attrs['textheight']) pwidth = int(self.current_page.pageStyle.attrs['textwidth']) - images = render_table(self.soup, tag, tag_css, + images = render_table(self.table_render_job_server, + self.soup, tag, tag_css, os.path.dirname(self.target_prefix), pwidth, pheight, self.profile.dpi, self.text_size_multiplier_for_rendered_tables) diff --git a/src/calibre/ebooks/lrf/html/table_as_image.py b/src/calibre/ebooks/lrf/html/table_as_image.py index 501a049832..4c5a79eab8 100644 --- a/src/calibre/ebooks/lrf/html/table_as_image.py +++ b/src/calibre/ebooks/lrf/html/table_as_image.py @@ -6,7 +6,7 @@ __docformat__ = 'restructuredtext en' ''' Render HTML tables as images. ''' -import os, tempfile, atexit, shutil +import os, tempfile, atexit, shutil, time from PyQt4.Qt import QWebPage, QUrl, QApplication, QSize, \ SIGNAL, QPainter, QImage, QObject, Qt @@ -58,7 +58,7 @@ class HTMLTableRenderer(QObject): finally: QApplication.quit() -def render_table(soup, table, css, base_dir, width, height, dpi, factor=1.0): +def render_table(server, soup, table, css, base_dir, width, height, dpi, factor=1.0): head = '' for e in soup.findAll(['link', 'style']): head += unicode(e)+'\n\n' @@ -78,14 +78,17 @@ def render_table(soup, table, css, base_dir, width, height, dpi, factor=1.0): '''%(head, width-10, style, unicode(table)) - from calibre.parallel import Server - s = Server() - result, exception, traceback, log = s.run(1, 'render_table', qapp=True, report_progress=False, - args=[html, base_dir, width, height, dpi, factor]) + server.run_job(1, 'render_table', + args=[html, base_dir, width, height, dpi, factor]) + res = None + while res is None: + time.sleep(2) + res = server.result(1) + result, exception, traceback = res if exception: print 'Failed to render table' + print exception print traceback - print log images, tdir = result atexit.register(shutil.rmtree, tdir) return images diff --git a/src/calibre/gui2/dialogs/jobs.py b/src/calibre/gui2/dialogs/jobs.py index b601cf2c7c..e0682b6bd8 100644 --- a/src/calibre/gui2/dialogs/jobs.py +++ b/src/calibre/gui2/dialogs/jobs.py @@ -35,7 +35,7 @@ class JobsDialog(QDialog, Ui_JobsDialog): self.jobs_view.setModel(model) self.model = model self.setWindowModality(Qt.NonModal) - self.setWindowTitle(__appname__ + ' - Active Jobs') + self.setWindowTitle(__appname__ + _(' - Jobs')) QObject.connect(self.jobs_view.model(), SIGNAL('modelReset()'), self.jobs_view.resizeColumnsToContents) QObject.connect(self.kill_button, SIGNAL('clicked()'), diff --git a/src/calibre/gui2/jobs.py b/src/calibre/gui2/jobs.py index d135b7d6cd..794a9d8f17 100644 --- a/src/calibre/gui2/jobs.py +++ b/src/calibre/gui2/jobs.py @@ -86,17 +86,34 @@ class DeviceJob(Job): class ConversionJob(Job): ''' Jobs that involve conversion of content.''' - def run(self): - last_traceback, exception = None, None - try: - self.result, exception, last_traceback, self.log = \ - self.server.run(self.id, self.func, self.args, self.kwargs) - except Exception, err: - last_traceback = traceback.format_exc() - exception = (exception.__class__.__name__, unicode(str(err), 'utf8', 'replace')) - - self.last_traceback, self.exception = last_traceback, exception + def __init__(self, *args, **kwdargs): + Job.__init__(self, *args, **kwdargs) + self.log = '' + def run(self): + result = None + self.server.run_job(self.id, self.func, progress=self.progress, + args=self.args, kwdargs=self.kwargs, + output=self.output) + res = None + while res is None: + time.sleep(2) + res = self.server.result(self.id) + if res is None: + exception, tb = 'UnknownError: This should not have happened', '' + else: + result, exception, tb = res + self.result, self.last_traceback, self.exception = result, tb, exception + + def output(self, msg): + if self.log is None: + self.log = '' + self.log += msg + self.emit(SIGNAL('output_received()')) + + def formatted_log(self): + return '

Log:

%s
'%self.log + def notify(self): self.emit(SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), self.id, self.description, self.result, self.exception, self.last_traceback, self.log) @@ -112,6 +129,9 @@ class ConversionJob(Job): ans = u'

%s: %s

'%self.exception ans += '

Traceback:

%s
'%self.last_traceback return ans + + def progress(self, percent, msg): + self.emit(SIGNAL('update_progress(int, PyQt_PyObject)'), self.id, percent) class JobManager(QAbstractTableModel): @@ -149,9 +169,9 @@ class JobManager(QAbstractTableModel): try: if isinstance(job, DeviceJob): job.terminate() - self.process_server.kill(job.id) except: continue + self.process_server.killall() def timerEvent(self, event): if event.timerId() == self.timer_id: @@ -241,7 +261,10 @@ class JobManager(QAbstractTableModel): id = self.next_id job = job_class(id, description, slot, priority, *args, **kwargs) job.server = self.process_server - QObject.connect(job, SIGNAL('status_update(int, int)'), self.status_update, Qt.QueuedConnection) + QObject.connect(job, SIGNAL('status_update(int, int)'), self.status_update, + Qt.QueuedConnection) + self.connect(job, SIGNAL('update_progress(int, PyQt_PyObject)'), + self.update_progress, Qt.QueuedConnection) self.update_lock.lock() self.add_queue.append(job) self.update_lock.unlock() @@ -370,11 +393,14 @@ class DetailView(QDialog, Ui_Dialog): self.setupUi(self) self.setWindowTitle(job.description) self.job = job - txt = self.job.formatted_error() + self.job.formatted_log() + self.connect(self.job, SIGNAL('output_received()'), self.update) + self.update() + + def update(self): + txt = self.job.formatted_error() + self.job.formatted_log() if not txt: txt = 'No details available' - self.log.setHtml(txt) - - + vbar = self.log.verticalScrollBar() + vbar.setValue(vbar.maximum()) diff --git a/src/calibre/gui2/main.py b/src/calibre/gui2/main.py index 6d63e0ad67..a567bc567d 100644 --- a/src/calibre/gui2/main.py +++ b/src/calibre/gui2/main.py @@ -77,7 +77,6 @@ class Main(MainWindow, Ui_MainWindow): self.conversion_jobs = {} self.persistent_files = [] self.metadata_dialogs = [] - self.viewer_job_id = 1 self.default_thumbnail = None self.device_error_dialog = ConversionErrorDialog(self, _('Error communicating with device'), ' ') self.device_error_dialog.setModal(Qt.NonModal) @@ -264,14 +263,6 @@ class Main(MainWindow, Ui_MainWindow): elif msg.startswith('refreshdb:'): self.library_view.model().resort() self.library_view.model().research() - elif msg.startswith('progress:'): - try: - fields = msg.split(':') - job_id, percent = fields[1:3] - job_id, percent = int(job_id), float(percent) - self.job_manager.update_progress(job_id, percent) - except: - pass else: print msg @@ -780,7 +771,7 @@ class Main(MainWindow, Ui_MainWindow): cmdline.append(pt.name) id = self.job_manager.run_conversion_job(self.book_converted, 'any2lrf', args=[cmdline], - job_description='Convert book %d of %d'%(i, len(rows))) + job_description='Convert book %d of %d'%(i+1, len(rows))) self.conversion_jobs[id] = (d.cover_file, pt, of, d.output_format, @@ -860,15 +851,16 @@ class Main(MainWindow, Ui_MainWindow): self._view_file(result) def _view_file(self, name): - if name.upper().endswith('.LRF'): - args = ['lrfviewer', name] - self.job_manager.process_server.run('viewer%d'%self.viewer_job_id, - 'lrfviewer', kwdargs=dict(args=args), - monitor=False) - self.viewer_job_id += 1 - else: - QDesktopServices.openUrl(QUrl('file:'+name))#launch(name) - time.sleep(2) # User feedback + self.setCursor(Qt.BusyCursor) + try: + if name.upper().endswith('.LRF'): + args = ['lrfviewer', name] + self.job_manager.process_server.run_free_job('lrfviewer', kwdargs=dict(args=args)) + else: + QDesktopServices.openUrl(QUrl('file:'+name))#launch(name) + time.sleep(5) # User feedback + finally: + self.unsetCursor() def view_specific_format(self, triggered): rows = self.library_view.selectionModel().selectedRows() @@ -1084,7 +1076,7 @@ class Main(MainWindow, Ui_MainWindow): if getattr(exception, 'only_msg', False): error_dialog(self, _('Conversion Error'), unicode(exception)).exec_() return - msg = u'

%s: %s

'%exception + msg = u'

%s:

'%exception msg += u'

Failed to perform job: '+description msg += u'

Detailed traceback:

'
         msg += formatted_traceback + '
' diff --git a/src/calibre/gui2/status.py b/src/calibre/gui2/status.py index 55e5c3f901..8b059f5711 100644 --- a/src/calibre/gui2/status.py +++ b/src/calibre/gui2/status.py @@ -1,6 +1,6 @@ __license__ = 'GPL v3' __copyright__ = '2008, Kovid Goyal ' -import textwrap, re +import re from PyQt4.QtGui import QStatusBar, QMovie, QLabel, QFrame, QHBoxLayout, QPixmap, \ QVBoxLayout, QSizePolicy, QToolButton, QIcon diff --git a/src/calibre/parallel.py b/src/calibre/parallel.py index 405aa36b0a..1c9593b94f 100644 --- a/src/calibre/parallel.py +++ b/src/calibre/parallel.py @@ -1,25 +1,25 @@ +from __future__ import with_statement __license__ = 'GPL v3' __copyright__ = '2008, Kovid Goyal ' ''' Used to run jobs in parallel in separate processes. ''' -import sys, tempfile, os, cPickle, traceback, atexit, binascii, time, subprocess +import sys, os, gc, cPickle, traceback, atexit, cStringIO, time, subprocess, socket, collections +from select import select from functools import partial - +from threading import RLock, Thread, Event from calibre.ebooks.lrf.any.convert_from import main as any2lrf from calibre.ebooks.lrf.web.convert_from import main as web2lrf from calibre.ebooks.lrf.feeds.convert_from import main as feeds2lrf from calibre.gui2.lrf_renderer.main import main as lrfviewer +from calibre.ptempfile import PersistentTemporaryFile + try: from calibre.ebooks.lrf.html.table_as_image import do_render as render_table except: # Dont fail is PyQt4.4 not present render_table = None -from calibre import iswindows, __appname__, islinux -try: - from calibre.utils.single_qt_application import SingleApplication -except: - SingleApplication = None +from calibre import iswindows, islinux, detect_ncpus sa = None job_id = None @@ -29,12 +29,13 @@ def report_progress(percent, msg=''): msg = 'progress:%s:%f:%s'%(job_id, percent, msg) sa.send_message(msg) +_notify = 'fskjhwseiuyweoiu987435935-0342' PARALLEL_FUNCS = { 'any2lrf' : partial(any2lrf, gui_mode=True), 'web2lrf' : web2lrf, 'lrfviewer' : lrfviewer, - 'feeds2lrf' : partial(feeds2lrf, notification=report_progress), + 'feeds2lrf' : partial(feeds2lrf, notification=_notify), 'render_table': render_table, } @@ -52,144 +53,422 @@ if islinux and hasattr(sys, 'frozen_path'): python = os.path.join(getattr(sys, 'frozen_path'), 'parallel') popen = partial(subprocess.Popen, cwd=getattr(sys, 'frozen_path')) -def cleanup(tdir): - try: - import shutil - shutil.rmtree(tdir, True) - except: - pass +prefix = 'import sys; sys.in_worker = True; ' +if hasattr(sys, 'frameworks_dir'): + fd = getattr(sys, 'frameworks_dir') + prefix += 'sys.frameworks_dir = "%s"; sys.frozen = "macosx_app"; '%fd + if fd not in os.environ['PATH']: + os.environ['PATH'] += ':'+fd -class Server(object): + +def write(socket, msg, timeout=5): + if isinstance(msg, unicode): + msg = msg.encode('utf-8') + length = None + while len(msg) > 0: + if length is None: + length = len(msg) + chunk = ('%-12d'%length) + msg[:4096-12] + msg = msg[4096-12:] + else: + chunk, msg = msg[:4096], msg[4096:] + w = select([], [socket], [], timeout)[1] + if not w: + raise RuntimeError('Write to socket timed out') + if socket.sendall(chunk) is not None: + raise RuntimeError('Failed to write chunk to socket') + + +def read(socket, timeout=5): + buf = cStringIO.StringIO() + length = None + while select([socket],[],[],timeout)[0]: + msg = socket.recv(4096) + if not msg: + break + if length is None: + length, msg = int(msg[:12]), msg[12:] + buf.write(msg) + if buf.tell() >= length: + break + if not length: + return '' + msg = buf.getvalue()[:length] + if len(msg) < length: + raise RuntimeError('Corrupted packet received') + + return msg + +class RepeatingTimer(Thread): + + def repeat(self): + while True: + self.event.wait(self.interval) + if self.event.isSet(): + break + self.action() + + def __init__(self, interval, func): + self.event = Event() + self.interval = interval + self.action = func + Thread.__init__(self, target=self.repeat) + self.setDaemon(True) + +class ControlError(Exception): + pass + +class Overseer(object): - #: Interval in seconds at which child processes are polled for status information - INTERVAL = 0.1 KILL_RESULT = 'Server: job killed by user|||#@#$%&*)*(*$#$%#$@&' + INTERVAL = 0.1 - def __init__(self): - self.tdir = tempfile.mkdtemp('', '%s_IPC_'%__appname__) - atexit.register(cleanup, self.tdir) - self.kill_jobs = [] + def __init__(self, server, port, timeout=5): + self.cmd = prefix + 'from calibre.parallel import worker; worker(%s, %s)'%(repr('localhost'), repr(port)) + self.process = popen([python, '-c', self.cmd]) + self.socket = server.accept()[0] - def kill(self, job_id): - ''' - Kill the job identified by job_id. - ''' - self.kill_jobs.append(str(job_id)) + self.working = False + self.timeout = timeout + self.last_job_time = time.time() + self.job_id = None + self._stop = False + if not select([self.socket], [], [], 120)[0]: + raise RuntimeError(_('Could not launch worker process.')) + if int(self.read()) != self.process.pid: + raise RuntimeError('PID mismatch') + self.write('OK') + if self.read() != 'WAITING': + raise RuntimeError('Worker sulking') - def _terminate(self, process): + def terminate(self): ''' Kill process. ''' + try: + if self.socket: + self.socket.close() + except: + pass if iswindows: win32api = __import__('win32api') try: - win32api.TerminateProcess(int(process.pid), -1) + win32api.TerminateProcess(int(self.process.pid), -1) except: pass else: import signal - os.kill(process.pid, signal.SIGKILL) - time.sleep(0.05) - - + try: + os.kill(self.process.pid, signal.SIGKILL) + time.sleep(0.05) + except: + pass - def run(self, job_id, func, args=[], kwdargs={}, monitor=True, - report_progress=True, qapp=True): - ''' - Run a job in a separate process. - @param job_id: A unique (per server) identifier - @param func: One of C{PARALLEL_FUNCS.keys()} - @param args: A list of arguments to pass of C{func} - @param kwdargs: A dictionary of keyword arguments to pass to C{func} - @param monitor: If False launch the child process and return. - Do not monitor/communicate with it. Automatically sets - `report_progress` and `qapp` to False. - @param report_progess: If True progress is reported to the GUI - @param qapp: If True, A QApplication is created. If False, progress reporting will also be disabled. - @return: (result, exception, formatted_traceback, log) where log is the combined - stdout + stderr of the child process; or None if monitor is True. If a job is killed - by a call to L{kill()} then result will be L{KILL_RESULT} - ''' - job_id = str(job_id) - job_dir = os.path.join(self.tdir, job_id) - if os.path.exists(job_dir): - raise ValueError('Cannot run job. The job_id %s has already been used.'%job_id) - os.mkdir(job_dir) - job_data = os.path.join(job_dir, 'job_data.pickle') - if not monitor: - report_progress = qapp = False - cPickle.dump((job_id, func, args, kwdargs, report_progress, qapp), - open(job_data, 'wb'), -1) - prefix = '' - if hasattr(sys, 'frameworks_dir'): - fd = getattr(sys, 'frameworks_dir') - prefix = 'import sys; sys.frameworks_dir = "%s"; sys.frozen = "macosx_app"; '%fd - if fd not in os.environ['PATH']: - os.environ['PATH'] += ':'+fd - cmd = prefix + 'from calibre.parallel import main; main(\'%s\')'%binascii.hexlify(job_data) + def write(self, msg, timeout=None): + write(self.socket, msg, timeout=self.timeout if timeout is None else timeout) - if not monitor: - popen([python, '-c', cmd]) + def read(self, timeout=None): + return read(self.socket, timeout=self.timeout if timeout is None else timeout) + + def __eq__(self, other): + return hasattr(other, 'process') and hasattr(other.process, 'pid') and self.process.pid == other.process.pid + + def __bool__(self): + self.process.poll() + return self.process.returncode is None + + def pid(self): + return self.process.pid + + def select(self, timeout=0): + return select([self.socket], [self.socket], [self.socket], timeout) + + def initialize_job(self, job): + self.job_id = job.job_id + self.working = True + self.write('JOB:'+cPickle.dumps((job.func, job.args, job.kwdargs), -1)) + msg = self.read() + if msg != 'OK': + raise ControlError('Failed to initialize job on worker %d:%s'%(self.process.pid, msg)) + self.output = job.output if callable(job.output) else sys.stdout.write + self.progress = job.progress if callable(job.progress) else None + self.job = job + + def control(self): + try: + if select([self.socket],[],[],0)[0]: + msg = self.read() + word, msg = msg.partition(':')[0], msg.partition(':')[-1] + if word == 'RESULT': + self.write('OK') + return Result(cPickle.loads(msg), None, None) + elif word == 'OUTPUT': + self.write('OK') + try: + self.output(''.join(cPickle.loads(msg))) + except: + self.output('Bad output message: '+ repr(msg)) + elif word == 'PROGRESS': + self.write('OK') + percent = None + try: + percent, msg = cPickle.loads(msg)[-1] + except: + print 'Bad progress update:', repr(msg) + if self.progress and percent is not None: + self.progress(percent, msg) + elif word == 'ERROR': + self.write('OK') + return Result(None, *cPickle.loads(msg)) + else: + self.terminate() + return Result(None, ControlError('Worker sent invalid msg: %s', repr(msg)), '') + self.process.poll() + if self.process.returncode is not None: + return Result(None, ControlError('Worker process died unexpectedly with returncode: %d'%self.process.returncode), '') + finally: + self.working = False + self.last_job_time = time.time() + +class Job(object): + + def __init__(self, job_id, func, args, kwdargs, output, progress, done): + self.job_id = job_id + self.func = func + self.args = args + self.kwdargs = kwdargs + self.output = output + self.progress = progress + self.done = done + +class Result(object): + + def __init__(self, result, exception, traceback): + self.result = result + self.exception = exception + self.traceback = traceback + + def __len__(self): + return 3 + + def __item__(self, i): + return (self.result, self.exception, self.traceback)[i] + + def __iter__(self): + return iter((self.result, self.exception, self.traceback)) + +class Server(Thread): + + KILL_RESULT = Overseer.KILL_RESULT + START_PORT = 10013 + + def __init__(self, number_of_workers=detect_ncpus()): + Thread.__init__(self) + self.setDaemon(True) + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.port = self.START_PORT + while True: + try: + self.server_socket.bind(('localhost', self.port)) + break + except: + self.port += 1 + self.server_socket.listen(5) + self.number_of_workers = number_of_workers + self.pool, self.jobs, self.working, self.results = [], collections.deque(), [], {} + atexit.register(self.killall) + self.job_lock = RLock() + self.overseer_lock = RLock() + self.working_lock = RLock() + self.result_lock = RLock() + self.pool_lock = RLock() + self.start() + + def add_job(self, job): + with self.job_lock: + self.jobs.append(job) + + def store_result(self, result, id=None): + if id: + with self.job_lock: + self.results[id] = result + + def result(self, id): + with self.result_lock: + return self.results.pop(id, None) + + def run(self): + while True: + job = None + with self.job_lock: + if len(self.jobs) > 0 and len(self.working) < self.number_of_workers: + job = self.jobs.popleft() + with self.pool_lock: + o = self.pool.pop() if self.pool else Overseer(self.server_socket, self.port) + try: + o.initialize_job(job) + except Exception, err: + res = Result(None, unicode(err), traceback.format_exc()) + job.done(res) + o.terminate() + o = None + if o: + with self.working_lock: + self.working.append(o) + + with self.working_lock: + done = [] + for o in self.working: + try: + res = o.control() + except Exception, err: + res = Result(None, unicode(err), traceback.format_exc()) + o.terminate() + if isinstance(res, Result): + o.job.done(res) + done.append(o) + for o in done: + self.working.remove(o) + if o: + with self.pool_lock: + self.pool.append(o) + + time.sleep(1) + + + def killall(self): + with self.pool_lock: + map(lambda x: x.terminate(), self.pool) + self.pool = [] + + + def kill(self, job_id): + with self.working_lock: + pop = None + for o in self.working: + if o.job_id == job_id: + o.terminate() + o.job.done(Result(self.KILL_RESULT, None, '')) + pop = o + break + if pop is not None: + self.working.remove(pop) + + + + def run_job(self, job_id, func, args=[], kwdargs={}, + output=None, progress=None, done=None): + ''' + Run a job in a separate process. Supports job control, output redirection + and progress reporting. + ''' + if done is None: + done = partial(self.store_result, id=job_id) + job = Job(job_id, func, args, kwdargs, output, progress, done) + with self.job_lock: + self.jobs.append(job) + + def run_free_job(self, func, args=[], kwdargs={}): + pt = PersistentTemporaryFile('.pickle', '_IPC_') + pt.write(cPickle.dumps((func, args, kwdargs))) + pt.close() + cmd = prefix + 'from calibre.parallel import free_spirit; free_spirit(%s)'%repr(pt.name) + popen([python, '-c', cmd]) + +########################################################################################## +##################################### CLIENT CODE ##################################### +########################################################################################## + +class BufferedSender(object): + + def __init__(self, socket): + self.socket = socket + self.wbuf, self.pbuf = [], [] + self.wlock, self.plock = RLock(), RLock() + self.timer = RepeatingTimer(0.5, self.send) + self.prefix = prefix + self.timer.start() + + def write(self, msg): + if not isinstance(msg, basestring): + msg = unicode(msg) + with self.wlock: + self.wbuf.append(msg) + + def send(self): + if not select([], [self.socket], [], 30)[1]: + print >>sys.__stderr__, 'Cannot pipe to overseer' return - output = open(os.path.join(job_dir, 'output.txt'), 'wb') - p = popen([python, '-c', cmd], stdout=output, stderr=output, - stdin=subprocess.PIPE) - p.stdin.close() - while p.returncode is None: - if job_id in self.kill_jobs: - self._terminate(p) - return self.KILL_RESULT, None, None, _('Job killed by user') - time.sleep(0.1) - p.poll() + with self.wlock: + if self.wbuf: + msg = cPickle.dumps(self.wbuf, -1) + self.wbuf = [] + write(self.socket, 'OUTPUT:'+msg) + read(self.socket, 10) + + with self.plock: + if self.pbuf: + msg = cPickle.dumps(self.pbuf, -1) + self.pbuf = [] + write(self.socket, 'PROGRESS:'+msg) + read(self.socket, 10) + + def notify(self, percent, msg=''): + with self.plock: + self.pbuf.append((percent, msg)) - - output.close() - job_result = os.path.join(job_dir, 'job_result.pickle') - if not os.path.exists(job_result): - result, exception, traceback = None, ('ParallelRuntimeError', - 'The worker process died unexpectedly.'), '' - else: - result, exception, traceback = cPickle.load(open(job_result, 'rb')) - log = open(output.name, 'rb').read() - - return result, exception, traceback, log - + def flush(self): + pass -def run_job(base, id, func, args, kwdargs): - global job_id - job_id = id - job_result = os.path.join(base, 'job_result.pickle') - +def work(client_socket, func, args, kwdargs): func = PARALLEL_FUNCS[func] - exception, tb = None, None + if hasattr(func, 'keywords'): + for key, val in func.keywords.items(): + if val == _notify: + func.keywords[key] = sys.stdout.notify + res = func(*args, **kwdargs) + sys.stdout.send() + return res + + + + +def worker(host, port): + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client_socket.connect((host, port)) + write(client_socket, str(os.getpid())) + msg = read(client_socket, timeout=10) + if msg != 'OK': + return 1 + write(client_socket, 'WAITING') + sys.stdout = BufferedSender(client_socket) + sys.stderr = sys.stdout + + while True: + msg = read(client_socket, timeout=60) + if msg.startswith('JOB:'): + func, args, kwdargs = cPickle.loads(msg[4:]) + write(client_socket, 'OK') + try: + result = work(client_socket, func, args, kwdargs) + write(client_socket, 'RESULT:'+ cPickle.dumps(result)) + except (Exception, SystemExit), err: + exception = (err.__class__.__name__, unicode(str(err), 'utf-8', 'replace')) + tb = traceback.format_exc() + write(client_socket, 'ERROR:'+cPickle.dumps((exception, tb),-1)) + if read(client_socket, 10) != 'OK': + break + gc.collect() + elif msg == 'STOP:': + return 0 + +def free_spirit(path): + func, args, kwdargs = cPickle.load(open(path, 'rb')) try: - result = func(*args, **kwdargs) - except (Exception, SystemExit), err: - result = None - exception = (err.__class__.__name__, unicode(str(err), 'utf-8', 'replace')) - tb = traceback.format_exc() - - if os.path.exists(os.path.dirname(job_result)): - cPickle.dump((result, exception, tb), open(job_result, 'wb')) - -def main(src): - from PyQt4.QtGui import QApplication - job_data = binascii.unhexlify(src) - global sa - job_id, func, args, kwdargs, rp, qapp = cPickle.load(open(job_data, 'rb')) - - if qapp and QApplication.instance() is None: - QApplication([]) - if SingleApplication is not None and rp and QApplication.instance() is not None: - sa = SingleApplication('calibre GUI') - - run_job(os.path.dirname(job_data), job_id, func, args, kwdargs) - - return 0 - -if __name__ == '__main__': - sys.exit(main(sys.argv[2])) - - + os.unlink(path) + except: + pass + PARALLEL_FUNCS[func](*args, **kwdargs) + \ No newline at end of file diff --git a/src/calibre/terminfo.py b/src/calibre/terminfo.py index 2141b9115b..2ed03a3077 100644 --- a/src/calibre/terminfo.py +++ b/src/calibre/terminfo.py @@ -94,7 +94,7 @@ class TerminalController: except: return # If the stream isn't a tty, then assume it has no capabilities. - if not hasattr(term_stream, 'isatty') or not term_stream.isatty(): return + if hasattr(sys, 'in_worker') or not hasattr(term_stream, 'isatty') or not term_stream.isatty(): return # Check the terminal type. If we fail, then assume that the # terminal has no capabilities. diff --git a/src/calibre/utils/single_qt_application.py b/src/calibre/utils/single_qt_application.py index 0c1eb0936a..78171a3dec 100644 --- a/src/calibre/utils/single_qt_application.py +++ b/src/calibre/utils/single_qt_application.py @@ -7,7 +7,7 @@ __docformat__ = 'restructuredtext en' Enforces running of only a single application instance and allows for messaging between applications using a local socket. ''' -import atexit +import atexit, os from PyQt4.QtCore import QByteArray, QDataStream, QIODevice, SIGNAL, QObject, Qt, QString from PyQt4.QtNetwork import QLocalSocket, QLocalServer @@ -93,6 +93,16 @@ class LocalServer(QLocalServer): for conn in pop: self.connections.remove(conn) + + def listen(self, name): + if not QLocalServer.listen(self, name): + try: + os.unlink(self.fullServerName()) + except: + pass + return QLocalServer.listen(self, name) + return True + @@ -124,8 +134,7 @@ class SingleApplication(QObject): self.mr, Qt.QueuedConnection) if not self.server.listen(self.server_name): - if not self.server.listen(self.server_name): - self.server = None + self.server = None if self.server is not None: atexit.register(self.server.close)