Move piper backend to use new code

This commit is contained in:
Kovid Goyal 2025-07-29 13:14:40 +05:30
parent 422d1fe8f9
commit 661499e391
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
4 changed files with 105 additions and 322 deletions

View File

@ -2,44 +2,26 @@
# License: GPLv3 Copyright: 2024, Kovid Goyal <kovid at kovidgoyal.net>
import atexit
import io
import json
import os
import re
import sys
from collections import deque
from collections.abc import Iterable, Iterator
from contextlib import suppress
from dataclasses import dataclass
from functools import lru_cache
from itertools import count
from time import monotonic
from typing import BinaryIO
from qt.core import (
QAudio,
QAudioFormat,
QAudioSink,
QByteArray,
QIODevice,
QIODeviceBase,
QMediaDevices,
QObject,
QProcess,
Qt,
QTextToSpeech,
QWidget,
pyqtSignal,
sip,
)
from qt.core import QAudio, QAudioFormat, QAudioSink, QByteArray, QIODevice, QIODeviceBase, QMediaDevices, QObject, Qt, QTextToSpeech, QWidget, pyqtSignal, sip
from calibre.constants import cache_dir, is_debugging, iswindows, piper_cmdline
from calibre.constants import cache_dir, is_debugging, iswindows
from calibre.gui2 import error_dialog
from calibre.gui2.tts.types import TTS_EMBEDED_CONFIG, EngineSpecificSettings, Quality, TTSBackend, Voice, widget_parent
from calibre.spell.break_iterator import PARAGRAPH_SEPARATOR, split_into_sentences_for_tts
from calibre.utils.filenames import ascii_text
from calibre.utils.localization import canonicalize_lang, get_lang
from calibre.utils.resources import get_path as P
from calibre.utils.tts.piper import SynthesisResult, global_piper_instance, global_piper_instance_if_exists, play_pcm_data
HIGH_QUALITY_SAMPLE_RATE = 22050
@ -60,20 +42,14 @@ def audio_format(audio_rate: int = HIGH_QUALITY_SAMPLE_RATE) -> QAudioFormat:
return fmt
def piper_process_metadata(model_path, config_path, s: EngineSpecificSettings, voice: Voice) -> tuple[int, list[str]]:
def piper_process_metadata(callback, model_path, config_path, s: EngineSpecificSettings, voice: Voice) -> int:
if not model_path:
raise Exception('Could not download voice data')
if 'metadata' not in voice.engine_data:
with open(config_path) as f:
voice.engine_data['metadata'] = json.load(f)
audio_rate = voice.engine_data['metadata']['audio']['sample_rate']
length_scale = max(0.1, 1 + -1 * s.rate) # maps -1 to 1 to 2 to 0.1
cmdline = list(piper_cmdline()) + [
'--model', model_path, '--config', config_path, '--output-raw', '--json-input',
'--sentence-silence', str(s.sentence_delay), '--length_scale', str(length_scale)]
if is_debugging():
cmdline.append('--debug')
return audio_rate, cmdline
return global_piper_instance().set_voice(
callback, config_path, model_path, length_scale_multiplier=s.rate, sentence_delay=s.sentence_delay)
def piper_cache_dir() -> str:
@ -149,8 +125,7 @@ class Utterance:
id: int
start: int
length: int
payload_size: int
left_to_write: QByteArray
sentence: str
audio_data: QByteArray
started: bool = False
@ -241,47 +216,19 @@ class UtteranceAudioQueue(QIODevice):
def split_into_utterances(text: str, counter: count, lang: str = 'en'):
for start, sentence in split_into_sentences_for_tts(text, lang):
payload = json.dumps({'text': sentence}).encode('utf-8')
ba = QByteArray()
ba.reserve(len(payload) + 1)
ba.append(payload)
ba.append(UTTERANCE_SEPARATOR)
u = Utterance(id=next(counter), payload_size=len(ba), audio_data=QByteArray(),
left_to_write=ba, start=start, length=len(sentence))
u = Utterance(id=next(counter), audio_data=QByteArray(), sentence=sentence, start=start, length=len(sentence))
debug(f'Utterance created {u.id} {start=}: {sentence!r}')
yield u
@lru_cache(2)
def stderr_pat():
return re.compile(rb'\[piper\] \[([a-zA-Z0-9_]+?)\] (.+)')
def detect_end_of_data(data: bytes, callback):
lines = data.split(b'\n')
for line in lines[:-1]:
if m := stderr_pat().search(line):
which, payload = m.group(1), m.group(2)
if which == b'info':
debug(f'[piper-info] {payload.decode("utf-8", "replace")}')
if payload.startswith(b'Real-time factor:'):
callback(True, None)
elif which == b'error':
callback(False, payload.decode('utf-8', 'replace'))
elif which == b'debug':
debug(f'[piper-debug] {payload.decode("utf-8", "replace")}')
return lines[-1]
class Piper(TTSBackend):
engine_name: str = 'piper'
filler_char: str = PARAGRAPH_SEPARATOR
_synthesis_done = pyqtSignal()
_synthesis_done = pyqtSignal(object, object, object)
def __init__(self, engine_name: str = '', parent: QObject | None = None):
super().__init__(parent)
self._process: QProcess | None = None
self._audio_sink: QAudioSink | None = None
self._current_voice: Voice | None = None
@ -296,7 +243,7 @@ class Piper(TTSBackend):
self._errors_from_piper: list[str] = []
self._pending_stderr_data = b''
self._synthesis_done.connect(self._utterance_synthesized, type=Qt.ConnectionType.QueuedConnection)
self._synthesis_done.connect(self._on_synthesis_done, type=Qt.ConnectionType.QueuedConnection)
atexit.register(self.shutdown)
@property
@ -304,27 +251,16 @@ class Piper(TTSBackend):
self._load_voice_metadata()
return {'': self._voices}
def _wait_for_process_to_start(self) -> bool:
if not self.process.waitForStarted():
cmdline = [self.process.program()] + self.process.arguments()
if self.process.error() is QProcess.ProcessError.TimedOut:
self._set_error(f'Timed out waiting for piper process {cmdline} to start')
else:
self._set_error(f'Failed to start piper process: {cmdline}')
return False
return True
def say(self, text: str) -> None:
if self._last_error:
return
self.stop()
if not self._wait_for_process_to_start():
return
self.ensure_started()
lang = 'en'
if self._current_voice and self._current_voice.language_code:
lang = self._current_voice.language_code
self._utterances_being_synthesized.extend(split_into_utterances(text, self._utterance_counter, lang))
self._write_current_utterance()
self._queue_current_utterance()
def pause(self) -> None:
if self._audio_sink is not None:
@ -335,29 +271,27 @@ class Piper(TTSBackend):
self._audio_sink.resume()
def stop(self) -> None:
if self._process is not None:
if self._audio_sink is not None:
if self._state is not QTextToSpeech.State.Ready or self._utterances_being_synthesized or self._utterances_being_spoken:
self.shutdown()
# We cannot re-create self.process here as that will cause the
# We cannot call ensure_started() here as that will cause the
# audio device to go to active state which will cause a
# speaking event to be generated
def shutdown(self) -> None:
if self._process is not None:
if self._audio_sink is not None:
gp = global_piper_instance_if_exists()
if gp is not None:
gp.cancel()
self._audio_sink.stateChanged.disconnect()
self._process.readyReadStandardError.disconnect()
self._process.bytesWritten.disconnect()
self._process.readyReadStandardOutput.disconnect()
self._process.stateChanged.disconnect()
self._process.kill()
self._process.waitForFinished(-1)
# this dance is needed otherwise stop() is very slow on Linux
self._audio_sink.suspend()
self._audio_sink.reset()
self._audio_sink.stop()
sip.delete(self._audio_sink)
sip.delete(self._process)
self._process = self._audio_sink = None
self._audio_sink = None
self._utterances_being_synthesized.clear()
self._utterances_being_spoken.clear()
self._set_state(QTextToSpeech.State.Ready)
def reload_after_configure(self) -> None:
@ -379,9 +313,8 @@ class Piper(TTSBackend):
self._last_error = msg
self._set_state(QTextToSpeech.State.Error)
@property
def process(self) -> QProcess:
if self._process is None:
def ensure_started(self) -> None:
if self._audio_sink is None:
model_path = config_path = ''
try:
self._load_voice_metadata()
@ -394,18 +327,9 @@ class Piper(TTSBackend):
self._utterances_being_spoken.clear()
self._utterances_being_synthesized.clear()
self._errors_from_piper.clear()
self._process = QProcess(self)
self._pending_stderr_data = b''
self._set_state(QTextToSpeech.State.Ready)
audio_rate, cmdline = piper_process_metadata(model_path, config_path, s, voice)
self._process.setProgram(cmdline[0])
self._process.setArguments(cmdline[1:])
debug('Running piper:', cmdline)
self._process.readyReadStandardError.connect(self.piper_stderr_available)
self._process.readyReadStandardOutput.connect(self.piper_stdout_available)
self._process.bytesWritten.connect(self.bytes_written)
self._process.stateChanged.connect(self._update_status)
audio_rate = piper_process_metadata(self.on_synthesis_done, model_path, config_path, s, voice)
fmt = audio_format(audio_rate)
dev = None
if s.audio_device_id:
@ -420,47 +344,37 @@ class Piper(TTSBackend):
if s.volume is not None:
self._audio_sink.setVolume(s.volume)
self._audio_sink.stateChanged.connect(self._utterances_being_spoken.audio_state_changed)
self._process.start()
self._audio_sink.start(self._utterances_being_spoken)
return self._process
def piper_stdout_available(self) -> None:
if self._utterances_being_synthesized:
def on_synthesis_done(self, sr, err, tb):
self._synthesis_done.emit(sr, err, tb)
def _on_synthesis_done(self, sr: SynthesisResult, err: Exception, tb: str):
if self._audio_sink is None:
return
if err is not None:
self._errors_from_piper.append(str(err))
self._errors_from_piper.append(tb)
else:
u = self._utterances_being_synthesized[0]
while True:
ba = self.process.readAll()
if not len(ba):
break
debug('Synthesized data read:', len(ba), 'bytes')
u.audio_data.append(ba)
def piper_stderr_available(self) -> None:
if self._process is not None:
def callback(ok, payload):
if ok:
if self._utterances_being_synthesized:
self._synthesis_done.emit()
if u.id == sr.utterance_id:
u.audio_data.append(sr.audio_data)
if sr.is_last:
debug(f'Utterance {u.id} got {len(sr.audio_data)} bytes of audio data from piper')
self._utterances_being_synthesized.popleft()
u.synthesized = True
if len(u.audio_data):
self._utterances_being_spoken.add_utterance(u)
else:
self._errors_from_piper.append(payload.decode('utf-8', 'replace'))
data = self._pending_stderr_data + bytes(self._process.readAllStandardError())
self._pending_stderr_data = detect_end_of_data(data, callback)
def _utterance_synthesized(self):
self.piper_stdout_available() # just in case
u = self._utterances_being_synthesized.popleft()
u.synthesized = True
debug(f'Utterance {u.id} got {len(u.audio_data)} bytes of audio data from piper')
if len(u.audio_data):
self._utterances_being_spoken.add_utterance(u)
self._write_current_utterance()
debug(f'Synthesized data read for utterance {u.id}: {len(sr.audio_data)} bytes')
self._queue_current_utterance()
self._update_status()
def _update_status(self):
if self._process is not None and self._process.state() is QProcess.ProcessState.NotRunning:
if self._process.exitStatus() is not QProcess.ExitStatus.NormalExit or self._process.exitCode():
m = '\n'.join(self._errors_from_piper)
self._set_error(f'piper process failed with exit code: {self._process.exitCode()} and error messages: {m}')
return
if self._errors_from_piper:
m = '\n'.join(self._errors_from_piper)
self._set_error(f'piper failed with error: {m}')
return
if self._state is QTextToSpeech.State.Error:
return
state = self._utterances_being_spoken.audio_state
@ -478,21 +392,12 @@ class Piper(TTSBackend):
if not self._utterances_being_synthesized and not self._utterances_being_spoken:
self._set_state(QTextToSpeech.State.Ready)
def bytes_written(self, count: int) -> None:
self._write_current_utterance()
def _write_current_utterance(self) -> None:
def _queue_current_utterance(self) -> None:
if self._utterances_being_synthesized:
u = self._utterances_being_synthesized[0]
while len(u.left_to_write):
written = self.process.write(u.left_to_write)
if written < 0:
self._set_error(f'Failed to write to piper process with error: {self.process.errorString()}')
break
if not u.started and written:
u.started = True
debug(f'Utterance {u.id} synthesis started')
u.left_to_write = u.left_to_write.last(len(u.left_to_write) - written)
global_piper_instance().synthesize(u.id, u.sentence)
u.started = True
debug(f'Utterance {u.id} synthesis queued')
def audio_sink_state_changed(self, state: QAudio.State) -> None:
self._update_status()
@ -564,7 +469,7 @@ class PiperEmbedded:
lang = get_lang()
lang = canonicalize_lang(lang) or lang
self._default_voice = self._voice_for_lang.get(lang) or self._voice_for_lang['eng']
self._current_voice = self._process = self._pipe_reader = None
self._current_voice = None
self._current_audio_rate = 0
def resolve_voice(self, lang: str, voice_name: str) -> Voice:
@ -586,42 +491,27 @@ class PiperEmbedded:
if voice is not self._current_voice:
self._current_voice = voice
self.shutdown()
self.ensure_process_started()
piper_done, errors_from_piper = [], []
self.ensure_started()
needs_conversion = sample_rate != self._current_audio_rate
if needs_conversion:
from calibre_extensions.ffmpeg import resample_raw_audio_16bit
def callback(ok, payload):
if ok:
piper_done.append(True)
else:
errors_from_piper.append(payload.decode('utf-8', 'replace'))
for text in texts:
text = text.strip()
if not text:
yield b'', 0.
continue
payload = json.dumps({'text': text}).encode('utf-8')
self._process.stdin.write(payload)
self._process.stdin.write(UTTERANCE_SEPARATOR)
self._process.stdin.flush()
stderr_data = b''
buf = io.BytesIO()
piper_done, errors_from_piper = [], []
all_data = []
global_piper_instance().synthesize(1, text)
while True:
sr, exc, tb = self._queue.get()
if exc is not None:
raise Exception(f'failed to synthesize text to audio with error: {exc} and traceback: {tb}')
all_data.append(sr.audio_data)
if sr.is_last:
break
def stderr_callback(data: bytes) -> bool:
nonlocal stderr_data
stderr_data = detect_end_of_data(stderr_data + data, callback)
return not piper_done
try:
self._pipe_reader(buf.write, stderr_callback)
except Exception as e:
raise Exception(f'Reading output from piper process failed with error: {e} and STDERR: ' + '\n'.join(errors_from_piper))
raw_data = buf.getvalue()
raw_data = b''.join(all_data)
if needs_conversion:
raw_data = resample_raw_audio_16bit(raw_data, self._current_audio_rate, sample_rate)
yield raw_data, duration_of_raw_audio_data(raw_data, sample_rate)
@ -635,126 +525,23 @@ class PiperEmbedded:
return True
def shutdown(self):
if self._process is not None:
self._pipe_reader.close()
self._pipe_reader = None
self._process.stdin.close()
self._process.stdout.close()
self._process.stderr.close()
self._process.kill()
self._process.wait()
self._process = None
if self._current_audio_rate != 0:
gp = global_piper_instance_if_exists()
if gp is not None:
gp.cancel()
self._current_audio_rate = 0
__del__ = shutdown
def ensure_process_started(self):
if self._process is not None:
return
model_path, config_path = download_voice(self._current_voice, headless=True)
self._current_audio_rate, cmdline = piper_process_metadata(model_path, config_path, self._embedded_settings, self._current_voice)
import subprocess
self._process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
self._pipe_reader = (ThreadedPipeReader if iswindows else PipeReader)(self._process.stdout, self._process.stderr)
def on_synthesis_done(self, sr: SynthesisResult, exc: Exception, tb: str) -> None:
self._queue.put((sr, exc, tb))
class PipeReader:
TIMEOUT = 30. # seconds
def __init__(self, stdout: BinaryIO, stderr: BinaryIO):
self.stdout_fd = stdout.fileno()
self.stderr_fd = stderr.fileno()
os.set_blocking(self.stdout_fd, False)
os.set_blocking(self.stderr_fd, False)
def close(self):
self.stderr_fd = self.stdout_fd = -1
def __call__(self, stdout_callback, stderr_callback):
from select import select
out, err = self.stdout_fd, self.stderr_fd
readers = out, err
buf = memoryview(bytearray(io.DEFAULT_BUFFER_SIZE))
def readall(fd: int) -> bytes:
output = io.BytesIO()
while True:
try:
num = os.readv(fd, (buf,))
except BlockingIOError:
break
else:
output.write(buf[:num])
return output.getvalue()
def readinto(fd: int, callback) -> None:
while True:
try:
num = os.readv(fd, (buf,))
except BlockingIOError:
break
else:
callback(buf[:num])
while True:
r, _, _ = select(readers, (), (), self.TIMEOUT)
if not r:
raise TimeoutError('Timed out waiting for output from piper process')
if out in r:
readinto(out, stdout_callback)
if err in r:
data = readall(err)
if not stderr_callback(data):
# In case there is new data written to stdout
readinto(out, stdout_callback)
break
class ThreadedPipeReader(PipeReader):
def __init__(self, stdout: BinaryIO, stderr: BinaryIO):
from queue import Queue
from threading import Event, Thread
self.shutting_down = Event()
self.queue = Queue()
Thread(target=self._reader, args=(stdout.fileno(), True), daemon=True).start()
Thread(target=self._reader, args=(stderr.fileno(), False), daemon=True).start()
def close(self):
self.shutting_down.set()
def __call__(self, stdout_callback, stderr_callback):
from queue import Empty
while True:
data, is_stdout, err = self.queue.get(True, self.TIMEOUT)
if err is not None:
raise err
if data:
if is_stdout:
stdout_callback(data)
else:
if not stderr_callback(data):
# in case more data was written to stdout
while True:
try:
data, is_stdout, err = self.queue.get_nowait()
except Empty:
break
if err is not None:
raise err
if is_stdout:
stdout_callback(data)
break
def _reader(self, pipe_fd: int, is_stdout: bool):
while not self.shutting_down.is_set():
try:
data = os.read(pipe_fd, io.DEFAULT_BUFFER_SIZE)
except OSError as e:
if not self.shutting_down.is_set():
self.queue.put((b'', is_stdout, e))
break
else:
self.queue.put((data, is_stdout, None))
def ensure_started(self):
if self._current_audio_rate == 0:
from queue import Queue
model_path, config_path = download_voice(self._current_voice, headless=True)
self._queue = Queue()
self._current_audio_rate = piper_process_metadata(
self.on_synthesis_done, model_path, config_path, self._embedded_settings, self._current_voice)
def duration_of_raw_audio_data(data: bytes, sample_rate: int = HIGH_QUALITY_SAMPLE_RATE, bytes_per_sample: int = 2, num_channels: int = 1) -> float:
@ -765,25 +552,14 @@ def duration_of_raw_audio_data(data: bytes, sample_rate: int = HIGH_QUALITY_SAMP
# develop {{{
def develop_embedded():
import subprocess
from calibre.utils.speedups import ReadOnlyFileBuffer
from calibre_extensions.ffmpeg import transcode_single_audio_stream, wav_header_for_pcm_data
p = PiperEmbedded()
all_data = [b'']
sz = 0
all_data = []
for data, duration in p.text_to_raw_audio_data((
'Hello, good day to you.', 'This is the second sentence.', 'This is the final sentence.'
)):
print(f'{duration=} {len(data)=}')
all_data.append(data)
sz += len(data)
all_data[0] = wav_header_for_pcm_data(sz, HIGH_QUALITY_SAMPLE_RATE)
wav = ReadOnlyFileBuffer(b''.join(all_data), name='tts.wav')
m4a = io.BytesIO()
m4a.name = 'tts.m4a'
transcode_single_audio_stream(wav, m4a)
subprocess.run(['mpv', '-'], input=m4a.getvalue())
play_pcm_data(b''.join(all_data), HIGH_QUALITY_SAMPLE_RATE)
def develop():

View File

@ -9,7 +9,7 @@ from typing import Literal, NamedTuple
from qt.core import QApplication, QLocale, QObject, QTextToSpeech, QVoice, QWidget, pyqtSignal
from calibre.constants import islinux, ismacos, iswindows, piper_cmdline
from calibre.constants import islinux, ismacos, iswindows
from calibre.utils.config import JSONConfig
from calibre.utils.config_base import tweaks
from calibre.utils.localization import canonicalize_lang
@ -234,12 +234,11 @@ def available_engines() -> dict[str, EngineMetadata]:
), True)
elif x == 'speechd':
continue
if piper_cmdline():
ans['piper'] = EngineMetadata('piper', _('The Piper Neural Engine'), _(
'The "piper" engine can track the currently spoken sentence on screen. It uses a neural network '
'for natural sounding voices. The neural network is run locally on your computer, it is fairly resource intensive to run.'
), TrackingCapability.Sentence, can_change_pitch=False, voices_have_quality_metadata=True, has_managed_voices=True,
has_sentence_delay=True)
ans['piper'] = EngineMetadata('piper', _('The Piper Neural Engine'), _(
'The "piper" engine can track the currently spoken sentence on screen. It uses a neural network '
'for natural sounding voices. The neural network is run locally on your computer, it is fairly resource intensive to run.'
), TrackingCapability.Sentence, can_change_pitch=False, voices_have_quality_metadata=True, has_managed_voices=True,
has_sentence_delay=True)
if islinux:
try:
from speechd.paths import SPD_SPAWN_CMD

