mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-05-31 12:14:15 -04:00
139 lines
3.6 KiB
Python
139 lines
3.6 KiB
Python
#!/usr/bin/env python
|
|
|
|
|
|
__license__ = 'GPL v3'
|
|
__copyright__ = '2014, Kovid Goyal <kovid at kovidgoyal.net>'
|
|
|
|
import itertools
|
|
import json
|
|
import os
|
|
import shlex
|
|
import subprocess
|
|
import sys
|
|
from collections import namedtuple
|
|
from contextlib import closing
|
|
from functools import partial
|
|
from multiprocessing.pool import ThreadPool as Pool
|
|
from threading import Thread
|
|
|
|
from polyglot.builtins import as_bytes, unicode_type
|
|
|
|
Job = namedtuple('Job', 'cmd human_text cwd')
|
|
|
|
cpu_count = min(16, max(1, os.cpu_count()))
|
|
|
|
|
|
def run_worker(job, decorate=True):
|
|
cmd, human_text = job.cmd, job.human_text
|
|
human_text = human_text or shlex.join(cmd)
|
|
cwd = job.cwd
|
|
if cmd[0].lower().endswith('cl.exe'):
|
|
cwd = os.environ.get('COMPILER_CWD')
|
|
try:
|
|
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd)
|
|
except Exception as err:
|
|
return False, human_text, unicode_type(err)
|
|
stdout, stderr = p.communicate()
|
|
if stdout:
|
|
stdout = stdout.decode('utf-8')
|
|
if stderr:
|
|
stderr = stderr.decode('utf-8')
|
|
if decorate:
|
|
stdout = human_text + '\n' + (stdout or '')
|
|
ok = p.returncode == 0
|
|
return ok, stdout, (stderr or '')
|
|
|
|
|
|
def create_job(cmd, human_text=None, cwd=None):
|
|
return Job(cmd, human_text, cwd)
|
|
|
|
|
|
def parallel_build(jobs, log, verbose=True):
|
|
p = Pool(cpu_count)
|
|
with closing(p):
|
|
for ok, stdout, stderr in p.imap(run_worker, jobs):
|
|
if verbose or not ok:
|
|
log(stdout)
|
|
if stderr:
|
|
log(stderr)
|
|
if not ok:
|
|
return False
|
|
return True
|
|
|
|
|
|
def parallel_build_silent(jobs):
|
|
p = Pool(cpu_count)
|
|
results = []
|
|
failed = False
|
|
with closing(p):
|
|
for (ok, stdout, stderr), job in zip(p.imap(partial(run_worker, decorate=False), jobs), jobs):
|
|
results.append((ok, job.cmd, job.human_text, stdout, stderr))
|
|
if not ok:
|
|
failed = True
|
|
return failed, results
|
|
|
|
|
|
def parallel_check_output(jobs, log):
|
|
p = Pool(cpu_count)
|
|
with closing(p):
|
|
for ok, stdout, stderr in p.imap(
|
|
partial(run_worker, decorate=False), ((j, '') for j in jobs)):
|
|
if not ok:
|
|
log(stdout)
|
|
if stderr:
|
|
log(stderr)
|
|
raise SystemExit(1)
|
|
yield stdout
|
|
|
|
|
|
def get_tasks(it):
|
|
it = tuple(it)
|
|
size, extra = divmod(len(it), cpu_count)
|
|
if extra:
|
|
size += 1
|
|
it = iter(it)
|
|
while 1:
|
|
x = tuple(itertools.islice(it, size))
|
|
if not x:
|
|
return
|
|
yield x
|
|
|
|
|
|
def batched_parallel_jobs(cmd, jobs, cwd=None):
|
|
workers = []
|
|
|
|
def get_output(p):
|
|
p.output = p.communicate(as_bytes(json.dumps(p.jobs_batch)))
|
|
|
|
for batch in get_tasks(jobs):
|
|
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)
|
|
p.jobs_batch = batch
|
|
p.output_thread = t = Thread(target=get_output, args=(p,))
|
|
t.daemon = True
|
|
t.start()
|
|
workers.append(p)
|
|
|
|
failed = False
|
|
ans = []
|
|
for p in workers:
|
|
p.output_thread.join()
|
|
if p.wait() != 0:
|
|
sys.stderr.buffer.write(p.output[1])
|
|
sys.stderr.buffer.flush()
|
|
failed = True
|
|
else:
|
|
ans.extend(json.loads(p.output[0]))
|
|
if failed:
|
|
raise SystemExit('Worker process failed')
|
|
return ans
|
|
|
|
|
|
def threaded_func_jobs(func, jobs_args):
|
|
|
|
def f(args):
|
|
return func(*args)
|
|
|
|
p = Pool(cpu_count)
|
|
with closing(p):
|
|
return p.map(f, jobs_args)
|