diff --git a/src/calibre/utils/ipc/server.py b/src/calibre/utils/ipc/server.py index 20a2b0ed64..9350163be6 100644 --- a/src/calibre/utils/ipc/server.py +++ b/src/calibre/utils/ipc/server.py @@ -86,19 +86,32 @@ class CriticalError(Exception): _name_counter = 0 -def create_linux_listener(authkey, backlog=4): - # Use abstract named sockets on linux to avoid creating unnecessary temp files - global _name_counter - prefix = u'\0calibre-ipc-listener-%d-%%d' % os.getpid() - while True: - _name_counter += 1 - address = (prefix % _name_counter).encode('ascii') - try: - return address, Listener(address=address, authkey=authkey, backlog=backlog) - except EnvironmentError as err: - if err.errno == errno.EADDRINUSE: - continue - raise +if islinux: + def create_listener(authkey, backlog=4): + # Use abstract named sockets on linux to avoid creating unnecessary temp files + global _name_counter + prefix = u'\0calibre-ipc-listener-%d-%%d' % os.getpid() + while True: + _name_counter += 1 + address = (prefix % _name_counter).encode('ascii') + try: + l = Listener(address=address, authkey=authkey, backlog=backlog) + if hasattr(l._listener._unlink, 'cancel'): + # multiprocessing tries to call unlink even on abstract + # named sockets, prevent it from doing so. + l._listener._unlink.cancel() + return address, l + except EnvironmentError as err: + if err.errno == errno.EADDRINUSE: + continue + raise +else: + def create_listener(authkey, backlog=4): + address = arbitrary_address('AF_PIPE' if iswindows else 'AF_UNIX') + if iswindows and address[1] == ':': + address = address[2:] + listener = Listener(address=address, authkey=authkey, backlog=backlog) + return address, listener class Server(Thread): @@ -115,13 +128,7 @@ class Server(Thread): self.pool_size = limit if pool_size is None else pool_size self.notify_on_job_done = notify_on_job_done self.auth_key = os.urandom(32) - if islinux: - self.address, self.listener = create_linux_listener(self.auth_key, backlog=4) - else: - self.address = arbitrary_address('AF_PIPE' if iswindows else 'AF_UNIX') - if iswindows and self.address[1] == ':': - self.address = self.address[2:] - self.listener = Listener(address=self.address, authkey=self.auth_key, backlog=4) + self.address, self.listener = create_listener(self.auth_key, backlog=4) self.add_jobs_queue, self.changed_jobs_queue = Queue(), Queue() self.kill_queue = Queue() self.waiting_jobs = [] diff --git a/src/calibre/utils/ipc/simple_worker.py b/src/calibre/utils/ipc/simple_worker.py index 2d24fec22b..d06550cdce 100644 --- a/src/calibre/utils/ipc/simple_worker.py +++ b/src/calibre/utils/ipc/simple_worker.py @@ -9,7 +9,7 @@ __docformat__ = 'restructuredtext en' import os, cPickle, traceback, time, importlib from binascii import hexlify, unhexlify -from multiprocessing.connection import Listener, arbitrary_address, Client +from multiprocessing.connection import Client from threading import Thread from contextlib import closing @@ -117,11 +117,9 @@ def communicate(ans, worker, listener, args, timeout=300, heartbeat=None, ans['result'] = cw.res['result'] def create_worker(env, priority='normal', cwd=None, func='main'): - address = arbitrary_address('AF_PIPE' if iswindows else 'AF_UNIX') - if iswindows and address[1] == ':': - address = address[2:] + from calibre.utils.ipc.server import create_listener auth_key = os.urandom(32) - listener = Listener(address=address, authkey=auth_key) + address, listener = create_listener(auth_key) env = dict(env) env.update({