More robust handling of EINTR during IPC

This commit is contained in:
Kovid Goyal 2012-06-24 14:11:08 +05:30
parent e127d1a5e1
commit 3c6879cfa9
5 changed files with 32 additions and 40 deletions

View File

@ -6,13 +6,22 @@ __license__ = 'GPL v3'
__copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import os
import os, errno
from threading import Thread
from calibre.constants import iswindows, get_windows_username
ADDRESS = None
def eintr_retry_call(func, *args, **kwargs):
while True:
try:
return func(*args, **kwargs)
except (OSError, IOError) as e:
if e.errno == errno.EINTR:
continue
raise
def gui_socket_address():
global ADDRESS
if ADDRESS is None:

View File

@ -15,6 +15,7 @@ from functools import partial
from calibre import as_unicode, prints
from calibre.constants import iswindows, DEBUG
from calibre.utils.ipc import eintr_retry_call
def _encode(msg):
raw = cPickle.dumps(msg, -1)
@ -68,7 +69,7 @@ class Writer(Thread):
break
try:
self.data_written = True
self.conn.send_bytes(x)
eintr_retry_call(self.conn.send_bytes, x)
except Exception as e:
self.resultq.put(as_unicode(e))
else:
@ -112,7 +113,7 @@ class Server(Thread):
def run(self):
while self.keep_going:
try:
conn = self.listener.accept()
conn = eintr_retry_call(self.listener.accept)
self.handle_client(conn)
except:
pass
@ -125,7 +126,7 @@ class Server(Thread):
def _handle_client(self, conn):
while True:
try:
func_name, args, kwargs = conn.recv()
func_name, args, kwargs = eintr_retry_call(conn.recv)
except EOFError:
try:
conn.close()
@ -156,8 +157,8 @@ class Server(Thread):
import traceback
# Try to tell the client process what error happened
try:
conn.send_bytes(_encode(('failed', (unicode(e),
as_unicode(traceback.format_exc())))))
eintr_retry_call(conn.send_bytes, (_encode(('failed', (unicode(e),
as_unicode(traceback.format_exc()))))))
except:
pass
raise

View File

@ -14,6 +14,7 @@ from multiprocessing.connection import Listener, arbitrary_address
from collections import deque
from binascii import hexlify
from calibre.utils.ipc import eintr_retry_call
from calibre.utils.ipc.launch import Worker
from calibre.utils.ipc.worker import PARALLEL_FUNCS
from calibre import detect_ncpus as cpu_count
@ -38,7 +39,7 @@ class ConnectedWorker(Thread):
def start_job(self, job):
notification = PARALLEL_FUNCS[job.name][-1] is not None
self.conn.send((job.name, job.args, job.kwargs, job.description))
eintr_retry_call(self.conn.send, (job.name, job.args, job.kwargs, job.description))
if notification:
self.start()
else:
@ -48,7 +49,7 @@ class ConnectedWorker(Thread):
def run(self):
while True:
try:
x = self.conn.recv()
x = eintr_retry_call(self.conn.recv)
self.notifications.put(x)
except BaseException:
break
@ -129,12 +130,7 @@ class Server(Thread):
'CALIBRE_WORKER_KEY' : hexlify(self.auth_key),
'CALIBRE_WORKER_RESULT' : hexlify(rfile.encode('utf-8')),
}
for i in range(2):
# Try launch twice as occasionally on OS X
# Listener.accept fails with EINTR
cw = self.do_launch(env, gui, redirect_output, rfile)
if isinstance(cw, ConnectedWorker):
break
cw = self.do_launch(env, gui, redirect_output, rfile)
if isinstance(cw, basestring):
raise CriticalError('Failed to launch worker process:\n'+cw)
if DEBUG:
@ -146,7 +142,7 @@ class Server(Thread):
try:
w(redirect_output=redirect_output)
conn = self.listener.accept()
conn = eintr_retry_call(self.listener.accept)
if conn is None:
raise Exception('Failed to launch worker process')
except BaseException:

View File

@ -14,6 +14,7 @@ from threading import Thread
from contextlib import closing
from calibre.constants import iswindows
from calibre.utils.ipc import eintr_retry_call
from calibre.utils.ipc.launch import Worker
class WorkerError(Exception):
@ -35,30 +36,18 @@ class ConnectedWorker(Thread):
def run(self):
conn = tb = None
for i in range(2):
# On OS X an EINTR can interrupt the accept() call
try:
conn = self.listener.accept()
break
except:
tb = traceback.format_exc()
pass
try:
conn = eintr_retry_call(self.listener.accept)
except:
tb = traceback.format_exc()
if conn is None:
self.tb = tb
return
self.accepted = True
with closing(conn):
try:
try:
conn.send(self.args)
except:
# Maybe an EINTR
conn.send(self.args)
try:
self.res = conn.recv()
except:
# Maybe an EINTR
self.res = conn.recv()
eintr_retry_call(conn.send, self.args)
self.res = eintr_retry_call(conn.recv)
except:
self.tb = traceback.format_exc()
@ -202,11 +191,7 @@ def main():
address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS']))
key = unhexlify(os.environ['CALIBRE_WORKER_KEY'])
with closing(Client(address, authkey=key)) as conn:
try:
args = conn.recv()
except:
# Maybe EINTR
args = conn.recv()
args = eintr_retry_call(conn.recv)
try:
mod, func, args, kwargs, module_is_source_code = args
if module_is_source_code:

View File

@ -16,6 +16,7 @@ from zipimport import ZipImportError
from calibre import prints
from calibre.constants import iswindows, isosx
from calibre.utils.ipc import eintr_retry_call
PARALLEL_FUNCS = {
'lrfviewer' :
@ -75,7 +76,7 @@ class Progress(Thread):
if x is None:
break
try:
self.conn.send(x)
eintr_retry_call(self.conn.send, x)
except:
break
@ -178,7 +179,7 @@ def main():
key = unhexlify(os.environ['CALIBRE_WORKER_KEY'])
resultf = unhexlify(os.environ['CALIBRE_WORKER_RESULT']).decode('utf-8')
with closing(Client(address, authkey=key)) as conn:
name, args, kwargs, desc = conn.recv()
name, args, kwargs, desc = eintr_retry_call(conn.recv)
if desc:
prints(desc)
sys.stdout.flush()