From 1a45fc3d58e7ee05c191ffde6218312381acbd2f Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sat, 19 Jul 2008 02:10:49 -0700 Subject: [PATCH] Fix various bugs in the worker process control code and switch to using local sockets to communicate on Unix --- src/calibre/gui2/main.py | 5 +- src/calibre/parallel.py | 197 +++++++++++++++++++++----------- src/calibre/utils/fontconfig.py | 54 +++++---- 3 files changed, 164 insertions(+), 92 deletions(-) diff --git a/src/calibre/gui2/main.py b/src/calibre/gui2/main.py index 5dafe3c683..e73ada9cb8 100644 --- a/src/calibre/gui2/main.py +++ b/src/calibre/gui2/main.py @@ -1241,9 +1241,10 @@ path_to_ebook to the database. if single_instance is not None and single_instance.is_running() and \ single_instance.send_message('launched:'+repr(args)): return 0 - + extra = '' if iswindows else \ + ('If you\'re sure it is not running, delete the file %s.'%os.path.expanduser('~/.calibre_calibre GUI.lock')) QMessageBox.critical(None, 'Cannot Start '+__appname__, - '

%s is already running.

'%__appname__) + '

%s is already running. %s

'%(__appname__, extra)) return 1 initialize_file_icon_provider() try: diff --git a/src/calibre/parallel.py b/src/calibre/parallel.py index d3531c5525..d33728042b 100644 --- a/src/calibre/parallel.py +++ b/src/calibre/parallel.py @@ -6,7 +6,7 @@ __docformat__ = 'restructuredtext en' ''' Used to run jobs in parallel in separate processes. Features output streaming, support for progress notification as well as job killing. The worker processes -are controlled via a simple protocol run over TCP/IP sockets. The control happens +are controlled via a simple protocol run over sockets. The control happens mainly in two class, :class:`Server` and :class:`Overseer`. The worker is encapsulated in the function :function:`worker`. Every worker process has the environment variable :envvar:`CALIBRE_WORKER` defined. @@ -25,7 +25,7 @@ the worker interrupts the job and dies. The sending of progress and console outp is buffered and asynchronous to prevent the job from being IO bound. ''' import sys, os, gc, cPickle, traceback, atexit, cStringIO, time, signal, \ - subprocess, socket, collections, binascii, re, tempfile, thread + subprocess, socket, collections, binascii, re, tempfile, thread, tempfile from select import select from functools import partial from threading import RLock, Thread, Event @@ -33,6 +33,7 @@ from threading import RLock, Thread, Event from calibre.ptempfile import PersistentTemporaryFile from calibre import iswindows, detect_ncpus, isosx +DEBUG = False #: A mapping from job names to functions that perform the jobs PARALLEL_FUNCS = { @@ -51,11 +52,14 @@ PARALLEL_FUNCS = { isfrozen = hasattr(sys, 'frozen') +isworker = False win32event = __import__('win32event') if iswindows else None win32process = __import__('win32process') if iswindows else None msvcrt = __import__('msvcrt') if iswindows else None +SOCKET_TYPE = socket.AF_UNIX if not iswindows else socket.AF_INET + class WorkerStatus(object): ''' A platform independent class to control child processes. Provides the @@ -223,6 +227,7 @@ class WorkerMother(object): mother = WorkerMother() +_comm_lock = RLock() def write(socket, msg, timeout=5): ''' Write a message on socket. If `msg` is unicode, it is encoded in utf-8. @@ -230,22 +235,29 @@ def write(socket, msg, timeout=5): `msg` is broken into chunks of size 4096 and sent. The :function:`read` function automatically re-assembles the chunks into whole message. ''' - if isinstance(msg, unicode): - msg = msg.encode('utf-8') - length = None - while len(msg) > 0: - if length is None: - length = len(msg) - chunk = ('%-12d'%length) + msg[:4096-12] - msg = msg[4096-12:] - else: - chunk, msg = msg[:4096], msg[4096:] - w = select([], [socket], [], timeout)[1] - if not w: - raise RuntimeError('Write to socket timed out') - if socket.sendall(chunk) is not None: - raise RuntimeError('Failed to write chunk to socket') - + if isworker: + _comm_lock.acquire() + try: + if isinstance(msg, unicode): + msg = msg.encode('utf-8') + if DEBUG: + print >>sys.__stdout__, 'write(%s):'%('worker' if isworker else 'overseer'), repr(msg) + length = None + while len(msg) > 0: + if length is None: + length = len(msg) + chunk = ('%-12d'%length) + msg[:4096-12] + msg = msg[4096-12:] + else: + chunk, msg = msg[:4096], msg[4096:] + w = select([], [socket], [], timeout)[1] + if not w: + raise RuntimeError('Write to socket timed out') + if socket.sendall(chunk) is not None: + raise RuntimeError('Failed to write chunk to socket') + finally: + if isworker: + _comm_lock.release() def read(socket, timeout=5): ''' @@ -253,24 +265,33 @@ def read(socket, timeout=5): function. Raises a `RuntimeError` if the message is corrpted. Can return an empty string. ''' - buf = cStringIO.StringIO() - length = None - while select([socket],[],[],timeout)[0]: - msg = socket.recv(4096) - if not msg: - break - if length is None: - length, msg = int(msg[:12]), msg[12:] - buf.write(msg) - if buf.tell() >= length: - break - if not length: - return '' - msg = buf.getvalue()[:length] - if len(msg) < length: - raise RuntimeError('Corrupted packet received') - - return msg + if isworker: + _comm_lock.acquire() + try: + buf = cStringIO.StringIO() + length = None + while select([socket],[],[],timeout)[0]: + msg = socket.recv(4096) + if not msg: + break + if length is None: + length, msg = int(msg[:12]), msg[12:] + buf.write(msg) + if buf.tell() >= length: + break + if not length: + if DEBUG: + print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), 'nothing' + return '' + msg = buf.getvalue()[:length] + if len(msg) < length: + raise RuntimeError('Corrupted packet received') + if DEBUG: + print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), repr(msg) + return msg + finally: + if isworker: + _comm_lock.release() class RepeatingTimer(Thread): ''' @@ -306,11 +327,13 @@ class Overseer(object): INTERVAL = 0.1 def __init__(self, server, port, timeout=5): - self.worker_status = mother.spawn_worker('127.0.0.1:%d'%port) + self.worker_status = mother.spawn_worker('127.0.0.1:'+str(port)) self.socket = server.accept()[0] # Needed if terminate called hwen interpreter is shutting down self.os = os self.signal = signal + self.on_probation = False + self.terminated = False self.working = False self.timeout = timeout @@ -329,6 +352,7 @@ class Overseer(object): def terminate(self): 'Kill worker process.' + self.terminated = True try: if self.socket: self.write('STOP:') @@ -363,7 +387,9 @@ class Overseer(object): def __eq__(self, other): return hasattr(other, 'process') and hasattr(other, 'worker_pid') and self.worker_pid == other.worker_pid - def __bool__(self): + def is_viable(self): + if self.terminated: + return False return self.worker_status.is_alive() def select(self, timeout=0): @@ -386,6 +412,7 @@ class Overseer(object): self.output = job.output if callable(job.output) else sys.stdout.write self.progress = job.progress if callable(job.progress) else None self.job = job + self.last_report = time.time() def control(self): ''' @@ -397,8 +424,21 @@ class Overseer(object): ''' if select([self.socket],[],[],0)[0]: msg = self.read() + if msg: + self.on_probation = False + self.last_report = time.time() + else: + if self.on_probation: + self.terminate() + return Result(None, ControlError('Worker process died unexpectedly'), '') + else: + self.on_probation = True + return word, msg = msg.partition(':')[0], msg.partition(':')[-1] - if word == 'RESULT': + if word == 'PING': + self.write('OK') + return + elif word == 'RESULT': self.write('OK') return Result(cPickle.loads(msg), None, None) elif word == 'OUTPUT': @@ -421,11 +461,11 @@ class Overseer(object): return Result(None, *cPickle.loads(msg)) else: self.terminate() - return Result(None, ControlError('Worker sent invalid msg: %s', repr(msg)), '') - if not self.worker_status.is_alive(): - return Result(None, ControlError('Worker process died unexpectedly with returncode: %d'%self.process.returncode), '') + return Result(None, ControlError('Worker sent invalid msg: %s'%repr(msg)), '') + if not self.worker_status.is_alive() or time.time() - self.last_report > 180: + self.terminate() + return Result(None, ControlError('Worker process died unexpectedly with returncode: %s'%str(self.process.returncode)), '') - class Job(object): @@ -458,18 +498,23 @@ class Server(Thread): KILL_RESULT = Overseer.KILL_RESULT START_PORT = 10013 + PID = os.getpid() + def __init__(self, number_of_workers=detect_ncpus()): Thread.__init__(self) self.setDaemon(True) - self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.port = self.START_PORT + self.server_socket = socket.socket(SOCKET_TYPE, socket.SOCK_STREAM) + self.port = tempfile.mktemp(prefix='calibre_server')+'_%d_'%self.PID if not iswindows else self.START_PORT while True: try: - self.server_socket.bind(('localhost', self.port)) + address = ('localhost', self.port) if iswindows else self.port + self.server_socket.bind(address) break - except: - self.port += 1 + except socket.error: + self.port += (1 if iswindows else '1') + if not iswindows: + atexit.register(os.unlink, self.port) self.server_socket.listen(5) self.number_of_workers = number_of_workers self.pool, self.jobs, self.working, self.results = [], collections.deque(), [], {} @@ -525,7 +570,7 @@ class Server(Thread): res = Result(None, unicode(err), traceback.format_exc()) job.done(res) o = None - if o: + if o and o.is_viable(): with self.working_lock: self.working.append(o) @@ -542,7 +587,7 @@ class Server(Thread): done.append(o) for o in done: self.working.remove(o) - if o: + if o and o.is_viable(): with self.pool_lock: self.pool.append(o) @@ -601,9 +646,11 @@ class BufferedSender(object): self.socket = socket self.wbuf, self.pbuf = [], [] self.wlock, self.plock = RLock(), RLock() + self.last_report = None self.timer = RepeatingTimer(0.5, self.send, 'BufferedSender') self.timer.start() + def write(self, msg): if not isinstance(msg, basestring): msg = unicode(msg) @@ -623,20 +670,31 @@ class BufferedSender(object): if not select([], [self.socket], [], 30)[1]: print >>sys.__stderr__, 'Cannot pipe to overseer' return - + + reported = False with self.wlock: if self.wbuf: msg = cPickle.dumps(self.wbuf, -1) self.wbuf = [] write(self.socket, 'OUTPUT:'+msg) read(self.socket, 10) + reported = True with self.plock: if self.pbuf: msg = cPickle.dumps(self.pbuf, -1) self.pbuf = [] write(self.socket, 'PROGRESS:'+msg) - read(self.socket, 10) + read(self.socket, 10) + reported = True + + if self.last_report is not None: + if reported: + self.last_report = time.time() + elif time.time() - self.last_report > 60: + write(self.socket, 'PING:') + read(self.socket, 10) + self.last_report = time.time() def notify(self, percent, msg=''): with self.plock: @@ -652,19 +710,25 @@ def get_func(name): return func, kwdargs, notification def work(client_socket, func, args, kwdargs): - func, kargs, notification = get_func(func) - if notification is not None and hasattr(sys.stdout, 'notify'): - kargs[notification] = sys.stdout.notify - kargs.update(kwdargs) - res = func(*args, **kargs) - if hasattr(sys.stdout, 'send'): - sys.stdout.send() - return res + sys.stdout.last_report = time.time() + try: + func, kargs, notification = get_func(func) + if notification is not None and hasattr(sys.stdout, 'notify'): + kargs[notification] = sys.stdout.notify + kargs.update(kwdargs) + res = func(*args, **kargs) + if hasattr(sys.stdout, 'send'): + sys.stdout.send() + return res + finally: + sys.stdout.last_report = None + time.sleep(5) # Give any in progress BufferedSend time to complete def worker(host, port): - client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - client_socket.connect((host, port)) + client_socket = socket.socket(SOCKET_TYPE, socket.SOCK_STREAM) + address = (host, port) if iswindows else port + client_socket.connect(address) write(client_socket, 'CALIBRE_WORKER:%d'%os.getpid()) msg = read(client_socket, timeout=10) if msg != 'OK': @@ -685,10 +749,11 @@ def worker(host, port): try: result = work(client_socket, func, args, kwdargs) write(client_socket, 'RESULT:'+ cPickle.dumps(result)) - except (Exception, SystemExit), err: + except BaseException, err: exception = (err.__class__.__name__, unicode(str(err), 'utf-8', 'replace')) tb = traceback.format_exc() - write(client_socket, 'ERROR:'+cPickle.dumps((exception, tb),-1)) + msg = 'ERROR:'+cPickle.dumps((exception, tb),-1) + write(client_socket, msg) if read(client_socket, 10) != 'OK': break gc.collect() @@ -714,11 +779,13 @@ def free_spirit(path): func(*args, **kargs) def main(args=sys.argv): + global isworker + isworker = True args = args[1].split(':') if len(args) == 1: free_spirit(binascii.unhexlify(re.sub(r'[^a-f0-9A-F]', '', args[0]))) else: - worker(args[0].replace("'", ''), int(args[1])) + worker(args[0].replace("'", ''), int(args[1]) if iswindows else args[1]) return 0 if __name__ == '__main__': diff --git a/src/calibre/utils/fontconfig.py b/src/calibre/utils/fontconfig.py index 3e74362720..4275d03479 100644 --- a/src/calibre/utils/fontconfig.py +++ b/src/calibre/utils/fontconfig.py @@ -130,36 +130,40 @@ lib.FcConfigBuildFonts.restype = c_int _init_error = None _initialized = False -from threading import Timer -def _do_init(): - # Initialize the fontconfig library. This has to be done manually - # for the OS X bundle as it may have its own private fontconfig. - if hasattr(sys, 'frameworks_dir'): - config_dir = os.path.join(os.path.dirname(getattr(sys, 'frameworks_dir')), 'Resources', 'fonts') - if isinstance(config_dir, unicode): - config_dir = config_dir.encode(sys.getfilesystemencoding()) - config = lib.FcConfigCreate() - if not lib.FcConfigParseAndLoad(config, os.path.join(config_dir, 'fonts.conf'), 1): - _init_error = 'Could not parse the fontconfig configuration' +from threading import Thread + +class FontScanner(Thread): + def run(self): + # Initialize the fontconfig library. This has to be done manually + # for the OS X bundle as it may have its own private fontconfig. + if getattr(sys, 'frameworks_dir', False): + config_dir = os.path.join(os.path.dirname(getattr(sys, 'frameworks_dir')), 'Resources', 'fonts') + if isinstance(config_dir, unicode): + config_dir = config_dir.encode(sys.getfilesystemencoding()) + config = lib.FcConfigCreate() + if not lib.FcConfigParseAndLoad(config, os.path.join(config_dir, 'fonts.conf'), 1): + _init_error = 'Could not parse the fontconfig configuration' + return + if not lib.FcConfigBuildFonts(config): + _init_error = 'Could not build fonts' + return + if not lib.FcConfigSetCurrent(config): + _init_error = 'Could not set font config' + return + elif not lib.FcInit(): + _init_error = _('Could not initialize the fontconfig library') return - if not lib.FcConfigBuildFonts(config): - _init_error = 'Could not build fonts' - return - if not lib.FcConfigSetCurrent(config): - _init_error = 'Could not set font config' - return - elif not lib.FcInit(): - _init_error = _('Could not initialize the fontconfig library') - return - global _initialized - _initialized = True + global _initialized + _initialized = True -_init_timer = Timer(0.1, _do_init) -_init_timer.start() +_scanner = FontScanner() +_scanner.start() def join(): - _init_timer.join() + _scanner.join(120) + if _scanner.isAlive(): + raise RuntimeError('Scanning for system fonts seems to have hung. Try again in a little while.') if _init_error is not None: raise RuntimeError(_init_error)