'
-import sys, logging, os, traceback, time, cPickle
+import sys, logging, os, traceback, time
from PyQt4.QtGui import QKeySequence, QPainter, QDialog, QSpinBox, QSlider
-from PyQt4.QtCore import Qt, QObject, SIGNAL, QCoreApplication, QThread, \
- QVariant
+from PyQt4.QtCore import Qt, QObject, SIGNAL, QCoreApplication, QThread
-from calibre import __appname__, __version__, __author__, setup_cli_handlers, islinux, Settings
+from calibre import __appname__, setup_cli_handlers, islinux, Settings
from calibre.ebooks.lrf.lrfparser import LRFDocument
from calibre.gui2 import ORG_NAME, APP_UID, error_dialog, choose_files, Application
@@ -57,7 +56,7 @@ class Config(QDialog, Ui_ViewerConfig):
class Main(MainWindow, Ui_MainWindow):
def __init__(self, logger, opts, parent=None):
- MainWindow.__init__(self, parent)
+ MainWindow.__init__(self, opts, parent)
Ui_MainWindow.__init__(self)
self.setupUi(self)
self.setAttribute(Qt.WA_DeleteOnClose)
@@ -263,9 +262,12 @@ def file_renderer(stream, opts, parent=None, logger=None):
def option_parser():
- from optparse import OptionParser
- parser = OptionParser(usage='%prog book.lrf', version=__appname__+' '+__version__,
- epilog='Created by ' + __author__)
+ from calibre.gui2.main_window import option_parser
+ parser = option_parser('''\
+%prog [options] book.lrf
+
+Read the LRF ebook book.lrf
+''')
parser.add_option('--verbose', default=False, action='store_true', dest='verbose',
help='Print more information about the rendering process')
parser.add_option('--visual-debug', help='Turn on visual aids to debugging the rendering engine',
diff --git a/src/calibre/gui2/main.py b/src/calibre/gui2/main.py
index e856d748f0..6ba8187597 100644
--- a/src/calibre/gui2/main.py
+++ b/src/calibre/gui2/main.py
@@ -23,7 +23,7 @@ from calibre.gui2 import APP_UID, warning_dialog, choose_files, error_dialog, \
from calibre.gui2.cover_flow import CoverFlow, DatabaseImages
from calibre.library.database import LibraryDatabase
from calibre.gui2.update import CheckForUpdates
-from calibre.gui2.main_window import MainWindow
+from calibre.gui2.main_window import MainWindow, option_parser
from calibre.gui2.main_ui import Ui_MainWindow
from calibre.gui2.device import DeviceDetector, DeviceManager
from calibre.gui2.status import StatusBar
@@ -58,8 +58,8 @@ class Main(MainWindow, Ui_MainWindow):
p.end()
self.default_thumbnail = (pixmap.width(), pixmap.height(), pixmap_to_data(pixmap))
- def __init__(self, single_instance, parent=None):
- MainWindow.__init__(self, parent)
+ def __init__(self, single_instance, opts, parent=None):
+ MainWindow.__init__(self, opts, parent)
self.single_instance = single_instance
if self.single_instance is not None:
self.connect(self.single_instance, SIGNAL('message_received(PyQt_PyObject)'),
@@ -1079,7 +1079,7 @@ class Main(MainWindow, Ui_MainWindow):
if getattr(exception, 'only_msg', False):
error_dialog(self, _('Conversion Error'), unicode(exception)).exec_()
return
- msg = u'%s:
'%exception
+ msg = u'%s:'%exception
msg += u'
Failed to perform job: '+description
msg += u'
Detailed traceback:
'
msg += formatted_traceback + '
'
@@ -1166,6 +1166,13 @@ def main(args=sys.argv):
pid = os.fork() if islinux else -1
if pid <= 0:
+ parser = option_parser('''\
+%prog [opts] [path_to_ebook]
+
+Launch the main calibre Graphical User Interface and optionally add the ebook at
+path_to_ebook to the database.
+''')
+ opts, args = parser.parse_args(args)
app = Application(args)
app.setWindowIcon(QIcon(':/library'))
QCoreApplication.setOrganizationName(ORG_NAME)
@@ -1173,7 +1180,7 @@ def main(args=sys.argv):
single_instance = None if SingleApplication is None else SingleApplication('calibre GUI')
if not singleinstance('calibre GUI'):
if single_instance is not None and single_instance.is_running() and \
- single_instance.send_message('launched:'+repr(sys.argv)):
+ single_instance.send_message('launched:'+repr(args)):
return 0
QMessageBox.critical(None, 'Cannot Start '+__appname__,
@@ -1181,14 +1188,14 @@ def main(args=sys.argv):
return 1
initialize_file_icon_provider()
try:
- main = Main(single_instance)
+ main = Main(single_instance, opts)
except DatabaseLocked, err:
QMessageBox.critical(None, 'Cannot Start '+__appname__,
'Another program is using the database.
Perhaps %s is already running?
If not try deleting the file %s'%(__appname__, err.lock_file_path))
return 1
sys.excepthook = main.unhandled_exception
- if len(sys.argv) > 1:
- main.add_filesystem_book(sys.argv[1])
+ if len(args) > 1:
+ main.add_filesystem_book(args)
return app.exec_()
return 0
@@ -1199,7 +1206,7 @@ if __name__ == '__main__':
except:
if not iswindows: raise
from PyQt4.QtGui import QErrorMessage
- logfile = os.path.expanduser('~/calibre.log')
+ logfile = os.path.join(os.path.expanduser('~'), 'calibre.log')
if os.path.exists(logfile):
log = open(logfile).read()
if log.strip():
diff --git a/src/calibre/gui2/main_window.py b/src/calibre/gui2/main_window.py
index c2b5efc0ed..66987228d1 100644
--- a/src/calibre/gui2/main_window.py
+++ b/src/calibre/gui2/main_window.py
@@ -3,13 +3,45 @@ __copyright__ = '2008, Kovid Goyal '
import StringIO, traceback, sys
-from PyQt4.QtGui import QMainWindow
+from PyQt4.Qt import QMainWindow, QString, Qt, QFont
from calibre.gui2.dialogs.conversion_error import ConversionErrorDialog
+from calibre import OptionParser
+
+def option_parser(usage='''\
+Usage: %prog [options]
+
+Launch the Graphical User Interface
+'''):
+ parser = OptionParser(usage)
+ parser.add_option('--redirect-console-output', default=False, action='store_true', dest='redirect',
+ help=_('Redirect console output to a dialog window (both stdout and stderr). Useful on windows where GUI apps do not have a output streams.'))
+ return parser
+
+class DebugWindow(ConversionErrorDialog):
+
+ def __init__(self, parent):
+ ConversionErrorDialog.__init__(self, parent, 'Console output', '')
+ self.setModal(Qt.NonModal)
+ font = QFont()
+ font.setStyleHint(QFont.TypeWriter)
+ self.text.setFont(font)
+
+ def write(self, msg):
+ self.text.setPlainText(self.text.toPlainText()+QString(msg))
+
+ def flush(self):
+ pass
class MainWindow(QMainWindow):
- def __init__(self, parent=None):
+ def __init__(self, opts, parent=None):
QMainWindow.__init__(self, parent)
+ if opts.redirect:
+ self.__console_redirect = DebugWindow(self)
+ sys.stdout = sys.stderr = self.__console_redirect
+ self.__console_redirect.show()
+ print 'testing 1'
+ print 'testing 2'
def unhandled_exception(self, type, value, tb):
try:
@@ -19,7 +51,7 @@ class MainWindow(QMainWindow):
print >>sys.stderr, fe
msg = '' + unicode(str(value), 'utf8', 'replace') + '
'
msg += 'Detailed traceback:
'+fe+'
'
- d = ConversionErrorDialog(self, 'ERROR: Unhandled exception', msg)
+ d = ConversionErrorDialog(self, _('ERROR: Unhandled exception'), msg)
d.exec_()
except:
pass
\ No newline at end of file
diff --git a/src/calibre/linux.py b/src/calibre/linux.py
index 3bc5cd9467..fc35a54b78 100644
--- a/src/calibre/linux.py
+++ b/src/calibre/linux.py
@@ -46,9 +46,10 @@ entry_points = {
'librarything = calibre.ebooks.metadata.library_thing:main',
'mobi2oeb = calibre.ebooks.mobi.reader:main',
'lrf2html = calibre.ebooks.lrf.html.convert_to:main',
- 'calibre-debug = calibre.debug:main',
- 'calibredb = calibre.library.cli:main',
+ 'calibre-debug = calibre.debug:main',
+ 'calibredb = calibre.library.cli:main',
'calibre-fontconfig = calibre.utils.fontconfig:main',
+ 'calibre-parallel = calibre.parallel:main',
],
'gui_scripts' : [
__appname__+' = calibre.gui2.main:main',
diff --git a/src/calibre/parallel.py b/src/calibre/parallel.py
index 7c0c997def..51a9cf97b8 100644
--- a/src/calibre/parallel.py
+++ b/src/calibre/parallel.py
@@ -1,75 +1,231 @@
from __future__ import with_statement
__license__ = 'GPL v3'
-__copyright__ = '2008, Kovid Goyal '
+__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
+__docformat__ = 'restructuredtext en'
+
'''
-Used to run jobs in parallel in separate processes.
+Used to run jobs in parallel in separate processes. Features output streaming,
+support for progress notification as well as job killing. The worker processes
+are controlled via a simple protocol run over TCP/IP sockets. The control happens
+mainly in two class, :class:`Server` and :class:`Overseer`. The worker is
+encapsulated in the function :function:`worker`. Every worker process
+has the environment variable :envvar:`CALIBRE_WORKER` defined.
+
+The worker control protocol has two modes of operation. In the first mode, the
+worker process listens for commands from the controller process. The controller
+process can either hand off a job to the worker or tell the worker to die.
+Once a job is handed off to the worker, the protocol enters the second mode, where
+the controller listens for messages from the worker. The worker can send progress updates
+as well as console output (i.e. text that would normally have been written to stdout
+or stderr by the job). Once the job completes (or raises an exception) the worker
+returns the result (or exception) to the controller adnt he protocol reverts to the first mode.
+
+In the second mode, the controller can also send the worker STOP messages, in which case
+the worker interrupts the job and dies. The sending of progress and console output messages
+is buffered and asynchronous to prevent the job from being IO bound.
'''
-import sys, os, gc, cPickle, traceback, atexit, cStringIO, time, \
- subprocess, socket, collections, binascii
+import sys, os, gc, cPickle, traceback, atexit, cStringIO, time, signal, \
+ subprocess, socket, collections, binascii, re, tempfile, thread
from select import select
from functools import partial
from threading import RLock, Thread, Event
-from calibre.ebooks.lrf.any.convert_from import main as any2lrf
-from calibre.ebooks.lrf.web.convert_from import main as web2lrf
-from calibre.ebooks.lrf.feeds.convert_from import main as feeds2lrf
-from calibre.gui2.lrf_renderer.main import main as lrfviewer
from calibre.ptempfile import PersistentTemporaryFile
+from calibre import iswindows, detect_ncpus, isosx
-try:
- from calibre.ebooks.lrf.html.table_as_image import do_render as render_table
-except: # Dont fail is PyQt4.4 not present
- render_table = None
-from calibre import iswindows, islinux, detect_ncpus
-
-sa = None
-job_id = None
-
-def report_progress(percent, msg=''):
- if sa is not None and job_id is not None:
- msg = 'progress:%s:%f:%s'%(job_id, percent, msg)
- sa.send_message(msg)
-
-_notify = 'fskjhwseiuyweoiu987435935-0342'
+#: A mapping from job names to functions that perform the jobs
PARALLEL_FUNCS = {
- 'any2lrf' : partial(any2lrf, gui_mode=True),
- 'web2lrf' : web2lrf,
- 'lrfviewer' : lrfviewer,
- 'feeds2lrf' : partial(feeds2lrf, notification=_notify),
- 'render_table': render_table,
- }
+ 'any2lrf' :
+ ('calibre.ebooks.lrf.any.convert_from', 'main', dict(gui_mode=True), None),
+
+ 'lrfviewer' :
+ ('calibre.gui2.lrf_renderer.main', 'main', {}, None),
+
+ 'feeds2lrf' :
+ ('calibre.ebooks.lrf.feeds.convert_from', 'main', {}, 'notification'),
+
+ 'render_table' :
+ ('calibre.ebooks.lrf.html.table_as_image', 'do_render', {}, None),
+}
-python = sys.executable
-popen = subprocess.Popen
-if iswindows:
- if hasattr(sys, 'frozen'):
- python = os.path.join(os.path.dirname(python), 'parallel.exe')
- else:
- python = os.path.join(os.path.dirname(python), 'Scripts\\parallel.exe')
- open = partial(subprocess.Popen, creationflags=0x08) # CREATE_NO_WINDOW=0x08 so that no ugly console is popped up
+isfrozen = hasattr(sys, 'frozen')
-if islinux and hasattr(sys, 'frozen_path'):
- python = os.path.join(getattr(sys, 'frozen_path'), 'calibre-parallel')
- popen = partial(subprocess.Popen, cwd=getattr(sys, 'frozen_path'))
+win32event = __import__('win32event') if iswindows else None
+win32process = __import__('win32process') if iswindows else None
+msvcrt = __import__('msvcrt') if iswindows else None
-prefix = 'import sys; sys.in_worker = True; '
-if hasattr(sys, 'frameworks_dir'):
- fd = getattr(sys, 'frameworks_dir')
- prefix += 'sys.frameworks_dir = "%s"; sys.frozen = "macosx_app"; '%fd
- if fd not in os.environ['PATH']:
- os.environ['PATH'] += ':'+fd
-if 'parallel' in python:
- executable = [python]
- worker_command = '%s:%s'
- free_spirit_command = '%s'
-else:
- executable = [python, '-c']
- worker_command = prefix + 'from calibre.parallel import worker; worker(%s, %s)'
- free_spirit_command = prefix + 'from calibre.parallel import free_spirit; free_spirit(%s)'
+class WorkerStatus(object):
+ '''
+ A platform independent class to control child processes. Provides the
+ methods:
+
+ .. method:: WorkerStatus.is_alive()
+
+ Return True is the child process is alive (i.e. it hasn't exited and returned a return code).
+
+ .. method:: WorkerStatus.returncode()
+
+ Wait for the child process to exit and return its return code (blocks until child returns).
+
+ .. method:: WorkerStatus.kill()
+
+ Forcibly terminates child process using operating system specific semantics.
+ '''
+
+ def __init__(self, obj):
+ '''
+ `obj`: On windows a process handle, on unix a subprocess.Popen object.
+ '''
+ self.obj = obj
+ self.win32process = win32process # Needed if kill is called during shutdown of interpreter
+ self.os = os
+ self.signal = signal
+ ext = 'windows' if iswindows else 'unix'
+ for func in ('is_alive', 'returncode', 'kill'):
+ setattr(self, func, getattr(self, func+'_'+ext))
+
+ def is_alive_unix(self):
+ return self.obj.poll() == None
+
+ def returncode_unix(self):
+ return self.obj.wait()
+
+ def kill_unix(self):
+ os.kill(self.obj.pid, self.signal.SIGKILL)
+
+ def is_alive_windows(self):
+ return win32event.WaitForSingleObject(self.obj, 0) != win32event.WAIT_OBJECT_0
+
+ def returncode_windows(self):
+ return win32process.GetExitCodeProcess(self.obj)
+
+ def kill_windows(self, returncode=-1):
+ self.win32process.TerminateProcess(self.obj, returncode)
+
+class WorkerMother(object):
+ '''
+ Platform independent object for launching child processes. All processes
+ have the environment variable :envvar:`CALIBRE_WORKER` set.
+
+ ..method:: WorkerMother.spawn_free_spirit(arg)
+
+ Launch a non monitored process with argument `arg`.
+
+ ..method:: WorkerMother.spawn_worker(arg)
+
+ Launch a monitored and controllable process with argument `arg`.
+ '''
+
+ def __init__(self):
+ ext = 'windows' if iswindows else 'osx' if isosx else 'linux'
+ self.os = os # Needed incase cleanup called when interpreter is shutting down
+ if iswindows:
+ self.executable = os.path.join(os.path.dirname(sys.executable),
+ 'calibre-parallel.exe' if isfrozen else 'Scripts\\calibre-parallel.exe')
+ elif isosx:
+ self.executable = sys.executable
+ self.prefix = ''
+ if isfrozen:
+ fd = getattr(sys, 'frameworks_dir')
+ contents = os.path.dirname(fd)
+ resources = os.path.join(contents, 'Resources')
+ sp = os.path.join(resources, 'lib', 'python'+sys.version[:3], 'site-packages.zip')
+
+ self.prefix += 'import sys; sys.frameworks_dir = "%s"; sys.frozen = "macosx_app"; '%fd
+ self.prefix += 'sys.path.insert(0, %s); '%repr(sp)
+ self.env = {}
+ if fd not in os.environ['PATH']:
+ self.env['PATH'] = os.environ['PATH']+':'+fd
+ self.env['PYTHONHOME'] = resources
+ else:
+ self.executable = os.path.join(getattr(sys, 'frozen_path'), 'calibre-parallel') \
+ if isfrozen else 'calibre-parallel'
+
+ self.spawn_worker_windows = lambda arg : self.spawn_free_spirit_windows(arg, type='worker')
+ self.spawn_worker_linux = lambda arg : self.spawn_free_spirit_linux(arg, type='worker')
+ self.spawn_worker_osx = lambda arg : self.spawn_free_spirit_osx(arg, type='worker')
+
+ for func in ('spawn_free_spirit', 'spawn_worker'):
+ setattr(self, func, getattr(self, func+'_'+ext))
+
+
+ def cleanup_child_windows(self, child, name=None, fd=None):
+ try:
+ child.kill()
+ except:
+ pass
+ try:
+ if fd is not None:
+ self.os.close(fd)
+ except:
+ pass
+ try:
+ if name is not None and os.path.exists(name):
+ self.os.unlink(name)
+ except:
+ pass
+
+ def cleanup_child_linux(self, child):
+ try:
+ child.kill()
+ except:
+ pass
+
+ def get_env(self):
+ env = dict(os.environ)
+ env['CALIBRE_WORKER'] = '1'
+ if hasattr(self, 'env'):
+ env.update(self.env)
+ return env
+
+ def spawn_free_spirit_osx(self, arg, type='free_spirit'):
+ script = 'from calibre.parallel import main; main(args=["calibre-parallel", %s]);'%repr(arg)
+ cmdline = [self.executable, '-c', self.prefix+script]
+ child = WorkerStatus(subprocess.Popen(cmdline, env=self.get_env()))
+ atexit.register(self.cleanup_child_linux, child)
+ return child
+
+ def spawn_free_spirit_linux(self, arg, type='free_spirit'):
+ cmdline = [self.executable, arg]
+ child = WorkerStatus(subprocess.Popen(cmdline, env=self.get_env()))
+ atexit.register(self.cleanup_child_linux, child)
+ return child
+
+ def spawn_free_spirit_windows(self, arg, type='free_spirit'):
+ fd, name = tempfile.mkstemp('.log', 'calibre_'+type+'_')
+ handle = msvcrt.get_osfhandle(fd)
+ si = win32process.STARTUPINFO()
+ si.hStdOutput = handle
+ si.hStdError = handle
+ cmdline = self.executable + ' ' + str(arg)
+ hProcess = \
+ win32process.CreateProcess(
+ None, # Application Name
+ cmdline, # Command line
+ None, # processAttributes
+ None, # threadAttributes
+ 1, # bInheritHandles
+ win32process.CREATE_NO_WINDOW, # Dont want ugly console popping up
+ self.get_env(), # New environment
+ None, # Current directory
+ si
+ )[0]
+ child = WorkerStatus(hProcess)
+ atexit.register(self.cleanup_child_windows, child, name, fd)
+ return child
+
+
+mother = WorkerMother()
def write(socket, msg, timeout=5):
+ '''
+ Write a message on socket. If `msg` is unicode, it is encoded in utf-8.
+ Raises a `RuntimeError` if the socket is not ready for writing or the writing fails.
+ `msg` is broken into chunks of size 4096 and sent. The :function:`read` function
+ automatically re-assembles the chunks into whole message.
+ '''
if isinstance(msg, unicode):
msg = msg.encode('utf-8')
length = None
@@ -88,6 +244,11 @@ def write(socket, msg, timeout=5):
def read(socket, timeout=5):
+ '''
+ Read a message from `socket`. The message must have been sent with the :function:`write`
+ function. Raises a `RuntimeError` if the message is corrpted. Can return an
+ empty string.
+ '''
buf = cStringIO.StringIO()
length = None
while select([socket],[],[],timeout)[0]:
@@ -108,6 +269,11 @@ def read(socket, timeout=5):
return msg
class RepeatingTimer(Thread):
+ '''
+ Calls a specified function repeatedly at a specified interval. Runs in a
+ daemon thread (i.e. the interpreter can exit while it is still running).
+ Call :meth:`start()` to start it.
+ '''
def repeat(self):
while True:
@@ -116,25 +282,31 @@ class RepeatingTimer(Thread):
break
self.action()
- def __init__(self, interval, func):
+ def __init__(self, interval, func, name):
self.event = Event()
self.interval = interval
self.action = func
- Thread.__init__(self, target=self.repeat)
+ Thread.__init__(self, target=self.repeat, name=name)
self.setDaemon(True)
class ControlError(Exception):
pass
class Overseer(object):
+ '''
+ Responsible for controlling worker processes. The main interface is the
+ methods, :meth:`initialize_job`, :meth:`control`.
+ '''
KILL_RESULT = 'Server: job killed by user|||#@#$%&*)*(*$#$%#$@&'
INTERVAL = 0.1
def __init__(self, server, port, timeout=5):
- self.cmd = worker_command%(repr('127.0.0.1'), repr(port))
- self.process = popen(executable + [self.cmd])
+ self.worker_status = mother.spawn_worker('127.0.0.1:%d'%port)
self.socket = server.accept()[0]
+ # Needed if terminate called hwen interpreter is shutting down
+ self.os = os
+ self.signal = signal
self.working = False
self.timeout = timeout
@@ -152,9 +324,7 @@ class Overseer(object):
raise RuntimeError('Worker sulking')
def terminate(self):
- '''
- Kill process.
- '''
+ 'Kill worker process.'
try:
if self.socket:
self.write('STOP:')
@@ -170,9 +340,8 @@ class Overseer(object):
except:
pass
else:
- import signal
try:
- os.kill(self.worker_pid, signal.SIGKILL)
+ self.os.kill(self.worker_pid, self.signal.SIGKILL)
time.sleep(0.05)
except:
pass
@@ -188,16 +357,19 @@ class Overseer(object):
return hasattr(other, 'process') and hasattr(other, 'worker_pid') and self.worker_pid == other.worker_pid
def __bool__(self):
- self.process.poll()
- return self.process.returncode is None
-
- def pid(self):
- return self.worker_pid
+ return self.worker_status.is_alive()
def select(self, timeout=0):
return select([self.socket], [self.socket], [self.socket], timeout)
def initialize_job(self, job):
+ '''
+ Sends `job` to worker process. Can raise `ControlError` if worker process
+ does not respond appropriately. In this case, this Overseer is useless
+ and should be discarded.
+
+ `job`: An instance of :class:`Job`.
+ '''
self.job_id = job.job_id
self.working = True
self.write('JOB:'+cPickle.dumps((job.func, job.args, job.kwdargs), -1))
@@ -209,40 +381,44 @@ class Overseer(object):
self.job = job
def control(self):
- try:
- if select([self.socket],[],[],0)[0]:
- msg = self.read()
- word, msg = msg.partition(':')[0], msg.partition(':')[-1]
- if word == 'RESULT':
- self.write('OK')
- return Result(cPickle.loads(msg), None, None)
- elif word == 'OUTPUT':
- self.write('OK')
- try:
- self.output(''.join(cPickle.loads(msg)))
- except:
- self.output('Bad output message: '+ repr(msg))
- elif word == 'PROGRESS':
- self.write('OK')
- percent = None
- try:
- percent, msg = cPickle.loads(msg)[-1]
- except:
- print 'Bad progress update:', repr(msg)
- if self.progress and percent is not None:
- self.progress(percent, msg)
- elif word == 'ERROR':
- self.write('OK')
- return Result(None, *cPickle.loads(msg))
- else:
- self.terminate()
- return Result(None, ControlError('Worker sent invalid msg: %s', repr(msg)), '')
- self.process.poll()
- if self.process.returncode is not None:
- return Result(None, ControlError('Worker process died unexpectedly with returncode: %d'%self.process.returncode), '')
- finally:
- self.working = False
- self.last_job_time = time.time()
+ '''
+ Listens for messages from the worker process and dispatches them
+ appropriately. If the worker process dies unexpectedly, returns a result
+ of None with a ControlError indicating the worker died.
+
+ Returns a :class:`Result` instance or None, if the worker is still working.
+ '''
+ if select([self.socket],[],[],0)[0]:
+ msg = self.read()
+ word, msg = msg.partition(':')[0], msg.partition(':')[-1]
+ if word == 'RESULT':
+ self.write('OK')
+ return Result(cPickle.loads(msg), None, None)
+ elif word == 'OUTPUT':
+ self.write('OK')
+ try:
+ self.output(''.join(cPickle.loads(msg)))
+ except:
+ self.output('Bad output message: '+ repr(msg))
+ elif word == 'PROGRESS':
+ self.write('OK')
+ percent = None
+ try:
+ percent, msg = cPickle.loads(msg)[-1]
+ except:
+ print 'Bad progress update:', repr(msg)
+ if self.progress and percent is not None:
+ self.progress(percent, msg)
+ elif word == 'ERROR':
+ self.write('OK')
+ return Result(None, *cPickle.loads(msg))
+ else:
+ self.terminate()
+ return Result(None, ControlError('Worker sent invalid msg: %s', repr(msg)), '')
+ if not self.worker_status.is_alive():
+ return Result(None, ControlError('Worker process died unexpectedly with returncode: %d'%self.process.returncode), '')
+
+
class Job(object):
@@ -325,14 +501,23 @@ class Server(Thread):
if len(self.jobs) > 0 and len(self.working) < self.number_of_workers:
job = self.jobs.popleft()
with self.pool_lock:
- o = self.pool.pop() if self.pool else Overseer(self.server_socket, self.port)
- try:
- o.initialize_job(job)
- except Exception, err:
- res = Result(None, unicode(err), traceback.format_exc())
- job.done(res)
- o.terminate()
o = None
+ while self.pool:
+ o = self.pool.pop()
+ try:
+ o.initialize_job(job)
+ break
+ except:
+ o.terminate()
+ if o is None:
+ o = Overseer(self.server_socket, self.port)
+ try:
+ o.initialize_job(job)
+ except Exception, err:
+ o.terminate()
+ res = Result(None, unicode(err), traceback.format_exc())
+ job.done(res)
+ o = None
if o:
with self.working_lock:
self.working.append(o)
@@ -393,8 +578,8 @@ class Server(Thread):
pt = PersistentTemporaryFile('.pickle', '_IPC_')
pt.write(cPickle.dumps((func, args, kwdargs)))
pt.close()
- cmd = free_spirit_command%repr(binascii.hexlify(pt.name))
- popen(executable + [cmd])
+ mother.spawn_free_spirit(binascii.hexlify(pt.name))
+
##########################################################################################
##################################### CLIENT CODE #####################################
@@ -406,8 +591,7 @@ class BufferedSender(object):
self.socket = socket
self.wbuf, self.pbuf = [], []
self.wlock, self.plock = RLock(), RLock()
- self.timer = RepeatingTimer(0.5, self.send)
- self.prefix = prefix
+ self.timer = RepeatingTimer(0.5, self.send, 'BufferedSender')
self.timer.start()
def write(self, msg):
@@ -417,6 +601,15 @@ class BufferedSender(object):
self.wbuf.append(msg)
def send(self):
+ if select([self.socket], [], [], 0)[0]:
+ msg = read(self.socket)
+ if msg == 'PING:':
+ write(self.socket, 'OK')
+ elif msg:
+ self.socket.shutdown(socket.SHUT_RDWR)
+ thread.interrupt_main()
+ time.sleep(1)
+ raise SystemExit
if not select([], [self.socket], [], 30)[1]:
print >>sys.__stderr__, 'Cannot pipe to overseer'
return
@@ -442,13 +635,18 @@ class BufferedSender(object):
def flush(self):
pass
+def get_func(name):
+ module, func, kwdargs, notification = PARALLEL_FUNCS[name]
+ module = __import__(module, fromlist=[1])
+ func = getattr(module, func)
+ return func, kwdargs, notification
+
def work(client_socket, func, args, kwdargs):
- func = PARALLEL_FUNCS[func]
- if hasattr(func, 'keywords'):
- for key, val in func.keywords.items():
- if val == _notify and hasattr(sys.stdout, 'notify'):
- func.keywords[key] = sys.stdout.notify
- res = func(*args, **kwdargs)
+ func, kargs, notification = get_func(func)
+ if notification is not None and hasattr(sys.stdout, 'notify'):
+ kargs[notification] = sys.stdout.notify
+ kargs.update(kwdargs)
+ res = func(*args, **kargs)
if hasattr(sys.stdout, 'send'):
sys.stdout.send()
return res
@@ -467,6 +665,9 @@ def worker(host, port):
sys.stderr = sys.stdout
while True:
+ if not select([client_socket], [], [], 60)[0]:
+ time.sleep(1)
+ continue
msg = read(client_socket, timeout=60)
if msg.startswith('JOB:'):
func, args, kwdargs = cPickle.loads(msg[4:])
@@ -481,7 +682,10 @@ def worker(host, port):
if read(client_socket, 10) != 'OK':
break
gc.collect()
+ elif msg == 'PING:':
+ write(client_socket, 'OK')
elif msg == 'STOP:':
+ client_socket.shutdown(socket.SHUT_RDWR)
return 0
elif not msg:
time.sleep(1)
@@ -490,21 +694,23 @@ def worker(host, port):
return 1
def free_spirit(path):
- func, args, kwdargs = cPickle.load(open(binascii.unhexlify(path), 'rb'))
+ func, args, kwdargs = cPickle.load(open(path, 'rb'))
try:
os.unlink(path)
except:
pass
- PARALLEL_FUNCS[func](*args, **kwdargs)
+ func, kargs = get_func(func)[:2]
+ kargs.update(kwdargs)
+ func(*args, **kargs)
def main(args=sys.argv):
args = args[1].split(':')
if len(args) == 1:
- free_spirit(args[0].replace("'", ''))
+ free_spirit(binascii.unhexlify(re.sub(r'[^a-f0-9A-F]', '', args[0])))
else:
worker(args[0].replace("'", ''), int(args[1]))
return 0
if __name__ == '__main__':
sys.exit(main())
-
\ No newline at end of file
+
diff --git a/src/calibre/terminfo.py b/src/calibre/terminfo.py
index 2ed03a3077..121f5fcfd1 100644
--- a/src/calibre/terminfo.py
+++ b/src/calibre/terminfo.py
@@ -1,6 +1,6 @@
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal '
-import sys, re
+import sys, re, os
""" Get information about the terminal we are running in """
@@ -94,7 +94,7 @@ class TerminalController:
except: return
# If the stream isn't a tty, then assume it has no capabilities.
- if hasattr(sys, 'in_worker') or not hasattr(term_stream, 'isatty') or not term_stream.isatty(): return
+ if os.environ.get('CALIBRE_WORKER', None) is not None or not hasattr(term_stream, 'isatty') or not term_stream.isatty(): return
# Check the terminal type. If we fail, then assume that the
# terminal has no capabilities.
diff --git a/upload.py b/upload.py
index 8ec158c1db..da2bd790dd 100644
--- a/upload.py
+++ b/upload.py
@@ -63,17 +63,18 @@ def start_vm(vm, ssh_host, build_script, sleep=75):
subprocess.check_call(('scp', t.name, ssh_host+':build-'+PROJECT))
subprocess.check_call('ssh -t %s bash build-%s'%(ssh_host, PROJECT), shell=True)
-def build_windows():
+def build_windows(shutdown=True):
installer = installer_name('exe')
vm = '/vmware/Windows XP/Windows XP Professional.vmx'
start_vm(vm, 'windows', BUILD_SCRIPT%('python setup.py develop', 'python','windows_installer.py'))
subprocess.check_call(('scp', 'windows:build/%s/dist/*.exe'%PROJECT, 'dist'))
if not os.path.exists(installer):
raise Exception('Failed to build installer '+installer)
- subprocess.Popen(('ssh', 'windows', 'shutdown', '-s', '-t', '0'))
+ if shutdown:
+ subprocess.Popen(('ssh', 'windows', 'shutdown', '-s', '-t', '0'))
return os.path.basename(installer)
-def build_osx():
+def build_osx(shutdown=True):
installer = installer_name('dmg')
vm = '/vmware/Mac OSX/Mac OSX.vmx'
python = '/Library/Frameworks/Python.framework/Versions/Current/bin/python'
@@ -81,18 +82,20 @@ def build_osx():
subprocess.check_call(('scp', 'osx:build/%s/dist/*.dmg'%PROJECT, 'dist'))
if not os.path.exists(installer):
raise Exception('Failed to build installer '+installer)
- subprocess.Popen(('ssh', 'osx', 'sudo', '/sbin/shutdown', '-h', 'now'))
+ if shutdown:
+ subprocess.Popen(('ssh', 'osx', 'sudo', '/sbin/shutdown', '-h', 'now'))
return os.path.basename(installer)
-def build_linux():
+def build_linux(shutdown=True):
installer = installer_name('tar.bz2')
vm = '/vmware/linux/libprs500-gentoo.vmx'
start_vm(vm, 'linux', BUILD_SCRIPT%('sudo python setup.py develop', 'python','linux_installer.py'))
subprocess.check_call(('scp', 'linux:/tmp/%s'%os.path.basename(installer), 'dist'))
if not os.path.exists(installer):
raise Exception('Failed to build installer '+installer)
- subprocess.Popen(('ssh', 'linux', 'sudo', '/sbin/poweroff'))
+ if shutdown:
+ subprocess.Popen(('ssh', 'linux', 'sudo', '/sbin/poweroff'))
return os.path.basename(installer)
def build_installers():
diff --git a/windows_installer.py b/windows_installer.py
index afabe531f8..38d90f42f4 100644
--- a/windows_installer.py
+++ b/windows_installer.py
@@ -412,7 +412,7 @@ SectionEnd
version=VERSION,
outpath=os.path.abspath(output_dir))
- def build(self):
+ def build(self):
f = open('installer.nsi', 'w')
path = f.name
f.write(self.installer)
@@ -420,7 +420,7 @@ SectionEnd
try:
subprocess.check_call('"C:\Program Files\NSIS\makensis.exe" /V2 ' + path, shell=True)
except:
- print path
+ print path
else:
os.remove(path)
@@ -537,18 +537,18 @@ class BuildEXE(build_exe):
def main():
sys.argv[1:2] = ['py2exe']
- console = [dict(dest_base=basenames['console'][i], script=scripts['console'][i])
- for i in range(len(scripts['console']))]
+ console = [dict(dest_base=basenames['console'][i], script=scripts['console'][i])
+ for i in range(len(scripts['console']))]# if not 'parallel.py' in scripts['console'][i] ]
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
setup(
cmdclass = {'py2exe': BuildEXE},
windows = [
- {'script' : scripts['gui'][0],
+ {'script' : scripts['gui'][0],
'dest_base' : APPNAME,
'icon_resources' : [(1, 'icons/library.ico')],
'other_resources' : [BuildEXE.manifest(APPNAME)],
},
- {'script' : scripts['gui'][1],
+ {'script' : scripts['gui'][1],
'dest_base' : 'lrfviewer',
'icon_resources' : [(1, 'icons/viewer.ico')],
'other_resources' : [BuildEXE.manifest('lrfviewer')],
@@ -561,11 +561,13 @@ def main():
'includes' : [
'sip', 'pkg_resources', 'PyQt4.QtSvg',
'mechanize', 'ClientForm', 'wmi',
- 'win32file', 'pythoncom', 'rtf2xml',
+ 'win32file', 'pythoncom', 'rtf2xml',
+ 'win32process', 'win32api', 'msvcrt',
+ 'win32event',
'lxml', 'lxml._elementpath', 'genshi',
'path', 'pydoc', 'IPython.Extensions.*',
'calibre.web.feeds.recipes.*', 'PyQt4.QtWebKit',
- ],
+ ],
'packages' : ['PIL'],
'excludes' : ["Tkconstants", "Tkinter", "tcl",
"_imagingtk", "ImageTk", "FixTk"