Refactor the multi-processing module to use QProcess, should make it more robust.

This commit is contained in:
Kovid Goyal 2008-03-06 07:15:59 +00:00
parent d411a0d3fc
commit df1b369f34

View File

@ -15,8 +15,10 @@
''' '''
Used to run jobs in parallel in separate processes. Used to run jobs in parallel in separate processes.
''' '''
import re, sys, tempfile, os, subprocess, cPickle, cStringIO, traceback, atexit, time, binascii import re, sys, tempfile, os, cPickle, cStringIO, traceback, atexit, binascii, time
from functools import partial from functools import partial
from PyQt4.QtCore import QProcess, QIODevice
from libprs500.ebooks.lrf.any.convert_from import main as any2lrf from libprs500.ebooks.lrf.any.convert_from import main as any2lrf
from libprs500.ebooks.lrf.web.convert_from import main as web2lrf from libprs500.ebooks.lrf.web.convert_from import main as web2lrf
from libprs500.gui2.lrf_renderer.main import main as lrfviewer from libprs500.gui2.lrf_renderer.main import main as lrfviewer
@ -27,12 +29,9 @@ PARALLEL_FUNCS = {
'web2lrf' : web2lrf, 'web2lrf' : web2lrf,
'lrfviewer' : lrfviewer, 'lrfviewer' : lrfviewer,
} }
Popen = subprocess.Popen
python = sys.executable python = sys.executable
if iswindows: if iswindows:
import win32con
Popen = partial(Popen, creationflags=win32con.CREATE_NO_WINDOW)
if hasattr(sys, 'frozen'): if hasattr(sys, 'frozen'):
python = os.path.join(os.path.dirname(python), 'parallel.exe') python = os.path.join(os.path.dirname(python), 'parallel.exe')
else: else:
@ -56,6 +55,7 @@ class Server(object):
atexit.register(cleanup, self.tdir) atexit.register(cleanup, self.tdir)
self.stdout = {} self.stdout = {}
self.kill_jobs = [] self.kill_jobs = []
self.detached_jobs = []
def kill(self, job_id): def kill(self, job_id):
''' '''
@ -63,27 +63,20 @@ class Server(object):
''' '''
self.kill_jobs.append(str(job_id)) self.kill_jobs.append(str(job_id))
def _terminate(self, pid): def _terminate(self, process):
''' '''
Kill process identified by C{pid}. Kill process.
@param pid: On unix a process number, on windows a process handle.
''' '''
if iswindows: if iswindows: # Note that QProcess.kill doesn't actually kill the process on windows
import win32api process.kill()
try: process.waitForFinished(20)
win32api.TerminateProcess(int(pid), -1)
except:
pass
else: else:
# We don't use QProcess.kill as it doesn't seem to work
import signal import signal
try: os.kill(process.pid(), signal.SIGKILL)
try: time.sleep(0.05)
os.kill(pid, signal.SIGTERM)
finally:
time.sleep(2)
os.kill(pid, signal.SIGKILL)
except:
pass
def run(self, job_id, func, args=[], kwdargs={}, monitor=True): def run(self, job_id, func, args=[], kwdargs={}, monitor=True):
''' '''
@ -114,18 +107,31 @@ class Server(object):
os.environ['PATH'] += ':'+fd os.environ['PATH'] += ':'+fd
cmd = prefix + 'from libprs500.parallel import run_job; run_job(\'%s\')'%binascii.hexlify(job_data) cmd = prefix + 'from libprs500.parallel import run_job; run_job(\'%s\')'%binascii.hexlify(job_data)
p = QProcess()
if monitor: if monitor:
p = Popen((python, '-c', cmd), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) p.setProcessChannelMode(QProcess.MergedChannels)
p.start(python, ['-c', cmd], QIODevice.ReadWrite|QIODevice.Text)
else: else:
Popen((python, '-c', cmd)) p.setProcessChannelMode(QProcess.ForwardedChannels)
return p.start(python, ['-c', cmd])
while p.returncode is None: self.detached_jobs.append(p)
return p.waitForStarted(10000)
started = p.waitForStarted(30000) # Wait at most 30 secs
if not started:
raise RuntimeError('Could not run job: %s %s'%(func, args))
while p.state() == QProcess.Running:
if job_id in self.kill_jobs: if job_id in self.kill_jobs:
self._terminate(p._handle if iswindows else p.pid) self._terminate(p)
return self.KILL_RESULT, None, None, _('Job killed by user') return self.KILL_RESULT, None, None, _('Job killed by user')
p.poll()
time.sleep(0.5) # Wait for half a second if p.waitForReadyRead(100): # Wait 0.1 secs for data to become available
self.stdout[job_id].write(p.stdout.read()) self.stdout[job_id].write(str(p.read(10))) # Read atmost 10 bytes of data
self.stdout[job_id].write(p.readAll())
job_result = os.path.join(job_dir, 'job_result.pickle') job_result = os.path.join(job_dir, 'job_result.pickle')
if not os.path.exists(job_result): if not os.path.exists(job_result):