diff --git a/setup/parallel_build.py b/setup/parallel_build.py index 69140492a9..9c1a0bbaa9 100644 --- a/setup/parallel_build.py +++ b/setup/parallel_build.py @@ -6,20 +6,22 @@ __license__ = 'GPL v3' __copyright__ = '2014, Kovid Goyal ' import subprocess, os, itertools, json, sys -from multiprocessing.dummy import Pool +from multiprocessing.pool import ThreadPool as Pool +from collections import namedtuple from threading import Thread from functools import partial from contextlib import closing from polyglot.builtins import unicode_type, as_bytes +Job = namedtuple('Job', 'cmd human_text cwd') cpu_count = min(16, max(1, os.cpu_count())) def run_worker(job, decorate=True): - cmd, human_text = job + cmd, human_text = job.cmd, job.human_text human_text = human_text or ' '.join(cmd) - cwd = None + cwd = job.cwd if cmd[0].lower().endswith('cl.exe'): cwd = os.environ.get('COMPILER_CWD') try: @@ -37,7 +39,7 @@ def run_worker(job, decorate=True): return ok, stdout, (stderr or '') -def create_job(cmd, human_text=None): +def create_job(cmd, human_text=None, cwd=None): return (cmd, human_text) @@ -54,6 +56,18 @@ def parallel_build(jobs, log, verbose=True): return True +def parallel_build_silent(jobs): + p = Pool(cpu_count) + results = [] + failed = False + with closing(p): + for (ok, stdout, stderr), job in zip(p.imap(partial(run_worker, decorate=False), jobs), jobs): + results.append((ok, job.cmd, job.human_text, stdout, stderr)) + if not ok: + failed = True + return failed, results + + def parallel_check_output(jobs, log): p = Pool(cpu_count) with closing(p): @@ -67,7 +81,11 @@ def parallel_check_output(jobs, log): yield stdout -def get_tasks(it, size): +def get_tasks(it): + it = tuple(it) + size, extra = divmod(len(it), cpu_count) + if extra: + size += 1 it = iter(it) while 1: x = tuple(itertools.islice(it, size)) @@ -77,15 +95,12 @@ def get_tasks(it, size): def batched_parallel_jobs(cmd, jobs, cwd=None): - chunksize, extra = divmod(len(jobs), cpu_count) - if extra: - chunksize += 1 workers = [] def get_output(p): p.output = p.communicate(as_bytes(json.dumps(p.jobs_batch))) - for batch in get_tasks(jobs, chunksize): + for batch in get_tasks(jobs): p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd) p.jobs_batch = batch p.output_thread = t = Thread(target=get_output, args=(p,)) @@ -106,3 +121,13 @@ def batched_parallel_jobs(cmd, jobs, cwd=None): if failed: raise SystemExit('Worker process failed') return ans + + +def threaded_func_jobs(func, jobs_args): + + def f(args): + return func(*args) + + p = Pool(cpu_count) + with closing(p): + return p.map(f, jobs_args)