Refactor for re-use

This commit is contained in:
Kovid Goyal 2014-11-20 17:20:27 +05:30
parent c356e09dcf
commit 78a5c3fd0f

View File

@ -49,6 +49,32 @@ if iswindows:
from calibre.utils.ipc.launch import windows_null_file
worker_kwargs['stdout'] = worker_kwargs['stderr'] = windows_null_file
def get_stdout(process):
import time
while process.poll() is None:
try:
raw = process.stdout.read(1)
if raw:
try:
sys.stdout.write(raw)
except EnvironmentError:
pass
else:
time.sleep(0.1)
except (EOFError, EnvironmentError):
break
def start_worker(code, name=''):
from calibre.utils.ipc.simple_worker import start_pipe_worker
if name:
name = '-' + name
p = start_pipe_worker(code, **worker_kwargs)
if get_stdout_from_child:
t = Thread(target=get_stdout, name='PoolWorkerGetStdout' + name, args=(p,))
t.daemon = True
t.start()
return p
class Failure(Exception):
def __init__(self, tf):
@ -63,10 +89,6 @@ class Worker(object):
self.process, self.conn = p, conn
self.events = events
self.name = name or ''
if get_stdout_from_child:
t = Thread(target=self.get_stdout, name='PoolWorkerGetOutput-'+self.name)
t.daemon = True
t.start()
def __call__(self, job):
eintr_retry_call(self.conn.send_bytes, cPickle.dumps(job, -1))
@ -89,21 +111,6 @@ class Worker(object):
def set_common_data(self, data):
eintr_retry_call(self.conn.send_bytes, data)
def get_stdout(self):
import time
while self.process.poll() is None:
try:
raw = self.process.stdout.read(1)
if raw:
try:
sys.stdout.write(raw)
except EnvironmentError:
pass
else:
time.sleep(0.1)
except (EOFError, EnvironmentError):
break
class Pool(Thread):
@ -175,9 +182,7 @@ class Pool(Thread):
self.shutdown_workers(wait_time=wait_time)
def create_worker(self):
from calibre.utils.ipc.simple_worker import start_pipe_worker
p = start_pipe_worker(
'from {0} import run_main, {1}; run_main({1})'.format(self.__class__.__module__, 'worker_main'), **worker_kwargs)
p = start_worker('from {0} import run_main, {1}; run_main({1})'.format(self.__class__.__module__, 'worker_main'))
sys.stdout.flush()
eintr_retry_call(p.stdin.write, self.worker_data)
p.stdin.flush(), p.stdin.close()