Use abstract named sockets on linux for IPC, to avoid use of temp files

This commit is contained in:
Kovid Goyal 2014-03-25 13:10:28 +05:30
parent 587e5aba65
commit 14ddd035b9
3 changed files with 34 additions and 16 deletions

View File

@ -365,7 +365,7 @@ def cant_start(msg=_('If you are sure it is not running')+', ',
else: else:
where += _('lower right region of the screen.') where += _('lower right region of the screen.')
if what is None: if what is None:
if iswindows: if iswindows or islinux:
what = _('try rebooting your computer.') what = _('try rebooting your computer.')
else: else:
what = _('try deleting the file')+': '+ gui_socket_address() what = _('try deleting the file')+': '+ gui_socket_address()
@ -436,7 +436,7 @@ def main(args=sys.argv):
try: try:
listener = Listener(address=gui_socket_address()) listener = Listener(address=gui_socket_address())
except socket.error: except socket.error:
if iswindows: if iswindows or islinux:
cant_start() cant_start()
if os.path.exists(gui_socket_address()): if os.path.exists(gui_socket_address()):
os.remove(gui_socket_address()) os.remove(gui_socket_address())

View File

@ -9,7 +9,7 @@ __docformat__ = 'restructuredtext en'
import os, errno import os, errno
from threading import Thread from threading import Thread
from calibre.constants import iswindows, get_windows_username from calibre.constants import iswindows, get_windows_username, islinux
ADDRESS = None ADDRESS = None
@ -37,12 +37,15 @@ def gui_socket_address():
if user: if user:
ADDRESS += '-' + user[:100] + 'x' ADDRESS += '-' + user[:100] + 'x'
else: else:
from tempfile import gettempdir
tmp = gettempdir()
user = os.environ.get('USER', '') user = os.environ.get('USER', '')
if not user: if not user:
user = os.path.basename(os.path.expanduser('~')) user = os.path.basename(os.path.expanduser('~'))
ADDRESS = os.path.join(tmp, user+'-calibre-gui.socket') if islinux:
ADDRESS = (u'\0%s-calibre-gui.socket' % user).encode('ascii')
else:
from tempfile import gettempdir
tmp = gettempdir()
ADDRESS = os.path.join(tmp, user+'-calibre-gui.socket')
return ADDRESS return ADDRESS
class RC(Thread): class RC(Thread):

View File

@ -6,7 +6,7 @@ __license__ = 'GPL v3'
__copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>' __copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en' __docformat__ = 'restructuredtext en'
import sys, os, cPickle, time, tempfile import sys, os, cPickle, time, tempfile, errno
from math import ceil from math import ceil
from threading import Thread, RLock from threading import Thread, RLock
from Queue import Queue, Empty from Queue import Queue, Empty
@ -18,7 +18,7 @@ from calibre.utils.ipc import eintr_retry_call
from calibre.utils.ipc.launch import Worker from calibre.utils.ipc.launch import Worker
from calibre.utils.ipc.worker import PARALLEL_FUNCS from calibre.utils.ipc.worker import PARALLEL_FUNCS
from calibre import detect_ncpus as cpu_count from calibre import detect_ncpus as cpu_count
from calibre.constants import iswindows, DEBUG from calibre.constants import iswindows, DEBUG, islinux
from calibre.ptempfile import base_dir from calibre.ptempfile import base_dir
_counter = 0 _counter = 0
@ -84,6 +84,22 @@ class ConnectedWorker(Thread):
class CriticalError(Exception): class CriticalError(Exception):
pass pass
_name_counter = 0
def create_linux_listener(authkey, backlog=4):
# Use abstract named sockets on linux to avoid creating unnecessary temp files
global _name_counter
prefix = u'\0calibre-ipc-listener-%d-%%d' % os.getpid()
while True:
_name_counter += 1
address = (prefix % _name_counter).encode('ascii')
try:
return address, Listener(address=address, authkey=authkey, backlog=backlog)
except EnvironmentError as err:
if err.errno == errno.EADDRINUSE:
continue
raise
class Server(Thread): class Server(Thread):
def __init__(self, notify_on_job_done=lambda x: x, pool_size=None, def __init__(self, notify_on_job_done=lambda x: x, pool_size=None,
@ -99,11 +115,13 @@ class Server(Thread):
self.pool_size = limit if pool_size is None else pool_size self.pool_size = limit if pool_size is None else pool_size
self.notify_on_job_done = notify_on_job_done self.notify_on_job_done = notify_on_job_done
self.auth_key = os.urandom(32) self.auth_key = os.urandom(32)
self.address = arbitrary_address('AF_PIPE' if iswindows else 'AF_UNIX') if islinux:
if iswindows and self.address[1] == ':': self.address, self.listener = create_linux_listener(self.auth_key, backlog=4)
self.address = self.address[2:] else:
self.listener = Listener(address=self.address, self.address = arbitrary_address('AF_PIPE' if iswindows else 'AF_UNIX')
authkey=self.auth_key, backlog=4) if iswindows and self.address[1] == ':':
self.address = self.address[2:]
self.listener = Listener(address=self.address, authkey=self.auth_key, backlog=4)
self.add_jobs_queue, self.changed_jobs_queue = Queue(), Queue() self.add_jobs_queue, self.changed_jobs_queue = Queue(), Queue()
self.kill_queue = Queue() self.kill_queue = Queue()
self.waiting_jobs = [] self.waiting_jobs = []
@ -162,7 +180,6 @@ class Server(Thread):
w = self.launch_worker(gui=gui, redirect_output=redirect_output) w = self.launch_worker(gui=gui, redirect_output=redirect_output)
w.start_job(job) w.start_job(job)
def run(self): def run(self):
while True: while True:
try: try:
@ -280,8 +297,6 @@ class Server(Thread):
pos += delta pos += delta
return ans return ans
def close(self): def close(self):
try: try:
self.add_jobs_queue.put(None) self.add_jobs_queue.put(None)