Wire up auto-download of piper voice

This commit is contained in:
Kovid Goyal 2024-09-02 21:14:36 +05:30
parent f6af198d4a
commit ac923e630a
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
4 changed files with 100 additions and 37 deletions

View File

@ -32,13 +32,13 @@ class PiperVoices(ReVendor):
src = self.download_securely(url).decode('utf-8') src = self.download_securely(url).decode('utf-8')
lang_map = {} lang_map = {}
current_lang = current_voice = '' current_lang = current_voice = ''
lang_pat = re.compile(r'`(.+?)`') lang_pat = re.compile(r'\((.+?)\)')
model_pat = re.compile(r'\[model\]\((.+?)\)') model_pat = re.compile(r'\[model\]\((.+?)\)')
config_pat = re.compile(r'\[config\]\((.+?)\)') config_pat = re.compile(r'\[config\]\((.+?)\)')
for line in src.splitlines(): for line in src.splitlines():
if line.startswith('* '): if line.startswith('* '):
if m := lang_pat.search(line): if m := lang_pat.search(line):
current_lang = m.group(1) current_lang = m.group(1).partition(',')[0].replace('`', '')
lang_map[current_lang] = {} lang_map[current_lang] = {}
current_voice = '' current_voice = ''
else: else:
@ -62,6 +62,8 @@ class PiperVoices(ReVendor):
lang_map[current_lang][current_voice] = {} lang_map[current_lang][current_voice] = {}
if not lang_map: if not lang_map:
raise SystemExit(f'Failed to read any piper voices from: {url}') raise SystemExit(f'Failed to read any piper voices from: {url}')
if 'en_US' not in lang_map:
raise SystemExit(f'Failed to read en_US piper voices from: {url}')
with open(self.output_file_path, 'w') as f: with open(self.output_file_path, 'w') as f:
json.dump({'version': 1, 'lang_map': lang_map}, f, indent=2, sort_keys=False) json.dump({'version': 1, 'lang_map': lang_map}, f, indent=2, sort_keys=False)

View File

@ -6,7 +6,7 @@ from collections import deque
from contextlib import contextmanager from contextlib import contextmanager
from typing import TYPE_CHECKING, NamedTuple from typing import TYPE_CHECKING, NamedTuple
from qt.core import QApplication, QDialog, QObject, QTextToSpeech, QWidget, pyqtSignal from qt.core import QApplication, QDialog, QObject, QTextToSpeech, pyqtSignal
from calibre.gui2 import error_dialog from calibre.gui2 import error_dialog
from calibre.gui2.widgets import BusyCursor from calibre.gui2.widgets import BusyCursor
@ -125,7 +125,10 @@ class TTSManager(QObject):
if self._tts is None: if self._tts is None:
with BusyCursor(): with BusyCursor():
from calibre.gui2.tts2.types import create_tts_backend from calibre.gui2.tts2.types import create_tts_backend
self._tts = create_tts_backend() try:
self._tts = create_tts_backend()
except AttributeError as e:
raise Exception(str(e)) from e
self._tts.state_changed.connect(self._state_changed) self._tts.state_changed.connect(self._state_changed)
self._tts.saying.connect(self._saying) self._tts.saying.connect(self._saying)
return self._tts return self._tts
@ -185,11 +188,9 @@ class TTSManager(QObject):
def configure(self) -> None: def configure(self) -> None:
from calibre.gui2.tts2.config import ConfigDialog from calibre.gui2.tts2.config import ConfigDialog
p = self from calibre.gui2.tts2.types import widget_parent
while p is not None and not isinstance(p, QWidget):
p = p.parent()
with self.resume_after() as rd: with self.resume_after() as rd:
d = ConfigDialog(parent=p) d = ConfigDialog(parent=widget_parent(self))
if d.exec() == QDialog.DialogCode.Accepted and self._tts is not None: if d.exec() == QDialog.DialogCode.Accepted and self._tts is not None:
rd.needs_full_resume = True rd.needs_full_resume = True
if d.engine_changed: if d.engine_changed:

View File

