Replace another use of temp files for named sockets on linux

Also prevent the multiprocessing module from calling unlink() on
abstract named sockets.
This commit is contained in:
Kovid Goyal 2014-03-25 13:45:17 +05:30
parent 14ddd035b9
commit 2989b9b819
2 changed files with 30 additions and 25 deletions

View File

@ -86,19 +86,32 @@ class CriticalError(Exception):
_name_counter = 0 _name_counter = 0
def create_linux_listener(authkey, backlog=4): if islinux:
# Use abstract named sockets on linux to avoid creating unnecessary temp files def create_listener(authkey, backlog=4):
global _name_counter # Use abstract named sockets on linux to avoid creating unnecessary temp files
prefix = u'\0calibre-ipc-listener-%d-%%d' % os.getpid() global _name_counter
while True: prefix = u'\0calibre-ipc-listener-%d-%%d' % os.getpid()
_name_counter += 1 while True:
address = (prefix % _name_counter).encode('ascii') _name_counter += 1
try: address = (prefix % _name_counter).encode('ascii')
return address, Listener(address=address, authkey=authkey, backlog=backlog) try:
except EnvironmentError as err: l = Listener(address=address, authkey=authkey, backlog=backlog)
if err.errno == errno.EADDRINUSE: if hasattr(l._listener._unlink, 'cancel'):
continue # multiprocessing tries to call unlink even on abstract
raise # named sockets, prevent it from doing so.
l._listener._unlink.cancel()
return address, l
except EnvironmentError as err:
if err.errno == errno.EADDRINUSE:
continue
raise
else:
def create_listener(authkey, backlog=4):
address = arbitrary_address('AF_PIPE' if iswindows else 'AF_UNIX')
if iswindows and address[1] == ':':
address = address[2:]
listener = Listener(address=address, authkey=authkey, backlog=backlog)
return address, listener
class Server(Thread): class Server(Thread):
@ -115,13 +128,7 @@ class Server(Thread):
self.pool_size = limit if pool_size is None else pool_size self.pool_size = limit if pool_size is None else pool_size
self.notify_on_job_done = notify_on_job_done self.notify_on_job_done = notify_on_job_done
self.auth_key = os.urandom(32) self.auth_key = os.urandom(32)
if islinux: self.address, self.listener = create_listener(self.auth_key, backlog=4)
self.address, self.listener = create_linux_listener(self.auth_key, backlog=4)
else:
self.address = arbitrary_address('AF_PIPE' if iswindows else 'AF_UNIX')
if iswindows and self.address[1] == ':':
self.address = self.address[2:]
self.listener = Listener(address=self.address, authkey=self.auth_key, backlog=4)
self.add_jobs_queue, self.changed_jobs_queue = Queue(), Queue() self.add_jobs_queue, self.changed_jobs_queue = Queue(), Queue()
self.kill_queue = Queue() self.kill_queue = Queue()
self.waiting_jobs = [] self.waiting_jobs = []

View File

@ -9,7 +9,7 @@ __docformat__ = 'restructuredtext en'
import os, cPickle, traceback, time, importlib import os, cPickle, traceback, time, importlib
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from multiprocessing.connection import Listener, arbitrary_address, Client from multiprocessing.connection import Client
from threading import Thread from threading import Thread
from contextlib import closing from contextlib import closing
@ -117,11 +117,9 @@ def communicate(ans, worker, listener, args, timeout=300, heartbeat=None,
ans['result'] = cw.res['result'] ans['result'] = cw.res['result']
def create_worker(env, priority='normal', cwd=None, func='main'): def create_worker(env, priority='normal', cwd=None, func='main'):
address = arbitrary_address('AF_PIPE' if iswindows else 'AF_UNIX') from calibre.utils.ipc.server import create_listener
if iswindows and address[1] == ':':
address = address[2:]
auth_key = os.urandom(32) auth_key = os.urandom(32)
listener = Listener(address=address, authkey=auth_key) address, listener = create_listener(auth_key)
env = dict(env) env = dict(env)
env.update({ env.update({