From 6bfe478ca41e89921b6595eaacf37e8f1ea08fba Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Mon, 14 Oct 2024 11:39:00 +0530 Subject: [PATCH] Get reading output from piper process via Python working on Unix --- src/calibre/gui2/tts/piper.py | 86 +++++++++++++++-------------------- 1 file changed, 37 insertions(+), 49 deletions(-) diff --git a/src/calibre/gui2/tts/piper.py b/src/calibre/gui2/tts/piper.py index c378eaebc5..425fec86d4 100644 --- a/src/calibre/gui2/tts/piper.py +++ b/src/calibre/gui2/tts/piper.py @@ -2,6 +2,7 @@ # License: GPLv3 Copyright: 2024, Kovid Goyal import atexit +import io import json import os import re @@ -11,8 +12,6 @@ from contextlib import suppress from dataclasses import dataclass from functools import lru_cache from itertools import count -from queue import Empty, Queue -from threading import Event from time import monotonic from typing import BinaryIO, Iterable, Iterator @@ -33,7 +32,7 @@ from qt.core import ( sip, ) -from calibre.constants import cache_dir, is_debugging, piper_cmdline +from calibre.constants import cache_dir, is_debugging, iswindows, piper_cmdline from calibre.gui2 import error_dialog from calibre.gui2.tts.types import TTS_EMBEDED_CONFIG, EngineSpecificSettings, Quality, TTSBackend, Voice, widget_parent from calibre.spell.break_iterator import PARAGRAPH_SEPARATOR, split_into_sentences_for_tts @@ -567,7 +566,7 @@ class PiperEmbedded: lang = get_lang() lang = canonicalize_lang(lang) or lang self._default_voice = self._voice_for_lang.get(lang) or self._voice_for_lang['eng'] - self._current_voice = self._process = self._process_shutdown_event = None + self._current_voice = self._process = None self._current_audio_rate = 0 def resolve_voice(self, lang: str, voice_name: str) -> Voice: @@ -591,16 +590,13 @@ class PiperEmbedded: self.shutdown() self.ensure_process_started() piper_done, errors_from_piper = [], [] - last_output_at = monotonic() needs_conversion = sample_rate != self._current_audio_rate if needs_conversion: from calibre_extensions.ffmpeg import resample_raw_audio_16bit def callback(ok, payload): - nonlocal last_output_at if ok: piper_done.append(True) - last_output_at = monotonic() else: errors_from_piper.append(payload.decode('utf-8', 'replace')) @@ -615,22 +611,20 @@ class PiperEmbedded: self._process.stdin.flush() stderr_data = b'' buf, piper_done, errors_from_piper = [], [], [] - last_output_at = monotonic() - while not piper_done: - try: - is_stdout, exception, data = self._from_process_queue.get(True, 1.0) - except Empty: - if self._process.poll() is not None: - raise Exception(f'piper process died with error code: {self._process.poll()} and stderr: {"".join(errors_from_piper)}') - if monotonic() - last_output_at > timeout: - raise TimeoutError(f'piper process produced no output for {timeout} seconds. stderr: {"".join(errors_from_piper)}') - else: - if exception is not None: - raise exception - if is_stdout: - buf.append(data) - else: - stderr_data = detect_end_of_data(stderr_data + data, callback) + + def stdout_callback(data: bytes) -> None: + buf.append(data) + + def stderr_callback(data: bytes) -> bool: + nonlocal stderr_data + stderr_data = detect_end_of_data(stderr_data + data, callback) + return not piper_done + + try: + unix_pipe_readers(self._process.stdout, self._process.stderr, stdout_callback, stderr_callback) + except Exception as e: + raise Exception(f'Reading output from piper process failed with error: {e} and STDERR: ' + '\n'.join(errors_from_piper)) + raw_data = b''.join(buf) if needs_conversion: raw_data = resample_raw_audio_16bit(raw_data, self._current_audio_rate, sample_rate) @@ -643,17 +637,12 @@ class PiperEmbedded: def shutdown(self): if self._process is not None: - self._process_shutdown_event.set() - self._to_process_queue.put(None) - self._process_shutdown_event = None self._process.stdin.close() - self._process.kill() - self._process.wait() self._process.stdout.close() self._process.stderr.close() + self._process.kill() + self._process.wait() self._process = None - self._stdout_reader.join() - self._stderr_reader.join() __del__ = shutdown def ensure_process_started(self): @@ -662,25 +651,26 @@ class PiperEmbedded: model_path, config_path = download_voice(self._current_voice, headless=True) self._current_audio_rate, cmdline = piper_process_metadata(model_path, config_path, self._embedded_settings, self._current_voice) import subprocess - from threading import Thread - self._process_shutdown_event = Event() - self._from_process_queue = Queue() self._process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) - self._stdout_reader = Thread(target=self.reader, args=(self._process_shutdown_event, self._process.stdout, True), daemon=True) - self._stderr_reader = Thread(target=self.reader, args=(self._process_shutdown_event, self._process.stderr, False), daemon=True) - self._stdout_reader.start() - self._stderr_reader.start() - def reader(self, shutdown_event: Event, pipe: BinaryIO, is_stdout: bool = False) -> None: - while not shutdown_event.is_set(): - try: - data = pipe.read() - except Exception as e: - if not shutdown_event.is_set(): - self._from_process_queue.put((is_stdout, e, b'')) + +def unix_pipe_readers(stdout: BinaryIO, stderr: BinaryIO, stdout_callback, stderr_callback, timeout: float = 10.) -> None: + from select import select + out, err = stdout.fileno(), stderr.fileno() + os.set_blocking(out, False) + os.set_blocking(err, False) + readers = out, err + while True: + r, _, _ = select(readers, (), (), timeout) + if not r: + raise TimeoutError('Timed out waiting for output from piper process') + if out in r: + data = stdout.read(io.DEFAULT_BUFFER_SIZE) + stdout_callback(data) + if err in r: + data = stderr.read(io.DEFAULT_BUFFER_SIZE) + if not stderr_callback(data): break - else: - self._from_process_queue.put((is_stdout, None, data)) def duration_of_raw_audio_data(data: bytes, sample_rate: int = HIGH_QUALITY_SAMPLE_RATE, bytes_per_sample: int = 2, num_channels: int = 1) -> float: @@ -691,7 +681,6 @@ def duration_of_raw_audio_data(data: bytes, sample_rate: int = HIGH_QUALITY_SAMP def develop_embedded(): import subprocess - from io import BytesIO from calibre.utils.speedups import ReadOnlyFileBuffer from calibre_extensions.ffmpeg import transcode_single_audio_stream, wav_header_for_pcm_data @@ -706,7 +695,7 @@ def develop_embedded(): sz += len(data) all_data[0] = wav_header_for_pcm_data(sz, HIGH_QUALITY_SAMPLE_RATE) wav = ReadOnlyFileBuffer(b''.join(all_data), name='tts.wav') - mp4 = BytesIO() + mp4 = io.BytesIO() mp4.name = 'tts.mp4' transcode_single_audio_stream(wav, mp4) subprocess.run(['mpv', '-'], input=mp4.getvalue()) @@ -717,7 +706,6 @@ def develop(): # {{{ from qt.core import QSocketNotifier - from calibre.constants import iswindows from calibre.gui2 import Application app = Application([]) p = Piper()