From 6ad91a2ee401c16762b8ccb63466fd3824be85f7 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Mon, 30 Nov 2020 11:40:28 +0530 Subject: [PATCH] Start work on making IPC more efficient Use multiprocessing.Pipe instead of using named pipes and establishing authenticated communication. Also avoids workers depending on a named object of any kind. Particularly relevant on macOS where the OS tends to delete things from under us. --- src/calibre/utils/ipc/launch.py | 7 +- src/calibre/utils/ipc/server.py | 141 ++++++-------------------------- src/calibre/utils/ipc/worker.py | 17 ++-- 3 files changed, 34 insertions(+), 131 deletions(-) diff --git a/src/calibre/utils/ipc/launch.py b/src/calibre/utils/ipc/launch.py index 78168ecd5b..eb020e7222 100644 --- a/src/calibre/utils/ipc/launch.py +++ b/src/calibre/utils/ipc/launch.py @@ -30,7 +30,7 @@ def renice(niceness): pass -class Worker(object): +class Worker: ''' Platform independent object for launching child processes. All processes have the environment variable :envvar:`CALIBRE_WORKER` set. @@ -136,7 +136,7 @@ class Worker(object): self.job_name = job_name self._env = env.copy() - def __call__(self, redirect_output=True, cwd=None, priority=None): + def __call__(self, redirect_output=True, cwd=None, priority=None, pass_fds=()): ''' If redirect_output is True, output from the child is redirected to a file on disk and this method returns the path to that file. @@ -186,6 +186,9 @@ class Worker(object): args['stdout'] = windows_null_file args['stderr'] = subprocess.STDOUT + args['close_fds'] = True + if pass_fds: + args['pass_fds'] = pass_fds self.child = subprocess.Popen(cmd, **args) if 'stdin' in args: self.child.stdin.close() diff --git a/src/calibre/utils/ipc/server.py b/src/calibre/utils/ipc/server.py index 1292b47fc6..3d82921785 100644 --- a/src/calibre/utils/ipc/server.py +++ b/src/calibre/utils/ipc/server.py @@ -7,8 +7,6 @@ __copyright__ = '2009, Kovid Goyal ' __docformat__ = 'restructuredtext en' -import errno -import itertools import os import sys import tempfile @@ -16,22 +14,23 @@ import time from collections import deque from itertools import count from math import ceil -from multiprocessing.connection import Listener, arbitrary_address +from multiprocessing import Pipe from threading import Thread from calibre import detect_ncpus as cpu_count, force_unicode -from calibre.constants import DEBUG, cache_dir, islinux, iswindows +from calibre.constants import DEBUG from calibre.ptempfile import base_dir from calibre.utils.ipc import eintr_retry_call from calibre.utils.ipc.launch import Worker from calibre.utils.ipc.worker import PARALLEL_FUNCS -from calibre.utils.serialize import msgpack_dumps, pickle_loads +from calibre.utils.serialize import pickle_loads from polyglot.binary import as_hex_unicode from polyglot.builtins import environ_item, string_or_bytes from polyglot.queue import Empty, Queue server_counter = count() +_name_counter = count() class ConnectedWorker(Thread): @@ -97,97 +96,6 @@ class CriticalError(Exception): pass -_name_counter = itertools.count() - -if islinux: - import fcntl - - class LinuxListener(Listener): - - def __init__(self, *args, **kwargs): - Listener.__init__(self, *args, **kwargs) - # multiprocessing tries to call unlink even on abstract - # named sockets, prevent it from doing so. - if self._listener._unlink is not None: - self._listener._unlink.cancel() - # Prevent child processes from inheriting this socket - # If we dont do this child processes not created by calibre, will - # inherit this socket, preventing the calibre GUI from being restarted. - # Examples of such processes are external viewers launched by Qt - # using openUrl(). - fd = self._listener._socket.fileno() - old_flags = fcntl.fcntl(fd, fcntl.F_GETFD) - fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC) - - def close(self): - # To ensure that the socket is released, we have to call - # shutdown() not close(). This is needed to allow calibre to - # restart using the same socket address. - import socket - listener = self._listener - if listener is not None: - self._listener = None - listener._socket.shutdown(socket.SHUT_RDWR) - listener._socket.close() - - def accept(self, *args, **kwargs): - ans = Listener.accept(self, *args, **kwargs) - fd = ans.fileno() - old_flags = fcntl.fcntl(fd, fcntl.F_GETFD) - fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC) - return ans - - def create_listener(authkey, backlog=4): - # Use abstract named sockets on linux to avoid creating unnecessary temp files - prefix = '\0calibre-ipc-listener-%d-%%d' % os.getpid() - while True: - address = (prefix % next(_name_counter)) - try: - l = LinuxListener(address=address, authkey=authkey, backlog=backlog) - return address, l - except EnvironmentError as err: - if err.errno == errno.EADDRINUSE: - continue - raise -elif iswindows: - - def create_listener(authkey, backlog=4): - address = arbitrary_address('AF_PIPE') - if address[1] == ':': - address = address[2:] - return address, Listener(address=address, authkey=authkey, backlog=backlog) -else: - - def create_listener(authkey, backlog=4): - # We use the cache dir rather than the temp dir because - # on macOS, there is software that deletes the temp dir after - # periods of inactivity - prefix = os.path.join(cache_dir(), 'ipc-socket-%d-%%d' % os.getpid()) - max_tries = 20 - while max_tries > 0: - max_tries -= 1 - address = prefix % next(_name_counter) - try: - return address, Listener(address=address, authkey=authkey, backlog=backlog) - except EnvironmentError as err: - if max_tries < 1: - raise - - if err.errno == errno.ENOENT: - # Some OS X machines have software that deletes temp - # files/dirs after prolonged inactivity. See for - # example, https://bugs.launchpad.net/bugs/1541356 - try: - os.makedirs(os.path.dirname(prefix)) - except EnvironmentError as e: - if e.errno != errno.EEXIST: - raise - continue - - if err.errno != errno.EADDRINUSE: - raise - - class Server(Thread): def __init__(self, notify_on_job_done=lambda x: x, pool_size=None, @@ -200,8 +108,6 @@ class Server(Thread): limit = min(limit, cpu_count()) self.pool_size = limit if pool_size is None else pool_size self.notify_on_job_done = notify_on_job_done - self.auth_key = os.urandom(32) - self.address, self.listener = create_listener(self.auth_key, backlog=4) self.add_jobs_queue, self.changed_jobs_queue = Queue(), Queue() self.kill_queue = Queue() self.waiting_jobs = [] @@ -219,34 +125,33 @@ class Server(Thread): if redirect_output is None: redirect_output = not gui - env = { - 'CALIBRE_WORKER_ADDRESS' : environ_item(as_hex_unicode(msgpack_dumps(self.address))), - 'CALIBRE_WORKER_KEY' : environ_item(as_hex_unicode(self.auth_key)), - 'CALIBRE_WORKER_RESULT' : environ_item(as_hex_unicode(rfile)), - } - cw = self.do_launch(env, gui, redirect_output, rfile, job_name=job_name) + cw = self.do_launch(gui, redirect_output, rfile, job_name=job_name) if isinstance(cw, string_or_bytes): raise CriticalError('Failed to launch worker process:\n'+force_unicode(cw)) if DEBUG: print('Worker Launch took: {:.2f} seconds'.format(time.monotonic() - start)) return cw - def do_launch(self, env, gui, redirect_output, rfile, job_name=None): - w = Worker(env, gui=gui, job_name=job_name) + def do_launch(self, gui, redirect_output, rfile, job_name=None): + a, b = Pipe() + with a: + env = { + 'CALIBRE_WORKER_FD': str(a.fileno()), + 'CALIBRE_WORKER_RESULT' : environ_item(as_hex_unicode(rfile)) + } + w = Worker(env, gui=gui, job_name=job_name) - try: - w(redirect_output=redirect_output) - conn = eintr_retry_call(self.listener.accept) - if conn is None: - raise Exception('Failed to launch worker process') - except BaseException: try: - w.kill() - except: - pass - import traceback - return traceback.format_exc() - return ConnectedWorker(w, conn, rfile) + w(pass_fds=(a.fileno(),), redirect_output=redirect_output) + except BaseException: + try: + w.kill() + except: + pass + b.close() + import traceback + return traceback.format_exc() + return ConnectedWorker(w, b, rfile) def add_job(self, job): job.done2 = self.notify_on_job_done diff --git a/src/calibre/utils/ipc/worker.py b/src/calibre/utils/ipc/worker.py index 2c7bc43038..389e582374 100644 --- a/src/calibre/utils/ipc/worker.py +++ b/src/calibre/utils/ipc/worker.py @@ -7,17 +7,16 @@ __copyright__ = '2009, Kovid Goyal ' __docformat__ = 'restructuredtext en' import os, sys, importlib -from multiprocessing.connection import Client +from multiprocessing.connection import Connection from threading import Thread -from contextlib import closing from zipimport import ZipImportError from calibre import prints from calibre.constants import iswindows, ismacos from calibre.utils.ipc import eintr_retry_call -from calibre.utils.serialize import msgpack_loads, pickle_dumps +from calibre.utils.serialize import pickle_dumps from polyglot.queue import Queue -from polyglot.binary import from_hex_bytes, from_hex_unicode +from polyglot.binary import from_hex_unicode PARALLEL_FUNCS = { 'lrfviewer' : @@ -168,10 +167,7 @@ def main(): from multiprocessing import freeze_support freeze_support() return 0 - # Close open file descriptors inherited from parent - # On Unix this is done by the subprocess module - os.closerange(3, 256) - if ismacos and 'CALIBRE_WORKER_ADDRESS' not in os.environ and 'CALIBRE_SIMPLE_WORKER' not in os.environ and '--pipe-worker' not in sys.argv: + if ismacos and 'CALIBRE_WORKER_FD' not in os.environ and 'CALIBRE_SIMPLE_WORKER' not in os.environ and '--pipe-worker' not in sys.argv: # On some OS X computers launchd apparently tries to # launch the last run process from the bundle # so launch the gui as usual @@ -198,10 +194,9 @@ def main(): sys.stdout.flush() raise return - address = msgpack_loads(from_hex_bytes(os.environ['CALIBRE_WORKER_ADDRESS'])) - key = from_hex_bytes(os.environ['CALIBRE_WORKER_KEY']) + fd = int(os.environ['CALIBRE_WORKER_FD']) resultf = from_hex_unicode(os.environ['CALIBRE_WORKER_RESULT']) - with closing(Client(address, authkey=key)) as conn: + with Connection(fd) as conn: name, args, kwargs, desc = eintr_retry_call(conn.recv) if desc: prints(desc)