From 15305e3bea074f765a209d893d204986833d1e25 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Fri, 20 Jun 2008 11:42:33 -0700 Subject: [PATCH] IGN:Fix new IPC framework on windows --- linux_installer.py | 2 +- src/calibre/parallel.py | 74 +++++++++++++++++++++++++++++------------ 2 files changed, 54 insertions(+), 22 deletions(-) diff --git a/linux_installer.py b/linux_installer.py index 315b2d8486..7556f438fb 100644 --- a/linux_installer.py +++ b/linux_installer.py @@ -74,7 +74,7 @@ f.write(hook_script) sys.path.insert(0, CALIBRESRC) from calibre.linux import entry_points -executables, scripts = ['calibre_postinstall', 'parallel'], \ +executables, scripts = ['calibre_postinstall', 'calibre-parallel'], \ [os.path.join(CALIBRESRC, 'calibre', 'linux.py'), os.path.join(CALIBRESRC, 'calibre', 'parallel.py')] for entry in entry_points['console_scripts'] + entry_points['gui_scripts']: diff --git a/src/calibre/parallel.py b/src/calibre/parallel.py index 1c9593b94f..5ba20d28aa 100644 --- a/src/calibre/parallel.py +++ b/src/calibre/parallel.py @@ -47,10 +47,10 @@ if iswindows: python = os.path.join(os.path.dirname(python), 'parallel.exe') else: python = os.path.join(os.path.dirname(python), 'Scripts\\parallel.exe') - popen = partial(subprocess.Popen, creationflags=0x08) # CREATE_NO_WINDOW=0x08 so that no ugly console is popped up + open = partial(subprocess.Popen, creationflags=0x08) # CREATE_NO_WINDOW=0x08 so that no ugly console is popped up if islinux and hasattr(sys, 'frozen_path'): - python = os.path.join(getattr(sys, 'frozen_path'), 'parallel') + python = os.path.join(getattr(sys, 'frozen_path'), 'calibre-parallel') popen = partial(subprocess.Popen, cwd=getattr(sys, 'frozen_path')) prefix = 'import sys; sys.in_worker = True; ' @@ -59,7 +59,14 @@ if hasattr(sys, 'frameworks_dir'): prefix += 'sys.frameworks_dir = "%s"; sys.frozen = "macosx_app"; '%fd if fd not in os.environ['PATH']: os.environ['PATH'] += ':'+fd - +if 'parallel' in python: + executable = [python] + worker_command = '%s:%s' + free_spirit_command = '%s' +else: + executable = [python, '-c'] + worker_command = prefix + 'from calibre.parallel import worker; worker(%s, %s)' + free_spirit_command = prefix + 'from calibre.parallel import free_spirit; free_spirit(%s)' def write(socket, msg, timeout=5): if isinstance(msg, unicode): @@ -124,8 +131,8 @@ class Overseer(object): INTERVAL = 0.1 def __init__(self, server, port, timeout=5): - self.cmd = prefix + 'from calibre.parallel import worker; worker(%s, %s)'%(repr('localhost'), repr(port)) - self.process = popen([python, '-c', self.cmd]) + self.cmd = worker_command%(repr('127.0.0.1'), repr(port)) + self.process = popen(executable + [self.cmd]) self.socket = server.accept()[0] self.working = False @@ -135,8 +142,10 @@ class Overseer(object): self._stop = False if not select([self.socket], [], [], 120)[0]: raise RuntimeError(_('Could not launch worker process.')) - if int(self.read()) != self.process.pid: - raise RuntimeError('PID mismatch') + ID = self.read().split(':') + if ID[0] != 'CALIBRE_WORKER': + raise RuntimeError('Impostor') + self.worker_pid = int(ID[1]) self.write('OK') if self.read() != 'WAITING': raise RuntimeError('Worker sulking') @@ -147,19 +156,22 @@ class Overseer(object): ''' try: if self.socket: - self.socket.close() + self.write('STOP:') + time.sleep(1) + self.socket.shutdown(socket.SHUT_RDWR) except: pass if iswindows: win32api = __import__('win32api') try: - win32api.TerminateProcess(int(self.process.pid), -1) + handle = win32api.OpenProcess(1, False, self.worker_pid) + win32api.TerminateProcess(handle, -1) except: pass else: import signal try: - os.kill(self.process.pid, signal.SIGKILL) + os.kill(self.worker_pid, signal.SIGKILL) time.sleep(0.05) except: pass @@ -172,14 +184,14 @@ class Overseer(object): return read(self.socket, timeout=self.timeout if timeout is None else timeout) def __eq__(self, other): - return hasattr(other, 'process') and hasattr(other.process, 'pid') and self.process.pid == other.process.pid + return hasattr(other, 'process') and hasattr(other, 'worker_pid') and self.worker_pid == other.worker_pid def __bool__(self): self.process.poll() return self.process.returncode is None def pid(self): - return self.process.pid + return self.worker_pid def select(self, timeout=0): return select([self.socket], [self.socket], [self.socket], timeout) @@ -190,7 +202,7 @@ class Overseer(object): self.write('JOB:'+cPickle.dumps((job.func, job.args, job.kwdargs), -1)) msg = self.read() if msg != 'OK': - raise ControlError('Failed to initialize job on worker %d:%s'%(self.process.pid, msg)) + raise ControlError('Failed to initialize job on worker %d:%s'%(self.worker_pid, msg)) self.output = job.output if callable(job.output) else sys.stdout.write self.progress = job.progress if callable(job.progress) else None self.job = job @@ -278,6 +290,7 @@ class Server(Thread): self.number_of_workers = number_of_workers self.pool, self.jobs, self.working, self.results = [], collections.deque(), [], {} atexit.register(self.killall) + atexit.register(self.close) self.job_lock = RLock() self.overseer_lock = RLock() self.working_lock = RLock() @@ -285,6 +298,12 @@ class Server(Thread): self.pool_lock = RLock() self.start() + def close(self): + try: + self.server_socket.shutdown(socket.SHUT_RDWR) + except: + pass + def add_job(self, job): with self.job_lock: self.jobs.append(job) @@ -373,8 +392,8 @@ class Server(Thread): pt = PersistentTemporaryFile('.pickle', '_IPC_') pt.write(cPickle.dumps((func, args, kwdargs))) pt.close() - cmd = prefix + 'from calibre.parallel import free_spirit; free_spirit(%s)'%repr(pt.name) - popen([python, '-c', cmd]) + cmd = free_spirit_command%repr(pt.name) + popen(executable + [cmd]) ########################################################################################## ##################################### CLIENT CODE ##################################### @@ -426,26 +445,26 @@ def work(client_socket, func, args, kwdargs): func = PARALLEL_FUNCS[func] if hasattr(func, 'keywords'): for key, val in func.keywords.items(): - if val == _notify: + if val == _notify and hasattr(sys.stdout, 'notify'): func.keywords[key] = sys.stdout.notify res = func(*args, **kwdargs) - sys.stdout.send() + if hasattr(sys.stdout, 'send'): + sys.stdout.send() return res - - def worker(host, port): client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client_socket.connect((host, port)) - write(client_socket, str(os.getpid())) + write(client_socket, 'CALIBRE_WORKER:%d'%os.getpid()) msg = read(client_socket, timeout=10) if msg != 'OK': return 1 write(client_socket, 'WAITING') + sys.stdout = BufferedSender(client_socket) sys.stderr = sys.stdout - + while True: msg = read(client_socket, timeout=60) if msg.startswith('JOB:'): @@ -463,6 +482,8 @@ def worker(host, port): gc.collect() elif msg == 'STOP:': return 0 + elif not msg: + time.sleep(1) def free_spirit(path): func, args, kwdargs = cPickle.load(open(path, 'rb')) @@ -471,4 +492,15 @@ def free_spirit(path): except: pass PARALLEL_FUNCS[func](*args, **kwdargs) + +def main(args=sys.argv): + args = args[1].split(':') + if len(args) == 1: + free_spirit(args[0]) + else: + worker(args[0].replace("'", ''), int(args[1])) + return 0 + +if __name__ == '__main__': + sys.exit(main()) \ No newline at end of file