Port simple and completion workers to use Pipe

This commit is contained in:
Kovid Goyal 2020-11-30 14:45:49 +05:30
parent 064c66804b
commit 3d7fb47592
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 69 additions and 77 deletions

View File

@ -5,16 +5,14 @@
__license__ = 'GPL v3'
__copyright__ = '2014, Kovid Goyal <kovid at kovidgoyal.net>'
import os, sys
from threading import Thread, Event, RLock
from contextlib import closing
from collections import namedtuple
from multiprocessing.connection import Pipe
from threading import Event, RLock, Thread
from calibre.constants import iswindows
from calibre.gui2.tweak_book.completion.basic import Request
from calibre.gui2.tweak_book.completion.utils import DataError
from calibre.utils.ipc import eintr_retry_call
from calibre.utils.serialize import msgpack_loads, msgpack_dumps
from polyglot.queue import Queue
COMPLETION_REQUEST = 'completion request'
@ -34,22 +32,24 @@ class CompletionWorker(Thread):
self.reap_thread = None
self.shutting_down = False
self.connected = Event()
self.current_completion_request = None
self.latest_completion_request_id = None
self.request_count = 0
self.lock = RLock()
def launch_worker_process(self):
from calibre.utils.ipc.server import create_listener
from calibre.utils.ipc.pool import start_worker
self.worker_process = p = start_worker(
'from {0} import run_main, {1}; run_main({1})'.format(self.__class__.__module__, self.worker_entry_point))
auth_key = os.urandom(32)
address, self.listener = create_listener(auth_key)
eintr_retry_call(p.stdin.write, msgpack_dumps((address, auth_key)))
p.stdin.flush(), p.stdin.close()
self.control_conn = eintr_retry_call(self.listener.accept)
self.data_conn = eintr_retry_call(self.listener.accept)
control_a, control_b = Pipe()
data_a, data_b = Pipe()
with control_a, data_a:
pass_fds = control_a.fileno(), data_a.fileno()
self.worker_process = p = start_worker(
'from {0} import run_main, {1}; run_main({2}, {3}, {1})'.format(
self.__class__.__module__, self.worker_entry_point, *pass_fds),
pass_fds
)
p.stdin.close()
self.control_conn = control_b
self.data_conn = data_b
self.data_thread = t = Thread(name='CWData', target=self.handle_data_requests)
t.daemon = True
t.start()
@ -106,10 +106,7 @@ class CompletionWorker(Thread):
req_type, req_data = obj
try:
if req_type is COMPLETION_REQUEST:
with self.lock:
if self.current_completion_request is not None:
ccr, self.current_completion_request = self.current_completion_request, None
self.send_completion_request(ccr)
self.send_completion_request(req_data)
elif req_type is CLEAR_REQUEST:
self.send(req_data)
except EOFError:
@ -121,7 +118,8 @@ class CompletionWorker(Thread):
def send_completion_request(self, request):
self.send(request)
result = self.recv()
if result.request_id == self.latest_completion_request_id:
latest_completion_request_id = self.latest_completion_request_id
if result is not None and result.request_id == latest_completion_request_id:
try:
self.result_callback(result)
except Exception:
@ -133,9 +131,9 @@ class CompletionWorker(Thread):
def queue_completion(self, request_id, completion_type, completion_data, query=None):
with self.lock:
self.current_completion_request = Request(request_id, completion_type, completion_data, query)
self.latest_completion_request_id = self.current_completion_request.id
self.main_queue.put((COMPLETION_REQUEST, None))
ccr = Request(request_id, completion_type, completion_data, query)
self.latest_completion_request_id = ccr.id
self.main_queue.put((COMPLETION_REQUEST, ccr))
def shutdown(self):
self.shutting_down = True
@ -170,11 +168,12 @@ def completion_worker():
return _completion_worker
def run_main(func):
from multiprocessing.connection import Client
stdin = getattr(sys.stdin, 'buffer', sys.stdin)
address, key = msgpack_loads(eintr_retry_call(stdin.read))
with closing(Client(address, authkey=key)) as control_conn, closing(Client(address, authkey=key)) as data_conn:
def run_main(control_fd, data_fd, func):
if iswindows:
from multiprocessing.connection import PipeConnection as Connection
else:
from multiprocessing.connection import Connection
with Connection(control_fd) as control_conn, Connection(data_fd) as data_conn:
func(control_conn, data_conn)
@ -207,12 +206,14 @@ def main(control_conn, data_conn):
def test_main(control_conn, data_conn):
obj = control_conn.recv()
control_conn.send(obj)
dobj = data_conn.recv()
control_conn.send((obj, dobj))
def test():
w = CompletionWorker(worker_entry_point='test_main')
w.wait_for_connection()
w.data_conn.send('got the data')
w.send('Hello World!')
print(w.recv())
w.shutdown(), w.join()

