Port ipc.pool to use Pipe

This commit is contained in:
Kovid Goyal 2020-11-30 13:21:44 +05:30
parent 6ad91a2ee4
commit 35cbca3dad
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 15 additions and 20 deletions

View File

@ -8,13 +8,14 @@ __copyright__ = '2014, Kovid Goyal <kovid at kovidgoyal.net>'
import os, sys
from threading import Thread
from collections import namedtuple
from multiprocessing.connection import Pipe
from calibre import detect_ncpus, as_unicode, prints
from calibre.constants import iswindows, DEBUG
from calibre.ptempfile import PersistentTemporaryFile
from calibre.utils import join_with_timeout
from calibre.utils.ipc import eintr_retry_call
from calibre.utils.serialize import msgpack_dumps, msgpack_loads, pickle_dumps, pickle_loads
from calibre.utils.serialize import pickle_dumps, pickle_loads
from polyglot.builtins import iteritems, itervalues
from polyglot.queue import Queue
@ -67,11 +68,11 @@ def get_stdout(process):
break
def start_worker(code, name=''):
def start_worker(code, pipe_fd, name=''):
from calibre.utils.ipc.simple_worker import start_pipe_worker
if name:
name = '-' + name
p = start_pipe_worker(code, **worker_kwargs)
p = start_pipe_worker(code, pass_fds=(pipe_fd,), **worker_kwargs)
if get_stdout_from_child:
t = Thread(target=get_stdout, name='PoolWorkerGetStdout' + name, args=(p,))
t.daemon = True
@ -132,7 +133,6 @@ class Pool(Thread):
self.tracker = Queue()
self.terminal_failure = None
self.common_data = pickle_dumps(None)
self.worker_data = None
self.shutting_down = False
self.start()
@ -187,12 +187,14 @@ class Pool(Thread):
self.shutdown_workers(wait_time=wait_time)
def create_worker(self):
p = start_worker('from {0} import run_main, {1}; run_main({1})'.format(self.__class__.__module__, 'worker_main'))
a, b = Pipe()
with a:
cmd = 'from {0} import run_main, {1}; run_main({2!r}, {1})'.format(
self.__class__.__module__, 'worker_main', a.fileno())
p = start_worker(cmd, a.fileno())
sys.stdout.flush()
eintr_retry_call(p.stdin.write, self.worker_data)
p.stdin.flush(), p.stdin.close()
conn = eintr_retry_call(self.listener.accept)
w = Worker(p, conn, self.events, self.name)
p.stdin.close()
w = Worker(p, b, self.events, self.name)
if self.common_data != pickle_dumps(None):
w.set_common_data(self.common_data)
return w
@ -209,10 +211,6 @@ class Pool(Thread):
return False
def run(self):
from calibre.utils.ipc.server import create_listener
self.auth_key = os.urandom(32)
self.address, self.listener = create_listener(self.auth_key)
self.worker_data = msgpack_dumps((self.address, self.auth_key))
if self.start_worker() is False:
return
@ -388,12 +386,9 @@ def worker_main(conn):
return 0
def run_main(func):
from multiprocessing.connection import Client
from contextlib import closing
stdin = getattr(sys.stdin, 'buffer', sys.stdin)
address, key = msgpack_loads(eintr_retry_call(stdin.read))
with closing(Client(address, authkey=key)) as conn:
def run_main(client_fd, func):
from multiprocessing.connection import Connection
with Connection(client_fd) as conn:
raise SystemExit(func(conn))

View File

@ -144,7 +144,7 @@ def create_worker(env, priority='normal', cwd=None, func='main'):
def start_pipe_worker(command, env=None, priority='normal', **process_args):
import subprocess
w = Worker(env or {})
args = {'stdout':subprocess.PIPE, 'stdin':subprocess.PIPE, 'env':w.env}
args = {'stdout':subprocess.PIPE, 'stdin':subprocess.PIPE, 'env':w.env, 'close_fds': True}
args.update(process_args)
if iswindows:
priority = {