mirror of
				https://github.com/kovidgoyal/calibre.git
				synced 2025-10-25 15:52:25 -04:00 
			
		
		
		
	
		
			
				
	
	
		
			139 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			139 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| 
 | |
| __license__ = 'GPL v3'
 | |
| __copyright__ = '2014, Kovid Goyal <kovid at kovidgoyal.net>'
 | |
| 
 | |
| import itertools
 | |
| import json
 | |
| import os
 | |
| import shlex
 | |
| import subprocess
 | |
| import sys
 | |
| from collections import namedtuple
 | |
| from contextlib import closing
 | |
| from functools import partial
 | |
| from multiprocessing.pool import ThreadPool as Pool
 | |
| from threading import Thread
 | |
| 
 | |
| from polyglot.builtins import as_bytes, unicode_type
 | |
| 
 | |
| Job = namedtuple('Job', 'cmd human_text cwd')
 | |
| 
 | |
| cpu_count = min(16, max(1, os.cpu_count()))
 | |
| 
 | |
| 
 | |
| def run_worker(job, decorate=True):
 | |
|     cmd, human_text = job.cmd, job.human_text
 | |
|     human_text = human_text or shlex.join(cmd)
 | |
|     cwd = job.cwd
 | |
|     if cmd[0].lower().endswith('cl.exe'):
 | |
|         cwd = os.environ.get('COMPILER_CWD')
 | |
|     try:
 | |
|         p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd)
 | |
|     except Exception as err:
 | |
|         return False, human_text, unicode_type(err)
 | |
|     stdout, stderr = p.communicate()
 | |
|     if stdout:
 | |
|         stdout = stdout.decode('utf-8')
 | |
|     if stderr:
 | |
|         stderr = stderr.decode('utf-8')
 | |
|     if decorate:
 | |
|         stdout = human_text + '\n' + (stdout or '')
 | |
|     ok = p.returncode == 0
 | |
|     return ok, stdout, (stderr or '')
 | |
| 
 | |
| 
 | |
| def create_job(cmd, human_text=None, cwd=None):
 | |
|     return Job(cmd, human_text, cwd)
 | |
| 
 | |
| 
 | |
| def parallel_build(jobs, log, verbose=True):
 | |
|     p = Pool(cpu_count)
 | |
|     with closing(p):
 | |
|         for ok, stdout, stderr in p.imap(run_worker, jobs):
 | |
|             if verbose or not ok:
 | |
|                 log(stdout)
 | |
|                 if stderr:
 | |
|                     log(stderr)
 | |
|             if not ok:
 | |
|                 return False
 | |
|         return True
 | |
| 
 | |
| 
 | |
| def parallel_build_silent(jobs):
 | |
|     p = Pool(cpu_count)
 | |
|     results = []
 | |
|     failed = False
 | |
|     with closing(p):
 | |
|         for (ok, stdout, stderr), job in zip(p.imap(partial(run_worker, decorate=False), jobs), jobs):
 | |
|             results.append((ok, job.cmd, job.human_text, stdout, stderr))
 | |
|             if not ok:
 | |
|                 failed = True
 | |
|     return failed, results
 | |
| 
 | |
| 
 | |
| def parallel_check_output(jobs, log):
 | |
|     p = Pool(cpu_count)
 | |
|     with closing(p):
 | |
|         for ok, stdout, stderr in p.imap(
 | |
|                 partial(run_worker, decorate=False), ((j, '') for j in jobs)):
 | |
|             if not ok:
 | |
|                 log(stdout)
 | |
|                 if stderr:
 | |
|                     log(stderr)
 | |
|                 raise SystemExit(1)
 | |
|             yield stdout
 | |
| 
 | |
| 
 | |
| def get_tasks(it):
 | |
|     it = tuple(it)
 | |
|     size, extra = divmod(len(it), cpu_count)
 | |
|     if extra:
 | |
|         size += 1
 | |
|     it = iter(it)
 | |
|     while 1:
 | |
|         x = tuple(itertools.islice(it, size))
 | |
|         if not x:
 | |
|             return
 | |
|         yield x
 | |
| 
 | |
| 
 | |
| def batched_parallel_jobs(cmd, jobs, cwd=None):
 | |
|     workers = []
 | |
| 
 | |
|     def get_output(p):
 | |
|         p.output = p.communicate(as_bytes(json.dumps(p.jobs_batch)))
 | |
| 
 | |
|     for batch in get_tasks(jobs):
 | |
|         p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)
 | |
|         p.jobs_batch = batch
 | |
|         p.output_thread = t = Thread(target=get_output, args=(p,))
 | |
|         t.daemon = True
 | |
|         t.start()
 | |
|         workers.append(p)
 | |
| 
 | |
|     failed = False
 | |
|     ans = []
 | |
|     for p in workers:
 | |
|         p.output_thread.join()
 | |
|         if p.wait() != 0:
 | |
|             sys.stderr.buffer.write(p.output[1])
 | |
|             sys.stderr.buffer.flush()
 | |
|             failed = True
 | |
|         else:
 | |
|             ans.extend(json.loads(p.output[0]))
 | |
|     if failed:
 | |
|         raise SystemExit('Worker process failed')
 | |
|     return ans
 | |
| 
 | |
| 
 | |
| def threaded_func_jobs(func, jobs_args):
 | |
| 
 | |
|     def f(args):
 | |
|         return func(*args)
 | |
| 
 | |
|     p = Pool(cpu_count)
 | |
|     with closing(p):
 | |
|         return p.map(f, jobs_args)
 |