View File

@ -68,11 +68,11 @@ def get_stdout(process):
break
def start_worker(code, pipe_fd, name=''):
def start_worker(code, pass_fds, name=''):
from calibre.utils.ipc.simple_worker import start_pipe_worker
if name:
name = '-' + name
p = start_pipe_worker(code, pass_fds=(pipe_fd,), **worker_kwargs)
p = start_pipe_worker(code, pass_fds=pass_fds, **worker_kwargs)
if get_stdout_from_child:
t = Thread(target=get_stdout, name='PoolWorkerGetStdout' + name, args=(p,))
t.daemon = True
@ -191,7 +191,7 @@ class Pool(Thread):
with a:
cmd = 'from {0} import run_main, {1}; run_main({2!r}, {1})'.format(
self.__class__.__module__, 'worker_main', a.fileno())
p = start_worker(cmd, a.fileno())
p = start_worker(cmd, (a.fileno(),))
sys.stdout.flush()
p.stdin.close()
w = Worker(p, b, self.events, self.name)

View File

@ -6,18 +6,23 @@ __license__ = 'GPL v3'
__copyright__ = '2012, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import os, time, traceback, importlib
from multiprocessing.connection import Client
import importlib
import os
import time
import traceback
from multiprocessing import Pipe
from threading import Thread
from contextlib import closing
from calibre.constants import iswindows
from calibre.utils.ipc import eintr_retry_call
from calibre.utils.ipc.launch import Worker
from calibre.utils.serialize import msgpack_loads, msgpack_dumps
from calibre.utils.monotonic import monotonic
from polyglot.builtins import unicode_type, string_or_bytes, environ_item
from polyglot.binary import as_hex_unicode, from_hex_bytes
from polyglot.builtins import environ_item, string_or_bytes, unicode_type
if iswindows:
from multiprocessing.connection import PipeConnection as Connection
else:
from multiprocessing.connection import Connection
class WorkerError(Exception):
@ -30,25 +35,20 @@ class WorkerError(Exception):
class ConnectedWorker(Thread):
def __init__(self, listener, args):
def __init__(self, conn, args):
Thread.__init__(self)
self.daemon = True
self.listener = listener
self.conn = conn
self.args = args
self.accepted = False
self.tb = None
self.res = None
def run(self):
conn = None
try:
conn = eintr_retry_call(self.listener.accept)
except BaseException:
self.tb = traceback.format_exc()
return
self.accepted = True
with closing(conn):
conn = self.conn
with conn:
try:
eintr_retry_call(conn.send, self.args)
self.res = eintr_retry_call(conn.recv)
@ -56,18 +56,15 @@ class ConnectedWorker(Thread):
self.tb = traceback.format_exc()
class OffloadWorker(object):
class OffloadWorker:
def __init__(self, listener, worker):
self.listener = listener
def __init__(self, conn, worker):
self.conn = conn
self.worker = worker
self.conn = None
self.kill_thread = t = Thread(target=self.worker.kill)
t.daemon = True
def __call__(self, module, func, *args, **kwargs):
if self.conn is None:
self.conn = eintr_retry_call(self.listener.accept)
eintr_retry_call(self.conn.send, (module, func, args, kwargs))
return eintr_retry_call(self.conn.recv)
@ -91,9 +88,9 @@ class OffloadWorker(object):
return self.worker.is_alive or self.kill_thread.is_alive()
def communicate(ans, worker, listener, args, timeout=300, heartbeat=None,
def communicate(ans, worker, conn, args, timeout=300, heartbeat=None,
abort=None):
cw = ConnectedWorker(listener, args)
cw = ConnectedWorker(conn, args)
cw.start()
st = monotonic()
check_heartbeat = callable(heartbeat)
@ -125,20 +122,17 @@ def communicate(ans, worker, listener, args, timeout=300, heartbeat=None,
def create_worker(env, priority='normal', cwd=None, func='main'):
from calibre.utils.ipc.server import create_listener
auth_key = os.urandom(32)
address, listener = create_listener(auth_key)
env = dict(env)
env.update({
'CALIBRE_WORKER_ADDRESS': environ_item(as_hex_unicode(msgpack_dumps(address))),
'CALIBRE_WORKER_KEY': environ_item(as_hex_unicode(auth_key)),
'CALIBRE_SIMPLE_WORKER': environ_item('calibre.utils.ipc.simple_worker:%s' % func),
})
a, b = Pipe()
with a:
env.update({
'CALIBRE_WORKER_FD': str(a.fileno()),
'CALIBRE_SIMPLE_WORKER': environ_item('calibre.utils.ipc.simple_worker:%s' % func),
})
w = Worker(env)
w(cwd=cwd, priority=priority)
return listener, w
w = Worker(env)
w(cwd=cwd, priority=priority, pass_fds=(a.fileno(),))
return b, w
def start_pipe_worker(command, env=None, priority='normal', **process_args):
@ -175,7 +169,7 @@ def start_pipe_worker(command, env=None, priority='normal', **process_args):
def two_part_fork_job(env=None, priority='normal', cwd=None):
env = env or {}
listener, w = create_worker(env, priority, cwd)
conn, w = create_worker(env, priority, cwd)
def run_job(
mod_name, func_name, args=(), kwargs=None, timeout=300, # seconds
@ -184,7 +178,7 @@ def two_part_fork_job(env=None, priority='normal', cwd=None):
ans = {'result':None, 'stdout_stderr':None}
kwargs = kwargs or {}
try:
communicate(ans, w, listener, (mod_name, func_name, args, kwargs,
communicate(ans, w, conn, (mod_name, func_name, args, kwargs,
module_is_source_code), timeout=timeout, heartbeat=heartbeat,
abort=abort)
except WorkerError as e:
@ -267,12 +261,13 @@ def fork_job(mod_name, func_name, args=(), kwargs=None, timeout=300, # seconds
def offload_worker(env={}, priority='normal', cwd=None):
listener, w = create_worker(env=env, priority=priority, cwd=cwd, func='offload')
return OffloadWorker(listener, w)
conn, w = create_worker(env=env, priority=priority, cwd=cwd, func='offload')
return OffloadWorker(conn, w)
def compile_code(src):
import re, io
import io
import re
if not isinstance(src, unicode_type):
match = re.search(br'coding[:=]\s*([-\w.]+)', src[:200])
enc = match.group(1).decode('utf-8') if match else 'utf-8'
@ -291,9 +286,7 @@ def compile_code(src):
def main():
# The entry point for the simple worker process
address = msgpack_loads(from_hex_bytes(os.environ['CALIBRE_WORKER_ADDRESS']))
key = from_hex_bytes(os.environ['CALIBRE_WORKER_KEY'])
with closing(Client(address, authkey=key)) as conn:
with Connection(int(os.environ['CALIBRE_WORKER_FD'])) as conn:
args = eintr_retry_call(conn.recv)
try:
mod, func, args, kwargs, module_is_source_code = args
@ -321,10 +314,8 @@ def main():
def offload():
# The entry point for the offload worker process
address = msgpack_loads(from_hex_bytes(os.environ['CALIBRE_WORKER_ADDRESS']))
key = from_hex_bytes(os.environ['CALIBRE_WORKER_KEY'])
func_cache = {}
with closing(Client(address, authkey=key)) as conn:
with Connection(int(os.environ['CALIBRE_WORKER_FD'])) as conn:
while True:
args = eintr_retry_call(conn.recv)
if args is None: