Start work on making IPC more efficient

Use multiprocessing.Pipe instead of using named pipes and establishing
authenticated communication. Also avoids workers depending on a named
object of any kind. Particularly relevant on macOS where the OS tends to
delete things from under us.
This commit is contained in:
Kovid Goyal 2020-11-30 11:40:28 +05:30
parent 9c1f6974f4
commit 6ad91a2ee4
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 34 additions and 131 deletions

View File

@ -30,7 +30,7 @@ def renice(niceness):
pass pass
class Worker(object): class Worker:
''' '''
Platform independent object for launching child processes. All processes Platform independent object for launching child processes. All processes
have the environment variable :envvar:`CALIBRE_WORKER` set. have the environment variable :envvar:`CALIBRE_WORKER` set.
@ -136,7 +136,7 @@ class Worker(object):
self.job_name = job_name self.job_name = job_name
self._env = env.copy() self._env = env.copy()
def __call__(self, redirect_output=True, cwd=None, priority=None): def __call__(self, redirect_output=True, cwd=None, priority=None, pass_fds=()):
''' '''
If redirect_output is True, output from the child is redirected If redirect_output is True, output from the child is redirected
to a file on disk and this method returns the path to that file. to a file on disk and this method returns the path to that file.
@ -186,6 +186,9 @@ class Worker(object):
args['stdout'] = windows_null_file args['stdout'] = windows_null_file
args['stderr'] = subprocess.STDOUT args['stderr'] = subprocess.STDOUT
args['close_fds'] = True
if pass_fds:
args['pass_fds'] = pass_fds
self.child = subprocess.Popen(cmd, **args) self.child = subprocess.Popen(cmd, **args)
if 'stdin' in args: if 'stdin' in args:
self.child.stdin.close() self.child.stdin.close()

View File

