Merge from trunk

This commit is contained in:
Charles Haley 2012-06-24 11:52:19 +02:00
commit 57d3864f94
5 changed files with 32 additions and 40 deletions

View File

@ -6,13 +6,22 @@ __license__ = 'GPL v3'
__copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>' __copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en' __docformat__ = 'restructuredtext en'
import os import os, errno
from threading import Thread from threading import Thread
from calibre.constants import iswindows, get_windows_username from calibre.constants import iswindows, get_windows_username
ADDRESS = None ADDRESS = None
def eintr_retry_call(func, *args, **kwargs):
while True:
try:
return func(*args, **kwargs)
except (OSError, IOError) as e:
if e.errno == errno.EINTR:
continue
raise
def gui_socket_address(): def gui_socket_address():
global ADDRESS global ADDRESS
if ADDRESS is None: if ADDRESS is None:

View File

@ -15,6 +15,7 @@ from functools import partial
from calibre import as_unicode, prints from calibre import as_unicode, prints
from calibre.constants import iswindows, DEBUG from calibre.constants import iswindows, DEBUG
from calibre.utils.ipc import eintr_retry_call
def _encode(msg): def _encode(msg):
raw = cPickle.dumps(msg, -1) raw = cPickle.dumps(msg, -1)
@ -68,7 +69,7 @@ class Writer(Thread):
break break
try: try:
self.data_written = True self.data_written = True
self.conn.send_bytes(x) eintr_retry_call(self.conn.send_bytes, x)
except Exception as e: except Exception as e:
self.resultq.put(as_unicode(e)) self.resultq.put(as_unicode(e))
else: else:
@ -112,7 +113,7 @@ class Server(Thread):
def run(self): def run(self):
while self.keep_going: while self.keep_going:
try: try:
conn = self.listener.accept() conn = eintr_retry_call(self.listener.accept)
self.handle_client(conn) self.handle_client(conn)
except: except:
pass pass
@ -125,7 +126,7 @@ class Server(Thread):
def _handle_client(self, conn): def _handle_client(self, conn):
while True: while True:
try: try:
func_name, args, kwargs = conn.recv() func_name, args, kwargs = eintr_retry_call(conn.recv)
except EOFError: except EOFError:
try: try:
conn.close() conn.close()
@ -156,8 +157,8 @@ class Server(Thread):
import traceback import traceback
# Try to tell the client process what error happened # Try to tell the client process what error happened
try: try:
conn.send_bytes(_encode(('failed', (unicode(e), eintr_retry_call(conn.send_bytes, (_encode(('failed', (unicode(e),
as_unicode(traceback.format_exc()))))) as_unicode(traceback.format_exc()))))))
except: except:
pass pass
raise raise

View File

@ -14,6 +14,7 @@ from multiprocessing.connection import Listener, arbitrary_address
from collections import deque from collections import deque
from binascii import hexlify from binascii import hexlify
from calibre.utils.ipc import eintr_retry_call
from calibre.utils.ipc.launch import Worker from calibre.utils.ipc.launch import Worker
from calibre.utils.ipc.worker import PARALLEL_FUNCS from calibre.utils.ipc.worker import PARALLEL_FUNCS
from calibre import detect_ncpus as cpu_count from calibre import detect_ncpus as cpu_count
@ -38,7 +39,7 @@ class ConnectedWorker(Thread):
def start_job(self, job): def start_job(self, job):
notification = PARALLEL_FUNCS[job.name][-1] is not None notification = PARALLEL_FUNCS[job.name][-1] is not None
self.conn.send((job.name, job.args, job.kwargs, job.description)) eintr_retry_call(self.conn.send, (job.name, job.args, job.kwargs, job.description))
if notification: if notification:
self.start() self.start()
else: else:
@ -48,7 +49,7 @@ class ConnectedWorker(Thread):
def run(self): def run(self):
while True: while True:
try: try:
x = self.conn.recv() x = eintr_retry_call(self.conn.recv)
self.notifications.put(x) self.notifications.put(x)
except BaseException: except BaseException:
break break
@ -129,12 +130,7 @@ class Server(Thread):
'CALIBRE_WORKER_KEY' : hexlify(self.auth_key), 'CALIBRE_WORKER_KEY' : hexlify(self.auth_key),
'CALIBRE_WORKER_RESULT' : hexlify(rfile.encode('utf-8')), 'CALIBRE_WORKER_RESULT' : hexlify(rfile.encode('utf-8')),
} }
for i in range(2):
# Try launch twice as occasionally on OS X
# Listener.accept fails with EINTR
cw = self.do_launch(env, gui, redirect_output, rfile) cw = self.do_launch(env, gui, redirect_output, rfile)
if isinstance(cw, ConnectedWorker):
break
if isinstance(cw, basestring): if isinstance(cw, basestring):
raise CriticalError('Failed to launch worker process:\n'+cw) raise CriticalError('Failed to launch worker process:\n'+cw)
if DEBUG: if DEBUG:
@ -146,7 +142,7 @@ class Server(Thread):
try: try:
w(redirect_output=redirect_output) w(redirect_output=redirect_output)
conn = self.listener.accept() conn = eintr_retry_call(self.listener.accept)
if conn is None: if conn is None:
raise Exception('Failed to launch worker process') raise Exception('Failed to launch worker process')
except BaseException: except BaseException:

