From e73e782b56d39769fcae731c9b659640b926a7f4 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Fri, 15 Mar 2019 21:33:26 +0530 Subject: [PATCH] Remove use of pickle for passing listen address to calibre workers --- src/calibre/utils/ipc/server.py | 36 ++++++++++++++++---------- src/calibre/utils/ipc/simple_worker.py | 9 ++++--- src/calibre/utils/ipc/worker.py | 8 +++--- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/src/calibre/utils/ipc/server.py b/src/calibre/utils/ipc/server.py index 4452d9809c..f96d9d11ed 100644 --- a/src/calibre/utils/ipc/server.py +++ b/src/calibre/utils/ipc/server.py @@ -1,28 +1,35 @@ #!/usr/bin/env python2 # vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai -from __future__ import with_statement -from __future__ import print_function +from __future__ import print_function, with_statement __license__ = 'GPL v3' __copyright__ = '2009, Kovid Goyal ' __docformat__ = 'restructuredtext en' -import sys, os, cPickle, time, tempfile, errno, itertools -from math import ceil -from threading import Thread, RLock -from Queue import Queue, Empty -from multiprocessing.connection import Listener, arbitrary_address -from collections import deque -from binascii import hexlify +import errno +import itertools +import os +import sys +import tempfile +import time +from binascii import hexlify +from collections import deque +from math import ceil +from multiprocessing.connection import Listener, arbitrary_address +from Queue import Empty, Queue +from threading import RLock, Thread + +from calibre import detect_ncpus as cpu_count +from calibre.constants import DEBUG, islinux, iswindows +from calibre.ptempfile import base_dir from calibre.utils.ipc import eintr_retry_call from calibre.utils.ipc.launch import Worker from calibre.utils.ipc.worker import PARALLEL_FUNCS -from calibre import detect_ncpus as cpu_count -from calibre.constants import iswindows, DEBUG, islinux -from calibre.ptempfile import base_dir +from calibre.utils.serialize import msgpack_dumps, pickle_loads from polyglot.builtins import string_or_bytes + _counter = 0 @@ -212,7 +219,7 @@ class Server(Thread): redirect_output = not gui env = { - 'CALIBRE_WORKER_ADDRESS' : hexlify(cPickle.dumps(self.listener.address, -1)), + 'CALIBRE_WORKER_ADDRESS' : hexlify(msgpack_dumps(self.listener.address)), 'CALIBRE_WORKER_KEY' : hexlify(self.auth_key), 'CALIBRE_WORKER_RESULT' : hexlify(rfile.encode('utf-8')), } @@ -281,7 +288,8 @@ class Server(Thread): job.returncode = worker.returncode elif os.path.exists(worker.rfile): try: - job.result = cPickle.load(open(worker.rfile, 'rb')) + with lopen(worker.rfile, 'rb') as f: + job.result = pickle_loads(f.read()) os.remove(worker.rfile) except: pass diff --git a/src/calibre/utils/ipc/simple_worker.py b/src/calibre/utils/ipc/simple_worker.py index 78c280ce60..880ba69665 100644 --- a/src/calibre/utils/ipc/simple_worker.py +++ b/src/calibre/utils/ipc/simple_worker.py @@ -7,7 +7,7 @@ __license__ = 'GPL v3' __copyright__ = '2012, Kovid Goyal ' __docformat__ = 'restructuredtext en' -import os, cPickle, traceback, time, importlib +import os, traceback, time, importlib from binascii import hexlify, unhexlify from multiprocessing.connection import Client from threading import Thread @@ -16,6 +16,7 @@ from contextlib import closing from calibre.constants import iswindows from calibre.utils.ipc import eintr_retry_call from calibre.utils.ipc.launch import Worker +from calibre.utils.serialize import msgpack_loads, msgpack_dumps from polyglot.builtins import unicode_type, string_or_bytes @@ -130,7 +131,7 @@ def create_worker(env, priority='normal', cwd=None, func='main'): env = dict(env) env.update({ - 'CALIBRE_WORKER_ADDRESS': hexlify(cPickle.dumps(listener.address, -1)), + 'CALIBRE_WORKER_ADDRESS': hexlify(msgpack_dumps(listener.address)), 'CALIBRE_WORKER_KEY': hexlify(auth_key), 'CALIBRE_SIMPLE_WORKER': 'calibre.utils.ipc.simple_worker:%s' % func, }) @@ -270,7 +271,7 @@ def compile_code(src): def main(): # The entry point for the simple worker process - address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS'])) + address = msgpack_loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS'])) key = unhexlify(os.environ['CALIBRE_WORKER_KEY']) with closing(Client(address, authkey=key)) as conn: args = eintr_retry_call(conn.recv) @@ -300,7 +301,7 @@ def main(): def offload(): # The entry point for the offload worker process - address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS'])) + address = msgpack_loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS'])) key = unhexlify(os.environ['CALIBRE_WORKER_KEY']) func_cache = {} with closing(Client(address, authkey=key)) as conn: diff --git a/src/calibre/utils/ipc/worker.py b/src/calibre/utils/ipc/worker.py index e48d94e860..c4ca19cd13 100644 --- a/src/calibre/utils/ipc/worker.py +++ b/src/calibre/utils/ipc/worker.py @@ -7,7 +7,7 @@ __license__ = 'GPL v3' __copyright__ = '2009, Kovid Goyal ' __docformat__ = 'restructuredtext en' -import os, cPickle, sys, importlib +import os, sys, importlib from multiprocessing.connection import Client from threading import Thread from Queue import Queue @@ -18,6 +18,7 @@ from zipimport import ZipImportError from calibre import prints from calibre.constants import iswindows, isosx from calibre.utils.ipc import eintr_retry_call +from calibre.utils.serialize import msgpack_loads, pickle_dumps PARALLEL_FUNCS = { 'lrfviewer' : @@ -182,7 +183,7 @@ def main(): print('Failed to run pipe worker with command:', sys.argv[-1]) raise return - address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS'])) + address = msgpack_loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS'])) key = unhexlify(os.environ['CALIBRE_WORKER_KEY']) resultf = unhexlify(os.environ['CALIBRE_WORKER_RESULT']).decode('utf-8') with closing(Client(address, authkey=key)) as conn: @@ -198,7 +199,8 @@ def main(): result = func(*args, **kwargs) if result is not None and os.path.exists(os.path.dirname(resultf)): - cPickle.dump(result, open(resultf, 'wb'), -1) + with lopen(resultf, 'wb') as f: + f.write(pickle_dumps(result)) notifier.queue.put(None)