New parallel processing framework. Parallel processes are controlled via TCP/IP sockets making distributed computing possible.

This commit is contained in:
Kovid Goyal 2008-06-21 20:02:11 -07:00
parent 233662ebd7
commit a1afe65f2c
13 changed files with 434 additions and 178 deletions

View File

@ -48,7 +48,7 @@ def _check_symlinks_prescript():
from Authorization import Authorization, kAuthorizationFlagDestroyRights
AUTHTOOL="""#!%(sp)s
import os
import os, shutil
scripts = %(sp)s
links = %(sp)s
fonts_conf = %(sp)s
@ -64,7 +64,8 @@ if not os.path.exists('/etc/fonts/fonts.conf'):
print 'Creating default fonts.conf'
if not os.path.exists('/etc/fonts'):
os.makedirs('/etc/fonts')
os.link(fonts_conf, '/etc/fonts/fonts.conf')
shutil.copyfile(fonts_conf, '/etc/fonts/fonts.conf')
shutil.copyfile(fonts_conf.replace('conf', 'dtd'), '/etc/fonts/fonts.dtd')
"""
dest_path = %(dest_path)s
@ -80,8 +81,7 @@ if not os.path.exists('/etc/fonts/fonts.conf'):
continue
bad = True
break
if not bad:
bad = os.path.exists('/etc/fonts/fonts.conf')
bad = bad or not os.path.exists('/etc/fonts/fonts.conf')
if bad:
auth = Authorization(destroyflags=(kAuthorizationFlagDestroyRights,))
fd, name = tempfile.mkstemp('.py')
@ -280,13 +280,15 @@ sys.frameworks_dir = os.path.join(os.path.dirname(os.environ['RESOURCEPATH']), '
f.write(src)
f.close()
print
print 'Adding GUI main.py'
print 'Adding GUI scripts to site-packages'
f = zipfile.ZipFile(os.path.join(self.dist_dir, APPNAME+'.app', 'Contents', 'Resources', 'lib', 'python2.5', 'site-packages.zip'), 'a', zipfile.ZIP_DEFLATED)
f.write('src/calibre/gui2/main.py', 'calibre/gui2/main.py')
for script in scripts['gui']:
f.write(script, script.partition('/')[-1])
f.close()
print
print 'Adding default fonts.conf'
open(os.path.join(self.dist_dir, APPNAME+'.app', 'Contents', 'Resources', 'fonts.conf'), 'wb').write(open('/etc/fonts/fonts.conf').read())
open(os.path.join(self.dist_dir, APPNAME+'.app', 'Contents', 'Resources', 'fonts.dtd'), 'wb').write(open('/etc/fonts/fonts.dtd').read())
print
print 'Building disk image'
BuildAPP.makedmg(os.path.join(self.dist_dir, APPNAME+'.app'), APPNAME+'-'+VERSION)

View File

@ -13,9 +13,6 @@ print 'Setup', APPNAME, 'version:', VERSION
epsrc = re.compile(r'entry_points = (\{.*?\})', re.DOTALL).search(open('src/%s/linux.py'%APPNAME, 'rb').read()).group(1)
entry_points = eval(epsrc, {'__appname__': APPNAME})
if 'win32' in sys.platform.lower() or 'win64' in sys.platform.lower():
entry_points['console_scripts'].append('parallel = %s.parallel:main'%APPNAME)
def _ep_to_script(ep, base='src'):
return (base+os.path.sep+re.search(r'.*=\s*(.*?):', ep).group(1).replace('.', '/')+'.py').strip()

View File

@ -75,6 +75,8 @@ class ColoredFormatter(Formatter):
def setup_cli_handlers(logger, level):
if os.environ.get('CALIBRE_WORKER', None) is not None and logger.handlers:
return
logger.setLevel(level)
if level == logging.WARNING:
handler = logging.StreamHandler(sys.stdout)
@ -88,9 +90,7 @@ def setup_cli_handlers(logger, level):
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(logging.DEBUG)
handler.setFormatter(logging.Formatter('[%(levelname)s] %(filename)s:%(lineno)s: %(message)s'))
for hdlr in logger.handlers:
if hdlr.__class__ == handler.__class__:
logger.removeHandler(hdlr)
logger.addHandler(handler)
class CustomHelpFormatter(IndentedHelpFormatter):

View File

@ -13,13 +13,17 @@
<string>Details of job</string>
</property>
<property name="windowIcon" >
<iconset resource="../images.qrc" >:/images/view.svg</iconset>
<iconset resource="../images.qrc" >
<normaloff>:/images/view.svg</normaloff>:/images/view.svg</iconset>
</property>
<layout class="QGridLayout" >
<item row="0" column="0" >
<widget class="QTextBrowser" name="log" >
<property name="lineWrapMode" >
<enum>QTextEdit::NoWrap</enum>
<widget class="QTextEdit" name="log" >
<property name="undoRedoEnabled" >
<bool>false</bool>
</property>
<property name="readOnly" >
<bool>true</bool>
</property>
</widget>
</item>

View File

@ -126,7 +126,7 @@ class ConversionJob(Job):
def formatted_error(self):
if self.exception is None:
return ''
ans = u'<p><b>%s</b>: %s</p>'%self.exception
ans = u'<p><b>%s</b>:'%self.exception
ans += '<h2>Traceback:</h2><pre>%s</pre>'%self.last_traceback
return ans

View File

@ -1,13 +1,12 @@
from calibre.gui2.library import SearchBox
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
import sys, logging, os, traceback, time, cPickle
import sys, logging, os, traceback, time
from PyQt4.QtGui import QKeySequence, QPainter, QDialog, QSpinBox, QSlider
from PyQt4.QtCore import Qt, QObject, SIGNAL, QCoreApplication, QThread, \
QVariant
from PyQt4.QtCore import Qt, QObject, SIGNAL, QCoreApplication, QThread
from calibre import __appname__, __version__, __author__, setup_cli_handlers, islinux, Settings
from calibre import __appname__, setup_cli_handlers, islinux, Settings
from calibre.ebooks.lrf.lrfparser import LRFDocument
from calibre.gui2 import ORG_NAME, APP_UID, error_dialog, choose_files, Application
@ -57,7 +56,7 @@ class Config(QDialog, Ui_ViewerConfig):
class Main(MainWindow, Ui_MainWindow):
def __init__(self, logger, opts, parent=None):
MainWindow.__init__(self, parent)
MainWindow.__init__(self, opts, parent)
Ui_MainWindow.__init__(self)
self.setupUi(self)
self.setAttribute(Qt.WA_DeleteOnClose)
@ -263,9 +262,12 @@ def file_renderer(stream, opts, parent=None, logger=None):
def option_parser():
from optparse import OptionParser
parser = OptionParser(usage='%prog book.lrf', version=__appname__+' '+__version__,
epilog='Created by ' + __author__)
from calibre.gui2.main_window import option_parser
parser = option_parser('''\
%prog [options] book.lrf
Read the LRF ebook book.lrf
''')
parser.add_option('--verbose', default=False, action='store_true', dest='verbose',
help='Print more information about the rendering process')
parser.add_option('--visual-debug', help='Turn on visual aids to debugging the rendering engine',

View File

@ -23,7 +23,7 @@ from calibre.gui2 import APP_UID, warning_dialog, choose_files, error_dialog, \
from calibre.gui2.cover_flow import CoverFlow, DatabaseImages
from calibre.library.database import LibraryDatabase
from calibre.gui2.update import CheckForUpdates
from calibre.gui2.main_window import MainWindow
from calibre.gui2.main_window import MainWindow, option_parser
from calibre.gui2.main_ui import Ui_MainWindow
from calibre.gui2.device import DeviceDetector, DeviceManager
from calibre.gui2.status import StatusBar
@ -58,8 +58,8 @@ class Main(MainWindow, Ui_MainWindow):
p.end()
self.default_thumbnail = (pixmap.width(), pixmap.height(), pixmap_to_data(pixmap))
def __init__(self, single_instance, parent=None):
MainWindow.__init__(self, parent)
def __init__(self, single_instance, opts, parent=None):
MainWindow.__init__(self, opts, parent)
self.single_instance = single_instance
if self.single_instance is not None:
self.connect(self.single_instance, SIGNAL('message_received(PyQt_PyObject)'),
@ -1079,7 +1079,7 @@ class Main(MainWindow, Ui_MainWindow):
if getattr(exception, 'only_msg', False):
error_dialog(self, _('Conversion Error'), unicode(exception)).exec_()
return
msg = u'<p><b>%s</b>: </p>'%exception
msg = u'<p><b>%s</b>:'%exception
msg += u'<p>Failed to perform <b>job</b>: '+description
msg += u'<p>Detailed <b>traceback</b>:<pre>'
msg += formatted_traceback + '</pre>'
@ -1166,6 +1166,13 @@ def main(args=sys.argv):
pid = os.fork() if islinux else -1
if pid <= 0:
parser = option_parser('''\
%prog [opts] [path_to_ebook]
Launch the main calibre Graphical User Interface and optionally add the ebook at
path_to_ebook to the database.
''')
opts, args = parser.parse_args(args)
app = Application(args)
app.setWindowIcon(QIcon(':/library'))
QCoreApplication.setOrganizationName(ORG_NAME)
@ -1173,7 +1180,7 @@ def main(args=sys.argv):
single_instance = None if SingleApplication is None else SingleApplication('calibre GUI')
if not singleinstance('calibre GUI'):
if single_instance is not None and single_instance.is_running() and \
single_instance.send_message('launched:'+repr(sys.argv)):
single_instance.send_message('launched:'+repr(args)):
return 0
QMessageBox.critical(None, 'Cannot Start '+__appname__,
@ -1181,14 +1188,14 @@ def main(args=sys.argv):
return 1
initialize_file_icon_provider()
try:
main = Main(single_instance)
main = Main(single_instance, opts)
except DatabaseLocked, err:
QMessageBox.critical(None, 'Cannot Start '+__appname__,
'<p>Another program is using the database. <br/>Perhaps %s is already running?<br/>If not try deleting the file %s'%(__appname__, err.lock_file_path))
return 1
sys.excepthook = main.unhandled_exception
if len(sys.argv) > 1:
main.add_filesystem_book(sys.argv[1])
if len(args) > 1:
main.add_filesystem_book(args)
return app.exec_()
return 0
@ -1199,7 +1206,7 @@ if __name__ == '__main__':
except:
if not iswindows: raise
from PyQt4.QtGui import QErrorMessage
logfile = os.path.expanduser('~/calibre.log')
logfile = os.path.join(os.path.expanduser('~'), 'calibre.log')
if os.path.exists(logfile):
log = open(logfile).read()
if log.strip():

View File

@ -3,13 +3,45 @@ __copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
import StringIO, traceback, sys
from PyQt4.QtGui import QMainWindow
from PyQt4.Qt import QMainWindow, QString, Qt, QFont
from calibre.gui2.dialogs.conversion_error import ConversionErrorDialog
from calibre import OptionParser
def option_parser(usage='''\
Usage: %prog [options]
Launch the Graphical User Interface
'''):
parser = OptionParser(usage)
parser.add_option('--redirect-console-output', default=False, action='store_true', dest='redirect',
help=_('Redirect console output to a dialog window (both stdout and stderr). Useful on windows where GUI apps do not have a output streams.'))
return parser
class DebugWindow(ConversionErrorDialog):
def __init__(self, parent):
ConversionErrorDialog.__init__(self, parent, 'Console output', '')
self.setModal(Qt.NonModal)
font = QFont()
font.setStyleHint(QFont.TypeWriter)
self.text.setFont(font)
def write(self, msg):
self.text.setPlainText(self.text.toPlainText()+QString(msg))
def flush(self):
pass
class MainWindow(QMainWindow):
def __init__(self, parent=None):
def __init__(self, opts, parent=None):
QMainWindow.__init__(self, parent)
if opts.redirect:
self.__console_redirect = DebugWindow(self)
sys.stdout = sys.stderr = self.__console_redirect
self.__console_redirect.show()
print 'testing 1'
print 'testing 2'
def unhandled_exception(self, type, value, tb):
try:
@ -19,7 +51,7 @@ class MainWindow(QMainWindow):
print >>sys.stderr, fe
msg = '<p><b>' + unicode(str(value), 'utf8', 'replace') + '</b></p>'
msg += '<p>Detailed <b>traceback</b>:<pre>'+fe+'</pre>'
d = ConversionErrorDialog(self, 'ERROR: Unhandled exception', msg)
d = ConversionErrorDialog(self, _('ERROR: Unhandled exception'), msg)
d.exec_()
except:
pass

View File

@ -49,6 +49,7 @@ entry_points = {
'calibre-debug = calibre.debug:main',
'calibredb = calibre.library.cli:main',
'calibre-fontconfig = calibre.utils.fontconfig:main',
'calibre-parallel = calibre.parallel:main',
],
'gui_scripts' : [
__appname__+' = calibre.gui2.main:main',

View File

@ -1,75 +1,231 @@
from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
__docformat__ = 'restructuredtext en'
'''
Used to run jobs in parallel in separate processes.
Used to run jobs in parallel in separate processes. Features output streaming,
support for progress notification as well as job killing. The worker processes
are controlled via a simple protocol run over TCP/IP sockets. The control happens
mainly in two class, :class:`Server` and :class:`Overseer`. The worker is
encapsulated in the function :function:`worker`. Every worker process
has the environment variable :envvar:`CALIBRE_WORKER` defined.
The worker control protocol has two modes of operation. In the first mode, the
worker process listens for commands from the controller process. The controller
process can either hand off a job to the worker or tell the worker to die.
Once a job is handed off to the worker, the protocol enters the second mode, where
the controller listens for messages from the worker. The worker can send progress updates
as well as console output (i.e. text that would normally have been written to stdout
or stderr by the job). Once the job completes (or raises an exception) the worker
returns the result (or exception) to the controller adnt he protocol reverts to the first mode.
In the second mode, the controller can also send the worker STOP messages, in which case
the worker interrupts the job and dies. The sending of progress and console output messages
is buffered and asynchronous to prevent the job from being IO bound.
'''
import sys, os, gc, cPickle, traceback, atexit, cStringIO, time, \
subprocess, socket, collections, binascii
import sys, os, gc, cPickle, traceback, atexit, cStringIO, time, signal, \
subprocess, socket, collections, binascii, re, tempfile, thread
from select import select
from functools import partial
from threading import RLock, Thread, Event
from calibre.ebooks.lrf.any.convert_from import main as any2lrf
from calibre.ebooks.lrf.web.convert_from import main as web2lrf
from calibre.ebooks.lrf.feeds.convert_from import main as feeds2lrf
from calibre.gui2.lrf_renderer.main import main as lrfviewer
from calibre.ptempfile import PersistentTemporaryFile
from calibre import iswindows, detect_ncpus, isosx
try:
from calibre.ebooks.lrf.html.table_as_image import do_render as render_table
except: # Dont fail is PyQt4.4 not present
render_table = None
from calibre import iswindows, islinux, detect_ncpus
sa = None
job_id = None
def report_progress(percent, msg=''):
if sa is not None and job_id is not None:
msg = 'progress:%s:%f:%s'%(job_id, percent, msg)
sa.send_message(msg)
_notify = 'fskjhwseiuyweoiu987435935-0342'
#: A mapping from job names to functions that perform the jobs
PARALLEL_FUNCS = {
'any2lrf' : partial(any2lrf, gui_mode=True),
'web2lrf' : web2lrf,
'lrfviewer' : lrfviewer,
'feeds2lrf' : partial(feeds2lrf, notification=_notify),
'render_table': render_table,
}
'any2lrf' :
('calibre.ebooks.lrf.any.convert_from', 'main', dict(gui_mode=True), None),
python = sys.executable
popen = subprocess.Popen
'lrfviewer' :
('calibre.gui2.lrf_renderer.main', 'main', {}, None),
if iswindows:
if hasattr(sys, 'frozen'):
python = os.path.join(os.path.dirname(python), 'parallel.exe')
else:
python = os.path.join(os.path.dirname(python), 'Scripts\\parallel.exe')
open = partial(subprocess.Popen, creationflags=0x08) # CREATE_NO_WINDOW=0x08 so that no ugly console is popped up
'feeds2lrf' :
('calibre.ebooks.lrf.feeds.convert_from', 'main', {}, 'notification'),
if islinux and hasattr(sys, 'frozen_path'):
python = os.path.join(getattr(sys, 'frozen_path'), 'calibre-parallel')
popen = partial(subprocess.Popen, cwd=getattr(sys, 'frozen_path'))
'render_table' :
('calibre.ebooks.lrf.html.table_as_image', 'do_render', {}, None),
}
prefix = 'import sys; sys.in_worker = True; '
if hasattr(sys, 'frameworks_dir'):
isfrozen = hasattr(sys, 'frozen')
win32event = __import__('win32event') if iswindows else None
win32process = __import__('win32process') if iswindows else None
msvcrt = __import__('msvcrt') if iswindows else None
class WorkerStatus(object):
'''
A platform independent class to control child processes. Provides the
methods:
.. method:: WorkerStatus.is_alive()
Return True is the child process is alive (i.e. it hasn't exited and returned a return code).
.. method:: WorkerStatus.returncode()
Wait for the child process to exit and return its return code (blocks until child returns).
.. method:: WorkerStatus.kill()
Forcibly terminates child process using operating system specific semantics.
'''
def __init__(self, obj):
'''
`obj`: On windows a process handle, on unix a subprocess.Popen object.
'''
self.obj = obj
self.win32process = win32process # Needed if kill is called during shutdown of interpreter
self.os = os
self.signal = signal
ext = 'windows' if iswindows else 'unix'
for func in ('is_alive', 'returncode', 'kill'):
setattr(self, func, getattr(self, func+'_'+ext))
def is_alive_unix(self):
return self.obj.poll() == None
def returncode_unix(self):
return self.obj.wait()
def kill_unix(self):
os.kill(self.obj.pid, self.signal.SIGKILL)
def is_alive_windows(self):
return win32event.WaitForSingleObject(self.obj, 0) != win32event.WAIT_OBJECT_0
def returncode_windows(self):
return win32process.GetExitCodeProcess(self.obj)
def kill_windows(self, returncode=-1):
self.win32process.TerminateProcess(self.obj, returncode)
class WorkerMother(object):
'''
Platform independent object for launching child processes. All processes
have the environment variable :envvar:`CALIBRE_WORKER` set.
..method:: WorkerMother.spawn_free_spirit(arg)
Launch a non monitored process with argument `arg`.
..method:: WorkerMother.spawn_worker(arg)
Launch a monitored and controllable process with argument `arg`.
'''
def __init__(self):
ext = 'windows' if iswindows else 'osx' if isosx else 'linux'
self.os = os # Needed incase cleanup called when interpreter is shutting down
if iswindows:
self.executable = os.path.join(os.path.dirname(sys.executable),
'calibre-parallel.exe' if isfrozen else 'Scripts\\calibre-parallel.exe')
elif isosx:
self.executable = sys.executable
self.prefix = ''
if isfrozen:
fd = getattr(sys, 'frameworks_dir')
prefix += 'sys.frameworks_dir = "%s"; sys.frozen = "macosx_app"; '%fd
contents = os.path.dirname(fd)
resources = os.path.join(contents, 'Resources')
sp = os.path.join(resources, 'lib', 'python'+sys.version[:3], 'site-packages.zip')
self.prefix += 'import sys; sys.frameworks_dir = "%s"; sys.frozen = "macosx_app"; '%fd
self.prefix += 'sys.path.insert(0, %s); '%repr(sp)
self.env = {}
if fd not in os.environ['PATH']:
os.environ['PATH'] += ':'+fd
if 'parallel' in python:
executable = [python]
worker_command = '%s:%s'
free_spirit_command = '%s'
else:
executable = [python, '-c']
worker_command = prefix + 'from calibre.parallel import worker; worker(%s, %s)'
free_spirit_command = prefix + 'from calibre.parallel import free_spirit; free_spirit(%s)'
self.env['PATH'] = os.environ['PATH']+':'+fd
self.env['PYTHONHOME'] = resources
else:
self.executable = os.path.join(getattr(sys, 'frozen_path'), 'calibre-parallel') \
if isfrozen else 'calibre-parallel'
self.spawn_worker_windows = lambda arg : self.spawn_free_spirit_windows(arg, type='worker')
self.spawn_worker_linux = lambda arg : self.spawn_free_spirit_linux(arg, type='worker')
self.spawn_worker_osx = lambda arg : self.spawn_free_spirit_osx(arg, type='worker')
for func in ('spawn_free_spirit', 'spawn_worker'):
setattr(self, func, getattr(self, func+'_'+ext))
def cleanup_child_windows(self, child, name=None, fd=None):
try:
child.kill()
except:
pass
try:
if fd is not None:
self.os.close(fd)
except:
pass
try:
if name is not None and os.path.exists(name):
self.os.unlink(name)
except:
pass
def cleanup_child_linux(self, child):
try:
child.kill()
except:
pass
def get_env(self):
env = dict(os.environ)
env['CALIBRE_WORKER'] = '1'
if hasattr(self, 'env'):
env.update(self.env)
return env
def spawn_free_spirit_osx(self, arg, type='free_spirit'):
script = 'from calibre.parallel import main; main(args=["calibre-parallel", %s]);'%repr(arg)
cmdline = [self.executable, '-c', self.prefix+script]
child = WorkerStatus(subprocess.Popen(cmdline, env=self.get_env()))
atexit.register(self.cleanup_child_linux, child)
return child
def spawn_free_spirit_linux(self, arg, type='free_spirit'):
cmdline = [self.executable, arg]
child = WorkerStatus(subprocess.Popen(cmdline, env=self.get_env()))
atexit.register(self.cleanup_child_linux, child)
return child
def spawn_free_spirit_windows(self, arg, type='free_spirit'):
fd, name = tempfile.mkstemp('.log', 'calibre_'+type+'_')
handle = msvcrt.get_osfhandle(fd)
si = win32process.STARTUPINFO()
si.hStdOutput = handle
si.hStdError = handle
cmdline = self.executable + ' ' + str(arg)
hProcess = \
win32process.CreateProcess(
None, # Application Name
cmdline, # Command line
None, # processAttributes
None, # threadAttributes
1, # bInheritHandles
win32process.CREATE_NO_WINDOW, # Dont want ugly console popping up
self.get_env(), # New environment
None, # Current directory
si
)[0]
child = WorkerStatus(hProcess)
atexit.register(self.cleanup_child_windows, child, name, fd)
return child
mother = WorkerMother()
def write(socket, msg, timeout=5):
'''
Write a message on socket. If `msg` is unicode, it is encoded in utf-8.
Raises a `RuntimeError` if the socket is not ready for writing or the writing fails.
`msg` is broken into chunks of size 4096 and sent. The :function:`read` function
automatically re-assembles the chunks into whole message.
'''
if isinstance(msg, unicode):
msg = msg.encode('utf-8')
length = None
@ -88,6 +244,11 @@ def write(socket, msg, timeout=5):
def read(socket, timeout=5):
'''
Read a message from `socket`. The message must have been sent with the :function:`write`
function. Raises a `RuntimeError` if the message is corrpted. Can return an
empty string.
'''
buf = cStringIO.StringIO()
length = None
while select([socket],[],[],timeout)[0]:
@ -108,6 +269,11 @@ def read(socket, timeout=5):
return msg
class RepeatingTimer(Thread):
'''
Calls a specified function repeatedly at a specified interval. Runs in a
daemon thread (i.e. the interpreter can exit while it is still running).
Call :meth:`start()` to start it.
'''
def repeat(self):
while True:
@ -116,25 +282,31 @@ class RepeatingTimer(Thread):
break
self.action()
def __init__(self, interval, func):
def __init__(self, interval, func, name):
self.event = Event()
self.interval = interval
self.action = func
Thread.__init__(self, target=self.repeat)
Thread.__init__(self, target=self.repeat, name=name)
self.setDaemon(True)
class ControlError(Exception):
pass
class Overseer(object):
'''
Responsible for controlling worker processes. The main interface is the
methods, :meth:`initialize_job`, :meth:`control`.
'''
KILL_RESULT = 'Server: job killed by user|||#@#$%&*)*(*$#$%#$@&'
INTERVAL = 0.1
def __init__(self, server, port, timeout=5):
self.cmd = worker_command%(repr('127.0.0.1'), repr(port))
self.process = popen(executable + [self.cmd])
self.worker_status = mother.spawn_worker('127.0.0.1:%d'%port)
self.socket = server.accept()[0]
# Needed if terminate called hwen interpreter is shutting down
self.os = os
self.signal = signal
self.working = False
self.timeout = timeout
@ -152,9 +324,7 @@ class Overseer(object):
raise RuntimeError('Worker sulking')
def terminate(self):
'''
Kill process.
'''
'Kill worker process.'
try:
if self.socket:
self.write('STOP:')
@ -170,9 +340,8 @@ class Overseer(object):
except:
pass
else:
import signal
try:
os.kill(self.worker_pid, signal.SIGKILL)
self.os.kill(self.worker_pid, self.signal.SIGKILL)
time.sleep(0.05)
except:
pass
@ -188,16 +357,19 @@ class Overseer(object):
return hasattr(other, 'process') and hasattr(other, 'worker_pid') and self.worker_pid == other.worker_pid
def __bool__(self):
self.process.poll()
return self.process.returncode is None
def pid(self):
return self.worker_pid
return self.worker_status.is_alive()
def select(self, timeout=0):
return select([self.socket], [self.socket], [self.socket], timeout)
def initialize_job(self, job):
'''
Sends `job` to worker process. Can raise `ControlError` if worker process
does not respond appropriately. In this case, this Overseer is useless
and should be discarded.
`job`: An instance of :class:`Job`.
'''
self.job_id = job.job_id
self.working = True
self.write('JOB:'+cPickle.dumps((job.func, job.args, job.kwdargs), -1))
@ -209,7 +381,13 @@ class Overseer(object):
self.job = job
def control(self):
try:
'''
Listens for messages from the worker process and dispatches them
appropriately. If the worker process dies unexpectedly, returns a result
of None with a ControlError indicating the worker died.
Returns a :class:`Result` instance or None, if the worker is still working.
'''
if select([self.socket],[],[],0)[0]:
msg = self.read()
word, msg = msg.partition(':')[0], msg.partition(':')[-1]
@ -237,12 +415,10 @@ class Overseer(object):
else:
self.terminate()
return Result(None, ControlError('Worker sent invalid msg: %s', repr(msg)), '')
self.process.poll()
if self.process.returncode is not None:
if not self.worker_status.is_alive():
return Result(None, ControlError('Worker process died unexpectedly with returncode: %d'%self.process.returncode), '')
finally:
self.working = False
self.last_job_time = time.time()
class Job(object):
@ -325,13 +501,22 @@ class Server(Thread):
if len(self.jobs) > 0 and len(self.working) < self.number_of_workers:
job = self.jobs.popleft()
with self.pool_lock:
o = self.pool.pop() if self.pool else Overseer(self.server_socket, self.port)
o = None
while self.pool:
o = self.pool.pop()
try:
o.initialize_job(job)
break
except:
o.terminate()
if o is None:
o = Overseer(self.server_socket, self.port)
try:
o.initialize_job(job)
except Exception, err:
o.terminate()
res = Result(None, unicode(err), traceback.format_exc())
job.done(res)
o.terminate()
o = None
if o:
with self.working_lock:
@ -393,8 +578,8 @@ class Server(Thread):
pt = PersistentTemporaryFile('.pickle', '_IPC_')
pt.write(cPickle.dumps((func, args, kwdargs)))
pt.close()
cmd = free_spirit_command%repr(binascii.hexlify(pt.name))
popen(executable + [cmd])
mother.spawn_free_spirit(binascii.hexlify(pt.name))
##########################################################################################
##################################### CLIENT CODE #####################################
@ -406,8 +591,7 @@ class BufferedSender(object):
self.socket = socket
self.wbuf, self.pbuf = [], []
self.wlock, self.plock = RLock(), RLock()
self.timer = RepeatingTimer(0.5, self.send)
self.prefix = prefix
self.timer = RepeatingTimer(0.5, self.send, 'BufferedSender')
self.timer.start()
def write(self, msg):
@ -417,6 +601,15 @@ class BufferedSender(object):
self.wbuf.append(msg)
def send(self):
if select([self.socket], [], [], 0)[0]:
msg = read(self.socket)
if msg == 'PING:':
write(self.socket, 'OK')
elif msg:
self.socket.shutdown(socket.SHUT_RDWR)
thread.interrupt_main()
time.sleep(1)
raise SystemExit
if not select([], [self.socket], [], 30)[1]:
print >>sys.__stderr__, 'Cannot pipe to overseer'
return
@ -442,13 +635,18 @@ class BufferedSender(object):
def flush(self):
pass
def get_func(name):
module, func, kwdargs, notification = PARALLEL_FUNCS[name]
module = __import__(module, fromlist=[1])
func = getattr(module, func)
return func, kwdargs, notification
def work(client_socket, func, args, kwdargs):
func = PARALLEL_FUNCS[func]
if hasattr(func, 'keywords'):
for key, val in func.keywords.items():
if val == _notify and hasattr(sys.stdout, 'notify'):
func.keywords[key] = sys.stdout.notify
res = func(*args, **kwdargs)
func, kargs, notification = get_func(func)
if notification is not None and hasattr(sys.stdout, 'notify'):
kargs[notification] = sys.stdout.notify
kargs.update(kwdargs)
res = func(*args, **kargs)
if hasattr(sys.stdout, 'send'):
sys.stdout.send()
return res
@ -467,6 +665,9 @@ def worker(host, port):
sys.stderr = sys.stdout
while True:
if not select([client_socket], [], [], 60)[0]:
time.sleep(1)
continue
msg = read(client_socket, timeout=60)
if msg.startswith('JOB:'):
func, args, kwdargs = cPickle.loads(msg[4:])
@ -481,7 +682,10 @@ def worker(host, port):
if read(client_socket, 10) != 'OK':
break
gc.collect()
elif msg == 'PING:':
write(client_socket, 'OK')
elif msg == 'STOP:':
client_socket.shutdown(socket.SHUT_RDWR)
return 0
elif not msg:
time.sleep(1)
@ -490,17 +694,19 @@ def worker(host, port):
return 1
def free_spirit(path):
func, args, kwdargs = cPickle.load(open(binascii.unhexlify(path), 'rb'))
func, args, kwdargs = cPickle.load(open(path, 'rb'))
try:
os.unlink(path)
except:
pass
PARALLEL_FUNCS[func](*args, **kwdargs)
func, kargs = get_func(func)[:2]
kargs.update(kwdargs)
func(*args, **kargs)
def main(args=sys.argv):
args = args[1].split(':')
if len(args) == 1:
free_spirit(args[0].replace("'", ''))
free_spirit(binascii.unhexlify(re.sub(r'[^a-f0-9A-F]', '', args[0])))
else:
worker(args[0].replace("'", ''), int(args[1]))
return 0

View File

@ -1,6 +1,6 @@
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
import sys, re
import sys, re, os
""" Get information about the terminal we are running in """
@ -94,7 +94,7 @@ class TerminalController:
except: return
# If the stream isn't a tty, then assume it has no capabilities.
if hasattr(sys, 'in_worker') or not hasattr(term_stream, 'isatty') or not term_stream.isatty(): return
if os.environ.get('CALIBRE_WORKER', None) is not None or not hasattr(term_stream, 'isatty') or not term_stream.isatty(): return
# Check the terminal type. If we fail, then assume that the
# terminal has no capabilities.

View File

@ -63,17 +63,18 @@ def start_vm(vm, ssh_host, build_script, sleep=75):
subprocess.check_call(('scp', t.name, ssh_host+':build-'+PROJECT))
subprocess.check_call('ssh -t %s bash build-%s'%(ssh_host, PROJECT), shell=True)
def build_windows():
def build_windows(shutdown=True):
installer = installer_name('exe')
vm = '/vmware/Windows XP/Windows XP Professional.vmx'
start_vm(vm, 'windows', BUILD_SCRIPT%('python setup.py develop', 'python','windows_installer.py'))
subprocess.check_call(('scp', 'windows:build/%s/dist/*.exe'%PROJECT, 'dist'))
if not os.path.exists(installer):
raise Exception('Failed to build installer '+installer)
if shutdown:
subprocess.Popen(('ssh', 'windows', 'shutdown', '-s', '-t', '0'))
return os.path.basename(installer)
def build_osx():
def build_osx(shutdown=True):
installer = installer_name('dmg')
vm = '/vmware/Mac OSX/Mac OSX.vmx'
python = '/Library/Frameworks/Python.framework/Versions/Current/bin/python'
@ -81,17 +82,19 @@ def build_osx():
subprocess.check_call(('scp', 'osx:build/%s/dist/*.dmg'%PROJECT, 'dist'))
if not os.path.exists(installer):
raise Exception('Failed to build installer '+installer)
if shutdown:
subprocess.Popen(('ssh', 'osx', 'sudo', '/sbin/shutdown', '-h', 'now'))
return os.path.basename(installer)
def build_linux():
def build_linux(shutdown=True):
installer = installer_name('tar.bz2')
vm = '/vmware/linux/libprs500-gentoo.vmx'
start_vm(vm, 'linux', BUILD_SCRIPT%('sudo python setup.py develop', 'python','linux_installer.py'))
subprocess.check_call(('scp', 'linux:/tmp/%s'%os.path.basename(installer), 'dist'))
if not os.path.exists(installer):
raise Exception('Failed to build installer '+installer)
if shutdown:
subprocess.Popen(('ssh', 'linux', 'sudo', '/sbin/poweroff'))
return os.path.basename(installer)

View File

@ -538,7 +538,7 @@ def main():
sys.argv[1:2] = ['py2exe']
console = [dict(dest_base=basenames['console'][i], script=scripts['console'][i])
for i in range(len(scripts['console']))]
for i in range(len(scripts['console']))]# if not 'parallel.py' in scripts['console'][i] ]
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
setup(
cmdclass = {'py2exe': BuildEXE},
@ -562,6 +562,8 @@ def main():
'sip', 'pkg_resources', 'PyQt4.QtSvg',
'mechanize', 'ClientForm', 'wmi',
'win32file', 'pythoncom', 'rtf2xml',
'win32process', 'win32api', 'msvcrt',
'win32event',
'lxml', 'lxml._elementpath', 'genshi',
'path', 'pydoc', 'IPython.Extensions.*',
'calibre.web.feeds.recipes.*', 'PyQt4.QtWebKit',