View File

@ -14,6 +14,7 @@ from threading import Thread
from contextlib import closing from contextlib import closing
from calibre.constants import iswindows from calibre.constants import iswindows
from calibre.utils.ipc import eintr_retry_call
from calibre.utils.ipc.launch import Worker from calibre.utils.ipc.launch import Worker
class WorkerError(Exception): class WorkerError(Exception):
@ -35,30 +36,18 @@ class ConnectedWorker(Thread):
def run(self): def run(self):
conn = tb = None conn = tb = None
for i in range(2):
# On OS X an EINTR can interrupt the accept() call
try: try:
conn = self.listener.accept() conn = eintr_retry_call(self.listener.accept)
break
except: except:
tb = traceback.format_exc() tb = traceback.format_exc()
pass
if conn is None: if conn is None:
self.tb = tb self.tb = tb
return return
self.accepted = True self.accepted = True
with closing(conn): with closing(conn):
try: try:
try: eintr_retry_call(conn.send, self.args)
conn.send(self.args) self.res = eintr_retry_call(conn.recv)
except:
# Maybe an EINTR
conn.send(self.args)
try:
self.res = conn.recv()
except:
# Maybe an EINTR
self.res = conn.recv()
except: except:
self.tb = traceback.format_exc() self.tb = traceback.format_exc()
@ -202,11 +191,7 @@ def main():
address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS'])) address = cPickle.loads(unhexlify(os.environ['CALIBRE_WORKER_ADDRESS']))
key = unhexlify(os.environ['CALIBRE_WORKER_KEY']) key = unhexlify(os.environ['CALIBRE_WORKER_KEY'])
with closing(Client(address, authkey=key)) as conn: with closing(Client(address, authkey=key)) as conn:
try: args = eintr_retry_call(conn.recv)
args = conn.recv()
except:
# Maybe EINTR
args = conn.recv()
try: try:
mod, func, args, kwargs, module_is_source_code = args mod, func, args, kwargs, module_is_source_code = args
if module_is_source_code: if module_is_source_code:

View File

@ -16,6 +16,7 @@ from zipimport import ZipImportError
from calibre import prints from calibre import prints
from calibre.constants import iswindows, isosx from calibre.constants import iswindows, isosx
from calibre.utils.ipc import eintr_retry_call
PARALLEL_FUNCS = { PARALLEL_FUNCS = {
'lrfviewer' : 'lrfviewer' :
@ -75,7 +76,7 @@ class Progress(Thread):
if x is None: if x is None:
break break
try: try:
self.conn.send(x) eintr_retry_call(self.conn.send, x)
except: except:
break break
@ -178,7 +179,7 @@ def main():
key = unhexlify(os.environ['CALIBRE_WORKER_KEY']) key = unhexlify(os.environ['CALIBRE_WORKER_KEY'])
resultf = unhexlify(os.environ['CALIBRE_WORKER_RESULT']).decode('utf-8') resultf = unhexlify(os.environ['CALIBRE_WORKER_RESULT']).decode('utf-8')
with closing(Client(address, authkey=key)) as conn: with closing(Client(address, authkey=key)) as conn:
name, args, kwargs, desc = conn.recv() name, args, kwargs, desc = eintr_retry_call(conn.recv)
if desc: if desc:
prints(desc) prints(desc)
sys.stdout.flush() sys.stdout.flush()