Revert changes to parallel processing infrastructure as OS X didn't seem to like the new code.

This commit is contained in:
Kovid Goyal 2008-03-09 19:31:44 +00:00
parent 795978b017
commit 7ee1d99e5b

View File

@ -15,9 +15,9 @@
''' '''
Used to run jobs in parallel in separate processes. Used to run jobs in parallel in separate processes.
''' '''
import re, sys, tempfile, os, cPickle, cStringIO, traceback, atexit, binascii, time import re, sys, tempfile, os, cPickle, cStringIO, traceback, atexit, binascii, time, subprocess
from functools import partial from functools import partial
from PyQt4.QtCore import QProcess, QIODevice
from libprs500.ebooks.lrf.any.convert_from import main as any2lrf from libprs500.ebooks.lrf.any.convert_from import main as any2lrf
from libprs500.ebooks.lrf.web.convert_from import main as web2lrf from libprs500.ebooks.lrf.web.convert_from import main as web2lrf
@ -53,9 +53,7 @@ class Server(object):
def __init__(self): def __init__(self):
self.tdir = tempfile.mkdtemp('', '%s_IPC_'%__appname__) self.tdir = tempfile.mkdtemp('', '%s_IPC_'%__appname__)
atexit.register(cleanup, self.tdir) atexit.register(cleanup, self.tdir)
self.stdout = {} self.kill_jobs = []
self.kill_jobs = []
self.detached_jobs = []
def kill(self, job_id): def kill(self, job_id):
''' '''
@ -67,13 +65,15 @@ class Server(object):
''' '''
Kill process. Kill process.
''' '''
if iswindows: # Note that QProcess.kill doesn't actually kill the process on windows if iswindows:
process.kill() win32api = __import__('win32api')
process.waitForFinished(20) try:
win32api.TerminateProcess(int(process.pid), -1)
except:
pass
else: else:
# We don't use QProcess.kill as it doesn't seem to work
import signal import signal
os.kill(process.pid(), signal.SIGKILL) os.kill(process.pid, signal.SIGKILL)
time.sleep(0.05) time.sleep(0.05)
@ -95,7 +95,6 @@ class Server(object):
if os.path.exists(job_dir): if os.path.exists(job_dir):
raise ValueError('Cannot run job. The job_id %s has already been used.'%job_id) raise ValueError('Cannot run job. The job_id %s has already been used.'%job_id)
os.mkdir(job_dir) os.mkdir(job_dir)
self.stdout[job_id] = cStringIO.StringIO()
job_data = os.path.join(job_dir, 'job_data.pickle') job_data = os.path.join(job_dir, 'job_data.pickle')
cPickle.dump((func, args, kwdargs), open(job_data, 'wb'), -1) cPickle.dump((func, args, kwdargs), open(job_data, 'wb'), -1)
@ -107,46 +106,36 @@ class Server(object):
os.environ['PATH'] += ':'+fd os.environ['PATH'] += ':'+fd
cmd = prefix + 'from libprs500.parallel import run_job; run_job(\'%s\')'%binascii.hexlify(job_data) cmd = prefix + 'from libprs500.parallel import run_job; run_job(\'%s\')'%binascii.hexlify(job_data)
p = QProcess() if not monitor:
subprocess.Popen([python, '-c', cmd])
return
if monitor: output = open(os.path.join(job_dir, 'output.txt'), 'wb')
p.setProcessChannelMode(QProcess.MergedChannels) p = subprocess.Popen([python, '-c', cmd], stdout=output, stderr=output)
p.start(python, ['-c', cmd], QIODevice.ReadWrite|QIODevice.Text) while p.returncode is None:
else:
p.setProcessChannelMode(QProcess.ForwardedChannels)
p.start(python, ['-c', cmd])
self.detached_jobs.append(p)
return p.waitForStarted(10000)
started = p.waitForStarted(30000) # Wait at most 30 secs
if not started:
raise RuntimeError('Could not run job: %s %s\nError: '%(func, args, p.errorString()))
while p.state() == QProcess.Running:
if job_id in self.kill_jobs: if job_id in self.kill_jobs:
self._terminate(p) self._terminate(p)
return self.KILL_RESULT, None, None, _('Job killed by user') return self.KILL_RESULT, None, None, _('Job killed by user')
time.sleep(0.1)
if p.waitForReadyRead(100): # Wait 0.1 secs for data to become available p.poll()
self.stdout[job_id].write(str(p.read(10))) # Read atmost 10 bytes of data
self.stdout[job_id].write(p.readAll())
output.close()
job_result = os.path.join(job_dir, 'job_result.pickle') job_result = os.path.join(job_dir, 'job_result.pickle')
if not os.path.exists(job_result): if not os.path.exists(job_result):
result, exception, traceback = None, ('ParallelRuntimeError', result, exception, traceback = None, ('ParallelRuntimeError',
'The worker process died unexpectedly.'), '' 'The worker process died unexpectedly.'), ''
else: else:
result, exception, traceback = cPickle.load(open(job_result, 'rb')) result, exception, traceback = cPickle.load(open(job_result, 'rb'))
log = self.stdout[job_id].getvalue() log = open(output.name, 'rb').read()
self.stdout.pop(job_id)
return result, exception, traceback, log return result, exception, traceback, log
def run_job(job_data): def run_job(job_data):
job_data = binascii.unhexlify(job_data) job_data = binascii.unhexlify(job_data)
job_result = os.path.join(os.path.dirname(job_data), 'job_result.pickle') base = os.path.dirname(job_data)
job_result = os.path.join(base, 'job_result.pickle')
func, args, kwdargs = cPickle.load(open(job_data, 'rb')) func, args, kwdargs = cPickle.load(open(job_data, 'rb'))
func = PARALLEL_FUNCS[func] func = PARALLEL_FUNCS[func]
exception, tb = None, None exception, tb = None, None