From 7ee1d99e5b71b5324e1145ea93648d7d86e9b82f Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sun, 9 Mar 2008 19:31:44 +0000 Subject: [PATCH] Revert changes to parallel processing infrastructure as OS X didn't seem to like the new code. --- src/libprs500/parallel.py | 59 ++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 35 deletions(-) diff --git a/src/libprs500/parallel.py b/src/libprs500/parallel.py index fc2e3f6332..56724220ea 100644 --- a/src/libprs500/parallel.py +++ b/src/libprs500/parallel.py @@ -15,9 +15,9 @@ ''' Used to run jobs in parallel in separate processes. ''' -import re, sys, tempfile, os, cPickle, cStringIO, traceback, atexit, binascii, time +import re, sys, tempfile, os, cPickle, cStringIO, traceback, atexit, binascii, time, subprocess from functools import partial -from PyQt4.QtCore import QProcess, QIODevice + from libprs500.ebooks.lrf.any.convert_from import main as any2lrf from libprs500.ebooks.lrf.web.convert_from import main as web2lrf @@ -53,9 +53,7 @@ class Server(object): def __init__(self): self.tdir = tempfile.mkdtemp('', '%s_IPC_'%__appname__) atexit.register(cleanup, self.tdir) - self.stdout = {} - self.kill_jobs = [] - self.detached_jobs = [] + self.kill_jobs = [] def kill(self, job_id): ''' @@ -67,13 +65,15 @@ class Server(object): ''' Kill process. ''' - if iswindows: # Note that QProcess.kill doesn't actually kill the process on windows - process.kill() - process.waitForFinished(20) + if iswindows: + win32api = __import__('win32api') + try: + win32api.TerminateProcess(int(process.pid), -1) + except: + pass else: - # We don't use QProcess.kill as it doesn't seem to work import signal - os.kill(process.pid(), signal.SIGKILL) + os.kill(process.pid, signal.SIGKILL) time.sleep(0.05) @@ -95,7 +95,6 @@ class Server(object): if os.path.exists(job_dir): raise ValueError('Cannot run job. The job_id %s has already been used.'%job_id) os.mkdir(job_dir) - self.stdout[job_id] = cStringIO.StringIO() job_data = os.path.join(job_dir, 'job_data.pickle') cPickle.dump((func, args, kwdargs), open(job_data, 'wb'), -1) @@ -107,46 +106,36 @@ class Server(object): os.environ['PATH'] += ':'+fd cmd = prefix + 'from libprs500.parallel import run_job; run_job(\'%s\')'%binascii.hexlify(job_data) - p = QProcess() + if not monitor: + subprocess.Popen([python, '-c', cmd]) + return - if monitor: - p.setProcessChannelMode(QProcess.MergedChannels) - p.start(python, ['-c', cmd], QIODevice.ReadWrite|QIODevice.Text) - else: - p.setProcessChannelMode(QProcess.ForwardedChannels) - p.start(python, ['-c', cmd]) - self.detached_jobs.append(p) - return p.waitForStarted(10000) - - started = p.waitForStarted(30000) # Wait at most 30 secs - if not started: - raise RuntimeError('Could not run job: %s %s\nError: '%(func, args, p.errorString())) - - while p.state() == QProcess.Running: + output = open(os.path.join(job_dir, 'output.txt'), 'wb') + p = subprocess.Popen([python, '-c', cmd], stdout=output, stderr=output) + while p.returncode is None: if job_id in self.kill_jobs: self._terminate(p) return self.KILL_RESULT, None, None, _('Job killed by user') - - if p.waitForReadyRead(100): # Wait 0.1 secs for data to become available - self.stdout[job_id].write(str(p.read(10))) # Read atmost 10 bytes of data - - - self.stdout[job_id].write(p.readAll()) + time.sleep(0.1) + p.poll() + + output.close() job_result = os.path.join(job_dir, 'job_result.pickle') if not os.path.exists(job_result): result, exception, traceback = None, ('ParallelRuntimeError', 'The worker process died unexpectedly.'), '' else: result, exception, traceback = cPickle.load(open(job_result, 'rb')) - log = self.stdout[job_id].getvalue() - self.stdout.pop(job_id) + log = open(output.name, 'rb').read() + return result, exception, traceback, log def run_job(job_data): job_data = binascii.unhexlify(job_data) - job_result = os.path.join(os.path.dirname(job_data), 'job_result.pickle') + base = os.path.dirname(job_data) + job_result = os.path.join(base, 'job_result.pickle') func, args, kwdargs = cPickle.load(open(job_data, 'rb')) func = PARALLEL_FUNCS[func] exception, tb = None, None