IGN:Initial implementation of process pool. Tested on linux

This commit is contained in:
Kovid Goyal 2008-06-19 23:23:34 -07:00
parent 8fcf04a9be
commit 9f1daa37d6
11 changed files with 527 additions and 204 deletions

View File

@ -80,6 +80,8 @@ if not os.path.exists('/etc/fonts/fonts.conf'):
continue continue
bad = True bad = True
break break
if not bad:
bad = os.path.exists('/etc/fonts/fonts.conf')
if bad: if bad:
auth = Authorization(destroyflags=(kAuthorizationFlagDestroyRights,)) auth = Authorization(destroyflags=(kAuthorizationFlagDestroyRights,))
fd, name = tempfile.mkstemp('.py') fd, name = tempfile.mkstemp('.py')

View File

@ -88,6 +88,9 @@ def setup_cli_handlers(logger, level):
handler = logging.StreamHandler(sys.stderr) handler = logging.StreamHandler(sys.stderr)
handler.setLevel(logging.DEBUG) handler.setLevel(logging.DEBUG)
handler.setFormatter(logging.Formatter('[%(levelname)s] %(filename)s:%(lineno)s: %(message)s')) handler.setFormatter(logging.Formatter('[%(levelname)s] %(filename)s:%(lineno)s: %(message)s'))
for hdlr in logger.handlers:
if hdlr.__class__ == handler.__class__:
logger.removeHandler(hdlr)
logger.addHandler(handler) logger.addHandler(handler)
class CustomHelpFormatter(IndentedHelpFormatter): class CustomHelpFormatter(IndentedHelpFormatter):

View File