@ -7,8 +7,6 @@ __copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en' __docformat__ = 'restructuredtext en'
import errno
import itertools
import os import os
import sys import sys
import tempfile import tempfile
@ -16,22 +14,23 @@ import time
from collections import deque from collections import deque
from itertools import count from itertools import count
from math import ceil from math import ceil
from multiprocessing.connection import Listener, arbitrary_address from multiprocessing import Pipe
from threading import Thread from threading import Thread
from calibre import detect_ncpus as cpu_count, force_unicode from calibre import detect_ncpus as cpu_count, force_unicode
from calibre.constants import DEBUG, cache_dir, islinux, iswindows from calibre.constants import DEBUG
from calibre.ptempfile import base_dir from calibre.ptempfile import base_dir
from calibre.utils.ipc import eintr_retry_call from calibre.utils.ipc import eintr_retry_call
from calibre.utils.ipc.launch import Worker from calibre.utils.ipc.launch import Worker
from calibre.utils.ipc.worker import PARALLEL_FUNCS from calibre.utils.ipc.worker import PARALLEL_FUNCS
from calibre.utils.serialize import msgpack_dumps, pickle_loads from calibre.utils.serialize import pickle_loads
from polyglot.binary import as_hex_unicode from polyglot.binary import as_hex_unicode
from polyglot.builtins import environ_item, string_or_bytes from polyglot.builtins import environ_item, string_or_bytes
from polyglot.queue import Empty, Queue from polyglot.queue import Empty, Queue
server_counter = count() server_counter = count()
_name_counter = count()
class ConnectedWorker(Thread): class ConnectedWorker(Thread):
@ -97,97 +96,6 @@ class CriticalError(Exception):
pass pass
_name_counter = itertools.count()
if islinux:
import fcntl
class LinuxListener(Listener):
def __init__(self, *args, **kwargs):
Listener.__init__(self, *args, **kwargs)
# multiprocessing tries to call unlink even on abstract
# named sockets, prevent it from doing so.
if self._listener._unlink is not None:
self._listener._unlink.cancel()
# Prevent child processes from inheriting this socket
# If we dont do this child processes not created by calibre, will
# inherit this socket, preventing the calibre GUI from being restarted.
# Examples of such processes are external viewers launched by Qt
# using openUrl().
fd = self._listener._socket.fileno()
old_flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)
def close(self):
# To ensure that the socket is released, we have to call
# shutdown() not close(). This is needed to allow calibre to
# restart using the same socket address.
import socket
listener = self._listener
if listener is not None:
self._listener = None
listener._socket.shutdown(socket.SHUT_RDWR)
listener._socket.close()
def accept(self, *args, **kwargs):
ans = Listener.accept(self, *args, **kwargs)
fd = ans.fileno()
old_flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)
return ans
def create_listener(authkey, backlog=4):
# Use abstract named sockets on linux to avoid creating unnecessary temp files
prefix = '\0calibre-ipc-listener-%d-%%d' % os.getpid()
while True:
address = (prefix % next(_name_counter))
try:
l = LinuxListener(address=address, authkey=authkey, backlog=backlog)
return address, l
except EnvironmentError as err:
if err.errno == errno.EADDRINUSE:
continue
raise
elif iswindows:
def create_listener(authkey, backlog=4):
address = arbitrary_address('AF_PIPE')
if address[1] == ':':
address = address[2:]
return address, Listener(address=address, authkey=authkey, backlog=backlog)
else:
def create_listener(authkey, backlog=4):
# We use the cache dir rather than the temp dir because
# on macOS, there is software that deletes the temp dir after
# periods of inactivity
prefix = os.path.join(cache_dir(), 'ipc-socket-%d-%%d' % os.getpid())
max_tries = 20
while max_tries > 0:
max_tries -= 1
address = prefix % next(_name_counter)
try:
return address, Listener(address=address, authkey=authkey, backlog=backlog)
except EnvironmentError as err:
if max_tries < 1:
raise
if err.errno == errno.ENOENT:
# Some OS X machines have software that deletes temp
# files/dirs after prolonged inactivity. See for
# example, https://bugs.launchpad.net/bugs/1541356
try:
os.makedirs(os.path.dirname(prefix))
except EnvironmentError as e:
if e.errno != errno.EEXIST:
raise
continue
if err.errno != errno.EADDRINUSE:
raise
class Server(Thread): class Server(Thread):
def __init__(self, notify_on_job_done=lambda x: x, pool_size=None, def __init__(self, notify_on_job_done=lambda x: x, pool_size=None,
@ -200,8 +108,6 @@ class Server(Thread):
limit = min(limit, cpu_count()) limit = min(limit, cpu_count())
self.pool_size = limit if pool_size is None else pool_size self.pool_size = limit if pool_size is None else pool_size
self.notify_on_job_done = notify_on_job_done self.notify_on_job_done = notify_on_job_done
self.auth_key = os.urandom(32)
self.address, self.listener = create_listener(self.auth_key, backlog=4)
self.add_jobs_queue, self.changed_jobs_queue = Queue(), Queue() self.add_jobs_queue, self.changed_jobs_queue = Queue(), Queue()
self.kill_queue = Queue() self.kill_queue = Queue()
self.waiting_jobs = [] self.waiting_jobs = []
@ -219,34 +125,33 @@ class Server(Thread):
if redirect_output is None: if redirect_output is None:
redirect_output = not gui redirect_output = not gui
env = { cw = self.do_launch(gui, redirect_output, rfile, job_name=job_name)
'CALIBRE_WORKER_ADDRESS' : environ_item(as_hex_unicode(msgpack_dumps(self.address))),
'CALIBRE_WORKER_KEY' : environ_item(as_hex_unicode(self.auth_key)),
'CALIBRE_WORKER_RESULT' : environ_item(as_hex_unicode(rfile)),
}
cw = self.do_launch(env, gui, redirect_output, rfile, job_name=job_name)
if isinstance(cw, string_or_bytes): if isinstance(cw, string_or_bytes):
raise CriticalError('Failed to launch worker process:\n'+force_unicode(cw)) raise CriticalError('Failed to launch worker process:\n'+force_unicode(cw))
if DEBUG: if DEBUG:
print('Worker Launch took: {:.2f} seconds'.format(time.monotonic() - start)) print('Worker Launch took: {:.2f} seconds'.format(time.monotonic() - start))
return cw return cw
def do_launch(self, env, gui, redirect_output, rfile, job_name=None): def do_launch(self, gui, redirect_output, rfile, job_name=None):
w = Worker(env, gui=gui, job_name=job_name) a, b = Pipe()
with a:
env = {
'CALIBRE_WORKER_FD': str(a.fileno()),
'CALIBRE_WORKER_RESULT' : environ_item(as_hex_unicode(rfile))
}
w = Worker(env, gui=gui, job_name=job_name)
try:
w(redirect_output=redirect_output)
conn = eintr_retry_call(self.listener.accept)
if conn is None:
raise Exception('Failed to launch worker process')
except BaseException:
try: try:
w.kill() w(pass_fds=(a.fileno(),), redirect_output=redirect_output)
except: except BaseException:
pass try:
import traceback w.kill()
return traceback.format_exc() except:
return ConnectedWorker(w, conn, rfile) pass
b.close()
import traceback
return traceback.format_exc()
return ConnectedWorker(w, b, rfile)
def add_job(self, job): def add_job(self, job):
job.done2 = self.notify_on_job_done job.done2 = self.notify_on_job_done

View File

@ -7,17 +7,16 @@ __copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en' __docformat__ = 'restructuredtext en'
import os, sys, importlib import os, sys, importlib
from multiprocessing.connection import Client from multiprocessing.connection import Connection
from threading import Thread from threading import Thread
from contextlib import closing
from zipimport import ZipImportError from zipimport import ZipImportError
from calibre import prints from calibre import prints
from calibre.constants import iswindows, ismacos from calibre.constants import iswindows, ismacos
from calibre.utils.ipc import eintr_retry_call from calibre.utils.ipc import eintr_retry_call
from calibre.utils.serialize import msgpack_loads, pickle_dumps from calibre.utils.serialize import pickle_dumps
from polyglot.queue import Queue from polyglot.queue import Queue
from polyglot.binary import from_hex_bytes, from_hex_unicode from polyglot.binary import from_hex_unicode
PARALLEL_FUNCS = { PARALLEL_FUNCS = {
'lrfviewer' : 'lrfviewer' :
@ -168,10 +167,7 @@ def main():
from multiprocessing import freeze_support from multiprocessing import freeze_support
freeze_support() freeze_support()
return 0 return 0
# Close open file descriptors inherited from parent if ismacos and 'CALIBRE_WORKER_FD' not in os.environ and 'CALIBRE_SIMPLE_WORKER' not in os.environ and '--pipe-worker' not in sys.argv:
# On Unix this is done by the subprocess module
os.closerange(3, 256)
if ismacos and 'CALIBRE_WORKER_ADDRESS' not in os.environ and 'CALIBRE_SIMPLE_WORKER' not in os.environ and '--pipe-worker' not in sys.argv:
# On some OS X computers launchd apparently tries to # On some OS X computers launchd apparently tries to
# launch the last run process from the bundle # launch the last run process from the bundle
# so launch the gui as usual # so launch the gui as usual
@ -198,10 +194,9 @@ def main():
sys.stdout.flush() sys.stdout.flush()
raise raise
return return
address = msgpack_loads(from_hex_bytes(os.environ['CALIBRE_WORKER_ADDRESS'])) fd = int(os.environ['CALIBRE_WORKER_FD'])
key = from_hex_bytes(os.environ['CALIBRE_WORKER_KEY'])
resultf = from_hex_unicode(os.environ['CALIBRE_WORKER_RESULT']) resultf = from_hex_unicode(os.environ['CALIBRE_WORKER_RESULT'])
with closing(Client(address, authkey=key)) as conn: with Connection(fd) as conn:
name, args, kwargs, desc = eintr_retry_call(conn.recv) name, args, kwargs, desc = eintr_retry_call(conn.recv)
if desc: if desc:
prints(desc) prints(desc)