View File

@ -123,14 +123,9 @@ class BuildTest(unittest.TestCase):
from speechd.client import SSIPClient
del SSIPClient
@unittest.skipIf('SKIP_SPEECH_TESTS' in os.environ, 'Speech support is opted out')
def test_piper(self):
import subprocess
from calibre.constants import piper_cmdline
self.assertTrue(piper_cmdline())
raw = subprocess.check_output(piper_cmdline() + ('-h',), stderr=subprocess.STDOUT).decode()
self.assertIn('--sentence_silence', raw)
from calibre.utils.tts.piper import simple_test
simple_test()
def test_zeroconf(self):
import ifaddr

View File

@ -83,6 +83,13 @@ class SynthesisResult(NamedTuple):
is_last: bool
def simple_test():
piper.initialize(espeak_data_dir())
piper.set_espeak_voice_by_name('en-us')
if not piper.phonemize('simple test'):
raise ValueError('No phonemes returned by phonemize()')
class Piper(Thread):
def __init__(self):
@ -172,6 +179,10 @@ def global_piper_instance() -> Piper:
return _global_piper_instance
def global_piper_instance_if_exists() -> Piper | None:
return _global_piper_instance
def play_wav_data(wav_data: bytes):
from qt.core import QAudioOutput, QBuffer, QByteArray, QCoreApplication, QIODevice, QMediaPlayer, QUrl
app = QCoreApplication([])
@ -190,6 +201,11 @@ def play_wav_data(wav_data: bytes):
app.exec()
def play_pcm_data(pcm_data, sample_rate):
from calibre_extensions.ffmpeg import wav_header_for_pcm_data
play_wav_data(wav_header_for_pcm_data(len(pcm_data), sample_rate) + pcm_data)
def develop():
from calibre.gui2.tts.piper import piper_cache_dir
p = global_piper_instance()
@ -210,10 +226,7 @@ def develop():
print(f'Got {len(sr.audio_data)} bytes of audio data', flush=True)
if sr.is_last:
break
from calibre_extensions.ffmpeg import wav_header_for_pcm_data
pcm_data = b''.join(all_data)
wav_data = wav_header_for_pcm_data(len(pcm_data), sample_rate) + pcm_data
play_wav_data(wav_data)
play_pcm_data(b''.join(all_data), sample_rate)
if __name__ == '__main__':