diff --git a/src/calibre/utils/ipc/pool.py b/src/calibre/utils/ipc/pool.py index a36de998cd..aff69e4354 100644 --- a/src/calibre/utils/ipc/pool.py +++ b/src/calibre/utils/ipc/pool.py @@ -6,7 +6,7 @@ from __future__ import (unicode_literals, division, absolute_import, __license__ = 'GPL v3' __copyright__ = '2014, Kovid Goyal ' -import os, cPickle, sys +import os, sys from threading import Thread from collections import namedtuple from Queue import Queue @@ -16,6 +16,7 @@ from calibre.constants import iswindows, DEBUG from calibre.ptempfile import PersistentTemporaryFile from calibre.utils import join_with_timeout from calibre.utils.ipc import eintr_retry_call +from calibre.utils.serialize import msgpack_dumps, msgpack_loads, pickle_dumps, pickle_loads Job = namedtuple('Job', 'id module func args kwargs') Result = namedtuple('Result', 'value err traceback') @@ -95,7 +96,7 @@ class Worker(object): self.name = name or '' def __call__(self, job): - eintr_retry_call(self.conn.send_bytes, cPickle.dumps(job, -1)) + eintr_retry_call(self.conn.send_bytes, pickle_dumps(job)) if job is not None: self.job_id = job.id t = Thread(target=self.recv, name='PoolWorker-'+self.name) @@ -104,7 +105,7 @@ class Worker(object): def recv(self): try: - result = cPickle.loads(eintr_retry_call(self.conn.recv_bytes)) + result = pickle_loads(eintr_retry_call(self.conn.recv_bytes)) wr = WorkerResult(self.job_id, result, False, self) except Exception as err: import traceback @@ -130,7 +131,7 @@ class Pool(Thread): self.results = Queue() self.tracker = Queue() self.terminal_failure = None - self.common_data = cPickle.dumps(None, -1) + self.common_data = pickle_dumps(None) self.worker_data = None self.shutting_down = False @@ -192,7 +193,7 @@ class Pool(Thread): p.stdin.flush(), p.stdin.close() conn = eintr_retry_call(self.listener.accept) w = Worker(p, conn, self.events, self.name) - if self.common_data != cPickle.dumps(None, -1): + if self.common_data != pickle_dumps(None): w.set_common_data(self.common_data) return w @@ -211,7 +212,7 @@ class Pool(Thread): from calibre.utils.ipc.server import create_listener self.auth_key = os.urandom(32) self.address, self.listener = create_listener(self.auth_key) - self.worker_data = cPickle.dumps((self.address, self.auth_key), -1) + self.worker_data = msgpack_dumps((self.address, self.auth_key)) if self.start_worker() is False: return @@ -243,12 +244,12 @@ class Pool(Thread): return False self.results.put(worker_result) else: - self.common_data = cPickle.dumps(event, -1) + self.common_data = pickle_dumps(event) if len(self.common_data) > MAX_SIZE: self.cd_file = PersistentTemporaryFile('pool_common_data') with self.cd_file as f: f.write(self.common_data) - self.common_data = cPickle.dumps(File(f.name), -1) + self.common_data = pickle_dumps(File(f.name)) for worker in self.available_workers: try: worker.set_common_data(self.common_data) @@ -340,7 +341,7 @@ def worker_main(conn): common_data = None while True: try: - job = cPickle.loads(eintr_retry_call(conn.recv_bytes)) + job = pickle_loads(eintr_retry_call(conn.recv_bytes)) except EOFError: break except KeyboardInterrupt: @@ -354,7 +355,9 @@ def worker_main(conn): break if not isinstance(job, Job): if isinstance(job, File): - common_data = cPickle.load(open(job.name, 'rb')) + with lopen(job.name, 'rb') as f: + common_data = f.read() + common_data = pickle_loads(common_data) else: common_data = job continue @@ -374,7 +377,7 @@ def worker_main(conn): import traceback result = Result(None, as_unicode(err), traceback.format_exc()) try: - eintr_retry_call(conn.send_bytes, cPickle.dumps(result, -1)) + eintr_retry_call(conn.send_bytes, pickle_dumps(result)) except EOFError: break except Exception: @@ -388,7 +391,8 @@ def worker_main(conn): def run_main(func): from multiprocessing.connection import Client from contextlib import closing - address, key = cPickle.loads(eintr_retry_call(sys.stdin.read)) + stdin = getattr(sys.stdin, 'buffer', sys.stdin) + address, key = msgpack_loads(eintr_retry_call(stdin.read)) with closing(Client(address, authkey=key)) as conn: raise SystemExit(func(conn))