Make use of pickle in ipc.pool work on py3 as well

This commit is contained in:
Kovid Goyal 2019-03-15 21:57:32 +05:30
parent 20b9cc3a81
commit d3ccd4369b
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -6,7 +6,7 @@ from __future__ import (unicode_literals, division, absolute_import,
__license__ = 'GPL v3' __license__ = 'GPL v3'
__copyright__ = '2014, Kovid Goyal <kovid at kovidgoyal.net>' __copyright__ = '2014, Kovid Goyal <kovid at kovidgoyal.net>'
import os, cPickle, sys import os, sys
from threading import Thread from threading import Thread
from collections import namedtuple from collections import namedtuple
from Queue import Queue from Queue import Queue
@ -16,6 +16,7 @@ from calibre.constants import iswindows, DEBUG
from calibre.ptempfile import PersistentTemporaryFile from calibre.ptempfile import PersistentTemporaryFile
from calibre.utils import join_with_timeout from calibre.utils import join_with_timeout
from calibre.utils.ipc import eintr_retry_call from calibre.utils.ipc import eintr_retry_call
from calibre.utils.serialize import msgpack_dumps, msgpack_loads, pickle_dumps, pickle_loads
Job = namedtuple('Job', 'id module func args kwargs') Job = namedtuple('Job', 'id module func args kwargs')
Result = namedtuple('Result', 'value err traceback') Result = namedtuple('Result', 'value err traceback')
@ -95,7 +96,7 @@ class Worker(object):
self.name = name or '' self.name = name or ''
def __call__(self, job): def __call__(self, job):
eintr_retry_call(self.conn.send_bytes, cPickle.dumps(job, -1)) eintr_retry_call(self.conn.send_bytes, pickle_dumps(job))
if job is not None: if job is not None:
self.job_id = job.id self.job_id = job.id
t = Thread(target=self.recv, name='PoolWorker-'+self.name) t = Thread(target=self.recv, name='PoolWorker-'+self.name)
@ -104,7 +105,7 @@ class Worker(object):
def recv(self): def recv(self):
try: try:
result = cPickle.loads(eintr_retry_call(self.conn.recv_bytes)) result = pickle_loads(eintr_retry_call(self.conn.recv_bytes))
wr = WorkerResult(self.job_id, result, False, self) wr = WorkerResult(self.job_id, result, False, self)
except Exception as err: except Exception as err:
import traceback import traceback
@ -130,7 +131,7 @@ class Pool(Thread):
self.results = Queue() self.results = Queue()
self.tracker = Queue() self.tracker = Queue()
self.terminal_failure = None self.terminal_failure = None
self.common_data = cPickle.dumps(None, -1) self.common_data = pickle_dumps(None)
self.worker_data = None self.worker_data = None
self.shutting_down = False self.shutting_down = False
@ -192,7 +193,7 @@ class Pool(Thread):
p.stdin.flush(), p.stdin.close() p.stdin.flush(), p.stdin.close()
conn = eintr_retry_call(self.listener.accept) conn = eintr_retry_call(self.listener.accept)
w = Worker(p, conn, self.events, self.name) w = Worker(p, conn, self.events, self.name)
if self.common_data != cPickle.dumps(None, -1): if self.common_data != pickle_dumps(None):
w.set_common_data(self.common_data) w.set_common_data(self.common_data)
return w return w
@ -211,7 +212,7 @@ class Pool(Thread):
from calibre.utils.ipc.server import create_listener from calibre.utils.ipc.server import create_listener
self.auth_key = os.urandom(32) self.auth_key = os.urandom(32)
self.address, self.listener = create_listener(self.auth_key) self.address, self.listener = create_listener(self.auth_key)
self.worker_data = cPickle.dumps((self.address, self.auth_key), -1) self.worker_data = msgpack_dumps((self.address, self.auth_key))
if self.start_worker() is False: if self.start_worker() is False:
return return
@ -243,12 +244,12 @@ class Pool(Thread):
return False return False
self.results.put(worker_result) self.results.put(worker_result)
else: else:
self.common_data = cPickle.dumps(event, -1) self.common_data = pickle_dumps(event)
if len(self.common_data) > MAX_SIZE: if len(self.common_data) > MAX_SIZE:
self.cd_file = PersistentTemporaryFile('pool_common_data') self.cd_file = PersistentTemporaryFile('pool_common_data')
with self.cd_file as f: with self.cd_file as f:
f.write(self.common_data) f.write(self.common_data)
self.common_data = cPickle.dumps(File(f.name), -1) self.common_data = pickle_dumps(File(f.name))
for worker in self.available_workers: for worker in self.available_workers:
try: try:
worker.set_common_data(self.common_data) worker.set_common_data(self.common_data)
@ -340,7 +341,7 @@ def worker_main(conn):
common_data = None common_data = None
while True: while True:
try: try:
job = cPickle.loads(eintr_retry_call(conn.recv_bytes)) job = pickle_loads(eintr_retry_call(conn.recv_bytes))
except EOFError: except EOFError:
break break
except KeyboardInterrupt: except KeyboardInterrupt:
@ -354,7 +355,9 @@ def worker_main(conn):
break break
if not isinstance(job, Job): if not isinstance(job, Job):
if isinstance(job, File): if isinstance(job, File):
common_data = cPickle.load(open(job.name, 'rb')) with lopen(job.name, 'rb') as f:
common_data = f.read()
common_data = pickle_loads(common_data)
else: else:
common_data = job common_data = job
continue continue
@ -374,7 +377,7 @@ def worker_main(conn):
import traceback import traceback
result = Result(None, as_unicode(err), traceback.format_exc()) result = Result(None, as_unicode(err), traceback.format_exc())
try: try:
eintr_retry_call(conn.send_bytes, cPickle.dumps(result, -1)) eintr_retry_call(conn.send_bytes, pickle_dumps(result))
except EOFError: except EOFError:
break break
except Exception: except Exception:
@ -388,7 +391,8 @@ def worker_main(conn):
def run_main(func): def run_main(func):
from multiprocessing.connection import Client from multiprocessing.connection import Client
from contextlib import closing from contextlib import closing
address, key = cPickle.loads(eintr_retry_call(sys.stdin.read)) stdin = getattr(sys.stdin, 'buffer', sys.stdin)
address, key = msgpack_loads(eintr_retry_call(stdin.read))
with closing(Client(address, authkey=key)) as conn: with closing(Client(address, authkey=key)) as conn:
raise SystemExit(func(conn)) raise SystemExit(func(conn))