@ -11,12 +11,12 @@ from dataclasses import dataclass
from itertools import count from itertools import count
from time import monotonic from time import monotonic
from qt.core import QAudio, QAudioFormat, QAudioSink, QByteArray, QIODevice, QIODeviceBase, QObject, QProcess, Qt, QTextToSpeech, pyqtSignal, sip from qt.core import QAudio, QAudioFormat, QAudioSink, QByteArray, QDialog, QIODevice, QIODeviceBase, QObject, QProcess, Qt, QTextToSpeech, pyqtSignal, sip
from calibre.constants import is_debugging from calibre.constants import cache_dir, is_debugging
from calibre.gui2.tts2.types import Quality, TTSBackend, Voice, piper_cmdline from calibre.gui2.tts2.types import EngineSpecificSettings, Quality, TTSBackend, Voice, piper_cmdline, widget_parent
from calibre.spell.break_iterator import sentence_positions, split_into_words_and_positions from calibre.spell.break_iterator import sentence_positions, split_into_words_and_positions
from calibre.utils.localization import canonicalize_lang from calibre.utils.localization import canonicalize_lang, get_lang
from calibre.utils.resources import get_path as P from calibre.utils.resources import get_path as P
@ -178,7 +178,7 @@ class Piper(TTSBackend):
self._utterances_being_spoken.saying.connect(self.saying) self._utterances_being_spoken.saying.connect(self.saying)
self._utterances_being_spoken.update_status.connect(self._update_status, type=Qt.ConnectionType.QueuedConnection) self._utterances_being_spoken.update_status.connect(self._update_status, type=Qt.ConnectionType.QueuedConnection)
self._state = QTextToSpeech.State.Ready self._state = QTextToSpeech.State.Ready
self._voices = None self._voices = self._voice_for_lang = None
self._last_error = '' self._last_error = ''
self._errors_from_piper: list[str] = [] self._errors_from_piper: list[str] = []
self._pending_stderr_data = b'' self._pending_stderr_data = b''
@ -189,25 +189,7 @@ class Piper(TTSBackend):
@property @property
def available_voices(self) -> dict[str, tuple[Voice, ...]]: def available_voices(self) -> dict[str, tuple[Voice, ...]]:
if self._voices is None: self._load_voice_metadata()
d = json.loads(P('piper-voices.json', data=True))
ans = []
for bcp_code, voice_map in d['lang_map'].items():
lang, sep, country = bcp_code.partition('_')
lang = canonicalize_lang(lang) or lang
for voice_name, qual_map in voice_map.items():
best_qual = voice = None
for qual, e in qual_map.items():
q = Quality.from_piper_quality(qual)
if best_qual is None or q.value < best_qual.value:
best_qual = q
voice = Voice(voice_name, lang, country, quality=q, engine_data={
'model_url': e['model'], 'config_url': e['config'],
'model_filename': f'{bcp_code}-{voice_name}-{qual}.onnx',
})
if voice:
ans.append(voice)
self._voices = tuple(ans)
return {'': self._voices} return {'': self._voices}
def say(self, text: str) -> None: def say(self, text: str) -> None:
@ -275,6 +257,20 @@ class Piper(TTSBackend):
@property @property
def process(self) -> QProcess: def process(self) -> QProcess:
if self._process is None: if self._process is None:
model_path = config_path = ''
try:
self._load_voice_metadata()
s = EngineSpecificSettings.create_from_config(self.engine_name)
rate = max(0.1, 1 + s.rate) # maps -1 to 1 to 0.1 to 2
voice = self._voice_name_map.get(s.voice_name) or self._default_voice
model_path, config_path = self._ensure_voice_is_downloaded(voice)
except AttributeError as e:
raise Exception(str(e)) from e
if not model_path:
raise Exception('Could not download voice data')
with open(config_path) as f:
voice_metadata = json.load(f)
audio_rate = voice_metadata['audio']['sample_rate']
self._utterances_being_spoken.clear() self._utterances_being_spoken.clear()
self._utterances_being_synthesized.clear() self._utterances_being_synthesized.clear()
self._errors_from_piper.clear() self._errors_from_piper.clear()
@ -282,10 +278,9 @@ class Piper(TTSBackend):
self._pending_stderr_data = b'' self._pending_stderr_data = b''
self._set_state(QTextToSpeech.State.Ready) self._set_state(QTextToSpeech.State.Ready)
model_path = '/t/en_US-libritts-high.onnx' # TODO: Dont hardcode voice
rate = 1.0 # TODO: Make rate configurable
cmdline = list(piper_cmdline()) + [ cmdline = list(piper_cmdline()) + [
'--model', model_path, '--output-raw', '--json-input', '--sentence-silence', '0', '--length_scale', str(rate)] '--model', model_path, '--config', config_path, '--output-raw', '--json-input',
'--sentence-silence', '0', '--length_scale', str(rate)]
if is_debugging(): if is_debugging():
cmdline.append('--debug') cmdline.append('--debug')
self._process.setProgram(cmdline[0]) self._process.setProgram(cmdline[0])
@ -296,7 +291,7 @@ class Piper(TTSBackend):
self._process.stateChanged.connect(self._update_status) self._process.stateChanged.connect(self._update_status)
fmt = QAudioFormat() fmt = QAudioFormat()
fmt.setSampleFormat(QAudioFormat.SampleFormat.Int16) fmt.setSampleFormat(QAudioFormat.SampleFormat.Int16)
fmt.setSampleRate(22050) # TODO: Read this from voice JSON fmt.setSampleRate(audio_rate)
fmt.setChannelConfig(QAudioFormat.ChannelConfig.ChannelConfigMono) fmt.setChannelConfig(QAudioFormat.ChannelConfig.ChannelConfigMono)
self._audio_sink = QAudioSink(fmt, self) # TODO: Make audio device configurable self._audio_sink = QAudioSink(fmt, self) # TODO: Make audio device configurable
self._audio_sink.stateChanged.connect(self._utterances_being_spoken.audio_state_changed) self._audio_sink.stateChanged.connect(self._utterances_being_spoken.audio_state_changed)
@ -383,6 +378,65 @@ class Piper(TTSBackend):
def audio_sink_state_changed(self, state: QAudio.State) -> None: def audio_sink_state_changed(self, state: QAudio.State) -> None:
self._update_status() self._update_status()
def _load_voice_metadata(self) -> None:
if self._voices is not None:
return
d = json.loads(P('piper-voices.json', data=True))
ans = []
lang_voices_map = {}
self._voice_name_map = {}
for bcp_code, voice_map in d['lang_map'].items():
lang, sep, country = bcp_code.partition('_')
lang = canonicalize_lang(lang) or lang
voices_for_lang = lang_voices_map.setdefault(lang, [])
for voice_name, qual_map in voice_map.items():
best_qual = voice = None
for qual, e in qual_map.items():
q = Quality.from_piper_quality(qual)
if best_qual is None or q.value < best_qual.value:
best_qual = q
voice = Voice(bcp_code + ':' + voice_name, lang, country, human_name=voice_name, quality=q, engine_data={
'model_url': e['model'], 'config_url': e['config'],
'model_filename': f'{bcp_code}-{voice_name}-{qual}.onnx',
})
if voice:
ans.append(voice)
self._voice_name_map[voice.name] = voice
voices_for_lang.append(voice)
self._voices = tuple(ans)
self._voice_for_lang = {}
for lang, voices in lang_voices_map.items():
voices.sort(key=lambda v: v.quality.value)
self._voice_for_lang[lang] = voices[0]
if lang == 'eng':
for v in voices:
if v.human_name == 'libritts':
self._voice_for_lang[lang] = v
break
@property
def _default_voice(self) -> Voice:
self._load_voice_metadata()
lang = get_lang()
lang = canonicalize_lang(lang) or lang
return self._voice_for_lang.get(lang) or self._voice_for_lang['eng']
def _ensure_voice_is_downloaded(self, voice: Voice) -> tuple[str, str]:
fname = voice.engine_data['model_filename']
model_path = os.path.join(cache_dir(), 'piper-voices', fname)
config_path = os.path.join(os.path.dirname(model_path), fname + '.json')
if os.path.exists(model_path) and os.path.exists(config_path):
return model_path, config_path
os.makedirs(os.path.dirname(model_path), exist_ok=True)
from calibre.gui2.tts2.download import DownloadResources
d = DownloadResources(_('Downloading voice data'), _('Downloading neural network for the {} voice').format(voice.human_name), {
voice.engine_data['model_url']: (model_path, _('Neural network data')),
voice.engine_data['config_url']: (config_path, _('Neural network metadata')),
}, parent=widget_parent(self))
if d.exec() == QDialog.DialogCode.Accepted:
return model_path, config_path
return '', ''
def develop(): # {{{ def develop(): # {{{
import tty import tty

View File

@ -7,7 +7,7 @@ from enum import Enum, auto
from functools import lru_cache from functools import lru_cache
from typing import Literal, NamedTuple from typing import Literal, NamedTuple
from qt.core import QApplication, QLocale, QObject, QTextToSpeech, QVoice, pyqtSignal from qt.core import QApplication, QLocale, QObject, QTextToSpeech, QVoice, QWidget, pyqtSignal
from calibre.constants import bundled_binaries_dir, islinux, ismacos, iswindows from calibre.constants import bundled_binaries_dir, islinux, ismacos, iswindows
from calibre.utils.config import JSONConfig from calibre.utils.config import JSONConfig
@ -207,6 +207,12 @@ def default_engine_name() -> str:
return 'flite' return 'flite'
def widget_parent(p: QObject) -> QWidget | None:
while p is not None and not isinstance(p, QWidget):
p = p.parent()
return p
class TTSBackend(QObject): class TTSBackend(QObject):
saying = pyqtSignal(int, int) # offset, length saying = pyqtSignal(int, int) # offset, length
state_changed = pyqtSignal(QTextToSpeech.State) state_changed = pyqtSignal(QTextToSpeech.State)