diff --git a/setup/parallel_build.py b/setup/parallel_build.py index 448a5bd695..69140492a9 100644 --- a/setup/parallel_build.py +++ b/setup/parallel_build.py @@ -5,45 +5,15 @@ __license__ = 'GPL v3' __copyright__ = '2014, Kovid Goyal ' -import subprocess, os +import subprocess, os, itertools, json, sys from multiprocessing.dummy import Pool +from threading import Thread from functools import partial from contextlib import closing -from setup import iswindows -from polyglot.builtins import unicode_type +from polyglot.builtins import unicode_type, as_bytes -if iswindows: - from ctypes import windll, Structure, POINTER, c_size_t - from ctypes.wintypes import WORD, DWORD, LPVOID - - class SYSTEM_INFO(Structure): - _fields_ = [ - ("wProcessorArchitecture", WORD), - ("wReserved", WORD), - ("dwPageSize", DWORD), - ("lpMinimumApplicationAddress", LPVOID), - ("lpMaximumApplicationAddress", LPVOID), - ("dwActiveProcessorMask", c_size_t), - ("dwNumberOfProcessors", DWORD), - ("dwProcessorType", DWORD), - ("dwAllocationGranularity", DWORD), - ("wProcessorLevel", WORD), - ("wProcessorRevision", WORD)] - gsi = windll.kernel32.GetSystemInfo - gsi.argtypes = [POINTER(SYSTEM_INFO)] - gsi.restype = None - si = SYSTEM_INFO() - gsi(si) - cpu_count = si.dwNumberOfProcessors -else: - from multiprocessing import cpu_count - try: - cpu_count = cpu_count() - except NotImplementedError: - cpu_count = 1 - -cpu_count = min(16, max(1, cpu_count)) +cpu_count = min(16, max(1, os.cpu_count())) def run_worker(job, decorate=True): @@ -95,3 +65,44 @@ def parallel_check_output(jobs, log): log(stderr) raise SystemExit(1) yield stdout + + +def get_tasks(it, size): + it = iter(it) + while 1: + x = tuple(itertools.islice(it, size)) + if not x: + return + yield x + + +def batched_parallel_jobs(cmd, jobs, cwd=None): + chunksize, extra = divmod(len(jobs), cpu_count) + if extra: + chunksize += 1 + workers = [] + + def get_output(p): + p.output = p.communicate(as_bytes(json.dumps(p.jobs_batch))) + + for batch in get_tasks(jobs, chunksize): + p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd) + p.jobs_batch = batch + p.output_thread = t = Thread(target=get_output, args=(p,)) + t.daemon = True + t.start() + workers.append(p) + + failed = False + ans = [] + for p in workers: + p.output_thread.join() + if p.wait() != 0: + sys.stderr.buffer.write(p.output[1]) + sys.stderr.buffer.flush() + failed = True + else: + ans.extend(json.loads(p.output[0])) + if failed: + raise SystemExit('Worker process failed') + return ans diff --git a/setup/translations.py b/setup/translations.py index 6edd0be117..873ab47a4a 100644 --- a/setup/translations.py +++ b/setup/translations.py @@ -12,7 +12,7 @@ from locale import normalize as normalize_locale from functools import partial from setup import Command, __appname__, __version__, require_git_master, build_cache_dir, edit_file, dump_json -from setup.parallel_build import parallel_check_output +from setup.parallel_build import batched_parallel_jobs from polyglot.builtins import codepoint_to_chr, iteritems, range is_ci = os.environ.get('CI', '').lower() == 'true' @@ -320,8 +320,7 @@ class Translations(POT): # {{{ self.compile_changelog_translations() def compile_group(self, files, handle_stats=None, file_ok=None, action_per_file=None): - from calibre.constants import islinux - jobs, ok_files = [], [] + ok_files = [] hashmap = {} def stats_cache(src, data=None): @@ -349,20 +348,21 @@ class Translations(POT): # {{{ else: if file_ok is None or file_ok(data, src): # self.info('\t' + os.path.relpath(src, self.j(self.d(self.SRC), 'translations'))) - if islinux: - msgfmt = ['msgfmt'] - else: - msgfmt = [sys.executable, self.j(self.SRC, 'calibre', 'translations', 'msgfmt.py')] - jobs.append(msgfmt + ['--statistics', '-o', dest, src]) ok_files.append((src, dest)) hashmap[src] = current_hash if action_per_file is not None: action_per_file(src) - self.info(f'\tCompiling {len(jobs)} files') - for (src, dest), line in zip(ok_files, parallel_check_output(jobs, self.info)): + self.info(f'\tCompiling {len(ok_files)} files') + items = [] + results = batched_parallel_jobs( + [sys.executable, self.j(self.SRC, 'calibre', 'translations', 'msgfmt.py'), 'STDIN'], + ok_files) + for (src, dest), nums in zip(ok_files, results): + items.append((src, dest, nums)) + + for (src, dest, nums) in items: self.write_cache(open(dest, 'rb').read(), hashmap[src], src) - nums = tuple(map(int, re.findall(r'\d+', line))) stats_cache(src, nums) if handle_stats is not None: handle_stats(src, nums) diff --git a/src/calibre/translations/msgfmt.py b/src/calibre/translations/msgfmt.py index 5054d7c737..44f92df4a9 100644 --- a/src/calibre/translations/msgfmt.py +++ b/src/calibre/translations/msgfmt.py @@ -50,7 +50,6 @@ def usage(code, msg=''): def add(ctxt, id, str, fuzzy): "Add a non-fuzzy translation to the dictionary." - global MESSAGES if not fuzzy and str: if id: STATS['translated'] += 1 @@ -65,7 +64,6 @@ def add(ctxt, id, str, fuzzy): def generate(): "Return the generated output." - global MESSAGES # the keys are sorted in the .mo file keys = sorted(MESSAGES.keys()) offsets = [] @@ -236,9 +234,28 @@ def make(filename, outfile): print(msg, file=sys.stderr) +def make_with_stats(filename, outfile): + MESSAGES.clear() + STATS['translated'] = STATS['untranslated'] = 0 + make(filename, outfile) + return STATS['translated'], STATS['untranslated'] + + +def run_batch(pairs): + for (filename, outfile) in pairs: + yield make_with_stats(filename, outfile) + + def main(): + args = sys.argv[1:] + if args == ['STDIN']: + import json + results = tuple(run_batch(json.loads(sys.stdin.buffer.read()))) + sys.stdout.buffer.write(json.dumps(results).encode('utf-8')) + sys.stdout.close() + return try: - opts, args = getopt.getopt(sys.argv[1:], 'hVso:', + opts, args = getopt.getopt(args, 'hVso:', ['help', 'version', 'statistics', 'output-file=']) except getopt.error as msg: usage(1, msg) @@ -263,8 +280,7 @@ def main(): return for filename in args: - STATS['translated'] = STATS['untranslated'] = 0 - make(filename, outfile) + translated, untranslated = make_with_stats(filename, outfile) if output_stats: print(STATS['translated'], 'translated messages,', STATS['untranslated'], 'untranslated messages.')