From 3d7fb47592bd1f530a2767c4edc8c893060e1fe0 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Mon, 30 Nov 2020 14:45:49 +0530 Subject: [PATCH] Port simple and completion workers to use Pipe --- .../gui2/tweak_book/completion/worker.py | 57 ++++++------- src/calibre/utils/ipc/pool.py | 6 +- src/calibre/utils/ipc/simple_worker.py | 83 +++++++++---------- 3 files changed, 69 insertions(+), 77 deletions(-) diff --git a/src/calibre/gui2/tweak_book/completion/worker.py b/src/calibre/gui2/tweak_book/completion/worker.py index 12a5c45ace..90619f4d81 100644 --- a/src/calibre/gui2/tweak_book/completion/worker.py +++ b/src/calibre/gui2/tweak_book/completion/worker.py @@ -5,16 +5,14 @@ __license__ = 'GPL v3' __copyright__ = '2014, Kovid Goyal ' -import os, sys -from threading import Thread, Event, RLock -from contextlib import closing from collections import namedtuple +from multiprocessing.connection import Pipe +from threading import Event, RLock, Thread from calibre.constants import iswindows from calibre.gui2.tweak_book.completion.basic import Request from calibre.gui2.tweak_book.completion.utils import DataError from calibre.utils.ipc import eintr_retry_call -from calibre.utils.serialize import msgpack_loads, msgpack_dumps from polyglot.queue import Queue COMPLETION_REQUEST = 'completion request' @@ -34,22 +32,24 @@ class CompletionWorker(Thread): self.reap_thread = None self.shutting_down = False self.connected = Event() - self.current_completion_request = None self.latest_completion_request_id = None self.request_count = 0 self.lock = RLock() def launch_worker_process(self): - from calibre.utils.ipc.server import create_listener from calibre.utils.ipc.pool import start_worker - self.worker_process = p = start_worker( - 'from {0} import run_main, {1}; run_main({1})'.format(self.__class__.__module__, self.worker_entry_point)) - auth_key = os.urandom(32) - address, self.listener = create_listener(auth_key) - eintr_retry_call(p.stdin.write, msgpack_dumps((address, auth_key))) - p.stdin.flush(), p.stdin.close() - self.control_conn = eintr_retry_call(self.listener.accept) - self.data_conn = eintr_retry_call(self.listener.accept) + control_a, control_b = Pipe() + data_a, data_b = Pipe() + with control_a, data_a: + pass_fds = control_a.fileno(), data_a.fileno() + self.worker_process = p = start_worker( + 'from {0} import run_main, {1}; run_main({2}, {3}, {1})'.format( + self.__class__.__module__, self.worker_entry_point, *pass_fds), + pass_fds + ) + p.stdin.close() + self.control_conn = control_b + self.data_conn = data_b self.data_thread = t = Thread(name='CWData', target=self.handle_data_requests) t.daemon = True t.start() @@ -106,10 +106,7 @@ class CompletionWorker(Thread): req_type, req_data = obj try: if req_type is COMPLETION_REQUEST: - with self.lock: - if self.current_completion_request is not None: - ccr, self.current_completion_request = self.current_completion_request, None - self.send_completion_request(ccr) + self.send_completion_request(req_data) elif req_type is CLEAR_REQUEST: self.send(req_data) except EOFError: @@ -121,7 +118,8 @@ class CompletionWorker(Thread): def send_completion_request(self, request): self.send(request) result = self.recv() - if result.request_id == self.latest_completion_request_id: + latest_completion_request_id = self.latest_completion_request_id + if result is not None and result.request_id == latest_completion_request_id: try: self.result_callback(result) except Exception: @@ -133,9 +131,9 @@ class CompletionWorker(Thread): def queue_completion(self, request_id, completion_type, completion_data, query=None): with self.lock: - self.current_completion_request = Request(request_id, completion_type, completion_data, query) - self.latest_completion_request_id = self.current_completion_request.id - self.main_queue.put((COMPLETION_REQUEST, None)) + ccr = Request(request_id, completion_type, completion_data, query) + self.latest_completion_request_id = ccr.id + self.main_queue.put((COMPLETION_REQUEST, ccr)) def shutdown(self): self.shutting_down = True @@ -170,11 +168,12 @@ def completion_worker(): return _completion_worker -def run_main(func): - from multiprocessing.connection import Client - stdin = getattr(sys.stdin, 'buffer', sys.stdin) - address, key = msgpack_loads(eintr_retry_call(stdin.read)) - with closing(Client(address, authkey=key)) as control_conn, closing(Client(address, authkey=key)) as data_conn: +def run_main(control_fd, data_fd, func): + if iswindows: + from multiprocessing.connection import PipeConnection as Connection + else: + from multiprocessing.connection import Connection + with Connection(control_fd) as control_conn, Connection(data_fd) as data_conn: func(control_conn, data_conn) @@ -207,12 +206,14 @@ def main(control_conn, data_conn): def test_main(control_conn, data_conn): obj = control_conn.recv() - control_conn.send(obj) + dobj = data_conn.recv() + control_conn.send((obj, dobj)) def test(): w = CompletionWorker(worker_entry_point='test_main') w.wait_for_connection() + w.data_conn.send('got the data') w.send('Hello World!') print(w.recv()) w.shutdown(), w.join() diff --git a/src/calibre/utils/ipc/pool.py b/src/calibre/utils/ipc/pool.py index d88148a8f1..bc69daa266 100644 --- a/src/calibre/utils/ipc/pool.py +++ b/src/calibre/utils/ipc/pool.py @@ -68,11 +68,11 @@ def get_stdout(process): break -def start_worker(code, pipe_fd, name=''): +def start_worker(code, pass_fds, name=''): from calibre.utils.ipc.simple_worker import start_pipe_worker if name: name = '-' + name - p = start_pipe_worker(code, pass_fds=(pipe_fd,), **worker_kwargs) + p = start_pipe_worker(code, pass_fds=pass_fds, **worker_kwargs) if get_stdout_from_child: t = Thread(target=get_stdout, name='PoolWorkerGetStdout' + name, args=(p,)) t.daemon = True @@ -191,7 +191,7 @@ class Pool(Thread): with a: cmd = 'from {0} import run_main, {1}; run_main({2!r}, {1})'.format( self.__class__.__module__, 'worker_main', a.fileno()) - p = start_worker(cmd, a.fileno()) + p = start_worker(cmd, (a.fileno(),)) sys.stdout.flush() p.stdin.close() w = Worker(p, b, self.events, self.name) diff --git a/src/calibre/utils/ipc/simple_worker.py b/src/calibre/utils/ipc/simple_worker.py index 7710cdb268..c44facfa0e 100644 --- a/src/calibre/utils/ipc/simple_worker.py +++ b/src/calibre/utils/ipc/simple_worker.py @@ -6,18 +6,23 @@ __license__ = 'GPL v3' __copyright__ = '2012, Kovid Goyal ' __docformat__ = 'restructuredtext en' -import os, time, traceback, importlib -from multiprocessing.connection import Client +import importlib +import os +import time +import traceback +from multiprocessing import Pipe from threading import Thread -from contextlib import closing from calibre.constants import iswindows from calibre.utils.ipc import eintr_retry_call from calibre.utils.ipc.launch import Worker -from calibre.utils.serialize import msgpack_loads, msgpack_dumps from calibre.utils.monotonic import monotonic -from polyglot.builtins import unicode_type, string_or_bytes, environ_item -from polyglot.binary import as_hex_unicode, from_hex_bytes +from polyglot.builtins import environ_item, string_or_bytes, unicode_type + +if iswindows: + from multiprocessing.connection import PipeConnection as Connection +else: + from multiprocessing.connection import Connection class WorkerError(Exception): @@ -30,25 +35,20 @@ class WorkerError(Exception): class ConnectedWorker(Thread): - def __init__(self, listener, args): + def __init__(self, conn, args): Thread.__init__(self) self.daemon = True - self.listener = listener + self.conn = conn self.args = args self.accepted = False self.tb = None self.res = None def run(self): - conn = None - try: - conn = eintr_retry_call(self.listener.accept) - except BaseException: - self.tb = traceback.format_exc() - return self.accepted = True - with closing(conn): + conn = self.conn + with conn: try: eintr_retry_call(conn.send, self.args) self.res = eintr_retry_call(conn.recv) @@ -56,18 +56,15 @@ class ConnectedWorker(Thread): self.tb = traceback.format_exc() -class OffloadWorker(object): +class OffloadWorker: - def __init__(self, listener, worker): - self.listener = listener + def __init__(self, conn, worker): + self.conn = conn self.worker = worker - self.conn = None self.kill_thread = t = Thread(target=self.worker.kill) t.daemon = True def __call__(self, module, func, *args, **kwargs): - if self.conn is None: - self.conn = eintr_retry_call(self.listener.accept) eintr_retry_call(self.conn.send, (module, func, args, kwargs)) return eintr_retry_call(self.conn.recv) @@ -91,9 +88,9 @@ class OffloadWorker(object): return self.worker.is_alive or self.kill_thread.is_alive() -def communicate(ans, worker, listener, args, timeout=300, heartbeat=None, +def communicate(ans, worker, conn, args, timeout=300, heartbeat=None, abort=None): - cw = ConnectedWorker(listener, args) + cw = ConnectedWorker(conn, args) cw.start() st = monotonic() check_heartbeat = callable(heartbeat) @@ -125,20 +122,17 @@ def communicate(ans, worker, listener, args, timeout=300, heartbeat=None, def create_worker(env, priority='normal', cwd=None, func='main'): - from calibre.utils.ipc.server import create_listener - auth_key = os.urandom(32) - address, listener = create_listener(auth_key) - env = dict(env) - env.update({ - 'CALIBRE_WORKER_ADDRESS': environ_item(as_hex_unicode(msgpack_dumps(address))), - 'CALIBRE_WORKER_KEY': environ_item(as_hex_unicode(auth_key)), - 'CALIBRE_SIMPLE_WORKER': environ_item('calibre.utils.ipc.simple_worker:%s' % func), - }) + a, b = Pipe() + with a: + env.update({ + 'CALIBRE_WORKER_FD': str(a.fileno()), + 'CALIBRE_SIMPLE_WORKER': environ_item('calibre.utils.ipc.simple_worker:%s' % func), + }) - w = Worker(env) - w(cwd=cwd, priority=priority) - return listener, w + w = Worker(env) + w(cwd=cwd, priority=priority, pass_fds=(a.fileno(),)) + return b, w def start_pipe_worker(command, env=None, priority='normal', **process_args): @@ -175,7 +169,7 @@ def start_pipe_worker(command, env=None, priority='normal', **process_args): def two_part_fork_job(env=None, priority='normal', cwd=None): env = env or {} - listener, w = create_worker(env, priority, cwd) + conn, w = create_worker(env, priority, cwd) def run_job( mod_name, func_name, args=(), kwargs=None, timeout=300, # seconds @@ -184,7 +178,7 @@ def two_part_fork_job(env=None, priority='normal', cwd=None): ans = {'result':None, 'stdout_stderr':None} kwargs = kwargs or {} try: - communicate(ans, w, listener, (mod_name, func_name, args, kwargs, + communicate(ans, w, conn, (mod_name, func_name, args, kwargs, module_is_source_code), timeout=timeout, heartbeat=heartbeat, abort=abort) except WorkerError as e: @@ -267,12 +261,13 @@ def fork_job(mod_name, func_name, args=(), kwargs=None, timeout=300, # seconds def offload_worker(env={}, priority='normal', cwd=None): - listener, w = create_worker(env=env, priority=priority, cwd=cwd, func='offload') - return OffloadWorker(listener, w) + conn, w = create_worker(env=env, priority=priority, cwd=cwd, func='offload') + return OffloadWorker(conn, w) def compile_code(src): - import re, io + import io + import re if not isinstance(src, unicode_type): match = re.search(br'coding[:=]\s*([-\w.]+)', src[:200]) enc = match.group(1).decode('utf-8') if match else 'utf-8' @@ -291,9 +286,7 @@ def compile_code(src): def main(): # The entry point for the simple worker process - address = msgpack_loads(from_hex_bytes(os.environ['CALIBRE_WORKER_ADDRESS'])) - key = from_hex_bytes(os.environ['CALIBRE_WORKER_KEY']) - with closing(Client(address, authkey=key)) as conn: + with Connection(int(os.environ['CALIBRE_WORKER_FD'])) as conn: args = eintr_retry_call(conn.recv) try: mod, func, args, kwargs, module_is_source_code = args @@ -321,10 +314,8 @@ def main(): def offload(): # The entry point for the offload worker process - address = msgpack_loads(from_hex_bytes(os.environ['CALIBRE_WORKER_ADDRESS'])) - key = from_hex_bytes(os.environ['CALIBRE_WORKER_KEY']) func_cache = {} - with closing(Client(address, authkey=key)) as conn: + with Connection(int(os.environ['CALIBRE_WORKER_FD'])) as conn: while True: args = eintr_retry_call(conn.recv) if args is None: