From 35cbca3dadc9bcac8a9a0c4f2af3aa398c230b24 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Mon, 30 Nov 2020 13:21:44 +0530 Subject: [PATCH] Port ipc.pool to use Pipe --- src/calibre/utils/ipc/pool.py | 33 +++++++++++--------------- src/calibre/utils/ipc/simple_worker.py | 2 +- 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/src/calibre/utils/ipc/pool.py b/src/calibre/utils/ipc/pool.py index 4ab4c7d09f..8a7a4265b4 100644 --- a/src/calibre/utils/ipc/pool.py +++ b/src/calibre/utils/ipc/pool.py @@ -8,13 +8,14 @@ __copyright__ = '2014, Kovid Goyal ' import os, sys from threading import Thread from collections import namedtuple +from multiprocessing.connection import Pipe from calibre import detect_ncpus, as_unicode, prints from calibre.constants import iswindows, DEBUG from calibre.ptempfile import PersistentTemporaryFile from calibre.utils import join_with_timeout from calibre.utils.ipc import eintr_retry_call -from calibre.utils.serialize import msgpack_dumps, msgpack_loads, pickle_dumps, pickle_loads +from calibre.utils.serialize import pickle_dumps, pickle_loads from polyglot.builtins import iteritems, itervalues from polyglot.queue import Queue @@ -67,11 +68,11 @@ def get_stdout(process): break -def start_worker(code, name=''): +def start_worker(code, pipe_fd, name=''): from calibre.utils.ipc.simple_worker import start_pipe_worker if name: name = '-' + name - p = start_pipe_worker(code, **worker_kwargs) + p = start_pipe_worker(code, pass_fds=(pipe_fd,), **worker_kwargs) if get_stdout_from_child: t = Thread(target=get_stdout, name='PoolWorkerGetStdout' + name, args=(p,)) t.daemon = True @@ -132,7 +133,6 @@ class Pool(Thread): self.tracker = Queue() self.terminal_failure = None self.common_data = pickle_dumps(None) - self.worker_data = None self.shutting_down = False self.start() @@ -187,12 +187,14 @@ class Pool(Thread): self.shutdown_workers(wait_time=wait_time) def create_worker(self): - p = start_worker('from {0} import run_main, {1}; run_main({1})'.format(self.__class__.__module__, 'worker_main')) + a, b = Pipe() + with a: + cmd = 'from {0} import run_main, {1}; run_main({2!r}, {1})'.format( + self.__class__.__module__, 'worker_main', a.fileno()) + p = start_worker(cmd, a.fileno()) sys.stdout.flush() - eintr_retry_call(p.stdin.write, self.worker_data) - p.stdin.flush(), p.stdin.close() - conn = eintr_retry_call(self.listener.accept) - w = Worker(p, conn, self.events, self.name) + p.stdin.close() + w = Worker(p, b, self.events, self.name) if self.common_data != pickle_dumps(None): w.set_common_data(self.common_data) return w @@ -209,10 +211,6 @@ class Pool(Thread): return False def run(self): - from calibre.utils.ipc.server import create_listener - self.auth_key = os.urandom(32) - self.address, self.listener = create_listener(self.auth_key) - self.worker_data = msgpack_dumps((self.address, self.auth_key)) if self.start_worker() is False: return @@ -388,12 +386,9 @@ def worker_main(conn): return 0 -def run_main(func): - from multiprocessing.connection import Client - from contextlib import closing - stdin = getattr(sys.stdin, 'buffer', sys.stdin) - address, key = msgpack_loads(eintr_retry_call(stdin.read)) - with closing(Client(address, authkey=key)) as conn: +def run_main(client_fd, func): + from multiprocessing.connection import Connection + with Connection(client_fd) as conn: raise SystemExit(func(conn)) diff --git a/src/calibre/utils/ipc/simple_worker.py b/src/calibre/utils/ipc/simple_worker.py index 642ee9fb9d..fc0af076ae 100644 --- a/src/calibre/utils/ipc/simple_worker.py +++ b/src/calibre/utils/ipc/simple_worker.py @@ -144,7 +144,7 @@ def create_worker(env, priority='normal', cwd=None, func='main'): def start_pipe_worker(command, env=None, priority='normal', **process_args): import subprocess w = Worker(env or {}) - args = {'stdout':subprocess.PIPE, 'stdin':subprocess.PIPE, 'env':w.env} + args = {'stdout':subprocess.PIPE, 'stdin':subprocess.PIPE, 'env':w.env, 'close_fds': True} args.update(process_args) if iswindows: priority = {