Remove use of pickle for passing listen address to calibre workers

This commit is contained in:
Kovid Goyal 2019-03-15 21:33:26 +05:30
parent 313ce6b0c9
commit e73e782b56
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 32 additions and 21 deletions

View File

@ -1,28 +1,35 @@
#!/usr/bin/env python2 #!/usr/bin/env python2
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai # vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import with_statement from __future__ import print_function, with_statement
from __future__ import print_function
__license__ = 'GPL v3' __license__ = 'GPL v3'
__copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>' __copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en' __docformat__ = 'restructuredtext en'
import sys, os, cPickle, time, tempfile, errno, itertools
from math import ceil
from threading import Thread, RLock
from Queue import Queue, Empty
from multiprocessing.connection import Listener, arbitrary_address
from collections import deque
from binascii import hexlify
import errno
import itertools
import os
import sys
import tempfile
import time
from binascii import hexlify
from collections import deque
from math import ceil
from multiprocessing.connection import Listener, arbitrary_address
from Queue import Empty, Queue
from threading import RLock, Thread
from calibre import detect_ncpus as cpu_count
from calibre.constants import DEBUG, islinux, iswindows
from calibre.ptempfile import base_dir
from calibre.utils.ipc import eintr_retry_call from calibre.utils.ipc import eintr_retry_call
from calibre.utils.ipc.launch import Worker from calibre.utils.ipc.launch import Worker
from calibre.utils.ipc.worker import PARALLEL_FUNCS from calibre.utils.ipc.worker import PARALLEL_FUNCS
from calibre import detect_ncpus as cpu_count from calibre.utils.serialize import msgpack_dumps, pickle_loads
from calibre.constants import iswindows, DEBUG, islinux
from calibre.ptempfile import base_dir
from polyglot.builtins import string_or_bytes from polyglot.builtins import string_or_bytes
_counter = 0 _counter = 0
@ -212,7 +219,7 @@ class Server(Thread):
redirect_output = not gui redirect_output = not gui
env = { env = {
'CALIBRE_WORKER_ADDRESS' : hexlify(cPickle.dumps(self.listener.address, -1)), 'CALIBRE_WORKER_ADDRESS' : hexlify(msgpack_dumps(self.listener.address)),
'CALIBRE_WORKER_KEY' : hexlify(self.auth_key), 'CALIBRE_WORKER_KEY' : hexlify(self.auth_key),
'CALIBRE_WORKER_RESULT' : hexlify(rfile.encode('utf-8')), 'CALIBRE_WORKER_RESULT' : hexlify(rfile.encode('utf-8')),
} }
@ -281,7 +288,8 @@ class Server(Thread):
job.returncode = worker.returncode job.returncode = worker.returncode
elif os.path.exists(worker.rfile): elif os.path.exists(worker.rfile):
try: try:
job.result = cPickle.load(open(worker.rfile, 'rb')) with lopen(worker.rfile, 'rb') as f:
job.result = pickle_loads(f.read())
os.remove(worker.rfile) os.remove(worker.rfile)
except: except:
pass pass

View File

