diff --git a/src/calibre/utils/ipc/pool.py b/src/calibre/utils/ipc/pool.py index 1ff31031b3..f840728568 100644 --- a/src/calibre/utils/ipc/pool.py +++ b/src/calibre/utils/ipc/pool.py @@ -49,6 +49,32 @@ if iswindows: from calibre.utils.ipc.launch import windows_null_file worker_kwargs['stdout'] = worker_kwargs['stderr'] = windows_null_file +def get_stdout(process): + import time + while process.poll() is None: + try: + raw = process.stdout.read(1) + if raw: + try: + sys.stdout.write(raw) + except EnvironmentError: + pass + else: + time.sleep(0.1) + except (EOFError, EnvironmentError): + break + +def start_worker(code, name=''): + from calibre.utils.ipc.simple_worker import start_pipe_worker + if name: + name = '-' + name + p = start_pipe_worker(code, **worker_kwargs) + if get_stdout_from_child: + t = Thread(target=get_stdout, name='PoolWorkerGetStdout' + name, args=(p,)) + t.daemon = True + t.start() + return p + class Failure(Exception): def __init__(self, tf): @@ -63,10 +89,6 @@ class Worker(object): self.process, self.conn = p, conn self.events = events self.name = name or '' - if get_stdout_from_child: - t = Thread(target=self.get_stdout, name='PoolWorkerGetOutput-'+self.name) - t.daemon = True - t.start() def __call__(self, job): eintr_retry_call(self.conn.send_bytes, cPickle.dumps(job, -1)) @@ -89,21 +111,6 @@ class Worker(object): def set_common_data(self, data): eintr_retry_call(self.conn.send_bytes, data) - def get_stdout(self): - import time - while self.process.poll() is None: - try: - raw = self.process.stdout.read(1) - if raw: - try: - sys.stdout.write(raw) - except EnvironmentError: - pass - else: - time.sleep(0.1) - except (EOFError, EnvironmentError): - break - class Pool(Thread): @@ -175,9 +182,7 @@ class Pool(Thread): self.shutdown_workers(wait_time=wait_time) def create_worker(self): - from calibre.utils.ipc.simple_worker import start_pipe_worker - p = start_pipe_worker( - 'from {0} import run_main, {1}; run_main({1})'.format(self.__class__.__module__, 'worker_main'), **worker_kwargs) + p = start_worker('from {0} import run_main, {1}; run_main({1})'.format(self.__class__.__module__, 'worker_main')) sys.stdout.flush() eintr_retry_call(p.stdin.write, self.worker_data) p.stdin.flush(), p.stdin.close()