Get reading output from piper process via Python working on Unix

This commit is contained in:
Kovid Goyal 2024-10-14 11:39:00 +05:30
parent cfaf12db02
commit 6bfe478ca4
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -2,6 +2,7 @@
# License: GPLv3 Copyright: 2024, Kovid Goyal <kovid at kovidgoyal.net> # License: GPLv3 Copyright: 2024, Kovid Goyal <kovid at kovidgoyal.net>
import atexit import atexit
import io
import json import json
import os import os
import re import re
@ -11,8 +12,6 @@ from contextlib import suppress
from dataclasses import dataclass from dataclasses import dataclass
from functools import lru_cache from functools import lru_cache
from itertools import count from itertools import count
from queue import Empty, Queue
from threading import Event
from time import monotonic from time import monotonic
from typing import BinaryIO, Iterable, Iterator from typing import BinaryIO, Iterable, Iterator
@ -33,7 +32,7 @@ from qt.core import (
sip, sip,
) )
from calibre.constants import cache_dir, is_debugging, piper_cmdline from calibre.constants import cache_dir, is_debugging, iswindows, piper_cmdline
from calibre.gui2 import error_dialog from calibre.gui2 import error_dialog
from calibre.gui2.tts.types import TTS_EMBEDED_CONFIG, EngineSpecificSettings, Quality, TTSBackend, Voice, widget_parent from calibre.gui2.tts.types import TTS_EMBEDED_CONFIG, EngineSpecificSettings, Quality, TTSBackend, Voice, widget_parent
from calibre.spell.break_iterator import PARAGRAPH_SEPARATOR, split_into_sentences_for_tts from calibre.spell.break_iterator import PARAGRAPH_SEPARATOR, split_into_sentences_for_tts
@ -567,7 +566,7 @@ class PiperEmbedded:
lang = get_lang() lang = get_lang()
lang = canonicalize_lang(lang) or lang lang = canonicalize_lang(lang) or lang
self._default_voice = self._voice_for_lang.get(lang) or self._voice_for_lang['eng'] self._default_voice = self._voice_for_lang.get(lang) or self._voice_for_lang['eng']
self._current_voice = self._process = self._process_shutdown_event = None self._current_voice = self._process = None
self._current_audio_rate = 0 self._current_audio_rate = 0
def resolve_voice(self, lang: str, voice_name: str) -> Voice: def resolve_voice(self, lang: str, voice_name: str) -> Voice:
@ -591,16 +590,13 @@ class PiperEmbedded:
self.shutdown() self.shutdown()
self.ensure_process_started() self.ensure_process_started()
piper_done, errors_from_piper = [], [] piper_done, errors_from_piper = [], []
last_output_at = monotonic()
needs_conversion = sample_rate != self._current_audio_rate needs_conversion = sample_rate != self._current_audio_rate
if needs_conversion: if needs_conversion:
from calibre_extensions.ffmpeg import resample_raw_audio_16bit from calibre_extensions.ffmpeg import resample_raw_audio_16bit
def callback(ok, payload): def callback(ok, payload):
nonlocal last_output_at
if ok: if ok:
piper_done.append(True) piper_done.append(True)
last_output_at = monotonic()
else: else:
errors_from_piper.append(payload.decode('utf-8', 'replace')) errors_from_piper.append(payload.decode('utf-8', 'replace'))
@ -615,22 +611,20 @@ class PiperEmbedded:
self._process.stdin.flush() self._process.stdin.flush()
stderr_data = b'' stderr_data = b''
buf, piper_done, errors_from_piper = [], [], [] buf, piper_done, errors_from_piper = [], [], []
last_output_at = monotonic()
while not piper_done: def stdout_callback(data: bytes) -> None:
try:
is_stdout, exception, data = self._from_process_queue.get(True, 1.0)
except Empty:
if self._process.poll() is not None:
raise Exception(f'piper process died with error code: {self._process.poll()} and stderr: {"".join(errors_from_piper)}')
if monotonic() - last_output_at > timeout:
raise TimeoutError(f'piper process produced no output for {timeout} seconds. stderr: {"".join(errors_from_piper)}')
else:
if exception is not None:
raise exception
if is_stdout:
buf.append(data) buf.append(data)
else:
def stderr_callback(data: bytes) -> bool:
nonlocal stderr_data
stderr_data = detect_end_of_data(stderr_data + data, callback) stderr_data = detect_end_of_data(stderr_data + data, callback)
return not piper_done
try:
unix_pipe_readers(self._process.stdout, self._process.stderr, stdout_callback, stderr_callback)
except Exception as e:
raise Exception(f'Reading output from piper process failed with error: {e} and STDERR: ' + '\n'.join(errors_from_piper))
raw_data = b''.join(buf) raw_data = b''.join(buf)
if needs_conversion: if needs_conversion:
raw_data = resample_raw_audio_16bit(raw_data, self._current_audio_rate, sample_rate) raw_data = resample_raw_audio_16bit(raw_data, self._current_audio_rate, sample_rate)
@ -643,17 +637,12 @@ class PiperEmbedded:
def shutdown(self): def shutdown(self):
if self._process is not None: if self._process is not None:
self._process_shutdown_event.set()
self._to_process_queue.put(None)
self._process_shutdown_event = None
self._process.stdin.close() self._process.stdin.close()
self._process.kill()
self._process.wait()
self._process.stdout.close() self._process.stdout.close()
self._process.stderr.close() self._process.stderr.close()
self._process.kill()
self._process.wait()
self._process = None self._process = None
self._stdout_reader.join()
self._stderr_reader.join()
__del__ = shutdown __del__ = shutdown
def ensure_process_started(self): def ensure_process_started(self):
@ -662,25 +651,26 @@ class PiperEmbedded:
model_path, config_path = download_voice(self._current_voice, headless=True) model_path, config_path = download_voice(self._current_voice, headless=True)
self._current_audio_rate, cmdline = piper_process_metadata(model_path, config_path, self._embedded_settings, self._current_voice) self._current_audio_rate, cmdline = piper_process_metadata(model_path, config_path, self._embedded_settings, self._current_voice)
import subprocess import subprocess
from threading import Thread
self._process_shutdown_event = Event()
self._from_process_queue = Queue()
self._process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) self._process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
self._stdout_reader = Thread(target=self.reader, args=(self._process_shutdown_event, self._process.stdout, True), daemon=True)
self._stderr_reader = Thread(target=self.reader, args=(self._process_shutdown_event, self._process.stderr, False), daemon=True)
self._stdout_reader.start()
self._stderr_reader.start()
def reader(self, shutdown_event: Event, pipe: BinaryIO, is_stdout: bool = False) -> None:
while not shutdown_event.is_set(): def unix_pipe_readers(stdout: BinaryIO, stderr: BinaryIO, stdout_callback, stderr_callback, timeout: float = 10.) -> None:
try: from select import select
data = pipe.read() out, err = stdout.fileno(), stderr.fileno()
except Exception as e: os.set_blocking(out, False)
if not shutdown_event.is_set(): os.set_blocking(err, False)
self._from_process_queue.put((is_stdout, e, b'')) readers = out, err
while True:
r, _, _ = select(readers, (), (), timeout)
if not r:
raise TimeoutError('Timed out waiting for output from piper process')
if out in r:
data = stdout.read(io.DEFAULT_BUFFER_SIZE)
stdout_callback(data)
if err in r:
data = stderr.read(io.DEFAULT_BUFFER_SIZE)
if not stderr_callback(data):
break break
else:
self._from_process_queue.put((is_stdout, None, data))
def duration_of_raw_audio_data(data: bytes, sample_rate: int = HIGH_QUALITY_SAMPLE_RATE, bytes_per_sample: int = 2, num_channels: int = 1) -> float: def duration_of_raw_audio_data(data: bytes, sample_rate: int = HIGH_QUALITY_SAMPLE_RATE, bytes_per_sample: int = 2, num_channels: int = 1) -> float:
@ -691,7 +681,6 @@ def duration_of_raw_audio_data(data: bytes, sample_rate: int = HIGH_QUALITY_SAMP
def develop_embedded(): def develop_embedded():
import subprocess import subprocess
from io import BytesIO
from calibre.utils.speedups import ReadOnlyFileBuffer from calibre.utils.speedups import ReadOnlyFileBuffer
from calibre_extensions.ffmpeg import transcode_single_audio_stream, wav_header_for_pcm_data from calibre_extensions.ffmpeg import transcode_single_audio_stream, wav_header_for_pcm_data
@ -706,7 +695,7 @@ def develop_embedded():
sz += len(data) sz += len(data)
all_data[0] = wav_header_for_pcm_data(sz, HIGH_QUALITY_SAMPLE_RATE) all_data[0] = wav_header_for_pcm_data(sz, HIGH_QUALITY_SAMPLE_RATE)
wav = ReadOnlyFileBuffer(b''.join(all_data), name='tts.wav') wav = ReadOnlyFileBuffer(b''.join(all_data), name='tts.wav')
mp4 = BytesIO() mp4 = io.BytesIO()
mp4.name = 'tts.mp4' mp4.name = 'tts.mp4'
transcode_single_audio_stream(wav, mp4) transcode_single_audio_stream(wav, mp4)
subprocess.run(['mpv', '-'], input=mp4.getvalue()) subprocess.run(['mpv', '-'], input=mp4.getvalue())
@ -717,7 +706,6 @@ def develop(): # {{{
from qt.core import QSocketNotifier from qt.core import QSocketNotifier
from calibre.constants import iswindows
from calibre.gui2 import Application from calibre.gui2 import Application
app = Application([]) app = Application([])
p = Piper() p = Piper()