@ -242,6 +242,7 @@ class HTMLConverter(object, LoggingInterface):
self.override_css = {} self.override_css = {}
self.override_pcss = {} self.override_pcss = {}
self.table_render_job_server = None
if self._override_css is not None: if self._override_css is not None:
if os.access(self._override_css, os.R_OK): if os.access(self._override_css, os.R_OK):
@ -262,37 +263,41 @@ class HTMLConverter(object, LoggingInterface):
paths = [os.path.abspath(path) for path in paths] paths = [os.path.abspath(path) for path in paths]
paths = [path.decode(sys.getfilesystemencoding()) if not isinstance(path, unicode) else path for path in paths] paths = [path.decode(sys.getfilesystemencoding()) if not isinstance(path, unicode) else path for path in paths]
while len(paths) > 0 and self.link_level <= self.link_levels: try:
for path in paths: while len(paths) > 0 and self.link_level <= self.link_levels:
if path in self.processed_files: for path in paths:
continue if path in self.processed_files:
try: continue
self.add_file(path) try:
except KeyboardInterrupt: self.add_file(path)
raise except KeyboardInterrupt:
except:
if self.link_level == 0: # Die on errors in the first level
raise raise
for link in self.links: except:
if link['path'] == path: if self.link_level == 0: # Die on errors in the first level
self.links.remove(link) raise
break for link in self.links:
self.log_warn('Could not process '+path) if link['path'] == path:
if self.verbose: self.links.remove(link)
self.log_exception(' ') break
self.links = self.process_links() self.log_warn('Could not process '+path)
self.link_level += 1 if self.verbose:
paths = [link['path'] for link in self.links] self.log_exception(' ')
self.links = self.process_links()
if self.current_page is not None and self.current_page.has_text(): self.link_level += 1
self.book.append(self.current_page) paths = [link['path'] for link in self.links]
for text, tb in self.extra_toc_entries: if self.current_page is not None and self.current_page.has_text():
self.book.addTocEntry(text, tb) self.book.append(self.current_page)
if self.base_font_size > 0: for text, tb in self.extra_toc_entries:
self.log_info('\tRationalizing font sizes...') self.book.addTocEntry(text, tb)
self.book.rationalize_font_sizes(self.base_font_size)
if self.base_font_size > 0:
self.log_info('\tRationalizing font sizes...')
self.book.rationalize_font_sizes(self.base_font_size)
finally:
if self.table_render_job_server is not None:
self.table_render_job_server.killall()
def is_baen(self, soup): def is_baen(self, soup):
return bool(soup.find('meta', attrs={'name':'Publisher', return bool(soup.find('meta', attrs={'name':'Publisher',
@ -1701,11 +1706,15 @@ class HTMLConverter(object, LoggingInterface):
self.process_children(tag, tag_css, tag_pseudo_css) self.process_children(tag, tag_css, tag_pseudo_css)
elif tagname == 'table' and not self.ignore_tables and not self.in_table: elif tagname == 'table' and not self.ignore_tables and not self.in_table:
if self.render_tables_as_images: if self.render_tables_as_images:
if self.table_render_job_server is None:
from calibre.parallel import Server
self.table_render_job_server = Server(number_of_workers=1)
print 'Rendering table...' print 'Rendering table...'
from calibre.ebooks.lrf.html.table_as_image import render_table from calibre.ebooks.lrf.html.table_as_image import render_table
pheight = int(self.current_page.pageStyle.attrs['textheight']) pheight = int(self.current_page.pageStyle.attrs['textheight'])
pwidth = int(self.current_page.pageStyle.attrs['textwidth']) pwidth = int(self.current_page.pageStyle.attrs['textwidth'])
images = render_table(self.soup, tag, tag_css, images = render_table(self.table_render_job_server,
self.soup, tag, tag_css,
os.path.dirname(self.target_prefix), os.path.dirname(self.target_prefix),
pwidth, pheight, self.profile.dpi, pwidth, pheight, self.profile.dpi,
self.text_size_multiplier_for_rendered_tables) self.text_size_multiplier_for_rendered_tables)

View File

@ -6,7 +6,7 @@ __docformat__ = 'restructuredtext en'
''' '''
Render HTML tables as images. Render HTML tables as images.
''' '''
import os, tempfile, atexit, shutil import os, tempfile, atexit, shutil, time
from PyQt4.Qt import QWebPage, QUrl, QApplication, QSize, \ from PyQt4.Qt import QWebPage, QUrl, QApplication, QSize, \
SIGNAL, QPainter, QImage, QObject, Qt SIGNAL, QPainter, QImage, QObject, Qt
@ -58,7 +58,7 @@ class HTMLTableRenderer(QObject):
finally: finally:
QApplication.quit() QApplication.quit()
def render_table(soup, table, css, base_dir, width, height, dpi, factor=1.0): def render_table(server, soup, table, css, base_dir, width, height, dpi, factor=1.0):
head = '' head = ''
for e in soup.findAll(['link', 'style']): for e in soup.findAll(['link', 'style']):
head += unicode(e)+'\n\n' head += unicode(e)+'\n\n'
@ -78,14 +78,17 @@ def render_table(soup, table, css, base_dir, width, height, dpi, factor=1.0):
</body> </body>
</html> </html>
'''%(head, width-10, style, unicode(table)) '''%(head, width-10, style, unicode(table))
from calibre.parallel import Server server.run_job(1, 'render_table',
s = Server() args=[html, base_dir, width, height, dpi, factor])
result, exception, traceback, log = s.run(1, 'render_table', qapp=True, report_progress=False, res = None
args=[html, base_dir, width, height, dpi, factor]) while res is None:
time.sleep(2)
res = server.result(1)
result, exception, traceback = res
if exception: if exception:
print 'Failed to render table' print 'Failed to render table'
print exception
print traceback print traceback
print log
images, tdir = result images, tdir = result
atexit.register(shutil.rmtree, tdir) atexit.register(shutil.rmtree, tdir)
return images return images

View File

@ -35,7 +35,7 @@ class JobsDialog(QDialog, Ui_JobsDialog):
self.jobs_view.setModel(model) self.jobs_view.setModel(model)
self.model = model self.model = model
self.setWindowModality(Qt.NonModal) self.setWindowModality(Qt.NonModal)
self.setWindowTitle(__appname__ + ' - Active Jobs') self.setWindowTitle(__appname__ + _(' - Jobs'))
QObject.connect(self.jobs_view.model(), SIGNAL('modelReset()'), QObject.connect(self.jobs_view.model(), SIGNAL('modelReset()'),
self.jobs_view.resizeColumnsToContents) self.jobs_view.resizeColumnsToContents)
QObject.connect(self.kill_button, SIGNAL('clicked()'), QObject.connect(self.kill_button, SIGNAL('clicked()'),

View File

@ -86,17 +86,34 @@ class DeviceJob(Job):
class ConversionJob(Job): class ConversionJob(Job):
''' Jobs that involve conversion of content.''' ''' Jobs that involve conversion of content.'''
def run(self): def __init__(self, *args, **kwdargs):
last_traceback, exception = None, None Job.__init__(self, *args, **kwdargs)
try: self.log = ''
self.result, exception, last_traceback, self.log = \
self.server.run(self.id, self.func, self.args, self.kwargs)
except Exception, err:
last_traceback = traceback.format_exc()
exception = (exception.__class__.__name__, unicode(str(err), 'utf8', 'replace'))
self.last_traceback, self.exception = last_traceback, exception
def run(self):
result = None
self.server.run_job(self.id, self.func, progress=self.progress,
args=self.args, kwdargs=self.kwargs,
output=self.output)
res = None
while res is None:
time.sleep(2)
res = self.server.result(self.id)
if res is None:
exception, tb = 'UnknownError: This should not have happened', ''
else:
result, exception, tb = res
self.result, self.last_traceback, self.exception = result, tb, exception
def output(self, msg):
if self.log is None:
self.log = ''
self.log += msg
self.emit(SIGNAL('output_received()'))
def formatted_log(self):
return '<h2>Log:</h2><pre>%s</pre>'%self.log
def notify(self): def notify(self):
self.emit(SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'), self.emit(SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'),
self.id, self.description, self.result, self.exception, self.last_traceback, self.log) self.id, self.description, self.result, self.exception, self.last_traceback, self.log)
@ -112,6 +129,9 @@ class ConversionJob(Job):
ans = u'<p><b>%s</b>: %s</p>'%self.exception ans = u'<p><b>%s</b>: %s</p>'%self.exception
ans += '<h2>Traceback:</h2><pre>%s</pre>'%self.last_traceback ans += '<h2>Traceback:</h2><pre>%s</pre>'%self.last_traceback
return ans return ans
def progress(self, percent, msg):
self.emit(SIGNAL('update_progress(int, PyQt_PyObject)'), self.id, percent)
class JobManager(QAbstractTableModel): class JobManager(QAbstractTableModel):
@ -149,9 +169,9 @@ class JobManager(QAbstractTableModel):
try: try:
if isinstance(job, DeviceJob): if isinstance(job, DeviceJob):
job.terminate() job.terminate()
self.process_server.kill(job.id)
except: except:
continue continue
self.process_server.killall()
def timerEvent(self, event): def timerEvent(self, event):
if event.timerId() == self.timer_id: if event.timerId() == self.timer_id:
@ -241,7 +261,10 @@ class JobManager(QAbstractTableModel):
id = self.next_id id = self.next_id
job = job_class(id, description, slot, priority, *args, **kwargs) job = job_class(id, description, slot, priority, *args, **kwargs)
job.server = self.process_server job.server = self.process_server
QObject.connect(job, SIGNAL('status_update(int, int)'), self.status_update, Qt.QueuedConnection) QObject.connect(job, SIGNAL('status_update(int, int)'), self.status_update,
Qt.QueuedConnection)
self.connect(job, SIGNAL('update_progress(int, PyQt_PyObject)'),
self.update_progress, Qt.QueuedConnection)
self.update_lock.lock() self.update_lock.lock()
self.add_queue.append(job) self.add_queue.append(job)
self.update_lock.unlock() self.update_lock.unlock()
@ -370,11 +393,14 @@ class DetailView(QDialog, Ui_Dialog):
self.setupUi(self) self.setupUi(self)
self.setWindowTitle(job.description) self.setWindowTitle(job.description)
self.job = job self.job = job
txt = self.job.formatted_error() + self.job.formatted_log() self.connect(self.job, SIGNAL('output_received()'), self.update)
self.update()
def update(self):
txt = self.job.formatted_error() + self.job.formatted_log()
if not txt: if not txt:
txt = 'No details available' txt = 'No details available'
self.log.setHtml(txt) self.log.setHtml(txt)
vbar = self.log.verticalScrollBar()
vbar.setValue(vbar.maximum())

View File

@ -77,7 +77,6 @@ class Main(MainWindow, Ui_MainWindow):
self.conversion_jobs = {} self.conversion_jobs = {}
self.persistent_files = [] self.persistent_files = []
self.metadata_dialogs = [] self.metadata_dialogs = []
self.viewer_job_id = 1
self.default_thumbnail = None self.default_thumbnail = None
self.device_error_dialog = ConversionErrorDialog(self, _('Error communicating with device'), ' ') self.device_error_dialog = ConversionErrorDialog(self, _('Error communicating with device'), ' ')
self.device_error_dialog.setModal(Qt.NonModal) self.device_error_dialog.setModal(Qt.NonModal)
@ -264,14 +263,6 @@ class Main(MainWindow, Ui_MainWindow):
elif msg.startswith('refreshdb:'): elif msg.startswith('refreshdb:'):
self.library_view.model().resort() self.library_view.model().resort()
self.library_view.model().research() self.library_view.model().research()
elif msg.startswith('progress:'):
try:
fields = msg.split(':')
job_id, percent = fields[1:3]
job_id, percent = int(job_id), float(percent)
self.job_manager.update_progress(job_id, percent)
except:
pass
else: else:
print msg print msg
@ -780,7 +771,7 @@ class Main(MainWindow, Ui_MainWindow):
cmdline.append(pt.name) cmdline.append(pt.name)
id = self.job_manager.run_conversion_job(self.book_converted, id = self.job_manager.run_conversion_job(self.book_converted,
'any2lrf', args=[cmdline], 'any2lrf', args=[cmdline],
job_description='Convert book %d of %d'%(i, len(rows))) job_description='Convert book %d of %d'%(i+1, len(rows)))
self.conversion_jobs[id] = (d.cover_file, pt, of, d.output_format, self.conversion_jobs[id] = (d.cover_file, pt, of, d.output_format,
@ -860,15 +851,16 @@ class Main(MainWindow, Ui_MainWindow):
self._view_file(result) self._view_file(result)
def _view_file(self, name): def _view_file(self, name):
if name.upper().endswith('.LRF'): self.setCursor(Qt.BusyCursor)
args = ['lrfviewer', name] try:
self.job_manager.process_server.run('viewer%d'%self.viewer_job_id, if name.upper().endswith('.LRF'):
'lrfviewer', kwdargs=dict(args=args), args = ['lrfviewer', name]
monitor=False) self.job_manager.process_server.run_free_job('lrfviewer', kwdargs=dict(args=args))
self.viewer_job_id += 1 else:
else: QDesktopServices.openUrl(QUrl('file:'+name))#launch(name)
QDesktopServices.openUrl(QUrl('file:'+name))#launch(name) time.sleep(5) # User feedback
time.sleep(2) # User feedback finally:
self.unsetCursor()
def view_specific_format(self, triggered): def view_specific_format(self, triggered):
rows = self.library_view.selectionModel().selectedRows() rows = self.library_view.selectionModel().selectedRows()
@ -1084,7 +1076,7 @@ class Main(MainWindow, Ui_MainWindow):
if getattr(exception, 'only_msg', False): if getattr(exception, 'only_msg', False):
error_dialog(self, _('Conversion Error'), unicode(exception)).exec_() error_dialog(self, _('Conversion Error'), unicode(exception)).exec_()
return return
msg = u'<p><b>%s</b>: %s</p>'%exception msg = u'<p><b>%s</b>: </p>'%exception
msg += u'<p>Failed to perform <b>job</b>: '+description msg += u'<p>Failed to perform <b>job</b>: '+description
msg += u'<p>Detailed <b>traceback</b>:<pre>' msg += u'<p>Detailed <b>traceback</b>:<pre>'
msg += formatted_traceback + '</pre>' msg += formatted_traceback + '</pre>'

View File

@ -1,6 +1,6 @@
__license__ = 'GPL v3' __license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>' __copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
import textwrap, re import re
from PyQt4.QtGui import QStatusBar, QMovie, QLabel, QFrame, QHBoxLayout, QPixmap, \ from PyQt4.QtGui import QStatusBar, QMovie, QLabel, QFrame, QHBoxLayout, QPixmap, \
QVBoxLayout, QSizePolicy, QToolButton, QIcon QVBoxLayout, QSizePolicy, QToolButton, QIcon

View File

@ -1,25 +1,25 @@
from __future__ import with_statement
__license__ = 'GPL v3' __license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>' __copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
''' '''
Used to run jobs in parallel in separate processes. Used to run jobs in parallel in separate processes.
''' '''
import sys, tempfile, os, cPickle, traceback, atexit, binascii, time, subprocess import sys, os, gc, cPickle, traceback, atexit, cStringIO, time, subprocess, socket, collections
from select import select
from functools import partial from functools import partial
from threading import RLock, Thread, Event
from calibre.ebooks.lrf.any.convert_from import main as any2lrf from calibre.ebooks.lrf.any.convert_from import main as any2lrf
from calibre.ebooks.lrf.web.convert_from import main as web2lrf from calibre.ebooks.lrf.web.convert_from import main as web2lrf
from calibre.ebooks.lrf.feeds.convert_from import main as feeds2lrf from calibre.ebooks.lrf.feeds.convert_from import main as feeds2lrf
from calibre.gui2.lrf_renderer.main import main as lrfviewer from calibre.gui2.lrf_renderer.main import main as lrfviewer
from calibre.ptempfile import PersistentTemporaryFile
try: try:
from calibre.ebooks.lrf.html.table_as_image import do_render as render_table from calibre.ebooks.lrf.html.table_as_image import do_render as render_table
except: # Dont fail is PyQt4.4 not present except: # Dont fail is PyQt4.4 not present
render_table = None render_table = None
from calibre import iswindows, __appname__, islinux from calibre import iswindows, islinux, detect_ncpus
try:
from calibre.utils.single_qt_application import SingleApplication
except:
SingleApplication = None
sa = None sa = None
job_id = None job_id = None
@ -29,12 +29,13 @@ def report_progress(percent, msg=''):
msg = 'progress:%s:%f:%s'%(job_id, percent, msg) msg = 'progress:%s:%f:%s'%(job_id, percent, msg)
sa.send_message(msg) sa.send_message(msg)
_notify = 'fskjhwseiuyweoiu987435935-0342'
PARALLEL_FUNCS = { PARALLEL_FUNCS = {
'any2lrf' : partial(any2lrf, gui_mode=True), 'any2lrf' : partial(any2lrf, gui_mode=True),
'web2lrf' : web2lrf, 'web2lrf' : web2lrf,
'lrfviewer' : lrfviewer, 'lrfviewer' : lrfviewer,
'feeds2lrf' : partial(feeds2lrf, notification=report_progress), 'feeds2lrf' : partial(feeds2lrf, notification=_notify),
'render_table': render_table, 'render_table': render_table,
} }
@ -52,144 +53,422 @@ if islinux and hasattr(sys, 'frozen_path'):
python = os.path.join(getattr(sys, 'frozen_path'), 'parallel') python = os.path.join(getattr(sys, 'frozen_path'), 'parallel')
popen = partial(subprocess.Popen, cwd=getattr(sys, 'frozen_path')) popen = partial(subprocess.Popen, cwd=getattr(sys, 'frozen_path'))
def cleanup(tdir): prefix = 'import sys; sys.in_worker = True; '
try: if hasattr(sys, 'frameworks_dir'):
import shutil fd = getattr(sys, 'frameworks_dir')
shutil.rmtree(tdir, True) prefix += 'sys.frameworks_dir = "%s"; sys.frozen = "macosx_app"; '%fd
except: if fd not in os.environ['PATH']:
pass os.environ['PATH'] += ':'+fd
class Server(object):
def write(socket, msg, timeout=5):
if isinstance(msg, unicode):
msg = msg.encode('utf-8')
length = None
while len(msg) > 0:
if length is None:
length = len(msg)
chunk = ('%-12d'%length) + msg[:4096-12]
msg = msg[4096-12:]
else:
chunk, msg = msg[:4096], msg[4096:]
w = select([], [socket], [], timeout)[1]
if not w:
raise RuntimeError('Write to socket timed out')
if socket.sendall(chunk) is not None:
raise RuntimeError('Failed to write chunk to socket')
def read(socket, timeout=5):
buf = cStringIO.StringIO()
length = None
while select([socket],[],[],timeout)[0]:
msg = socket.recv(4096)
if not msg:
break
if length is None:
length, msg = int(msg[:12]), msg[12:]
buf.write(msg)
if buf.tell() >= length:
break
if not length:
return ''
msg = buf.getvalue()[:length]
if len(msg) < length:
raise RuntimeError('Corrupted packet received')
return msg
class RepeatingTimer(Thread):
def repeat(self):
while True:
self.event.wait(self.interval)
if self.event.isSet():
break
self.action()
def __init__(self, interval, func):
self.event = Event()
self.interval = interval
self.action = func
Thread.__init__(self, target=self.repeat)
self.setDaemon(True)
class ControlError(Exception):
pass
class Overseer(object):
#: Interval in seconds at which child processes are polled for status information
INTERVAL = 0.1
KILL_RESULT = 'Server: job killed by user|||#@#$%&*)*(*$#$%#$@&' KILL_RESULT = 'Server: job killed by user|||#@#$%&*)*(*$#$%#$@&'
INTERVAL = 0.1
def __init__(self): def __init__(self, server, port, timeout=5):
self.tdir = tempfile.mkdtemp('', '%s_IPC_'%__appname__) self.cmd = prefix + 'from calibre.parallel import worker; worker(%s, %s)'%(repr('localhost'), repr(port))
atexit.register(cleanup, self.tdir) self.process = popen([python, '-c', self.cmd])
self.kill_jobs = [] self.socket = server.accept()[0]
def kill(self, job_id): self.working = False
''' self.timeout = timeout
Kill the job identified by job_id. self.last_job_time = time.time()
''' self.job_id = None
self.kill_jobs.append(str(job_id)) self._stop = False
if not select([self.socket], [], [], 120)[0]:
raise RuntimeError(_('Could not launch worker process.'))
if int(self.read()) != self.process.pid:
raise RuntimeError('PID mismatch')
self.write('OK')
if self.read() != 'WAITING':
raise RuntimeError('Worker sulking')
def _terminate(self, process): def terminate(self):
''' '''
Kill process. Kill process.
''' '''
try:
if self.socket:
self.socket.close()
except:
pass
if iswindows: if iswindows:
win32api = __import__('win32api') win32api = __import__('win32api')
try: try:
win32api.TerminateProcess(int(process.pid), -1) win32api.TerminateProcess(int(self.process.pid), -1)
except: except:
pass pass
else: else:
import signal import signal
os.kill(process.pid, signal.SIGKILL) try:
time.sleep(0.05) os.kill(self.process.pid, signal.SIGKILL)
time.sleep(0.05)
except:
pass
def run(self, job_id, func, args=[], kwdargs={}, monitor=True,
report_progress=True, qapp=True):
'''
Run a job in a separate process.
@param job_id: A unique (per server) identifier
@param func: One of C{PARALLEL_FUNCS.keys()}
@param args: A list of arguments to pass of C{func}
@param kwdargs: A dictionary of keyword arguments to pass to C{func}
@param monitor: If False launch the child process and return.
Do not monitor/communicate with it. Automatically sets
`report_progress` and `qapp` to False.
@param report_progess: If True progress is reported to the GUI
@param qapp: If True, A QApplication is created. If False, progress reporting will also be disabled.
@return: (result, exception, formatted_traceback, log) where log is the combined
stdout + stderr of the child process; or None if monitor is True. If a job is killed
by a call to L{kill()} then result will be L{KILL_RESULT}
'''
job_id = str(job_id)
job_dir = os.path.join(self.tdir, job_id)
if os.path.exists(job_dir):
raise ValueError('Cannot run job. The job_id %s has already been used.'%job_id)
os.mkdir(job_dir)
job_data = os.path.join(job_dir, 'job_data.pickle') def write(self, msg, timeout=None):
if not monitor: write(self.socket, msg, timeout=self.timeout if timeout is None else timeout)
report_progress = qapp = False
cPickle.dump((job_id, func, args, kwdargs, report_progress, qapp),
open(job_data, 'wb'), -1)
prefix = ''
if hasattr(sys, 'frameworks_dir'):
fd = getattr(sys, 'frameworks_dir')
prefix = 'import sys; sys.frameworks_dir = "%s"; sys.frozen = "macosx_app"; '%fd
if fd not in os.environ['PATH']:
os.environ['PATH'] += ':'+fd
cmd = prefix + 'from calibre.parallel import main; main(\'%s\')'%binascii.hexlify(job_data)
if not monitor: def read(self, timeout=None):
popen([python, '-c', cmd]) return read(self.socket, timeout=self.timeout if timeout is None else timeout)
def __eq__(self, other):
return hasattr(other, 'process') and hasattr(other.process, 'pid') and self.process.pid == other.process.pid
def __bool__(self):
self.process.poll()
return self.process.returncode is None
def pid(self):
return self.process.pid
def select(self, timeout=0):
return select([self.socket], [self.socket], [self.socket], timeout)
def initialize_job(self, job):
self.job_id = job.job_id
self.working = True
self.write('JOB:'+cPickle.dumps((job.func, job.args, job.kwdargs), -1))
msg = self.read()
if msg != 'OK':
raise ControlError('Failed to initialize job on worker %d:%s'%(self.process.pid, msg))
self.output = job.output if callable(job.output) else sys.stdout.write
self.progress = job.progress if callable(job.progress) else None
self.job = job
def control(self):
try:
if select([self.socket],[],[],0)[0]:
msg = self.read()
word, msg = msg.partition(':')[0], msg.partition(':')[-1]
if word == 'RESULT':
self.write('OK')
return Result(cPickle.loads(msg), None, None)
elif word == 'OUTPUT':
self.write('OK')
try:
self.output(''.join(cPickle.loads(msg)))
except:
self.output('Bad output message: '+ repr(msg))
elif word == 'PROGRESS':
self.write('OK')
percent = None
try:
percent, msg = cPickle.loads(msg)[-1]
except:
print 'Bad progress update:', repr(msg)
if self.progress and percent is not None:
self.progress(percent, msg)
elif word == 'ERROR':
self.write('OK')
return Result(None, *cPickle.loads(msg))
else:
self.terminate()
return Result(None, ControlError('Worker sent invalid msg: %s', repr(msg)), '')
self.process.poll()
if self.process.returncode is not None:
return Result(None, ControlError('Worker process died unexpectedly with returncode: %d'%self.process.returncode), '')
finally:
self.working = False
self.last_job_time = time.time()
class Job(object):
def __init__(self, job_id, func, args, kwdargs, output, progress, done):
self.job_id = job_id
self.func = func
self.args = args
self.kwdargs = kwdargs
self.output = output
self.progress = progress
self.done = done
class Result(object):
def __init__(self, result, exception, traceback):
self.result = result
self.exception = exception
self.traceback = traceback
def __len__(self):
return 3
def __item__(self, i):
return (self.result, self.exception, self.traceback)[i]
def __iter__(self):
return iter((self.result, self.exception, self.traceback))
class Server(Thread):
KILL_RESULT = Overseer.KILL_RESULT
START_PORT = 10013
def __init__(self, number_of_workers=detect_ncpus()):
Thread.__init__(self)
self.setDaemon(True)
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.port = self.START_PORT
while True:
try:
self.server_socket.bind(('localhost', self.port))
break
except:
self.port += 1
self.server_socket.listen(5)
self.number_of_workers = number_of_workers
self.pool, self.jobs, self.working, self.results = [], collections.deque(), [], {}
atexit.register(self.killall)
self.job_lock = RLock()
self.overseer_lock = RLock()
self.working_lock = RLock()
self.result_lock = RLock()
self.pool_lock = RLock()
self.start()
def add_job(self, job):
with self.job_lock:
self.jobs.append(job)
def store_result(self, result, id=None):
if id:
with self.job_lock:
self.results[id] = result
def result(self, id):
with self.result_lock:
return self.results.pop(id, None)
def run(self):
while True:
job = None
with self.job_lock:
if len(self.jobs) > 0 and len(self.working) < self.number_of_workers:
job = self.jobs.popleft()
with self.pool_lock:
o = self.pool.pop() if self.pool else Overseer(self.server_socket, self.port)
try:
o.initialize_job(job)
except Exception, err:
res = Result(None, unicode(err), traceback.format_exc())
job.done(res)
o.terminate()
o = None
if o:
with self.working_lock:
self.working.append(o)
with self.working_lock:
done = []
for o in self.working:
try:
res = o.control()
except Exception, err:
res = Result(None, unicode(err), traceback.format_exc())
o.terminate()
if isinstance(res, Result):
o.job.done(res)
done.append(o)
for o in done:
self.working.remove(o)
if o:
with self.pool_lock:
self.pool.append(o)
time.sleep(1)
def killall(self):
with self.pool_lock:
map(lambda x: x.terminate(), self.pool)
self.pool = []
def kill(self, job_id):
with self.working_lock:
pop = None
for o in self.working:
if o.job_id == job_id:
o.terminate()
o.job.done(Result(self.KILL_RESULT, None, ''))
pop = o
break
if pop is not None:
self.working.remove(pop)
def run_job(self, job_id, func, args=[], kwdargs={},
output=None, progress=None, done=None):
'''
Run a job in a separate process. Supports job control, output redirection
and progress reporting.
'''
if done is None:
done = partial(self.store_result, id=job_id)
job = Job(job_id, func, args, kwdargs, output, progress, done)
with self.job_lock:
self.jobs.append(job)
def run_free_job(self, func, args=[], kwdargs={}):
pt = PersistentTemporaryFile('.pickle', '_IPC_')
pt.write(cPickle.dumps((func, args, kwdargs)))
pt.close()
cmd = prefix + 'from calibre.parallel import free_spirit; free_spirit(%s)'%repr(pt.name)
popen([python, '-c', cmd])
##########################################################################################
##################################### CLIENT CODE #####################################
##########################################################################################
class BufferedSender(object):
def __init__(self, socket):
self.socket = socket
self.wbuf, self.pbuf = [], []
self.wlock, self.plock = RLock(), RLock()
self.timer = RepeatingTimer(0.5, self.send)
self.prefix = prefix
self.timer.start()
def write(self, msg):
if not isinstance(msg, basestring):
msg = unicode(msg)
with self.wlock:
self.wbuf.append(msg)
def send(self):
if not select([], [self.socket], [], 30)[1]:
print >>sys.__stderr__, 'Cannot pipe to overseer'
return return
output = open(os.path.join(job_dir, 'output.txt'), 'wb') with self.wlock:
p = popen([python, '-c', cmd], stdout=output, stderr=output, if self.wbuf:
stdin=subprocess.PIPE) msg = cPickle.dumps(self.wbuf, -1)
p.stdin.close() self.wbuf = []
while p.returncode is None: write(self.socket, 'OUTPUT:'+msg)
if job_id in self.kill_jobs: read(self.socket, 10)
self._terminate(p)
return self.KILL_RESULT, None, None, _('Job killed by user') with self.plock:
time.sleep(0.1) if self.pbuf:
p.poll() msg = cPickle.dumps(self.pbuf, -1)
self.pbuf = []
write(self.socket, 'PROGRESS:'+msg)
read(self.socket, 10)
def notify(self, percent, msg=''):
with self.plock:
self.pbuf.append((percent, msg))
def flush(self):
output.close() pass
job_result = os.path.join(job_dir, 'job_result.pickle')
if not os.path.exists(job_result):
result, exception, traceback = None, ('ParallelRuntimeError',
'The worker process died unexpectedly.'), ''
else:
result, exception, traceback = cPickle.load(open(job_result, 'rb'))
log = open(output.name, 'rb').read()
return result, exception, traceback, log
def run_job(base, id, func, args, kwdargs): def work(client_socket, func, args, kwdargs):
global job_id
job_id = id
job_result = os.path.join(base, 'job_result.pickle')
func = PARALLEL_FUNCS[func] func = PARALLEL_FUNCS[func]
exception, tb = None, None if hasattr(func, 'keywords'):
for key, val in func.keywords.items():
if val == _notify:
func.keywords[key] = sys.stdout.notify
res = func(*args, **kwdargs)
sys.stdout.send()
return res
def worker(host, port):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((host, port))
write(client_socket, str(os.getpid()))
msg = read(client_socket, timeout=10)
if msg != 'OK':
return 1
write(client_socket, 'WAITING')
sys.stdout = BufferedSender(client_socket)
sys.stderr = sys.stdout
while True:
msg = read(client_socket, timeout=60)
if msg.startswith('JOB:'):
func, args, kwdargs = cPickle.loads(msg[4:])
write(client_socket, 'OK')
try:
result = work(client_socket, func, args, kwdargs)
write(client_socket, 'RESULT:'+ cPickle.dumps(result))
except (Exception, SystemExit), err:
exception = (err.__class__.__name__, unicode(str(err), 'utf-8', 'replace'))
tb = traceback.format_exc()
write(client_socket, 'ERROR:'+cPickle.dumps((exception, tb),-1))
if read(client_socket, 10) != 'OK':
break
gc.collect()
elif msg == 'STOP:':
return 0
def free_spirit(path):
func, args, kwdargs = cPickle.load(open(path, 'rb'))
try: try:
result = func(*args, **kwdargs) os.unlink(path)
except (Exception, SystemExit), err: except:
result = None pass
exception = (err.__class__.__name__, unicode(str(err), 'utf-8', 'replace')) PARALLEL_FUNCS[func](*args, **kwdargs)
tb = traceback.format_exc()
if os.path.exists(os.path.dirname(job_result)):
cPickle.dump((result, exception, tb), open(job_result, 'wb'))
def main(src):
from PyQt4.QtGui import QApplication
job_data = binascii.unhexlify(src)
global sa
job_id, func, args, kwdargs, rp, qapp = cPickle.load(open(job_data, 'rb'))
if qapp and QApplication.instance() is None:
QApplication([])
if SingleApplication is not None and rp and QApplication.instance() is not None:
sa = SingleApplication('calibre GUI')
run_job(os.path.dirname(job_data), job_id, func, args, kwdargs)
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv[2]))

View File

@ -94,7 +94,7 @@ class TerminalController:
except: return except: return
# If the stream isn't a tty, then assume it has no capabilities. # If the stream isn't a tty, then assume it has no capabilities.
if not hasattr(term_stream, 'isatty') or not term_stream.isatty(): return if hasattr(sys, 'in_worker') or not hasattr(term_stream, 'isatty') or not term_stream.isatty(): return
# Check the terminal type. If we fail, then assume that the # Check the terminal type. If we fail, then assume that the
# terminal has no capabilities. # terminal has no capabilities.

View File

@ -7,7 +7,7 @@ __docformat__ = 'restructuredtext en'
Enforces running of only a single application instance and allows for messaging between Enforces running of only a single application instance and allows for messaging between
applications using a local socket. applications using a local socket.
''' '''
import atexit import atexit, os
from PyQt4.QtCore import QByteArray, QDataStream, QIODevice, SIGNAL, QObject, Qt, QString from PyQt4.QtCore import QByteArray, QDataStream, QIODevice, SIGNAL, QObject, Qt, QString
from PyQt4.QtNetwork import QLocalSocket, QLocalServer from PyQt4.QtNetwork import QLocalSocket, QLocalServer
@ -93,6 +93,16 @@ class LocalServer(QLocalServer):
for conn in pop: for conn in pop:
self.connections.remove(conn) self.connections.remove(conn)
def listen(self, name):
if not QLocalServer.listen(self, name):
try:
os.unlink(self.fullServerName())
except:
pass
return QLocalServer.listen(self, name)
return True
@ -124,8 +134,7 @@ class SingleApplication(QObject):
self.mr, Qt.QueuedConnection) self.mr, Qt.QueuedConnection)
if not self.server.listen(self.server_name): if not self.server.listen(self.server_name):
if not self.server.listen(self.server_name): self.server = None
self.server = None
if self.server is not None: if self.server is not None:
atexit.register(self.server.close) atexit.register(self.server.close)