From df1b369f344e71920747fb9386d4a24ac84b6c9a Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Thu, 6 Mar 2008 07:15:59 +0000 Subject: [PATCH] Refactor the multi-processing module to use QProcess, should make it more robust. --- src/libprs500/parallel.py | 66 +++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/src/libprs500/parallel.py b/src/libprs500/parallel.py index 5d800cc3fb..5b6ee6eead 100644 --- a/src/libprs500/parallel.py +++ b/src/libprs500/parallel.py @@ -15,8 +15,10 @@ ''' Used to run jobs in parallel in separate processes. ''' -import re, sys, tempfile, os, subprocess, cPickle, cStringIO, traceback, atexit, time, binascii +import re, sys, tempfile, os, cPickle, cStringIO, traceback, atexit, binascii, time from functools import partial +from PyQt4.QtCore import QProcess, QIODevice + from libprs500.ebooks.lrf.any.convert_from import main as any2lrf from libprs500.ebooks.lrf.web.convert_from import main as web2lrf from libprs500.gui2.lrf_renderer.main import main as lrfviewer @@ -27,12 +29,9 @@ PARALLEL_FUNCS = { 'web2lrf' : web2lrf, 'lrfviewer' : lrfviewer, } -Popen = subprocess.Popen python = sys.executable if iswindows: - import win32con - Popen = partial(Popen, creationflags=win32con.CREATE_NO_WINDOW) if hasattr(sys, 'frozen'): python = os.path.join(os.path.dirname(python), 'parallel.exe') else: @@ -56,6 +55,7 @@ class Server(object): atexit.register(cleanup, self.tdir) self.stdout = {} self.kill_jobs = [] + self.detached_jobs = [] def kill(self, job_id): ''' @@ -63,27 +63,20 @@ class Server(object): ''' self.kill_jobs.append(str(job_id)) - def _terminate(self, pid): + def _terminate(self, process): ''' - Kill process identified by C{pid}. - @param pid: On unix a process number, on windows a process handle. + Kill process. ''' - if iswindows: - import win32api - try: - win32api.TerminateProcess(int(pid), -1) - except: - pass + if iswindows: # Note that QProcess.kill doesn't actually kill the process on windows + process.kill() + process.waitForFinished(20) else: + # We don't use QProcess.kill as it doesn't seem to work import signal - try: - try: - os.kill(pid, signal.SIGTERM) - finally: - time.sleep(2) - os.kill(pid, signal.SIGKILL) - except: - pass + os.kill(process.pid(), signal.SIGKILL) + time.sleep(0.05) + + def run(self, job_id, func, args=[], kwdargs={}, monitor=True): ''' @@ -114,18 +107,31 @@ class Server(object): os.environ['PATH'] += ':'+fd cmd = prefix + 'from libprs500.parallel import run_job; run_job(\'%s\')'%binascii.hexlify(job_data) + p = QProcess() + if monitor: - p = Popen((python, '-c', cmd), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + p.setProcessChannelMode(QProcess.MergedChannels) + p.start(python, ['-c', cmd], QIODevice.ReadWrite|QIODevice.Text) else: - Popen((python, '-c', cmd)) - return - while p.returncode is None: + p.setProcessChannelMode(QProcess.ForwardedChannels) + p.start(python, ['-c', cmd]) + self.detached_jobs.append(p) + return p.waitForStarted(10000) + + started = p.waitForStarted(30000) # Wait at most 30 secs + if not started: + raise RuntimeError('Could not run job: %s %s'%(func, args)) + + while p.state() == QProcess.Running: if job_id in self.kill_jobs: - self._terminate(p._handle if iswindows else p.pid) - return self.KILL_RESULT, None, None, _('Job killed by user') - p.poll() - time.sleep(0.5) # Wait for half a second - self.stdout[job_id].write(p.stdout.read()) + self._terminate(p) + return self.KILL_RESULT, None, None, _('Job killed by user') + + if p.waitForReadyRead(100): # Wait 0.1 secs for data to become available + self.stdout[job_id].write(str(p.read(10))) # Read atmost 10 bytes of data + + + self.stdout[job_id].write(p.readAll()) job_result = os.path.join(job_dir, 'job_result.pickle') if not os.path.exists(job_result):