Make jobs a bit nicer

This commit is contained in:
Kovid Goyal 2019-12-06 11:11:44 +05:30
parent 6811bb0cf7
commit 86081e47d8
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -6,20 +6,22 @@ __license__ = 'GPL v3'
__copyright__ = '2014, Kovid Goyal <kovid at kovidgoyal.net>'
import subprocess, os, itertools, json, sys
from multiprocessing.dummy import Pool
from multiprocessing.pool import ThreadPool as Pool
from collections import namedtuple
from threading import Thread
from functools import partial
from contextlib import closing
from polyglot.builtins import unicode_type, as_bytes
Job = namedtuple('Job', 'cmd human_text cwd')
cpu_count = min(16, max(1, os.cpu_count()))
def run_worker(job, decorate=True):
cmd, human_text = job
cmd, human_text = job.cmd, job.human_text
human_text = human_text or ' '.join(cmd)
cwd = None
cwd = job.cwd
if cmd[0].lower().endswith('cl.exe'):
cwd = os.environ.get('COMPILER_CWD')
try:
@ -37,7 +39,7 @@ def run_worker(job, decorate=True):
return ok, stdout, (stderr or '')
def create_job(cmd, human_text=None):
def create_job(cmd, human_text=None, cwd=None):
return (cmd, human_text)
@ -54,6 +56,18 @@ def parallel_build(jobs, log, verbose=True):
return True
def parallel_build_silent(jobs):
p = Pool(cpu_count)
results = []
failed = False
with closing(p):
for (ok, stdout, stderr), job in zip(p.imap(partial(run_worker, decorate=False), jobs), jobs):
results.append((ok, job.cmd, job.human_text, stdout, stderr))
if not ok:
failed = True
return failed, results
def parallel_check_output(jobs, log):
p = Pool(cpu_count)
with closing(p):
@ -67,7 +81,11 @@ def parallel_check_output(jobs, log):
yield stdout
def get_tasks(it, size):
def get_tasks(it):
it = tuple(it)
size, extra = divmod(len(it), cpu_count)
if extra:
size += 1
it = iter(it)
while 1:
x = tuple(itertools.islice(it, size))
@ -77,15 +95,12 @@ def get_tasks(it, size):
def batched_parallel_jobs(cmd, jobs, cwd=None):
chunksize, extra = divmod(len(jobs), cpu_count)
if extra:
chunksize += 1
workers = []
def get_output(p):
p.output = p.communicate(as_bytes(json.dumps(p.jobs_batch)))
for batch in get_tasks(jobs, chunksize):
for batch in get_tasks(jobs):
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)
p.jobs_batch = batch
p.output_thread = t = Thread(target=get_output, args=(p,))
@ -106,3 +121,13 @@ def batched_parallel_jobs(cmd, jobs, cwd=None):
if failed:
raise SystemExit('Worker process failed')
return ans
def threaded_func_jobs(func, jobs_args):
def f(args):
return func(*args)
p = Pool(cpu_count)
with closing(p):
return p.map(f, jobs_args)