Fix various bugs in the worker process control code and switch to using local sockets to communicate on Unix

This commit is contained in:
Kovid Goyal 2008-07-19 02:10:49 -07:00
parent d2c5ba9de4
commit 1a45fc3d58
3 changed files with 164 additions and 92 deletions

View File

@ -1241,9 +1241,10 @@ path_to_ebook to the database.
if single_instance is not None and single_instance.is_running() and \ if single_instance is not None and single_instance.is_running() and \
single_instance.send_message('launched:'+repr(args)): single_instance.send_message('launched:'+repr(args)):
return 0 return 0
extra = '' if iswindows else \
('If you\'re sure it is not running, delete the file %s.'%os.path.expanduser('~/.calibre_calibre GUI.lock'))
QMessageBox.critical(None, 'Cannot Start '+__appname__, QMessageBox.critical(None, 'Cannot Start '+__appname__,
'<p>%s is already running.</p>'%__appname__) '<p>%s is already running. %s</p>'%(__appname__, extra))
return 1 return 1
initialize_file_icon_provider() initialize_file_icon_provider()
try: try:

View File

@ -6,7 +6,7 @@ __docformat__ = 'restructuredtext en'
''' '''
Used to run jobs in parallel in separate processes. Features output streaming, Used to run jobs in parallel in separate processes. Features output streaming,
support for progress notification as well as job killing. The worker processes support for progress notification as well as job killing. The worker processes
are controlled via a simple protocol run over TCP/IP sockets. The control happens are controlled via a simple protocol run over sockets. The control happens
mainly in two class, :class:`Server` and :class:`Overseer`. The worker is mainly in two class, :class:`Server` and :class:`Overseer`. The worker is
encapsulated in the function :function:`worker`. Every worker process encapsulated in the function :function:`worker`. Every worker process
has the environment variable :envvar:`CALIBRE_WORKER` defined. has the environment variable :envvar:`CALIBRE_WORKER` defined.
@ -25,7 +25,7 @@ the worker interrupts the job and dies. The sending of progress and console outp
is buffered and asynchronous to prevent the job from being IO bound. is buffered and asynchronous to prevent the job from being IO bound.
''' '''
import sys, os, gc, cPickle, traceback, atexit, cStringIO, time, signal, \ import sys, os, gc, cPickle, traceback, atexit, cStringIO, time, signal, \
subprocess, socket, collections, binascii, re, tempfile, thread subprocess, socket, collections, binascii, re, tempfile, thread, tempfile
from select import select from select import select
from functools import partial from functools import partial
from threading import RLock, Thread, Event from threading import RLock, Thread, Event
@ -33,6 +33,7 @@ from threading import RLock, Thread, Event
from calibre.ptempfile import PersistentTemporaryFile from calibre.ptempfile import PersistentTemporaryFile
from calibre import iswindows, detect_ncpus, isosx from calibre import iswindows, detect_ncpus, isosx
DEBUG = False
#: A mapping from job names to functions that perform the jobs #: A mapping from job names to functions that perform the jobs
PARALLEL_FUNCS = { PARALLEL_FUNCS = {
@ -51,11 +52,14 @@ PARALLEL_FUNCS = {
isfrozen = hasattr(sys, 'frozen') isfrozen = hasattr(sys, 'frozen')
isworker = False
win32event = __import__('win32event') if iswindows else None win32event = __import__('win32event') if iswindows else None
win32process = __import__('win32process') if iswindows else None win32process = __import__('win32process') if iswindows else None
msvcrt = __import__('msvcrt') if iswindows else None msvcrt = __import__('msvcrt') if iswindows else None
SOCKET_TYPE = socket.AF_UNIX if not iswindows else socket.AF_INET
class WorkerStatus(object): class WorkerStatus(object):
''' '''
A platform independent class to control child processes. Provides the A platform independent class to control child processes. Provides the
@ -223,6 +227,7 @@ class WorkerMother(object):
mother = WorkerMother() mother = WorkerMother()
_comm_lock = RLock()
def write(socket, msg, timeout=5): def write(socket, msg, timeout=5):
''' '''
Write a message on socket. If `msg` is unicode, it is encoded in utf-8. Write a message on socket. If `msg` is unicode, it is encoded in utf-8.
@ -230,8 +235,13 @@ def write(socket, msg, timeout=5):
`msg` is broken into chunks of size 4096 and sent. The :function:`read` function `msg` is broken into chunks of size 4096 and sent. The :function:`read` function
automatically re-assembles the chunks into whole message. automatically re-assembles the chunks into whole message.
''' '''
if isworker:
_comm_lock.acquire()
try:
if isinstance(msg, unicode): if isinstance(msg, unicode):
msg = msg.encode('utf-8') msg = msg.encode('utf-8')
if DEBUG:
print >>sys.__stdout__, 'write(%s):'%('worker' if isworker else 'overseer'), repr(msg)
length = None length = None
while len(msg) > 0: while len(msg) > 0:
if length is None: if length is None:
@ -245,7 +255,9 @@ def write(socket, msg, timeout=5):
raise RuntimeError('Write to socket timed out') raise RuntimeError('Write to socket timed out')
if socket.sendall(chunk) is not None: if socket.sendall(chunk) is not None:
raise RuntimeError('Failed to write chunk to socket') raise RuntimeError('Failed to write chunk to socket')
finally:
if isworker:
_comm_lock.release()
def read(socket, timeout=5): def read(socket, timeout=5):
''' '''
@ -253,6 +265,9 @@ def read(socket, timeout=5):
function. Raises a `RuntimeError` if the message is corrpted. Can return an function. Raises a `RuntimeError` if the message is corrpted. Can return an
empty string. empty string.
''' '''
if isworker:
_comm_lock.acquire()
try:
buf = cStringIO.StringIO() buf = cStringIO.StringIO()
length = None length = None
while select([socket],[],[],timeout)[0]: while select([socket],[],[],timeout)[0]:
@ -265,12 +280,18 @@ def read(socket, timeout=5):
if buf.tell() >= length: if buf.tell() >= length:
break break
if not length: if not length:
if DEBUG:
print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), 'nothing'
return '' return ''
msg = buf.getvalue()[:length] msg = buf.getvalue()[:length]
if len(msg) < length: if len(msg) < length:
raise RuntimeError('Corrupted packet received') raise RuntimeError('Corrupted packet received')
if DEBUG:
print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), repr(msg)
return msg return msg
finally:
if isworker:
_comm_lock.release()
class RepeatingTimer(Thread): class RepeatingTimer(Thread):
''' '''
@ -306,11 +327,13 @@ class Overseer(object):
INTERVAL = 0.1 INTERVAL = 0.1
def __init__(self, server, port, timeout=5): def __init__(self, server, port, timeout=5):
self.worker_status = mother.spawn_worker('127.0.0.1:%d'%port) self.worker_status = mother.spawn_worker('127.0.0.1:'+str(port))
self.socket = server.accept()[0] self.socket = server.accept()[0]
# Needed if terminate called hwen interpreter is shutting down # Needed if terminate called hwen interpreter is shutting down
self.os = os self.os = os
self.signal = signal self.signal = signal
self.on_probation = False
self.terminated = False
self.working = False self.working = False
self.timeout = timeout self.timeout = timeout
@ -329,6 +352,7 @@ class Overseer(object):
def terminate(self): def terminate(self):
'Kill worker process.' 'Kill worker process.'
self.terminated = True
try: try:
if self.socket: if self.socket:
self.write('STOP:') self.write('STOP:')
@ -363,7 +387,9 @@ class Overseer(object):
def __eq__(self, other): def __eq__(self, other):
return hasattr(other, 'process') and hasattr(other, 'worker_pid') and self.worker_pid == other.worker_pid return hasattr(other, 'process') and hasattr(other, 'worker_pid') and self.worker_pid == other.worker_pid
def __bool__(self): def is_viable(self):
if self.terminated:
return False
return self.worker_status.is_alive() return self.worker_status.is_alive()
def select(self, timeout=0): def select(self, timeout=0):
@ -386,6 +412,7 @@ class Overseer(object):
self.output = job.output if callable(job.output) else sys.stdout.write self.output = job.output if callable(job.output) else sys.stdout.write
self.progress = job.progress if callable(job.progress) else None self.progress = job.progress if callable(job.progress) else None
self.job = job self.job = job
self.last_report = time.time()
def control(self): def control(self):
''' '''
@ -397,8 +424,21 @@ class Overseer(object):
''' '''
if select([self.socket],[],[],0)[0]: if select([self.socket],[],[],0)[0]:
msg = self.read() msg = self.read()
if msg:
self.on_probation = False
self.last_report = time.time()
else:
if self.on_probation:
self.terminate()
return Result(None, ControlError('Worker process died unexpectedly'), '')
else:
self.on_probation = True
return
word, msg = msg.partition(':')[0], msg.partition(':')[-1] word, msg = msg.partition(':')[0], msg.partition(':')[-1]
if word == 'RESULT': if word == 'PING':
self.write('OK')
return
elif word == 'RESULT':
self.write('OK') self.write('OK')
return Result(cPickle.loads(msg), None, None) return Result(cPickle.loads(msg), None, None)
elif word == 'OUTPUT': elif word == 'OUTPUT':
@ -421,10 +461,10 @@ class Overseer(object):
return Result(None, *cPickle.loads(msg)) return Result(None, *cPickle.loads(msg))
else: else:
self.terminate() self.terminate()
return Result(None, ControlError('Worker sent invalid msg: %s', repr(msg)), '') return Result(None, ControlError('Worker sent invalid msg: %s'%repr(msg)), '')
if not self.worker_status.is_alive(): if not self.worker_status.is_alive() or time.time() - self.last_report > 180:
return Result(None, ControlError('Worker process died unexpectedly with returncode: %d'%self.process.returncode), '') self.terminate()
return Result(None, ControlError('Worker process died unexpectedly with returncode: %s'%str(self.process.returncode)), '')
class Job(object): class Job(object):
@ -458,18 +498,23 @@ class Server(Thread):
KILL_RESULT = Overseer.KILL_RESULT KILL_RESULT = Overseer.KILL_RESULT
START_PORT = 10013 START_PORT = 10013
PID = os.getpid()
def __init__(self, number_of_workers=detect_ncpus()): def __init__(self, number_of_workers=detect_ncpus()):
Thread.__init__(self) Thread.__init__(self)
self.setDaemon(True) self.setDaemon(True)
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket = socket.socket(SOCKET_TYPE, socket.SOCK_STREAM)
self.port = self.START_PORT self.port = tempfile.mktemp(prefix='calibre_server')+'_%d_'%self.PID if not iswindows else self.START_PORT
while True: while True:
try: try:
self.server_socket.bind(('localhost', self.port)) address = ('localhost', self.port) if iswindows else self.port
self.server_socket.bind(address)
break break
except: except socket.error:
self.port += 1 self.port += (1 if iswindows else '1')
if not iswindows:
atexit.register(os.unlink, self.port)
self.server_socket.listen(5) self.server_socket.listen(5)
self.number_of_workers = number_of_workers self.number_of_workers = number_of_workers
self.pool, self.jobs, self.working, self.results = [], collections.deque(), [], {} self.pool, self.jobs, self.working, self.results = [], collections.deque(), [], {}
@ -525,7 +570,7 @@ class Server(Thread):
res = Result(None, unicode(err), traceback.format_exc()) res = Result(None, unicode(err), traceback.format_exc())
job.done(res) job.done(res)
o = None o = None
if o: if o and o.is_viable():
with self.working_lock: with self.working_lock:
self.working.append(o) self.working.append(o)
@ -542,7 +587,7 @@ class Server(Thread):
done.append(o) done.append(o)
for o in done: for o in done:
self.working.remove(o) self.working.remove(o)
if o: if o and o.is_viable():
with self.pool_lock: with self.pool_lock:
self.pool.append(o) self.pool.append(o)
@ -601,9 +646,11 @@ class BufferedSender(object):
self.socket = socket self.socket = socket
self.wbuf, self.pbuf = [], [] self.wbuf, self.pbuf = [], []
self.wlock, self.plock = RLock(), RLock() self.wlock, self.plock = RLock(), RLock()
self.last_report = None
self.timer = RepeatingTimer(0.5, self.send, 'BufferedSender') self.timer = RepeatingTimer(0.5, self.send, 'BufferedSender')
self.timer.start() self.timer.start()
def write(self, msg): def write(self, msg):
if not isinstance(msg, basestring): if not isinstance(msg, basestring):
msg = unicode(msg) msg = unicode(msg)
@ -624,12 +671,14 @@ class BufferedSender(object):
print >>sys.__stderr__, 'Cannot pipe to overseer' print >>sys.__stderr__, 'Cannot pipe to overseer'
return return
reported = False
with self.wlock: with self.wlock:
if self.wbuf: if self.wbuf:
msg = cPickle.dumps(self.wbuf, -1) msg = cPickle.dumps(self.wbuf, -1)
self.wbuf = [] self.wbuf = []
write(self.socket, 'OUTPUT:'+msg) write(self.socket, 'OUTPUT:'+msg)
read(self.socket, 10) read(self.socket, 10)
reported = True
with self.plock: with self.plock:
if self.pbuf: if self.pbuf:
@ -637,6 +686,15 @@ class BufferedSender(object):
self.pbuf = [] self.pbuf = []
write(self.socket, 'PROGRESS:'+msg) write(self.socket, 'PROGRESS:'+msg)
read(self.socket, 10) read(self.socket, 10)
reported = True
if self.last_report is not None:
if reported:
self.last_report = time.time()
elif time.time() - self.last_report > 60:
write(self.socket, 'PING:')
read(self.socket, 10)
self.last_report = time.time()
def notify(self, percent, msg=''): def notify(self, percent, msg=''):
with self.plock: with self.plock:
@ -652,6 +710,8 @@ def get_func(name):
return func, kwdargs, notification return func, kwdargs, notification
def work(client_socket, func, args, kwdargs): def work(client_socket, func, args, kwdargs):
sys.stdout.last_report = time.time()
try:
func, kargs, notification = get_func(func) func, kargs, notification = get_func(func)
if notification is not None and hasattr(sys.stdout, 'notify'): if notification is not None and hasattr(sys.stdout, 'notify'):
kargs[notification] = sys.stdout.notify kargs[notification] = sys.stdout.notify
@ -660,11 +720,15 @@ def work(client_socket, func, args, kwdargs):
if hasattr(sys.stdout, 'send'): if hasattr(sys.stdout, 'send'):
sys.stdout.send() sys.stdout.send()
return res return res
finally:
sys.stdout.last_report = None
time.sleep(5) # Give any in progress BufferedSend time to complete
def worker(host, port): def worker(host, port):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client_socket = socket.socket(SOCKET_TYPE, socket.SOCK_STREAM)
client_socket.connect((host, port)) address = (host, port) if iswindows else port
client_socket.connect(address)
write(client_socket, 'CALIBRE_WORKER:%d'%os.getpid()) write(client_socket, 'CALIBRE_WORKER:%d'%os.getpid())
msg = read(client_socket, timeout=10) msg = read(client_socket, timeout=10)
if msg != 'OK': if msg != 'OK':
@ -685,10 +749,11 @@ def worker(host, port):
try: try:
result = work(client_socket, func, args, kwdargs) result = work(client_socket, func, args, kwdargs)
write(client_socket, 'RESULT:'+ cPickle.dumps(result)) write(client_socket, 'RESULT:'+ cPickle.dumps(result))
except (Exception, SystemExit), err: except BaseException, err:
exception = (err.__class__.__name__, unicode(str(err), 'utf-8', 'replace')) exception = (err.__class__.__name__, unicode(str(err), 'utf-8', 'replace'))
tb = traceback.format_exc() tb = traceback.format_exc()
write(client_socket, 'ERROR:'+cPickle.dumps((exception, tb),-1)) msg = 'ERROR:'+cPickle.dumps((exception, tb),-1)
write(client_socket, msg)
if read(client_socket, 10) != 'OK': if read(client_socket, 10) != 'OK':
break break
gc.collect() gc.collect()
@ -714,11 +779,13 @@ def free_spirit(path):
func(*args, **kargs) func(*args, **kargs)
def main(args=sys.argv): def main(args=sys.argv):
global isworker
isworker = True
args = args[1].split(':') args = args[1].split(':')
if len(args) == 1: if len(args) == 1:
free_spirit(binascii.unhexlify(re.sub(r'[^a-f0-9A-F]', '', args[0]))) free_spirit(binascii.unhexlify(re.sub(r'[^a-f0-9A-F]', '', args[0])))
else: else:
worker(args[0].replace("'", ''), int(args[1])) worker(args[0].replace("'", ''), int(args[1]) if iswindows else args[1])
return 0 return 0
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -130,11 +130,13 @@ lib.FcConfigBuildFonts.restype = c_int
_init_error = None _init_error = None
_initialized = False _initialized = False
from threading import Timer from threading import Thread
def _do_init():
class FontScanner(Thread):
def run(self):
# Initialize the fontconfig library. This has to be done manually # Initialize the fontconfig library. This has to be done manually
# for the OS X bundle as it may have its own private fontconfig. # for the OS X bundle as it may have its own private fontconfig.
if hasattr(sys, 'frameworks_dir'): if getattr(sys, 'frameworks_dir', False):
config_dir = os.path.join(os.path.dirname(getattr(sys, 'frameworks_dir')), 'Resources', 'fonts') config_dir = os.path.join(os.path.dirname(getattr(sys, 'frameworks_dir')), 'Resources', 'fonts')
if isinstance(config_dir, unicode): if isinstance(config_dir, unicode):
config_dir = config_dir.encode(sys.getfilesystemencoding()) config_dir = config_dir.encode(sys.getfilesystemencoding())
@ -155,11 +157,13 @@ def _do_init():
_initialized = True _initialized = True
_init_timer = Timer(0.1, _do_init) _scanner = FontScanner()
_init_timer.start() _scanner.start()
def join(): def join():
_init_timer.join() _scanner.join(120)
if _scanner.isAlive():
raise RuntimeError('Scanning for system fonts seems to have hung. Try again in a little while.')
if _init_error is not None: if _init_error is not None:
raise RuntimeError(_init_error) raise RuntimeError(_init_error)