From 6051cf3995030e2c86c43647b8a89d9cdea333a8 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Tue, 15 Oct 2024 08:55:16 +0530 Subject: [PATCH] Get pipe reader working on windows where we cant wait for data to become available on a non-blocking pipe --- src/calibre/gui2/tts/piper.py | 134 ++++++++++++++++++++++++++++------ 1 file changed, 110 insertions(+), 24 deletions(-) diff --git a/src/calibre/gui2/tts/piper.py b/src/calibre/gui2/tts/piper.py index 425fec86d4..965397ed31 100644 --- a/src/calibre/gui2/tts/piper.py +++ b/src/calibre/gui2/tts/piper.py @@ -566,7 +566,7 @@ class PiperEmbedded: lang = get_lang() lang = canonicalize_lang(lang) or lang self._default_voice = self._voice_for_lang.get(lang) or self._voice_for_lang['eng'] - self._current_voice = self._process = None + self._current_voice = self._process = self._pipe_reader = None self._current_audio_rate = 0 def resolve_voice(self, lang: str, voice_name: str) -> Voice: @@ -610,10 +610,8 @@ class PiperEmbedded: self._process.stdin.write(UTTERANCE_SEPARATOR) self._process.stdin.flush() stderr_data = b'' - buf, piper_done, errors_from_piper = [], [], [] - - def stdout_callback(data: bytes) -> None: - buf.append(data) + buf = io.BytesIO() + piper_done, errors_from_piper = [], [] def stderr_callback(data: bytes) -> bool: nonlocal stderr_data @@ -621,11 +619,11 @@ class PiperEmbedded: return not piper_done try: - unix_pipe_readers(self._process.stdout, self._process.stderr, stdout_callback, stderr_callback) + self._pipe_reader(buf.write, stderr_callback) except Exception as e: raise Exception(f'Reading output from piper process failed with error: {e} and STDERR: ' + '\n'.join(errors_from_piper)) - raw_data = b''.join(buf) + raw_data = buf.getvalue() if needs_conversion: raw_data = resample_raw_audio_16bit(raw_data, self._current_audio_rate, sample_rate) yield raw_data, duration_of_raw_audio_data(raw_data, sample_rate) @@ -637,6 +635,8 @@ class PiperEmbedded: def shutdown(self): if self._process is not None: + self._pipe_reader.close() + self._pipe_reader = None self._process.stdin.close() self._process.stdout.close() self._process.stderr.close() @@ -652,25 +652,111 @@ class PiperEmbedded: self._current_audio_rate, cmdline = piper_process_metadata(model_path, config_path, self._embedded_settings, self._current_voice) import subprocess self._process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) + self._pipe_reader = (ThreadedPipeReader if iswindows else PipeReader)(self._process.stdout, self._process.stderr) -def unix_pipe_readers(stdout: BinaryIO, stderr: BinaryIO, stdout_callback, stderr_callback, timeout: float = 10.) -> None: - from select import select - out, err = stdout.fileno(), stderr.fileno() - os.set_blocking(out, False) - os.set_blocking(err, False) - readers = out, err - while True: - r, _, _ = select(readers, (), (), timeout) - if not r: - raise TimeoutError('Timed out waiting for output from piper process') - if out in r: - data = stdout.read(io.DEFAULT_BUFFER_SIZE) - stdout_callback(data) - if err in r: - data = stderr.read(io.DEFAULT_BUFFER_SIZE) - if not stderr_callback(data): +class PipeReader: + + TIMEOUT = 10. # seconds + + def __init__(self, stdout: BinaryIO, stderr: BinaryIO): + self.stdout_fd = stdout.fileno() + self.stderr_fd = stderr.fileno() + os.set_blocking(self.stdout_fd, False) + os.set_blocking(self.stderr_fd, False) + + def close(self): + self.stderr_fd = self.stdout_fd = -1 + + def __call__(self, stdout_callback, stderr_callback): + from select import select + out, err = self.stdout_fd, self.stderr_fd + readers = out, err + buf = memoryview(bytearray(io.DEFAULT_BUFFER_SIZE)) + + def readall(fd: int) -> bytes: + output = io.BytesIO() + while True: + try: + num = os.readv(fd, (buf,)) + except BlockingIOError: + break + else: + output.write(buf[:num]) + return output.getvalue() + + def readinto(fd: int, callback) -> None: + while True: + try: + num = os.readv(fd, (buf,)) + except BlockingIOError: + break + else: + callback(buf[:num]) + + while True: + r, _, _ = select(readers, (), (), self.TIMEOUT) + if not r: + raise TimeoutError('Timed out waiting for output from piper process') + if out in r: + readinto(out, stdout_callback) + if err in r: + data = readall(err) + if not stderr_callback(data): + # In case there is new data written to stdout + readinto(out, stdout_callback) + break + + +class ThreadedPipeReader(PipeReader): + + def __init__(self, stdout: BinaryIO, stderr: BinaryIO): + from queue import Queue + from threading import Event, Thread + self.shutting_down = Event() + self.queue = Queue() + self.stdout_thread = t = Thread(target=self._reader, args=(stdout.fileno(), True), daemon=True) + t.start() + self.stderr_thread = t = Thread(target=self._reader, args=(stderr.fileno(), False), daemon=True) + t.start() + + def close(self): + self.shutting_down.set() + + def __call__(self, stdout_callback, stderr_callback): + from queue import Empty + while True: + data, is_stdout, err = self.queue.get(True, self.TIMEOUT) + if err is not None: + raise err + if data: + if is_stdout: + stdout_callback(data) + else: + if not stderr_callback(data): + # in case more data was written to stdout + while True: + try: + data, is_stdout, err = self.queue.get_nowait() + except Empty: + break + if err is not None: + raise err + if is_stdout: + stdout_callback(data) + break + + def _reader(self, pipe_fd: int, is_stdout: bool): + while not self.shutting_down.is_set(): + try: + data = os.read(pipe_fd, io.DEFAULT_BUFFER_SIZE) + except OSError as e: + if not self.shutting_down.is_set(): + self.queue.put((b'', is_stdout, e)) break + else: + self.queue.put((data, is_stdout, None)) + def duration_of_raw_audio_data(data: bytes, sample_rate: int = HIGH_QUALITY_SAMPLE_RATE, bytes_per_sample: int = 2, num_channels: int = 1) -> float: @@ -690,7 +776,7 @@ def develop_embedded(): for data, duration in p.text_to_raw_audio_data(( 'Hello, good day to you.', 'This is the second sentence.', 'This is the final sentence.' )): - print(duration, len(data)) + print(f'{duration=} {len(data)=}') all_data.append(data) sz += len(data) all_data[0] = wav_header_for_pcm_data(sz, HIGH_QUALITY_SAMPLE_RATE)