IGN:Fix new IPC framework on windows

This commit is contained in:
Kovid Goyal 2008-06-20 11:42:33 -07:00
parent d0106bd4a0
commit 15305e3bea
2 changed files with 54 additions and 22 deletions

View File

@ -74,7 +74,7 @@ f.write(hook_script)
sys.path.insert(0, CALIBRESRC)
from calibre.linux import entry_points
executables, scripts = ['calibre_postinstall', 'parallel'], \
executables, scripts = ['calibre_postinstall', 'calibre-parallel'], \
[os.path.join(CALIBRESRC, 'calibre', 'linux.py'), os.path.join(CALIBRESRC, 'calibre', 'parallel.py')]
for entry in entry_points['console_scripts'] + entry_points['gui_scripts']:

View File

@ -47,10 +47,10 @@ if iswindows:
python = os.path.join(os.path.dirname(python), 'parallel.exe')
else:
python = os.path.join(os.path.dirname(python), 'Scripts\\parallel.exe')
popen = partial(subprocess.Popen, creationflags=0x08) # CREATE_NO_WINDOW=0x08 so that no ugly console is popped up
open = partial(subprocess.Popen, creationflags=0x08) # CREATE_NO_WINDOW=0x08 so that no ugly console is popped up
if islinux and hasattr(sys, 'frozen_path'):
python = os.path.join(getattr(sys, 'frozen_path'), 'parallel')
python = os.path.join(getattr(sys, 'frozen_path'), 'calibre-parallel')
popen = partial(subprocess.Popen, cwd=getattr(sys, 'frozen_path'))
prefix = 'import sys; sys.in_worker = True; '
@ -59,7 +59,14 @@ if hasattr(sys, 'frameworks_dir'):
prefix += 'sys.frameworks_dir = "%s"; sys.frozen = "macosx_app"; '%fd
if fd not in os.environ['PATH']:
os.environ['PATH'] += ':'+fd
if 'parallel' in python:
executable = [python]
worker_command = '%s:%s'
free_spirit_command = '%s'
else:
executable = [python, '-c']
worker_command = prefix + 'from calibre.parallel import worker; worker(%s, %s)'
free_spirit_command = prefix + 'from calibre.parallel import free_spirit; free_spirit(%s)'
def write(socket, msg, timeout=5):
if isinstance(msg, unicode):
@ -124,8 +131,8 @@ class Overseer(object):
INTERVAL = 0.1
def __init__(self, server, port, timeout=5):
self.cmd = prefix + 'from calibre.parallel import worker; worker(%s, %s)'%(repr('localhost'), repr(port))
self.process = popen([python, '-c', self.cmd])
self.cmd = worker_command%(repr('127.0.0.1'), repr(port))
self.process = popen(executable + [self.cmd])
self.socket = server.accept()[0]
self.working = False
@ -135,8 +142,10 @@ class Overseer(object):
self._stop = False
if not select([self.socket], [], [], 120)[0]:
raise RuntimeError(_('Could not launch worker process.'))
if int(self.read()) != self.process.pid:
raise RuntimeError('PID mismatch')
ID = self.read().split(':')
if ID[0] != 'CALIBRE_WORKER':
raise RuntimeError('Impostor')
self.worker_pid = int(ID[1])
self.write('OK')
if self.read() != 'WAITING':
raise RuntimeError('Worker sulking')
@ -147,19 +156,22 @@ class Overseer(object):
'''
try:
if self.socket:
self.socket.close()
self.write('STOP:')
time.sleep(1)
self.socket.shutdown(socket.SHUT_RDWR)
except:
pass
if iswindows:
win32api = __import__('win32api')
try:
win32api.TerminateProcess(int(self.process.pid), -1)
handle = win32api.OpenProcess(1, False, self.worker_pid)
win32api.TerminateProcess(handle, -1)
except:
pass
else:
import signal
try:
os.kill(self.process.pid, signal.SIGKILL)
os.kill(self.worker_pid, signal.SIGKILL)
time.sleep(0.05)
except:
pass
@ -172,14 +184,14 @@ class Overseer(object):
return read(self.socket, timeout=self.timeout if timeout is None else timeout)
def __eq__(self, other):
return hasattr(other, 'process') and hasattr(other.process, 'pid') and self.process.pid == other.process.pid
return hasattr(other, 'process') and hasattr(other, 'worker_pid') and self.worker_pid == other.worker_pid
def __bool__(self):
self.process.poll()
return self.process.returncode is None
def pid(self):
return self.process.pid
return self.worker_pid
def select(self, timeout=0):
return select([self.socket], [self.socket], [self.socket], timeout)
@ -190,7 +202,7 @@ class Overseer(object):
self.write('JOB:'+cPickle.dumps((job.func, job.args, job.kwdargs), -1))
msg = self.read()
if msg != 'OK':
raise ControlError('Failed to initialize job on worker %d:%s'%(self.process.pid, msg))
raise ControlError('Failed to initialize job on worker %d:%s'%(self.worker_pid, msg))
self.output = job.output if callable(job.output) else sys.stdout.write
self.progress = job.progress if callable(job.progress) else None
self.job = job
@ -278,6 +290,7 @@ class Server(Thread):
self.number_of_workers = number_of_workers
self.pool, self.jobs, self.working, self.results = [], collections.deque(), [], {}
atexit.register(self.killall)
atexit.register(self.close)
self.job_lock = RLock()
self.overseer_lock = RLock()
self.working_lock = RLock()
@ -285,6 +298,12 @@ class Server(Thread):
self.pool_lock = RLock()
self.start()
def close(self):
try:
self.server_socket.shutdown(socket.SHUT_RDWR)
except:
pass
def add_job(self, job):
with self.job_lock:
self.jobs.append(job)
@ -373,8 +392,8 @@ class Server(Thread):
pt = PersistentTemporaryFile('.pickle', '_IPC_')
pt.write(cPickle.dumps((func, args, kwdargs)))
pt.close()
cmd = prefix + 'from calibre.parallel import free_spirit; free_spirit(%s)'%repr(pt.name)
popen([python, '-c', cmd])
cmd = free_spirit_command%repr(pt.name)
popen(executable + [cmd])
##########################################################################################
##################################### CLIENT CODE #####################################
@ -426,26 +445,26 @@ def work(client_socket, func, args, kwdargs):
func = PARALLEL_FUNCS[func]
if hasattr(func, 'keywords'):
for key, val in func.keywords.items():
if val == _notify:
if val == _notify and hasattr(sys.stdout, 'notify'):
func.keywords[key] = sys.stdout.notify
res = func(*args, **kwdargs)
sys.stdout.send()
if hasattr(sys.stdout, 'send'):
sys.stdout.send()
return res
def worker(host, port):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((host, port))
write(client_socket, str(os.getpid()))
write(client_socket, 'CALIBRE_WORKER:%d'%os.getpid())
msg = read(client_socket, timeout=10)
if msg != 'OK':
return 1
write(client_socket, 'WAITING')
sys.stdout = BufferedSender(client_socket)
sys.stderr = sys.stdout
while True:
msg = read(client_socket, timeout=60)
if msg.startswith('JOB:'):
@ -463,6 +482,8 @@ def worker(host, port):
gc.collect()
elif msg == 'STOP:':
return 0
elif not msg:
time.sleep(1)
def free_spirit(path):
func, args, kwdargs = cPickle.load(open(path, 'rb'))
@ -471,4 +492,15 @@ def free_spirit(path):
except:
pass
PARALLEL_FUNCS[func](*args, **kwdargs)
def main(args=sys.argv):
args = args[1].split(':')
if len(args) == 1:
free_spirit(args[0])
else:
worker(args[0].replace("'", ''), int(args[1]))
return 0
if __name__ == '__main__':
sys.exit(main())