From 3c6879cfa9050ba94228d507ef14c99a5f179419 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sun, 24 Jun 2012 14:11:08 +0530 Subject: [PATCH] More robust handling of EINTR during IPC --- src/calibre/utils/ipc/__init__.py | 11 ++++++++- src/calibre/utils/ipc/proxy.py | 11 ++++----- src/calibre/utils/ipc/server.py | 14 +++++------- src/calibre/utils/ipc/simple_worker.py | 31 +++++++------------------- src/calibre/utils/ipc/worker.py | 5 +++-- 5 files changed, 32 insertions(+), 40 deletions(-) diff --git a/src/calibre/utils/ipc/__init__.py b/src/calibre/utils/ipc/__init__.py index 93db2e9fc7..f0a8eb0cec 100644 --- a/src/calibre/utils/ipc/__init__.py +++ b/src/calibre/utils/ipc/__init__.py @@ -6,13 +6,22 @@ __license__ = 'GPL v3' __copyright__ = '2009, Kovid Goyal ' __docformat__ = 'restructuredtext en' -import os +import os, errno from threading import Thread from calibre.constants import iswindows, get_windows_username ADDRESS = None +def eintr_retry_call(func, *args, **kwargs): + while True: + try: + return func(*args, **kwargs) + except (OSError, IOError) as e: + if e.errno == errno.EINTR: + continue + raise + def gui_socket_address(): global ADDRESS if ADDRESS is None: diff --git a/src/calibre/utils/ipc/proxy.py b/src/calibre/utils/ipc/proxy.py index 3b037a42a6..897d79b423 100644 --- a/src/calibre/utils/ipc/proxy.py +++ b/src/calibre/utils/ipc/proxy.py @@ -15,6 +15,7 @@ from functools import partial from calibre import as_unicode, prints from calibre.constants import iswindows, DEBUG +from calibre.utils.ipc import eintr_retry_call def _encode(msg): raw = cPickle.dumps(msg, -1) @@ -68,7 +69,7 @@ class Writer(Thread): break try: self.data_written = True - self.conn.send_bytes(x) + eintr_retry_call(self.conn.send_bytes, x) except Exception as e: self.resultq.put(as_unicode(e)) else: @@ -112,7 +113,7 @@ class Server(Thread): def run(self): while self.keep_going: try: - conn = self.listener.accept() + conn = eintr_retry_call(self.listener.accept) self.handle_client(conn) except: pass @@ -125,7 +126,7 @@ class Server(Thread): def _handle_client(self, conn): while True: try: - func_name, args, kwargs = conn.recv() + func_name, args, kwargs = eintr_retry_call(conn.recv) except EOFError: try: conn.close() @@ -156,8 +157,8 @@ class Server(Thread): import traceback # Try to tell the client process what error happened try: - conn.send_bytes(_encode(('failed', (unicode(e), - as_unicode(traceback.format_exc()))))) + eintr_retry_call(conn.send_bytes, (_encode(('failed', (unicode(e), + as_unicode(traceback.format_exc())))))) except: pass raise diff --git a/src/calibre/utils/ipc/server.py b/src/calibre/utils/ipc/server.py index a4c5b5d881..2912971bf6 100644 --- a/src/calibre/utils/ipc/server.py +++ b/src/calibre/utils/ipc/server.py @@ -14,6 +14,7 @@ from multiprocessing.connection import Listener, arbitrary_address from collections import deque from binascii import hexlify +from calibre.utils.ipc import eintr_retry_call from calibre.utils.ipc.launch import Worker from calibre.utils.ipc.worker import PARALLEL_FUNCS from calibre import detect_ncpus as cpu_count @@ -38,7 +39,7 @@ class ConnectedWorker(Thread): def start_job(self, job): notification = PARALLEL_FUNCS[job.name][-1] is not None - self.conn.send((job.name, job.args, job.kwargs, job.description)) + eintr_retry_call(self.conn.send, (job.name, job.args, job.kwargs, job.description)) if notification: self.start() else: @@ -48,7 +49,7 @@ class ConnectedWorker(Thread): def run(self): while True: try: - x = self.conn.recv() + x = eintr_retry_call(self.conn.recv) self.notifications.put(x) except BaseException: break @@ -129,12 +130,7 @@ class Server(Thread): 'CALIBRE_WORKER_KEY' : hexlify(self.auth_key), 'CALIBRE_WORKER_RESULT' : hexlify(rfile.encode('utf-8')), } - for i in range(2): - # Try launch twice as occasionally on OS X - # Listener.accept fails with EINTR - cw = self.do_launch(env, gui, redirect_output, rfile) - if isinstance(cw, ConnectedWorker): - break + cw = self.do_launch(env, gui, redirect_output, rfile) if isinstance(cw, basestring): raise CriticalError('Failed to launch worker process:\n'+cw) if DEBUG: @@ -146,7 +142,7 @@ class Server(Thread): try: w(redirect_output=redirect_output) - conn = self.listener.accept() + conn = eintr_retry_call(self.listener.accept) if conn is None: raise Exception('Failed to launch worker process') except BaseException: diff --git a/src/calibre/utils/ipc/simple_worker.py b/src/calibre/utils/ipc/simple_worker.py index 68d879f8a8..ca84f9d84d 100644 --- a/src/calibre/utils/ipc/simple_worker.py +++ b/src/calibre/utils/ipc/simple_worker.py @@ -14,6 +14,7 @@ from threading import Thread from contextlib import closing from calibre.constants import iswindows +from calibre.utils.ipc import eintr_retry_call from calibre.utils.ipc.launch import Worker class WorkerError(Exception): @@ -35,30 +36,18 @@ class ConnectedWorker(Thread): def run(self): conn = tb = None - for i in range(2): - # On OS X an EINTR can interrupt the accept() call - try: - conn = self.listener.accept() - break - except: - tb = traceback.format_exc() - pass + try: + conn = eintr_retry_call(self.listener.accept) + except: + tb = traceback.format_exc() if conn is None: self.tb = tb return self.accepted = True with closing(conn): try: - try: - conn.send(self.args) - except: - # Maybe an EINTR - conn.send(self.args) - try: - self.res = conn.recv() - except: - # Maybe an EINTR - self.res = conn.recv() + eintr_retry_call(conn.send, self.args) + self.res = eintr_retry_call(conn.recv) except: self.tb = traceback.format_exc() @@ -202,11 +191,7 @@ def main(): address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS'])) key = unhexlify(os.environ['CALIBRE_WORKER_KEY']) with closing(Client(address, authkey=key)) as conn: - try: - args = conn.recv() - except: - # Maybe EINTR - args = conn.recv() + args = eintr_retry_call(conn.recv) try: mod, func, args, kwargs, module_is_source_code = args if module_is_source_code: diff --git a/src/calibre/utils/ipc/worker.py b/src/calibre/utils/ipc/worker.py index 0c43f00dca..08374400ac 100644 --- a/src/calibre/utils/ipc/worker.py +++ b/src/calibre/utils/ipc/worker.py @@ -16,6 +16,7 @@ from zipimport import ZipImportError from calibre import prints from calibre.constants import iswindows, isosx +from calibre.utils.ipc import eintr_retry_call PARALLEL_FUNCS = { 'lrfviewer' : @@ -75,7 +76,7 @@ class Progress(Thread): if x is None: break try: - self.conn.send(x) + eintr_retry_call(self.conn.send, x) except: break @@ -178,7 +179,7 @@ def main(): key = unhexlify(os.environ['CALIBRE_WORKER_KEY']) resultf = unhexlify(os.environ['CALIBRE_WORKER_RESULT']).decode('utf-8') with closing(Client(address, authkey=key)) as conn: - name, args, kwargs, desc = conn.recv() + name, args, kwargs, desc = eintr_retry_call(conn.recv) if desc: prints(desc) sys.stdout.flush()