@ -7,7 +7,7 @@ __license__ = 'GPL v3'
__copyright__ = '2012, Kovid Goyal <kovid@kovidgoyal.net>' __copyright__ = '2012, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en' __docformat__ = 'restructuredtext en'
import os, cPickle, traceback, time, importlib import os, traceback, time, importlib
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from multiprocessing.connection import Client from multiprocessing.connection import Client
from threading import Thread from threading import Thread
@ -16,6 +16,7 @@ from contextlib import closing
from calibre.constants import iswindows from calibre.constants import iswindows
from calibre.utils.ipc import eintr_retry_call from calibre.utils.ipc import eintr_retry_call
from calibre.utils.ipc.launch import Worker from calibre.utils.ipc.launch import Worker
from calibre.utils.serialize import msgpack_loads, msgpack_dumps
from polyglot.builtins import unicode_type, string_or_bytes from polyglot.builtins import unicode_type, string_or_bytes
@ -130,7 +131,7 @@ def create_worker(env, priority='normal', cwd=None, func='main'):
env = dict(env) env = dict(env)
env.update({ env.update({
'CALIBRE_WORKER_ADDRESS': hexlify(cPickle.dumps(listener.address, -1)), 'CALIBRE_WORKER_ADDRESS': hexlify(msgpack_dumps(listener.address)),
'CALIBRE_WORKER_KEY': hexlify(auth_key), 'CALIBRE_WORKER_KEY': hexlify(auth_key),
'CALIBRE_SIMPLE_WORKER': 'calibre.utils.ipc.simple_worker:%s' % func, 'CALIBRE_SIMPLE_WORKER': 'calibre.utils.ipc.simple_worker:%s' % func,
}) })
@ -270,7 +271,7 @@ def compile_code(src):
def main(): def main():
# The entry point for the simple worker process # The entry point for the simple worker process
address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS'])) address = msgpack_loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS']))
key = unhexlify(os.environ['CALIBRE_WORKER_KEY']) key = unhexlify(os.environ['CALIBRE_WORKER_KEY'])
with closing(Client(address, authkey=key)) as conn: with closing(Client(address, authkey=key)) as conn:
args = eintr_retry_call(conn.recv) args = eintr_retry_call(conn.recv)
@ -300,7 +301,7 @@ def main():
def offload(): def offload():
# The entry point for the offload worker process # The entry point for the offload worker process
address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS'])) address = msgpack_loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS']))
key = unhexlify(os.environ['CALIBRE_WORKER_KEY']) key = unhexlify(os.environ['CALIBRE_WORKER_KEY'])
func_cache = {} func_cache = {}
with closing(Client(address, authkey=key)) as conn: with closing(Client(address, authkey=key)) as conn:

View File

@ -7,7 +7,7 @@ __license__ = 'GPL v3'
__copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>' __copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en' __docformat__ = 'restructuredtext en'
import os, cPickle, sys, importlib import os, sys, importlib
from multiprocessing.connection import Client from multiprocessing.connection import Client
from threading import Thread from threading import Thread
from Queue import Queue from Queue import Queue
@ -18,6 +18,7 @@ from zipimport import ZipImportError
from calibre import prints from calibre import prints
from calibre.constants import iswindows, isosx from calibre.constants import iswindows, isosx
from calibre.utils.ipc import eintr_retry_call from calibre.utils.ipc import eintr_retry_call
from calibre.utils.serialize import msgpack_loads, pickle_dumps
PARALLEL_FUNCS = { PARALLEL_FUNCS = {
'lrfviewer' : 'lrfviewer' :
@ -182,7 +183,7 @@ def main():
print('Failed to run pipe worker with command:', sys.argv[-1]) print('Failed to run pipe worker with command:', sys.argv[-1])
raise raise
return return
address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS'])) address = msgpack_loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS']))
key = unhexlify(os.environ['CALIBRE_WORKER_KEY']) key = unhexlify(os.environ['CALIBRE_WORKER_KEY'])
resultf = unhexlify(os.environ['CALIBRE_WORKER_RESULT']).decode('utf-8') resultf = unhexlify(os.environ['CALIBRE_WORKER_RESULT']).decode('utf-8')
with closing(Client(address, authkey=key)) as conn: with closing(Client(address, authkey=key)) as conn:
@ -198,7 +199,8 @@ def main():
result = func(*args, **kwargs) result = func(*args, **kwargs)
if result is not None and os.path.exists(os.path.dirname(resultf)): if result is not None and os.path.exists(os.path.dirname(resultf)):
cPickle.dump(result, open(resultf, 'wb'), -1) with lopen(resultf, 'wb') as f:
f.write(pickle_dumps(result))
notifier.queue.put(None) notifier.queue.put(None)