Remove the old TTS backends

This commit is contained in:
Kovid Goyal 2024-09-03 19:21:30 +05:30
parent 0786d2d1a9
commit 3146ffafca
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
21 changed files with 2 additions and 4511 deletions

View File

@ -176,23 +176,6 @@
"libraries": "shell32 wininet advapi32 gdi32 rstrtmgr", "libraries": "shell32 wininet advapi32 gdi32 rstrtmgr",
"cflags": "/X" "cflags": "/X"
}, },
{
"name": "winsapi",
"only": "windows",
"headers": "calibre/utils/cpp_binding.h calibre/utils/windows/common.h",
"sources": "calibre/utils/windows/winsapi.cpp",
"libraries": "SAPI Ole32",
"cflags": "/X"
},
{
"name": "winspeech",
"only": "windows",
"headers": "calibre/utils/cpp_binding.h calibre/utils/windows/common.h",
"sources": "calibre/utils/windows/winspeech.cpp",
"libraries": "WindowsApp",
"needs_c++": "20",
"cflags": "/X /Zc:__cplusplus /bigobj /permissive- /WX /Zc:twoPhase-"
},
{ {
"name": "wpd", "name": "wpd",
"only": "windows", "only": "windows",
@ -217,7 +200,7 @@
{ {
"name": "cocoa", "name": "cocoa",
"only": "macos", "only": "macos",
"sources": "calibre/utils/cocoa.m calibre/gui2/tts/nsss.m", "sources": "calibre/utils/cocoa.m",
"ldflags": "-framework Cocoa -framework UserNotifications" "ldflags": "-framework Cocoa -framework UserNotifications"
}, },
{ {

View File

@ -275,7 +275,7 @@ class ExtensionsImporter:
'uchardet', 'uchardet',
) )
if iswindows: if iswindows:
extra = ('winutil', 'wpd', 'winfonts', 'winsapi', 'winspeech') extra = ('winutil', 'wpd', 'winfonts',)
elif ismacos: elif ismacos:
extra = ('usbobserver', 'cocoa', 'libusb', 'libmtp') extra = ('usbobserver', 'cocoa', 'libusb', 'libmtp')
elif isfreebsd or ishaiku or islinux: elif isfreebsd or ishaiku or islinux:

View File

@ -1,42 +0,0 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
from enum import Enum, auto
class EventType(Enum):
mark = auto()
begin = auto()
end = auto()
cancel = auto()
pause = auto()
resume = auto()
class Event:
def __init__(self, etype, data=None):
self.type = etype
self.data = data
def __repr__(self):
return f'Event(type={self.type}, data={self.data})'
def add_markup(text_parts, mark_template, escape_marked_text, chunk_size=0):
buf = []
size = 0
for x in text_parts:
if isinstance(x, int):
item = mark_template.format(x)
else:
item = escape_marked_text(x)
sz = len(item)
if chunk_size and size + sz > chunk_size:
yield ''.join(buf).strip()
size = 0
buf = []
size += sz
buf.append(item)
if size:
yield ''.join(buf).strip()

View File

@ -1,186 +0,0 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
import re
from itertools import count
from qt.core import QDialogButtonBox, QLabel, QMainWindow, Qt, QTimer, QVBoxLayout, QWidget, pyqtSignal
from calibre.gui2 import Application
from .common import EventType
from .implementation import Client
def add_markup(text):
buf = []
first = True
counter = count()
pos_map = {}
last = None
bm = Client.mark_template
for m in re.finditer(r'\w+', text):
start, end = m.start(), m.end()
if first:
first = False
if start:
buf.append(Client.escape_marked_text(text[:start]))
elif start > last:
buf.append(Client.escape_marked_text(text[last:start]))
num = next(counter)
buf.append(bm.format(num))
pos_map[num] = start, end
buf.append(Client.escape_marked_text(m.group()))
last = end
if last is None:
buf.append(Client.escape_marked_text(text))
else:
buf.append(Client.escape_marked_text(text[last:]))
return ''.join(buf), pos_map
class TTSWidget(QWidget):
dispatch_on_main_thread_signal = pyqtSignal(object)
mark_changed = pyqtSignal(object)
show_message = pyqtSignal(object)
show_status = pyqtSignal(object)
def __init__(self, parent=None):
QWidget.__init__(self, parent)
self.mark_changed.connect(self.on_mark_change)
self.dispatch_on_main_thread_signal.connect(self.dispatch_on_main_thread, type=Qt.ConnectionType.QueuedConnection)
self.tts = Client({}, self.dispatch_on_main_thread_signal.emit)
self.l = l = QVBoxLayout(self)
self.la = la = QLabel(self)
la.setTextFormat(Qt.TextFormat.RichText)
la.setWordWrap(True)
self.text = '''\
In their duty through weakness of will, which is the
same as saying through shrinking from toil and pain. These cases are
perfectly simple and easy to distinguish. In a free hour, when our
power of choice is untrammelled and when nothing prevents our being
able to do what we like best, every pleasure is to be welcomed and
every pain avoided.
But in certain circumstances and owing to the claims of duty or the obligations
of business it will frequently occur that pleasures have to be repudiated and
annoyances accepted. The wise man therefore always holds in these matters to
this.
Born and I will give you a complete account of the system, and expound the
actual teachings of the great explorer of the truth, the master-builder of
human happiness. No one rejects, dislikes, or avoids pleasure itself, because
it is pleasure, but because those who do not know how to pursue pleasure
rationally encounter consequences that are extremely painful.
Nor again is there anyone who loves or pursues or desires to obtain pain of
itself, because it is pain, but because occasionally circumstances occur in
which toil and pain can procure him some great pleasure. To take a trivial
example, which of.
'''
self.ssml, self.pos_map = add_markup(self.text)
self.current_mark = None
l.addWidget(la)
self.bb = bb = QDialogButtonBox(self)
l.addWidget(bb)
self.play_button = b = bb.addButton('Play', QDialogButtonBox.ButtonRole.ActionRole)
b.clicked.connect(self.play_clicked)
self.pause_button = b = bb.addButton('Pause', QDialogButtonBox.ButtonRole.ActionRole)
b.clicked.connect(self.pause_clicked)
self.resume_button = b = bb.addButton('Resume', QDialogButtonBox.ButtonRole.ActionRole)
b.clicked.connect(self.resume_clicked)
self.stop_button = b = bb.addButton('Stop', QDialogButtonBox.ButtonRole.ActionRole)
b.clicked.connect(self.stop_clicked)
self.render_text()
def render_text(self):
text = self.text
if self.current_mark is not None:
start, end = self.pos_map[self.current_mark]
text = text[:end] + '</b>' + text[end:]
text = text[:start] + '<b>' + text[start:]
lines = ['<p>']
for line in text.splitlines():
if not line.strip():
lines.append('<p>')
else:
lines.append(line)
self.la.setText('\n'.join(lines))
def play_clicked(self):
self.tts.speak_marked_text(self.ssml, self.handle_event)
def pause_clicked(self):
self.tts.pause()
def resume_clicked(self):
self.tts.resume()
def stop_clicked(self):
self.tts.stop()
def dispatch_on_main_thread(self, func):
try:
func()
except Exception:
import traceback
traceback.print_exc()
def handle_event(self, event):
status = str(self.tts.status)
self.show_status.emit(str(status))
if event.type is EventType.mark:
try:
mark = int(event.data)
except Exception:
return
self.mark_changed.emit(mark)
else:
self.show_message.emit(f'Got event: {event.type.name}')
def on_mark_change(self, mark):
self.current_mark = mark
self.render_text()
def main():
app = Application([])
w = QMainWindow()
sb = w.statusBar()
la = QLabel(sb)
sb.addPermanentWidget(la)
tts = TTSWidget(w)
tts.show_message.connect(sb.showMessage)
tts.show_status.connect(la.setText)
w.setCentralWidget(tts)
w.show()
app.exec()
tts.dispatch_on_main_thread_signal.disconnect()
tts.mark_changed.disconnect()
tts.tts.shutdown()
def headless():
app = Application([])
c = Client()
text = '[[sync 0x123456]]very [[sync 0x80]]good [[sync 0x81]]indeed'
def callback():
for ev in c.get_events():
if ev.type is EventType.mark:
print('mark:', hex(ev.data))
if ev.type in (EventType.end, EventType.cancel):
print(ev.type)
app.quit()
def run():
c.speak_marked_text(text, callback)
QTimer.singleShot(10, run)
QTimer.singleShot(5000, app.quit)
app.exec()
if __name__ == '__main__':
main()

View File

@ -1,9 +0,0 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
class TTSSystemUnavailable(Exception):
def __init__(self, message, details):
Exception.__init__(self, message)
self.short_msg = message
self.details = details

View File

@ -1,16 +0,0 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
from calibre.constants import ismacos, iswindows
if iswindows:
from calibre.utils.config_base import tweaks
if tweaks.get('prefer_winsapi'):
from .windows_sapi import Client
else:
from .windows import Client
elif ismacos:
from .macos import Client
else:
from .linux import Client
Client

View File

@ -1,220 +0,0 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
from functools import partial
from calibre import prepare_string_for_xml
from .common import Event, EventType, add_markup
from .errors import TTSSystemUnavailable
def wrap_in_ssml(text):
return ('<?xml version="1.0"?>\n<speak version="1.0" xmlns="http://www.w3.org/2001/10/synthesis"><s>' +
text + '</s></speak>')
class Client:
mark_template = '<mark name="{}"/>'
name = 'speechd'
min_rate = -100
max_rate = 100
chunk_size = 0
@classmethod
def escape_marked_text(cls, text):
return prepare_string_for_xml(text)
def __init__(self, settings=None, dispatch_on_main_thread=lambda f: f()):
self.ssip_client = None
self.status = {'synthesizing': False, 'paused': False}
self.settings = settings or {}
self.dispatch_on_main_thread = dispatch_on_main_thread
self.current_marked_text = None
self.last_mark = None
self.next_cancel_is_for_pause = False
self.next_begin_is_for_resume = False
self.current_callback = None
self.settings_applied = False
self.system_default_output_module = None
def create_ssip_client(self):
from speechd.client import Priority, SpawnError, SSIPClient, SSIPCommunicationError
try:
self.ssip_client = SSIPClient('calibre')
except SSIPCommunicationError as err:
ex = err.additional_exception()
if isinstance(ex, SpawnError):
raise TTSSystemUnavailable(_('Could not find speech-dispatcher on your system. Please install it.'), str(err))
raise
except SpawnError as err:
raise TTSSystemUnavailable(_('Could not find speech-dispatcher on your system. Please install it.'), str(err))
self.ssip_client.set_priority(Priority.TEXT)
def __del__(self):
if self.ssip_client is not None:
try:
self.ssip_client.cancel()
except Exception:
pass
self.ssip_client.close()
self.ssip_client = None
shutdown = __del__
def ensure_state(self, use_ssml=False):
if self.ssip_client is None:
self.create_ssip_client()
if self.system_default_output_module is None:
self.system_default_output_module = self.ssip_client.get_output_module()
if self.system_default_output_module == '(null)':
mods = self.ssip_client.list_output_modules()
if not mods:
raise ValueError(_('Speech dispatcher on this system is not configured with any available voices. Install some voices first.'))
self.system_default_output_module = mods[0]
if not self.settings_applied:
self.apply_settings()
self.set_use_ssml(use_ssml)
def apply_settings(self, new_settings=None):
if new_settings is not None:
self.settings = new_settings
if self.settings_applied:
self.shutdown()
self.settings_applied = False
self.ensure_state()
om = self.settings.get('output_module')
if om:
self.ssip_client.set_output_module(om)
voice = self.settings.get('voice')
if voice:
self.ssip_client.set_synthesis_voice(voice[0])
rate = self.settings.get('rate')
if rate:
self.ssip_client.set_rate(rate)
self.settings_applied = True
def set_use_ssml(self, on):
from speechd.client import DataMode, SSIPCommunicationError
mode = DataMode.SSML if on else DataMode.TEXT
try:
self.ssip_client.set_data_mode(mode)
except SSIPCommunicationError:
self.ssip_client.close()
self.ssip_client = None
self.ensure_state(on)
def speak_simple_text(self, text):
self.stop()
self.ensure_state(use_ssml=False)
self.current_marked_text = self.last_mark = None
def callback(callback_type, index_mark=None):
self.dispatch_on_main_thread(partial(self.update_status, callback_type, index_mark))
self.ssip_client.speak(text, callback)
def update_status(self, callback_type, index_mark=None):
from speechd.client import CallbackType
event = None
if callback_type is CallbackType.INDEX_MARK:
self.last_mark = index_mark
event = Event(EventType.mark, index_mark)
elif callback_type is CallbackType.BEGIN:
self.status = {'synthesizing': True, 'paused': False}
event = Event(EventType.resume if self.next_begin_is_for_resume else EventType.begin)
self.next_begin_is_for_resume = False
elif callback_type is CallbackType.END:
self.status = {'synthesizing': False, 'paused': False}
event = Event(EventType.end)
elif callback_type is CallbackType.CANCEL:
if self.next_cancel_is_for_pause:
self.status = {'synthesizing': True, 'paused': True}
event = Event(EventType.pause)
else:
self.status = {'synthesizing': False, 'paused': False}
event = Event(EventType.cancel)
self.next_cancel_is_for_pause = False
return event
def speak_marked_text(self, marked_text, callback=lambda ev: None):
self.stop()
text = ''.join(add_markup(marked_text, self.mark_template, self.escape_marked_text, self.chunk_size))
self.current_marked_text = text
self.last_mark = None
def callback_wrapper(callback_type, index_mark=None):
event = self.update_status(callback_type, index_mark)
if event is not None:
try:
callback(event)
except Exception:
import traceback
traceback.print_exc()
def cw(callback_type, index_mark=None):
self.dispatch_on_main_thread(partial(callback_wrapper, callback_type, index_mark))
self.current_callback = cw
self.ensure_state(use_ssml=True)
self.ssip_client.speak(wrap_in_ssml(text), callback=self.current_callback)
def pause(self):
if self.status['synthesizing'] and not self.status['paused']:
self.next_cancel_is_for_pause = True
self.ssip_client.stop()
def resume(self):
if self.current_marked_text is None or not self.status['synthesizing'] or not self.status['paused']:
return
self.next_begin_is_for_resume = True
if self.last_mark is None:
text = self.current_marked_text
else:
mark = self.mark_template.format(self.last_mark)
idx = self.current_marked_text.find(mark)
if idx == -1:
text = self.current_marked_text
else:
text = self.current_marked_text[idx:]
self.ensure_state(use_ssml=True)
self.ssip_client.speak(wrap_in_ssml(text), callback=self.current_callback)
resume_after_configure = resume
def stop(self):
self.current_callback = self.current_marked_text = self.last_mark = None
self.next_cancel_is_for_pause = False
self.next_begin_is_for_resume = False
if self.ssip_client is not None:
self.ssip_client.stop()
def config_widget(self, backend_settings, parent):
from calibre.gui2.tts.linux_config import Widget
return Widget(self, backend_settings, parent)
def get_voice_data(self):
ans = getattr(self, 'voice_data', None)
if ans is None:
self.ensure_state()
ans = self.voice_data = {}
output_module = self.ssip_client.get_output_module()
for om in self.ssip_client.list_output_modules():
self.ssip_client.set_output_module(om)
ans[om] = tuple(self.ssip_client.list_synthesis_voices())
self.ssip_client.set_output_module(output_module)
return ans
def change_rate(self, steps=1):
rate = current_rate = self.settings.get('rate') or 0
step_size = (self.max_rate - self.min_rate) // 10
rate += steps * step_size
rate = max(self.min_rate, min(rate, self.max_rate))
if rate != current_rate:
self.settings['rate'] = rate
prev_state = self.status.copy()
self.apply_settings()
if prev_state['synthesizing'] and not prev_state['paused']:
self.status['synthesizing'] = True
self.status['paused'] = True
self.resume_after_configure()
return self.settings

View File

@ -1,215 +0,0 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
from contextlib import suppress
from qt.core import (
QAbstractItemView,
QAbstractTableModel,
QByteArray,
QComboBox,
QFontMetrics,
QFormLayout,
QItemSelectionModel,
QSlider,
QSortFilterProxyModel,
Qt,
QTableView,
QWidget,
)
from calibre.gui2.widgets import BusyCursor
class VoicesModel(QAbstractTableModel):
system_default_voice = ('', '', '')
def __init__(self, voice_data, default_output_module, parent=None):
super().__init__(parent)
self.voice_data = voice_data
try:
self.current_voices = voice_data[default_output_module]
except KeyError as e:
raise ValueError(_('Speech dispatcher on this system is not configured with any available voices. Install some voices first.')) from e
self.column_headers = (_('Name'), _('Language'), _('Variant'))
def rowCount(self, parent=None):
return len(self.current_voices) + 1
def columnCount(self, parent=None):
return len(self.column_headers)
def headerData(self, section, orientation, role=Qt.ItemDataRole.DisplayRole):
if role == Qt.ItemDataRole.DisplayRole and orientation == Qt.Orientation.Horizontal:
return self.column_headers[section]
return super().headerData(section, orientation, role)
def data(self, index, role=Qt.ItemDataRole.DisplayRole):
if role == Qt.ItemDataRole.DisplayRole:
row = index.row()
with suppress(IndexError):
if row == 0:
return (_('System default'), '', '')[index.column()]
data = self.current_voices[row - 1]
ans = data[index.column()]
if not ans or ans == 'none':
ans = ''
return ans
if role == Qt.ItemDataRole.UserRole:
row = index.row()
with suppress(IndexError):
if row == 0:
return self.system_default_voice
return self.current_voices[row - 1]
def change_output_module(self, om):
self.beginResetModel()
try:
self.current_voices = self.voice_data[om]
finally:
self.endResetModel()
def index_for_voice(self, v):
r = 0
if v != self.system_default_voice:
try:
idx = self.current_voices.index(v)
except Exception:
return
r = idx + 1
return self.index(r, 0)
class Widget(QWidget):
def __init__(self, tts_client, initial_backend_settings=None, parent=None):
QWidget.__init__(self, parent)
self.l = l = QFormLayout(self)
self.tts_client = tts_client
self.speed = s = QSlider(Qt.Orientation.Horizontal, self)
s.setTickPosition(QSlider.TickPosition.TicksAbove)
s.setMinimumWidth(200)
l.addRow(_('&Speed of speech:'), s)
s.setRange(self.tts_client.min_rate, self.tts_client.max_rate)
s.setSingleStep(10)
s.setTickInterval((s.maximum() - s.minimum()) // 2)
self.output_modules = om = QComboBox(self)
with BusyCursor():
self.voice_data = self.tts_client.get_voice_data()
self.system_default_output_module = self.tts_client.system_default_output_module
om.addItem(_('System default'), self.system_default_output_module)
for x in self.voice_data:
om.addItem(x, x)
l.addRow(_('Speech s&ynthesizer:'), om)
self.voices = v = QTableView(self)
self.voices_model = VoicesModel(self.voice_data, self.system_default_output_module, parent=v)
self.proxy_model = p = QSortFilterProxyModel(self)
p.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
p.setSourceModel(self.voices_model)
v.setModel(p)
v.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows)
v.setSortingEnabled(True)
h = v.horizontalHeader()
h.resizeSection(0, QFontMetrics(self.font()).averageCharWidth() * 30)
v.verticalHeader().close()
v.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection)
v.sortByColumn(0, Qt.SortOrder.AscendingOrder)
om.currentIndexChanged.connect(self.output_module_changed)
l.addRow(v)
self.backend_settings = initial_backend_settings or {}
def restore_state(self, prefs):
data = prefs.get(f'{self.tts_client.name}-voice-table-state')
if data is not None:
self.voices.horizontalHeader().restoreState(QByteArray(data))
def save_state(self, prefs):
data = bytearray(self.voices.horizontalHeader().saveState())
prefs.set(f'{self.tts_client.name}-voice-table-state', data)
def restore_to_defaults(self):
self.backend_settings = {}
def sizeHint(self):
ans = super().sizeHint()
ans.setHeight(max(ans.height(), 600))
return ans
@property
def selected_voice(self):
for x in self.voices.selectedIndexes():
return x.data(Qt.ItemDataRole.UserRole)
@selected_voice.setter
def selected_voice(self, val):
val = val or VoicesModel.system_default_voice
idx = self.voices_model.index_for_voice(tuple(val))
if idx is not None:
idx = self.proxy_model.mapFromSource(idx)
self.voices.selectionModel().select(idx, QItemSelectionModel.SelectionFlag.ClearAndSelect | QItemSelectionModel.SelectionFlag.Rows)
self.voices.scrollTo(idx)
@property
def selected_output_module(self):
return self.output_modules.currentData()
@selected_output_module.setter
def selected_output_module(self, val):
if not val:
self.output_modules.setCurrentIndex(0)
return
idx = self.output_modules.findData(val)
if idx < 0:
idx = 0
self.output_modules.setCurrentIndex(idx)
def output_module_changed(self, idx):
om = self.selected_output_module
self.voices_model.change_output_module(om)
@property
def rate(self):
return self.speed.value()
@rate.setter
def rate(self, val):
val = int(val or 0)
self.speed.setValue(val)
@property
def backend_settings(self):
ans = {}
om = self.selected_output_module
if om != self.system_default_output_module:
ans['output_module'] = om
voice = self.selected_voice
if voice and voice != VoicesModel.system_default_voice:
ans['voice'] = voice
rate = self.rate
if rate:
ans['rate'] = rate
return ans
@backend_settings.setter
def backend_settings(self, val):
om = val.get('output_module') or self.system_default_output_module
self.selected_output_module = om
voice = val.get('voice') or VoicesModel.system_default_voice
self.selected_voice = voice
self.rate = val.get('rate') or 0
if __name__ == '__main__':
from calibre.gui2 import Application
from calibre.gui2.tts.implementation import Client
app = Application([])
c = Client({})
w = Widget(c, {})
w.show()
app.exec()
print(w.backend_settings)

View File

@ -1,149 +0,0 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
from .common import Event, EventType, add_markup
class Client:
mark_template = '[[sync 0x{:x}]]'
END_MARK = 0xffffffff
name = 'nsss'
min_rate = 10
max_rate = 340
chunk_size = 0
@classmethod
def escape_marked_text(cls, text):
return text.replace('[[', ' [ [ ').replace(']]', ' ] ] ')
def __init__(self, settings=None, dispatch_on_main_thread=lambda f: f()):
from calibre_extensions.cocoa import NSSpeechSynthesizer
self.nsss = NSSpeechSynthesizer(self.handle_message)
self.default_system_rate = self.nsss.get_current_rate()
self.default_system_voice = self.nsss.get_current_voice()
self.current_callback = None
self.current_marked_text = self.last_mark = None
self.dispatch_on_main_thread = dispatch_on_main_thread
self.status = {'synthesizing': False, 'paused': False}
self.settings = settings or {}
self.ignore_next_stop_event = False
self.apply_settings()
def apply_settings(self, new_settings=None):
if self.status['paused']:
self.nsss.resume()
self.ignore_next_stop_event = True
self.status = {'synthesizing': False, 'paused': False}
if new_settings is not None:
self.settings = new_settings
self.nsss.set_current_voice(self.settings.get('voice') or self.default_system_voice)
rate = self.settings.get('rate', self.default_system_rate)
self.nsss.set_current_rate(rate)
def __del__(self):
self.nsss = None
shutdown = __del__
def handle_message(self, message_type, data):
from calibre_extensions.cocoa import END, MARK
event = None
if message_type == MARK:
self.last_mark = data
event = Event(EventType.mark, data)
elif message_type == END:
if self.ignore_next_stop_event:
self.ignore_next_stop_event = False
return
event = Event(EventType.end if data else EventType.cancel)
self.status = {'synthesizing': False, 'paused': False}
if event is not None and self.current_callback is not None:
try:
self.current_callback(event)
except Exception:
import traceback
traceback.print_exc()
def speak_simple_text(self, text):
self.current_callback = None
self.current_marked_text = self.last_mark = None
self.nsss.speak(self.escape_marked_text(text))
self.status = {'synthesizing': True, 'paused': False}
def speak_marked_text(self, marked_text, callback):
text = ''.join(add_markup(marked_text, self.mark_template, self.escape_marked_text, self.chunk_size))
self.current_callback = callback
self.current_marked_text = text
self.last_mark = None
self.nsss.speak(text)
self.status = {'synthesizing': True, 'paused': False}
self.current_callback(Event(EventType.begin))
def pause(self):
if self.status['synthesizing']:
self.nsss.pause()
self.status = {'synthesizing': True, 'paused': True}
if self.current_callback is not None:
self.current_callback(Event(EventType.pause))
def resume(self):
if self.status['paused']:
self.nsss.resume()
self.status = {'synthesizing': True, 'paused': False}
if self.current_callback is not None:
self.current_callback(Event(EventType.resume))
def resume_after_configure(self):
if self.status['paused']:
self.resume()
return
if self.last_mark is None:
idx = -1
else:
mark = self.mark_template.format(self.last_mark)
idx = self.current_marked_text.find(mark)
if idx == -1:
text = self.current_marked_text
else:
text = self.current_marked_text[idx:]
self.nsss.speak(text)
self.status = {'synthesizing': True, 'paused': False}
if self.current_callback is not None:
self.current_callback(Event(EventType.resume))
def stop(self):
self.nsss.stop()
@property
def rate(self):
return self.nss.get_current_rate()
@rate.setter
def rate(self, val):
val = val or self.default_system_rate
self.nss.set_current_rate(float(val))
def get_voice_data(self):
ans = getattr(self, 'voice_data', None)
if ans is None:
ans = self.voice_data = self.nsss.get_all_voices()
return ans
def config_widget(self, backend_settings, parent):
from calibre.gui2.tts.macos_config import Widget
return Widget(self, backend_settings, parent)
def change_rate(self, steps=1):
rate = current_rate = self.settings.get('rate', self.default_system_rate)
step_size = (self.max_rate - self.min_rate) // 10
rate += steps * step_size
rate = max(self.min_rate, min(rate, self.max_rate))
if rate != current_rate:
self.settings['rate'] = rate
prev_state = self.status.copy()
self.pause()
self.apply_settings()
if prev_state['synthesizing']:
self.status = {'synthesizing': True, 'paused': False}
self.resume_after_configure()
return self.settings

View File

@ -1,188 +0,0 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
from contextlib import suppress
from qt.core import (
QAbstractItemView,
QAbstractTableModel,
QByteArray,
QFontMetrics,
QFormLayout,
QItemSelectionModel,
QSlider,
QSortFilterProxyModel,
Qt,
QTableView,
QWidget,
)
from calibre.gui2.widgets import BusyCursor
class VoicesModel(QAbstractTableModel):
system_default_voice = ''
def __init__(self, voice_data, parent=None):
super().__init__(parent)
self.voice_data = voice_data
gmap = {'VoiceGenderNeuter': _('neutral'), 'VoiceGenderFemale': _('female'), 'VoiceGenderMale': _('male')}
def gender(x):
return gmap.get(x, x)
def language(x):
return x.get('language_display_name') or x['locale_id'] or ''
self.current_voices = tuple((x['name'], language(x), x['age'], gender(x['gender'])) for x in voice_data.values())
self.voice_ids = tuple(voice_data)
self.column_headers = _('Name'), _('Language'), _('Age'), _('Gender')
def rowCount(self, parent=None):
return len(self.current_voices) + 1
def columnCount(self, parent=None):
return len(self.column_headers)
def headerData(self, section, orientation, role=Qt.ItemDataRole.DisplayRole):
if role == Qt.ItemDataRole.DisplayRole and orientation == Qt.Orientation.Horizontal:
return self.column_headers[section]
return super().headerData(section, orientation, role)
def data(self, index, role=Qt.ItemDataRole.DisplayRole):
if role == Qt.ItemDataRole.DisplayRole:
row = index.row()
with suppress(IndexError):
if row == 0:
return (_('System default'), '', '', '')[index.column()]
data = self.current_voices[row - 1]
col = index.column()
ans = data[col] or ''
return ans
if role == Qt.ItemDataRole.UserRole:
row = index.row()
with suppress(IndexError):
if row == 0:
return self.system_default_voice
return self.voice_ids[row - 1]
def index_for_voice(self, v):
r = 0
if v != self.system_default_voice:
try:
idx = self.voice_ids.index(v)
except Exception:
return
r = idx + 1
return self.index(r, 0)
class Widget(QWidget):
def __init__(self, tts_client, initial_backend_settings=None, parent=None):
QWidget.__init__(self, parent)
self.l = l = QFormLayout(self)
self.tts_client = tts_client
with BusyCursor():
self.voice_data = self.tts_client.get_voice_data()
self.default_system_rate = self.tts_client.default_system_rate
self.speed = s = QSlider(Qt.Orientation.Horizontal, self)
s.setMinimumWidth(200)
l.addRow(_('&Speed of speech (words per minute):'), s)
s.setRange(self.tts_client.min_rate, self.tts_client.max_rate)
s.setTickPosition(QSlider.TickPosition.TicksAbove)
s.setTickInterval((s.maximum() - s.minimum()) // 2)
s.setSingleStep(10)
self.voices = v = QTableView(self)
self.voices_model = VoicesModel(self.voice_data, parent=v)
self.proxy_model = p = QSortFilterProxyModel(self)
p.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
p.setSourceModel(self.voices_model)
v.setModel(p)
v.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows)
v.setSortingEnabled(True)
v.horizontalHeader().resizeSection(0, QFontMetrics(self.font()).averageCharWidth() * 20)
v.horizontalHeader().resizeSection(1, QFontMetrics(self.font()).averageCharWidth() * 30)
v.verticalHeader().close()
v.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection)
v.sortByColumn(0, Qt.SortOrder.AscendingOrder)
l.addRow(v)
self.backend_settings = initial_backend_settings or {}
def restore_state(self, prefs):
data = prefs.get(f'{self.tts_client.name}-voice-table-state')
if data is not None:
self.voices.horizontalHeader().restoreState(QByteArray(data))
def save_state(self, prefs):
data = bytearray(self.voices.horizontalHeader().saveState())
prefs.set(f'{self.tts_client.name}-voice-table-state', data)
def restore_to_defaults(self):
self.backend_settings = {}
def sizeHint(self):
ans = super().sizeHint()
ans.setHeight(max(ans.height(), 600))
ans.setWidth(max(ans.width(), 500))
return ans
@property
def selected_voice(self):
for x in self.voices.selectedIndexes():
return x.data(Qt.ItemDataRole.UserRole)
@selected_voice.setter
def selected_voice(self, val):
val = val or VoicesModel.system_default_voice
idx = self.voices_model.index_for_voice(val)
if idx is not None:
idx = self.proxy_model.mapFromSource(idx)
self.voices.selectionModel().select(idx, QItemSelectionModel.SelectionFlag.ClearAndSelect | QItemSelectionModel.SelectionFlag.Rows)
self.voices.scrollTo(idx)
@property
def rate(self):
return self.speed.value()
@rate.setter
def rate(self, val):
val = int(val or self.default_system_rate)
self.speed.setValue(val)
@property
def backend_settings(self):
ans = {}
voice = self.selected_voice
if voice and voice != VoicesModel.system_default_voice:
ans['voice'] = voice
rate = self.rate
if rate and rate != self.default_system_rate:
ans['rate'] = rate
return ans
@backend_settings.setter
def backend_settings(self, val):
voice = val.get('voice') or VoicesModel.system_default_voice
self.selected_voice = voice
self.rate = val.get('rate') or self.default_system_rate
def develop():
from calibre.gui2 import Application
from calibre.gui2.tts.implementation import Client
app = Application([])
c = Client()
w = Widget(c, {})
w.show()
app.exec()
print(w.backend_settings)
if __name__ == '__main__':
develop()

View File

@ -1,311 +0,0 @@
/*
* nsss.m
* Copyright (C) 2020 Kovid Goyal <kovid at kovidgoyal.net>
*
* Distributed under terms of the GPL3 license.
*/
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#import <AppKit/AppKit.h>
// Structures {{{
typedef struct {
PyObject_HEAD
NSSpeechSynthesizer *nsss;
PyObject *callback;
} NSSS;
typedef enum { MARK, END } MessageType;
static PyTypeObject NSSSType = {
PyVarObject_HEAD_INIT(NULL, 0)
};
static void
dispatch_message(NSSS *self, MessageType which, unsigned int val) {
PyGILState_STATE state = PyGILState_Ensure();
PyObject *ret = PyObject_CallFunction(self->callback, "iI", which, val);
if (ret) Py_DECREF(ret);
else PyErr_Print();
PyGILState_Release(state);
}
@interface SynthesizerDelegate : NSObject <NSSpeechSynthesizerDelegate> {
@private
NSSS *nsss;
}
- (id)initWithNSSS:(NSSS *)x;
@end
@implementation SynthesizerDelegate
- (id)initWithNSSS:(NSSS *)x {
self = [super init];
nsss = x;
return self;
}
- (void)speechSynthesizer:(NSSpeechSynthesizer *)sender didFinishSpeaking:(BOOL)success {
dispatch_message(nsss, END, success);
}
- (void)speechSynthesizer:(NSSpeechSynthesizer *)sender didEncounterSyncMessage:(NSString *)message {
NSError *err = nil;
NSNumber *syncProp = (NSNumber*) [sender objectForProperty: NSSpeechRecentSyncProperty error: &err];
if (syncProp && !err) dispatch_message(nsss, MARK, syncProp.unsignedIntValue);
}
@end
// }}}
static PyObject *
NSSS_new(PyTypeObject *type, PyObject *args, PyObject *kwds) {
PyObject *callback;
if (!PyArg_ParseTuple(args, "O", &callback)) return NULL;
if (!PyCallable_Check(callback)) { PyErr_SetString(PyExc_TypeError, "callback must be a callable"); return NULL; }
NSSS *self = (NSSS *) type->tp_alloc(type, 0);
if (self) {
self->callback = callback;
Py_INCREF(callback);
self->nsss = [[NSSpeechSynthesizer alloc] initWithVoice:nil];
if (self->nsss) {
self->nsss.delegate = [[SynthesizerDelegate alloc] initWithNSSS:self];
} else return PyErr_NoMemory();
}
return (PyObject*)self;
}
static void
NSSS_dealloc(NSSS *self) {
if (self->nsss) {
if (self->nsss.delegate) [self->nsss.delegate release];
self->nsss.delegate = nil;
[self->nsss release];
}
self->nsss = nil;
Py_CLEAR(self->callback);
}
static PyObject*
as_python(NSObject *x) {
if (!x) Py_RETURN_NONE;
if ([x isKindOfClass:[NSString class]]) {
NSString *s = (NSString*)x;
return PyUnicode_FromString([s UTF8String]);
}
if ([x isKindOfClass:[NSNumber class]]) {
NSNumber *n = (NSNumber*)x;
return PyFloat_FromDouble([n doubleValue]);
}
Py_RETURN_NONE;
}
static PyObject*
NSSS_get_all_voices(NSSS *self, PyObject *args) {
PyObject *ans = PyDict_New();
if (!ans) return NULL;
NSLocale *locale = [NSLocale autoupdatingCurrentLocale];
for (NSSpeechSynthesizerVoiceName voice_id in [NSSpeechSynthesizer availableVoices]) {
NSDictionary *attributes = [NSSpeechSynthesizer attributesForVoice:voice_id];
if (attributes) {
NSObject *lang_key = [attributes objectForKey:NSVoiceLocaleIdentifier];
const char *lang_name = NULL;
if (lang_key && [lang_key isKindOfClass:[NSString class]]) {
NSString *display_name = [locale displayNameForKey:NSLocaleIdentifier value:(NSString*)lang_key];
if (display_name) lang_name = [display_name UTF8String];
}
#define E(x, y) #x, as_python([attributes objectForKey:y])
PyObject *v = Py_BuildValue("{sN sN sN sN sN sz}",
E(name, NSVoiceName), E(age, NSVoiceAge), E(gender, NSVoiceGender),
E(demo_text, NSVoiceDemoText), E(locale_id, NSVoiceLocaleIdentifier), "language_display_name", lang_name);
if (!v) { Py_DECREF(ans); return NULL; }
#undef E
if (PyDict_SetItemString(ans, [voice_id UTF8String], v) != 0) {
Py_DECREF(ans); Py_DECREF(v); return NULL;
}
Py_DECREF(v);
}
}
return ans;
}
static PyObject*
NSSS_set_command_delimiters(NSSS *self, PyObject *args) {
// this function doesn't actually work
// https://openradar.appspot.com/6524554
const char *left, *right;
if (!PyArg_ParseTuple(args, "ss", &left, &right)) return NULL;
NSError *err = nil;
[self->nsss setObject:@{NSSpeechCommandPrefix:@(left), NSSpeechCommandSuffix:@(right)} forProperty:NSSpeechCommandDelimiterProperty error:&err];
if (err) {
PyErr_SetString(PyExc_OSError, [[NSString stringWithFormat:@"Failed to set delimiters: %@", err] UTF8String]);
return NULL;
}
Py_RETURN_NONE;
}
static PyObject*
NSSS_get_current_voice(NSSS *self, PyObject *args) {
return Py_BuildValue("s", [[self->nsss voice] UTF8String]);
}
static PyObject*
NSSS_set_current_voice(NSSS *self, PyObject *args) {
const char *name;
if (!PyArg_ParseTuple(args, "s", &name)) return NULL;
BOOL ok = [self->nsss setVoice:@(name)];
if (ok) Py_RETURN_TRUE;
Py_RETURN_FALSE;
}
static PyObject*
NSSS_any_application_speaking(NSSS *self, PyObject *args) {
return Py_BuildValue("O", NSSpeechSynthesizer.anyApplicationSpeaking ? Py_True : Py_False);
}
static PyObject*
NSSS_speaking(NSSS *self, PyObject *args) {
return Py_BuildValue("O", self->nsss.speaking ? Py_True : Py_False);
}
static PyObject*
NSSS_get_current_volume(NSSS *self, PyObject *args) {
return Py_BuildValue("f", self->nsss.volume);
}
static PyObject*
NSSS_set_current_volume(NSSS *self, PyObject *args) {
float vol;
if (!PyArg_ParseTuple(args, "f", &vol)) return NULL;
self->nsss.volume = vol;
return Py_BuildValue("f", self->nsss.volume);
}
static PyObject*
NSSS_get_current_rate(NSSS *self, PyObject *args) {
return Py_BuildValue("f", self->nsss.rate);
}
static PyObject*
NSSS_set_current_rate(NSSS *self, PyObject *args) {
float vol;
if (!PyArg_ParseTuple(args, "f", &vol)) return NULL;
self->nsss.rate = vol;
return Py_BuildValue("f", self->nsss.rate);
}
static PyObject*
NSSS_speak(NSSS *self, PyObject *args) {
const char *text;
if (!PyArg_ParseTuple(args, "s", &text)) return NULL;
if ([self->nsss startSpeakingString:@(text)]) Py_RETURN_TRUE;
Py_RETURN_FALSE;
}
static PyObject*
NSSS_start_saving_to_path(NSSS *self, PyObject *args) {
const char *text, *path;
if (!PyArg_ParseTuple(args, "ss", &text, &path)) return NULL;
NSURL *url = [NSURL fileURLWithPath:@(path) isDirectory: NO];
BOOL ok = [self->nsss startSpeakingString:@(text) toURL:url];
[url release];
if (ok) Py_RETURN_TRUE;
Py_RETURN_FALSE;
}
static PyObject*
NSSS_status(NSSS *self, PyObject *args) {
NSError *err = nil;
NSDictionary *status = [self->nsss objectForProperty:NSSpeechStatusProperty error:&err];
if (err) {
PyErr_SetString(PyExc_OSError, [[err localizedDescription] UTF8String]);
return NULL;
}
PyObject *ans = PyDict_New();
if (ans) {
NSNumber *result = [status objectForKey:NSSpeechStatusOutputBusy];
if (result) {
if (PyDict_SetItemString(ans, "synthesizing", [result boolValue] ? Py_True : Py_False) != 0) { Py_CLEAR(ans); return NULL; }
}
result = [status objectForKey:NSSpeechStatusOutputPaused];
if (result) {
if (PyDict_SetItemString(ans, "paused", [result boolValue] ? Py_True : Py_False) != 0) { Py_CLEAR(ans); return NULL; }
}
}
return ans;
}
static PyObject*
NSSS_pause(NSSS *self, PyObject *args) {
unsigned int boundary = NSSpeechWordBoundary;
if (!PyArg_ParseTuple(args, "|I", &boundary)) return NULL;
[self->nsss pauseSpeakingAtBoundary:boundary];
Py_RETURN_NONE;
}
static PyObject*
NSSS_resume(NSSS *self, PyObject *args) {
[self->nsss continueSpeaking];
Py_RETURN_NONE;
}
static PyObject*
NSSS_stop(NSSS *self, PyObject *args) {
[self->nsss stopSpeaking];
Py_RETURN_NONE;
}
// Boilerplate {{{
#define M(name, args) { #name, (PyCFunction)NSSS_##name, args, ""}
static PyMethodDef NSSS_methods[] = {
M(get_all_voices, METH_NOARGS),
M(status, METH_NOARGS),
M(resume, METH_NOARGS),
M(stop, METH_NOARGS),
M(speak, METH_VARARGS),
M(start_saving_to_path, METH_VARARGS),
M(speaking, METH_NOARGS),
M(any_application_speaking, METH_NOARGS),
M(get_current_voice, METH_NOARGS),
M(set_current_voice, METH_VARARGS),
M(get_current_volume, METH_NOARGS),
M(set_current_volume, METH_VARARGS),
M(get_current_rate, METH_NOARGS),
M(set_current_rate, METH_VARARGS),
M(set_command_delimiters, METH_VARARGS),
M(pause, METH_VARARGS),
{NULL, NULL, 0, NULL}
};
#undef M
int
nsss_init_module(PyObject *module) {
NSSSType.tp_name = "cocoa.NSSpeechSynthesizer";
NSSSType.tp_doc = "Wrapper for NSSpeechSynthesizer";
NSSSType.tp_basicsize = sizeof(NSSS);
NSSSType.tp_itemsize = 0;
NSSSType.tp_flags = Py_TPFLAGS_DEFAULT;
NSSSType.tp_new = NSSS_new;
NSSSType.tp_methods = NSSS_methods;
NSSSType.tp_dealloc = (destructor)NSSS_dealloc;
if (PyType_Ready(&NSSSType) < 0) return -1;
Py_INCREF(&NSSSType);
if (PyModule_AddObject(module, "NSSpeechSynthesizer", (PyObject *) &NSSSType) < 0) {
Py_DECREF(&NSSSType);
return -1;
}
PyModule_AddIntMacro(module, MARK);
PyModule_AddIntMacro(module, END);
PyModule_AddIntMacro(module, NSSpeechImmediateBoundary);
PyModule_AddIntMacro(module, NSSpeechWordBoundary);
PyModule_AddIntMacro(module, NSSpeechSentenceBoundary);
return 0;
}
// }}}

View File

@ -1,239 +0,0 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
from functools import partial
from calibre.utils.windows.winspeech import Error, MarkReached, MediaState, MediaStateChanged, WinSpeech
from .common import Event, EventType
def split_into_chunks(marked_text, chunk_size):
chunk = []
tlen = 0
for x in marked_text:
if isinstance(x, int):
chunk.append(x)
else:
sz = len(x)
if tlen + sz > chunk_size:
mark = None
if chunk:
if isinstance(chunk[-1], int):
mark = chunk[-1]
del chunk[-1]
yield chunk
chunk = [] if mark is None else [mark]
tlen = sz
chunk.append(x)
else:
chunk.append(x)
tlen += sz
if chunk:
yield chunk
def chunk_has_text(chunk):
for x in chunk:
if isinstance(x, str) and x:
return True
return False
class Client:
mark_template = ''
name = 'winspeech'
min_rate = 0.5
max_rate = 6.0
default_system_rate = 1.0
chunk_size = 64 * 1024
@classmethod
def escape_marked_text(cls, text):
return text
def __init__(self, settings=None, dispatch_on_main_thread=lambda f: f()):
self.backend = WinSpeech(self.dispatch_msg)
self.last_mark = -1
self.current_callback = None
self.dispatch_on_main_thread = dispatch_on_main_thread
self.synthesizing = False
self.settings = settings or {}
self.clear_chunks()
self.default_system_audio_device = self.backend.get_audio_device().device
self.default_system_voice = self.backend.default_voice().voice
self.apply_settings()
def get_all_voices(self):
return self.backend.all_voices().voices
def get_all_audio_devices(self):
return self.backend.all_audio_devices().devices
def __del__(self):
if self.backend is not None:
self.backend.shutdown()
self.backend = None
shutdown = __del__
def dispatch_msg(self, msg):
self.dispatch_on_main_thread(partial(self.handle_event, msg))
def speak_current_chunk(self):
chunk = self.current_chunks[self.current_chunk_idx]
if chunk_has_text(chunk):
self.backend.speak(chunk, is_cued=True)
else:
self.handle_end_event()
def handle_end_event(self):
if self.current_chunk_idx >= len(self.current_chunks) - 1:
self.clear_chunks()
self.callback_ignoring_errors(Event(EventType.end))
else:
self.current_chunk_idx += 1
self.speak_current_chunk()
def handle_event(self, x):
if isinstance(x, MarkReached):
if self.current_chunks:
self.last_mark = x.id
self.callback_ignoring_errors(Event(EventType.mark, x.id))
elif isinstance(x, MediaStateChanged):
if self.current_chunks:
if x.state is MediaState.ended:
self.handle_end_event()
elif x.state is MediaState.failed:
self.clear_chunks()
self.callback_ignoring_errors(Event(EventType.cancel))
e = x.as_exception()
e.display_to_user = True
raise e
elif x.state is MediaState.opened:
self.callback_ignoring_errors(Event(EventType.resume if self.next_start_is_resume else EventType.begin))
self.next_start_is_resume = False
elif isinstance(x, Error):
raise x.as_exception(check_for_no_audio_devices=True)
else:
raise KeyError(f'Unknown event type: {x}')
def speak_simple_text(self, text):
self.backend.pause()
self.clear_chunks()
self.current_callback = None
if text:
self.backend.speak(text)
def speak_marked_text(self, text, callback):
self.backend.pause()
self.clear_chunks()
self.current_callback = callback
self.current_chunks = tuple(split_into_chunks(text, self.chunk_size))
self.current_chunk_idx = -100
if self.current_chunks:
self.current_chunk_idx = 0
self.speak_current_chunk()
self.synthesizing = True
def callback_ignoring_errors(self, ev):
if self.current_callback is not None:
try:
self.current_callback(ev)
except Exception:
import traceback
traceback.print_exc()
def clear_chunks(self):
self.synthesizing = False
self.next_start_is_resume = False
self.current_chunk_idx = -100
self.current_chunks = ()
self.last_mark = -1
def stop(self):
self.backend.pause()
self.synthesizing = False
self.clear_chunks()
if self.current_callback is not None:
self.current_callback(Event(EventType.cancel))
def pause(self):
self.backend.pause()
self.synthesizing = False
if self.current_callback is not None:
self.current_callback(Event(EventType.pause))
def resume(self):
self.backend.play()
self.synthesizing = True
if self.current_callback is not None:
self.current_callback(Event(EventType.resume))
def apply_settings(self, new_settings=None):
was_synthesizing = self.synthesizing
if self.synthesizing:
self.pause()
if new_settings is not None:
self.settings = new_settings
try:
self.backend.set_voice(self.settings.get('voice'), self.default_system_voice)
except OSError:
import traceback
traceback.print_exc()
self.settings.pop('voice', None)
try:
self.backend.set_rate(self.settings.get('rate', self.default_system_rate))
except OSError:
import traceback
traceback.print_exc()
self.settings.pop('rate', None)
try:
self.backend.set_audio_device(self.settings.get('sound_output'), self.default_system_audio_device)
except OSError:
import traceback
traceback.print_exc()
self.settings.pop('sound_output', None)
if was_synthesizing:
self.resume_after_configure()
def config_widget(self, backend_settings, parent):
from calibre.gui2.tts.windows_config import Widget
return Widget(self, backend_settings, parent)
def chunks_from_last_mark(self):
if self.last_mark > -1:
for i, chunk in enumerate(self.current_chunks):
for ci, x in enumerate(chunk):
if x == self.last_mark:
chunks = self.current_chunks[i:]
chunk = chunk[ci + 1:]
if chunk:
chunks = (chunk,) + chunks[1:]
else:
chunks = chunks[1:]
return chunks
return ()
def resume_after_configure(self):
self.current_chunks = self.chunks_from_last_mark()
self.current_chunk_idx = -100
self.last_mark = -1
self.next_start_is_resume = True
self.synthesizing = bool(self.current_chunks)
if self.synthesizing:
self.current_chunk_idx = 0
self.speak_current_chunk()
def change_rate(self, steps=1):
rate = current_rate = self.settings.get('rate', self.default_system_rate)
if rate < 1:
step_size = 0.1
else:
step_size = 0.5
rate += steps * step_size
rate = max(self.min_rate, min(rate, self.max_rate))
if rate != current_rate:
self.settings['rate'] = rate
self.apply_settings()
return self.settings

View File

@ -1,214 +0,0 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
from contextlib import suppress
from qt.core import (
QAbstractItemView,
QAbstractTableModel,
QByteArray,
QComboBox,
QFontMetrics,
QFormLayout,
QItemSelectionModel,
QSlider,
QSortFilterProxyModel,
Qt,
QTableView,
QWidget,
)
from calibre.gui2.widgets import BusyCursor
class VoicesModel(QAbstractTableModel):
system_default_voice = '__default__'
def __init__(self, voice_data, parent=None):
super().__init__(parent)
self.voice_data = voice_data
self.current_voices = tuple((x.display_name, x.language, x.gender, x.id) for x in voice_data)
self.column_headers = _('Name'), _('Language'), _('Gender')
def rowCount(self, parent=None):
return len(self.current_voices) + 1
def columnCount(self, parent=None):
return len(self.column_headers)
def headerData(self, section, orientation, role=Qt.ItemDataRole.DisplayRole):
if role == Qt.ItemDataRole.DisplayRole and orientation == Qt.Orientation.Horizontal:
return self.column_headers[section]
return super().headerData(section, orientation, role)
def data(self, index, role=Qt.ItemDataRole.DisplayRole):
if role == Qt.ItemDataRole.DisplayRole:
row = index.row()
with suppress(IndexError):
if row == 0:
return (_('System default'), '', '', '')[index.column()]
data = self.current_voices[row - 1]
col = index.column()
ans = data[col] or ''
return ans
if role == Qt.ItemDataRole.UserRole:
row = index.row()
with suppress(IndexError):
if row == 0:
return self.system_default_voice
return self.current_voices[row - 1][3]
def index_for_voice(self, v):
r = 0
if v != self.system_default_voice:
for i, x in enumerate(self.current_voices):
if x[3] == v:
r = i + 1
break
else:
return
return self.index(r, 0)
class Widget(QWidget):
def __init__(self, tts_client, initial_backend_settings=None, parent=None):
QWidget.__init__(self, parent)
self.l = l = QFormLayout(self)
self.tts_client = tts_client
with BusyCursor():
self.voice_data = self.tts_client.get_all_voices()
self.default_system_rate = self.tts_client.default_system_rate
self.all_sound_outputs = self.tts_client.get_all_audio_devices()
self.default_system_audio_device = self.tts_client.default_system_audio_device
self.speed = s = QSlider(Qt.Orientation.Horizontal, self)
s.setMinimumWidth(200)
l.addRow(_('&Speed of speech:'), s)
s.setRange(int(self.tts_client.min_rate * 100), int(100 * self.tts_client.max_rate))
s.setSingleStep(10)
s.setPageStep(40)
self.voices = v = QTableView(self)
self.voices_model = VoicesModel(self.voice_data, parent=v)
self.proxy_model = p = QSortFilterProxyModel(self)
p.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
p.setSourceModel(self.voices_model)
v.setModel(p)
v.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows)
v.setSortingEnabled(True)
v.horizontalHeader().resizeSection(0, QFontMetrics(self.font()).averageCharWidth() * 25)
v.horizontalHeader().resizeSection(1, QFontMetrics(self.font()).averageCharWidth() * 30)
v.verticalHeader().close()
v.verticalHeader().close()
v.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection)
v.sortByColumn(0, Qt.SortOrder.AscendingOrder)
l.addRow(v)
self.sound_outputs = so = QComboBox(self)
so.addItem(_('System default'), ())
for x in self.all_sound_outputs:
so.addItem(x.name, x.spec())
l.addRow(_('Sound output:'), so)
self.backend_settings = initial_backend_settings or {}
def restore_state(self, prefs):
data = prefs.get(f'{self.tts_client.name}-voice-table-state')
if data is not None:
self.voices.horizontalHeader().restoreState(QByteArray(data))
def save_state(self, prefs):
data = bytearray(self.voices.horizontalHeader().saveState())
prefs.set(f'{self.tts_client.name}-voice-table-state', data)
def restore_to_defaults(self):
self.backend_settings = {}
def sizeHint(self):
ans = super().sizeHint()
ans.setHeight(max(ans.height(), 600))
ans.setWidth(max(ans.width(), 500))
return ans
@property
def selected_voice(self):
for x in self.voices.selectedIndexes():
return x.data(Qt.ItemDataRole.UserRole)
@selected_voice.setter
def selected_voice(self, val):
val = val or VoicesModel.system_default_voice
idx = self.voices_model.index_for_voice(val)
if idx is not None:
idx = self.proxy_model.mapFromSource(idx)
self.voices.selectionModel().select(idx, QItemSelectionModel.SelectionFlag.ClearAndSelect | QItemSelectionModel.SelectionFlag.Rows)
self.voices.scrollTo(idx)
@property
def rate(self):
return self.speed.value() / 100
@rate.setter
def rate(self, val):
val = int((val or self.default_system_rate) * 100)
self.speed.setValue(val)
@property
def sound_output(self):
return self.sound_outputs.currentData()
@sound_output.setter
def sound_output(self, val):
idx = 0
if val:
val = tuple(val)
for q in range(self.sound_outputs.count()):
x = self.sound_outputs.itemData(q)
if x == val:
idx = q
break
self.sound_outputs.setCurrentIndex(idx)
@property
def backend_settings(self):
ans = {}
voice = self.selected_voice
if voice and voice != VoicesModel.system_default_voice:
ans['voice'] = voice
rate = self.rate
if rate and rate != self.default_system_rate:
ans['rate'] = rate
so = self.sound_output
if so:
ans['sound_output'] = so
return ans
@backend_settings.setter
def backend_settings(self, val):
voice = val.get('voice') or VoicesModel.system_default_voice
self.selected_voice = voice
self.rate = val.get('rate', self.default_system_rate)
self.sound_output = val.get('sound_output') or ()
def develop():
from calibre.gui2 import Application
from calibre.gui2.tts.implementation import Client
from calibre.gui2.viewer.config import vprefs
s = vprefs.get('tts_winspeech') or {}
print(s)
print(flush=True)
app = Application([])
c = Client()
w = Widget(c, s)
w.show()
app.exec()
print(flush=True)
print(w.backend_settings)
if __name__ == '__main__':
develop()

View File

@ -1,285 +0,0 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
from threading import Thread
from time import monotonic
from typing import NamedTuple
from calibre import prepare_string_for_xml
from .common import Event, EventType, add_markup
class QueueEntry(NamedTuple):
stream_number: int
text: str
class SpeechQueue:
def __init__(self):
self.clear()
def __len__(self):
return len(self.items)
def clear(self, keep_mark=False):
self.items = []
self.pos = -1
if not keep_mark:
self.last_mark = None
def add(self, stream_number, text):
self.items.append(QueueEntry(stream_number, text))
def start(self, stream_number):
self.pos = -1
for i, x in enumerate(self.items):
if x.stream_number == stream_number:
self.pos = i
break
@property
def is_at_start(self):
return self.pos == 0
@property
def is_at_end(self):
return self.pos >= len(self.items) - 1
@property
def current_stream_number(self):
if -1 < self.pos < len(self.items):
return self.items[self.pos].stream_number
def resume_from_last_mark(self, mark_template):
if self.pos < 0 or self.pos >= len(self.items):
return
item = self.items[self.pos]
if self.last_mark is None:
idx = -1
else:
idx = item.text.find(mark_template.format(self.last_mark))
if idx == -1:
text = item.text
else:
text = item.text[idx:]
yield text
for i in range(self.pos + 1, len(self.items)):
yield self.items[i].text
class Client:
mark_template = '<bookmark mark="{}"/>'
name = 'sapi'
min_rate = -10
max_rate = 10
chunk_size = 128 * 1024
@classmethod
def escape_marked_text(cls, text):
return prepare_string_for_xml(text)
def __init__(self, settings=None, dispatch_on_main_thread=lambda f: f()):
self.create_voice()
self.ignore_next_stop_event = None
self.ignore_next_start_event = False
self.default_system_rate = self.sp_voice.get_current_rate()
self.default_system_voice = self.sp_voice.get_current_voice()
self.default_system_sound_output = self.sp_voice.get_current_sound_output()
self.current_stream_queue = SpeechQueue()
self.current_callback = None
self.dispatch_on_main_thread = dispatch_on_main_thread
self.synthesizing = False
self.pause_count = 0
self.settings = settings or {}
self.apply_settings()
@property
def status(self):
return {'synthesizing': self.synthesizing, 'paused': self.pause_count > 0}
def clear_pauses(self):
while self.pause_count:
self.sp_voice.resume()
self.pause_count -= 1
def create_voice(self):
from calibre.utils.windows.winsapi import ISpVoice
self.sp_voice = ISpVoice()
self.events_thread = Thread(name='SAPIEvents', target=self.wait_for_events, daemon=True)
self.events_thread.start()
def __del__(self):
if self.sp_voice is not None:
self.sp_voice.shutdown_event_loop()
self.events_thread.join(5)
self.sp_voice = None
shutdown = __del__
def apply_settings(self, new_settings=None):
if self.pause_count:
self.clear_pauses()
self.ignore_next_stop_event = monotonic()
self.synthesizing = False
if new_settings is not None:
self.settings = new_settings
try:
self.sp_voice.set_current_rate(self.settings.get('rate', self.default_system_rate))
except OSError:
self.settings.pop('rate', None)
try:
self.sp_voice.set_current_voice(self.settings.get('voice') or self.default_system_voice)
except OSError:
self.settings.pop('voice', None)
try:
self.sp_voice.set_current_sound_output(self.settings.get('sound_output') or self.default_system_sound_output)
except OSError:
self.settings.pop('sound_output', None)
def wait_for_events(self):
while True:
if self.sp_voice.wait_for_event() is False:
break
self.dispatch_on_main_thread(self.handle_events)
def handle_events(self):
from calibre_extensions.winsapi import SPEI_END_INPUT_STREAM, SPEI_START_INPUT_STREAM, SPEI_TTS_BOOKMARK
c = self.current_callback
for (stream_number, event_type, event_data) in self.sp_voice.get_events():
if event_type == SPEI_TTS_BOOKMARK:
self.current_stream_queue.last_mark = event_data
event = Event(EventType.mark, event_data)
elif event_type == SPEI_START_INPUT_STREAM:
self.current_stream_queue.start(stream_number)
if self.ignore_next_start_event:
self.ignore_next_start_event = False
continue
self.synthesizing = True
if not self.current_stream_queue.is_at_start:
continue
event = Event(EventType.begin)
elif event_type == SPEI_END_INPUT_STREAM:
if self.ignore_next_stop_event is not None and monotonic() - self.ignore_next_stop_event < 2:
self.ignore_next_stop_event = None
continue
self.synthesizing = False
if not self.current_stream_queue.is_at_end:
continue
event = Event(EventType.end)
else:
continue
if c is not None and stream_number == self.current_stream_queue.current_stream_number:
try:
c(event)
except Exception:
import traceback
traceback.print_exc()
def speak_implementation(self, *args):
try:
return self.sp_voice.speak(*args)
except OSError as err:
# see https://docs.microsoft.com/en-us/previous-versions/office/developer/speech-technologies/jj127491(v=msdn.10)
import re
hr = int(re.search(r'\[hr=(0x\S+)', str(err)).group(1), 16)
if hr == 0x8004503a:
raise OSError(_('No active audio output devices found. Connect headphones or speakers.')) from err
raise
def speak(self, text, is_xml=False, want_events=True, purge=True):
from calibre_extensions.winsapi import SPF_ASYNC, SPF_IS_NOT_XML, SPF_IS_XML, SPF_PURGEBEFORESPEAK
flags = SPF_IS_XML if is_xml else SPF_IS_NOT_XML
if purge:
flags |= SPF_PURGEBEFORESPEAK
return self.speak_implementation(text, flags | SPF_ASYNC, want_events)
def purge(self):
from calibre_extensions.winsapi import SPF_PURGEBEFORESPEAK
self.speak_implementation('', SPF_PURGEBEFORESPEAK, False)
self.synthesizing = False
def speak_simple_text(self, text):
self.current_callback = None
self.current_stream_queue.clear()
number = self.speak(text)
self.clear_pauses()
self.current_stream_queue.add(number, text)
def speak_marked_text(self, text, callback):
self.clear_pauses()
self.current_stream_queue.clear()
if self.synthesizing:
self.ignore_next_stop_event = monotonic()
self.current_callback = callback
for i, chunk in enumerate(add_markup(text, self.mark_template, self.escape_marked_text, self.chunk_size)):
number = self.speak(chunk, is_xml=True, purge=i == 0)
self.current_stream_queue.add(number, chunk)
def stop(self):
self.clear_pauses()
self.purge()
if self.current_callback is not None:
self.current_callback(Event(EventType.cancel))
self.current_callback = None
def pause(self):
self.sp_voice.pause()
self.pause_count += 1
if self.current_callback is not None:
self.current_callback(Event(EventType.pause))
def resume(self):
if self.pause_count:
self.clear_pauses()
if self.current_callback is not None:
self.current_callback(Event(EventType.resume))
def resume_after_configure(self):
if self.pause_count:
self.clear_pauses()
return
chunks = tuple(self.current_stream_queue.resume_from_last_mark(self.mark_template))
self.ignore_next_start_event = True
self.current_stream_queue.clear(keep_mark=True)
self.purge()
for chunk in chunks:
number = self.speak(chunk, is_xml=True, purge=False)
self.current_stream_queue.add(number, chunk)
if self.current_callback is not None:
self.current_callback(Event(EventType.resume))
self.synthesizing = bool(chunks)
def get_voice_data(self):
ans = getattr(self, 'voice_data', None)
if ans is None:
ans = self.voice_data = self.sp_voice.get_all_voices()
return ans
def get_sound_outputs(self):
ans = getattr(self, 'sound_outputs', None)
if ans is None:
ans = self.sound_outputs = self.sp_voice.get_all_sound_outputs()
return ans
def config_widget(self, backend_settings, parent):
from calibre.gui2.tts.windows_sapi_config import Widget
return Widget(self, backend_settings, parent)
def change_rate(self, steps=1):
rate = current_rate = self.settings.get('rate', self.default_system_rate)
step_size = (self.max_rate - self.min_rate) // 10
rate += steps * step_size
rate = max(self.min_rate, min(rate, self.max_rate))
if rate != current_rate:
self.settings['rate'] = rate
was_synthesizing = self.synthesizing
self.pause()
self.apply_settings()
if was_synthesizing:
self.synthesizing = True
self.resume_after_configure()
return self.settings

View File

@ -1,210 +0,0 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
from contextlib import suppress
from qt.core import (
QAbstractItemView,
QAbstractTableModel,
QByteArray,
QComboBox,
QFontMetrics,
QFormLayout,
QItemSelectionModel,
QSlider,
QSortFilterProxyModel,
Qt,
QTableView,
QWidget,
)
from calibre.gui2.widgets import BusyCursor
class VoicesModel(QAbstractTableModel):
system_default_voice = ''
def __init__(self, voice_data, parent=None):
super().__init__(parent)
self.voice_data = voice_data
def language(x):
return x.get('language_display_name') or x.get('language') or ''
self.current_voices = tuple((x['name'], language(x), x.get('age', ''), x.get('gender', ''), x['id']) for x in voice_data)
self.column_headers = _('Name'), _('Language'), _('Age'), _('Gender')
def rowCount(self, parent=None):
return len(self.current_voices) + 1
def columnCount(self, parent=None):
return len(self.column_headers)
def headerData(self, section, orientation, role=Qt.ItemDataRole.DisplayRole):
if role == Qt.ItemDataRole.DisplayRole and orientation == Qt.Orientation.Horizontal:
return self.column_headers[section]
return super().headerData(section, orientation, role)
def data(self, index, role=Qt.ItemDataRole.DisplayRole):
if role == Qt.ItemDataRole.DisplayRole:
row = index.row()
with suppress(IndexError):
if row == 0:
return (_('System default'), '', '', '')[index.column()]
data = self.current_voices[row - 1]
col = index.column()
ans = data[col] or ''
return ans
if role == Qt.ItemDataRole.UserRole:
row = index.row()
with suppress(IndexError):
if row == 0:
return self.system_default_voice
return self.current_voices[row - 1][4]
def index_for_voice(self, v):
r = 0
if v != self.system_default_voice:
for i, x in enumerate(self.current_voices):
if x[4] == v:
r = i + 1
break
else:
return
return self.index(r, 0)
class Widget(QWidget):
def __init__(self, tts_client, initial_backend_settings=None, parent=None):
QWidget.__init__(self, parent)
self.l = l = QFormLayout(self)
self.tts_client = tts_client
with BusyCursor():
self.voice_data = self.tts_client.get_voice_data()
self.default_system_rate = self.tts_client.default_system_rate
self.all_sound_outputs = self.tts_client.get_sound_outputs()
self.speed = s = QSlider(Qt.Orientation.Horizontal, self)
s.setMinimumWidth(200)
l.addRow(_('&Speed of speech (words per minute):'), s)
s.setRange(self.tts_client.min_rate, self.tts_client.max_rate)
s.setSingleStep(1)
s.setPageStep(2)
self.voices = v = QTableView(self)
self.voices_model = VoicesModel(self.voice_data, parent=v)
self.proxy_model = p = QSortFilterProxyModel(self)
p.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
p.setSourceModel(self.voices_model)
v.setModel(p)
v.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows)
v.setSortingEnabled(True)
v.horizontalHeader().resizeSection(0, QFontMetrics(self.font()).averageCharWidth() * 25)
v.horizontalHeader().resizeSection(1, QFontMetrics(self.font()).averageCharWidth() * 30)
v.verticalHeader().close()
v.verticalHeader().close()
v.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection)
v.sortByColumn(0, Qt.SortOrder.AscendingOrder)
l.addRow(v)
self.sound_outputs = so = QComboBox(self)
so.addItem(_('System default'), '')
for x in self.all_sound_outputs:
so.addItem(x.get('description') or x['id'], x['id'])
l.addRow(_('Sound output:'), so)
self.backend_settings = initial_backend_settings or {}
def restore_state(self, prefs):
data = prefs.get(f'{self.tts_client.name}-voice-table-state')
if data is not None:
self.voices.horizontalHeader().restoreState(QByteArray(data))
def save_state(self, prefs):
data = bytearray(self.voices.horizontalHeader().saveState())
prefs.set(f'{self.tts_client.name}-voice-table-state', data)
def restore_to_defaults(self):
self.backend_settings = {}
def sizeHint(self):
ans = super().sizeHint()
ans.setHeight(max(ans.height(), 600))
ans.setWidth(max(ans.width(), 500))
return ans
@property
def selected_voice(self):
for x in self.voices.selectedIndexes():
return x.data(Qt.ItemDataRole.UserRole)
@selected_voice.setter
def selected_voice(self, val):
val = val or VoicesModel.system_default_voice
idx = self.voices_model.index_for_voice(val)
if idx is not None:
idx = self.proxy_model.mapFromSource(idx)
self.voices.selectionModel().select(idx, QItemSelectionModel.SelectionFlag.ClearAndSelect | QItemSelectionModel.SelectionFlag.Rows)
self.voices.scrollTo(idx)
@property
def rate(self):
return self.speed.value()
@rate.setter
def rate(self, val):
val = int(val or self.default_system_rate)
self.speed.setValue(val)
@property
def sound_output(self):
return self.sound_outputs.currentData()
@sound_output.setter
def sound_output(self, val):
val = val or ''
idx = 0
if val:
q = self.sound_outputs.findData(val)
if q > -1:
idx = q
self.sound_outputs.setCurrentIndex(idx)
@property
def backend_settings(self):
ans = {}
voice = self.selected_voice
if voice and voice != VoicesModel.system_default_voice:
ans['voice'] = voice
rate = self.rate
if rate and rate != self.default_system_rate:
ans['rate'] = rate
so = self.sound_output
if so:
ans['sound_output'] = so
return ans
@backend_settings.setter
def backend_settings(self, val):
voice = val.get('voice') or VoicesModel.system_default_voice
self.selected_voice = voice
self.rate = val.get('rate') or self.default_system_rate
self.sound_output = val.get('sound_output') or ''
def develop():
from calibre.gui2 import Application
from calibre.gui2.tts.implementation import Client
app = Application([])
c = Client()
w = Widget(c, {})
w.show()
app.exec()
print(w.backend_settings)
if __name__ == '__main__':
develop()

View File

@ -303,8 +303,6 @@ def find_tests(which_tests=None, exclude_tests=None):
if iswindows: if iswindows:
from calibre.utils.windows.wintest import find_tests from calibre.utils.windows.wintest import find_tests
a(find_tests()) a(find_tests())
from calibre.utils.windows.winsapi import find_tests
a(find_tests())
a(unittest.defaultTestLoader.loadTestsFromTestCase(TestImports)) a(unittest.defaultTestLoader.loadTestsFromTestCase(TestImports))
if ok('dbcli'): if ok('dbcli'):
from calibre.db.cli.tests import find_tests from calibre.db.cli.tests import find_tests

View File

@ -1,619 +0,0 @@
/*
* winsapi.cpp
* Copyright (C) 2020 Kovid Goyal <kovid at kovidgoyal.net>
*
* Distributed under terms of the GPL3 license.
*/
#define _ATL_APARTMENT_THREADED
#include "common.h"
#include <atlbase.h>
extern CComModule _Module;
#include <atlcom.h>
#include <sapi.h>
#pragma warning( push )
#pragma warning( disable : 4996 ) // sphelper.h uses deprecated GetVersionEx
#include <sphelper.h>
#pragma warning( pop )
// Structures {{{
typedef struct {
PyObject_HEAD
ISpVoice *voice;
HANDLE shutdown_events_thread, events_available;
} Voice;
static PyTypeObject VoiceType = {
PyVarObject_HEAD_INIT(NULL, 0)
};
static const ULONGLONG speak_events = SPFEI(SPEI_START_INPUT_STREAM) | SPFEI(SPEI_END_INPUT_STREAM) | SPFEI(SPEI_TTS_BOOKMARK);
static PyObject *
Voice_new(PyTypeObject *type, PyObject *args, PyObject *kwds) {
HRESULT hr = CoInitialize(NULL);
if (hr != S_OK && hr != S_FALSE) {
if (hr == RPC_E_CHANGED_MODE) {
return error_from_hresult(hr, "COM initialization failed as it was already initialized in multi-threaded mode");
}
return PyErr_NoMemory();
}
Voice *self = (Voice *) type->tp_alloc(type, 0);
if (self) {
if (FAILED(hr = CoCreateInstance(CLSID_SpVoice, NULL, CLSCTX_ALL, IID_ISpVoice, (void **)&self->voice))) {
Py_CLEAR(self);
return error_from_hresult(hr, "Failed to create ISpVoice instance");
}
if (FAILED(hr = self->voice->SetNotifyWin32Event())) {
Py_CLEAR(self);
return error_from_hresult(hr, "Failed to set event based notify mechanism");
}
self->events_available = self->voice->GetNotifyEventHandle();
if (self->events_available == INVALID_HANDLE_VALUE) {
Py_CLEAR(self);
PyErr_SetString(PyExc_OSError, "Failed to get events handle for ISpVoice");
return NULL;
}
self->shutdown_events_thread = CreateEventW(NULL, true, false, NULL);
if (self->shutdown_events_thread == INVALID_HANDLE_VALUE) {
Py_CLEAR(self);
PyErr_SetFromWindowsErr(0);
return NULL;
}
}
return (PyObject*)self;
}
static void
Voice_dealloc(Voice *self) {
if (self->voice) { self->voice->Release(); self->voice = NULL; }
if (self->shutdown_events_thread != INVALID_HANDLE_VALUE) {
CloseHandle(self->shutdown_events_thread);
self->shutdown_events_thread = INVALID_HANDLE_VALUE;
}
CoUninitialize();
}
// }}}
// Enumeration {{{
static PyObject*
Voice_get_all_sound_outputs(Voice *self, PyObject *args) {
HRESULT hr = S_OK;
CComPtr<IEnumSpObjectTokens> iterator = NULL;
if (FAILED(hr = SpEnumTokens(SPCAT_AUDIOOUT, NULL, NULL, &iterator))) {
return error_from_hresult(hr, "Failed to create audio output category iterator");
}
pyobject_raii ans(PyList_New(0));
if (!ans) return NULL;
while (true) {
CComPtr<ISpObjectToken> token = NULL;
if (FAILED(hr = iterator->Next(1, &token, NULL)) || hr == S_FALSE || !token) break;
pyobject_raii dict(PyDict_New());
if (!dict) return NULL;
com_wchar_raii id, description;
if (FAILED(hr = token->GetId(id.unsafe_address()))) continue;
pyobject_raii idpy(PyUnicode_FromWideChar(id.ptr(), -1));
if (!idpy) return NULL;
if (PyDict_SetItemString(dict.ptr(), "id", idpy.ptr()) != 0) return NULL;
if (FAILED(hr = SpGetDescription(token, description.unsafe_address(), NULL))) continue;
pyobject_raii descriptionpy(PyUnicode_FromWideChar(description.ptr(), -1));
if (!descriptionpy) return NULL;
if (PyDict_SetItemString(dict.ptr(), "description", descriptionpy.ptr()) != 0) return NULL;
if (PyList_Append(ans.ptr(), dict.ptr()) != 0) return NULL;
}
return PyList_AsTuple(ans.ptr());
}
static PyObject*
Voice_get_current_sound_output(Voice *self, PyObject *args) {
HRESULT hr = S_OK;
CComPtr<ISpObjectToken> token = NULL;
if (FAILED(hr = self->voice->GetOutputObjectToken(&token))) return error_from_hresult(hr, "Failed to get current output object token");
if (hr == S_FALSE) Py_RETURN_NONE;
com_wchar_raii id;
if (FAILED(hr = token->GetId(id.unsafe_address()))) return error_from_hresult(hr, "Failed to get ID for current audio output token");
return PyUnicode_FromWideChar(id.ptr(), -1);
}
static PyObject*
Voice_set_current_sound_output(Voice *self, PyObject *args) {
wchar_raii id;
int allow_format_changes = 1;
if (!PyArg_ParseTuple(args, "|O&p", py_to_wchar, &id, &allow_format_changes)) return NULL;
HRESULT hr = S_OK;
if (id) {
CComPtr<ISpObjectToken> token = NULL;
if (FAILED(hr = SpGetTokenFromId(id.ptr(), &token))) {
return error_from_hresult(hr, "Failed to find sound output with id", PyTuple_GET_ITEM(args, 0));
}
if (FAILED(hr = self->voice->SetOutput(token, allow_format_changes))) return error_from_hresult(hr, "Failed to set sound output to", PyTuple_GET_ITEM(args, 0));
} else {
if (FAILED(hr = self->voice->SetOutput(NULL, allow_format_changes))) return error_from_hresult(hr, "Failed to set sound output to default");
}
Py_RETURN_NONE;
}
static PyObject*
Voice_get_current_voice(Voice *self, PyObject *args) {
HRESULT hr = S_OK;
CComPtr<ISpObjectToken> token = NULL;
if (FAILED(hr = self->voice->GetVoice(&token))) {
return error_from_hresult(hr, "Failed to get current voice");
}
com_wchar_raii id;
if (FAILED(hr = token->GetId(id.unsafe_address()))) return error_from_hresult(hr, "Failed to get ID for current voice");
return PyUnicode_FromWideChar(id.ptr(), -1);
}
static PyObject*
Voice_set_current_voice(Voice *self, PyObject *args) {
wchar_raii id;
if (!PyArg_ParseTuple(args, "|O&", py_to_wchar, &id)) return NULL;
HRESULT hr = S_OK;
if (id) {
CComPtr<ISpObjectToken> token = NULL;
if (FAILED(hr = SpGetTokenFromId(id.ptr(), &token))) {
return error_from_hresult(hr, "Failed to find voice with id", PyTuple_GET_ITEM(args, 0));
}
if (FAILED(hr = self->voice->SetVoice(token))) return error_from_hresult(hr, "Failed to set voice to", PyTuple_GET_ITEM(args, 0));
} else {
if (FAILED(hr = self->voice->SetVoice(NULL))) return error_from_hresult(hr, "Failed to set voice to default");
}
Py_RETURN_NONE;
}
static PyObject*
Voice_get_all_voices(Voice *self, PyObject *args) {
HRESULT hr = S_OK;
CComPtr<IEnumSpObjectTokens> iterator = NULL;
if (FAILED(hr = SpEnumTokens(SPCAT_VOICES, NULL, NULL, &iterator))) {
return error_from_hresult(hr, "Failed to create voice category iterator");
}
pyobject_raii ans(PyList_New(0));
if (!ans) return NULL;
while (true) {
CComPtr<ISpObjectToken> token = NULL;
if (FAILED(hr = iterator->Next(1, &token, NULL)) || hr == S_FALSE || !token) break;
pyobject_raii dict(PyDict_New());
if (!dict) return NULL;
com_wchar_raii id, description;
if (FAILED(hr = token->GetId(id.unsafe_address()))) continue;
pyobject_raii idpy(PyUnicode_FromWideChar(id.ptr(), -1));
if (!idpy) return NULL;
if (PyDict_SetItemString(dict.ptr(), "id", idpy.ptr()) != 0) return NULL;
if (FAILED(hr = SpGetDescription(token, description.unsafe_address(), NULL))) continue;
pyobject_raii descriptionpy(PyUnicode_FromWideChar(description.ptr(), -1));
if (!descriptionpy) return NULL;
if (PyDict_SetItemString(dict.ptr(), "description", descriptionpy.ptr()) != 0) return NULL;
CComPtr<ISpDataKey> attributes = NULL;
if (FAILED(hr = token->OpenKey(L"Attributes", &attributes))) continue;
#define ATTR(name) {\
com_wchar_raii val; \
if (SUCCEEDED(attributes->GetStringValue(TEXT(#name), val.unsafe_address()))) { \
pyobject_raii pyval(PyUnicode_FromWideChar(val.ptr(), -1)); if (!pyval) return NULL; \
if (PyDict_SetItemString(dict.ptr(), #name, pyval.ptr()) != 0) return NULL; \
}\
}
ATTR(gender); ATTR(name); ATTR(vendor); ATTR(age);
#undef ATTR
com_wchar_raii val;
if (SUCCEEDED(attributes->GetStringValue(L"language", val.unsafe_address()))) {
int lcid = wcstol(val.ptr(), NULL, 16);
wchar_t buf[LOCALE_NAME_MAX_LENGTH];
if (LCIDToLocaleName(lcid, buf, LOCALE_NAME_MAX_LENGTH, 0) > 0) {
pyobject_raii pyval(PyUnicode_FromWideChar(buf, -1)); if (!pyval) return NULL;
if (PyDict_SetItemString(dict.ptr(), "language", pyval.ptr()) != 0) return NULL;
wchar_t display_name[1024];
int res = GetLocaleInfoEx(buf, LOCALE_SLOCALIZEDDISPLAYNAME, display_name, sizeof(display_name)/sizeof(display_name[0]));
if (res > 0) {
pyobject_raii pd(PyUnicode_FromWideChar(display_name, -1)); if (!pd) return NULL;
if (PyDict_SetItemString(dict.ptr(), "language_display_name", pd.ptr()) != 0) return NULL;
}
}
}
if (PyList_Append(ans.ptr(), dict.ptr()) != 0) return NULL;
}
return PyList_AsTuple(ans.ptr());
}
// }}}
// Volume and rate {{{
static PyObject*
Voice_get_current_volume(Voice *self, PyObject *args) {
HRESULT hr = S_OK;
USHORT volume;
if (FAILED(hr = self->voice->GetVolume(&volume))) return error_from_hresult(hr);
return PyLong_FromUnsignedLong((unsigned long)volume);
}
static PyObject*
Voice_get_current_rate(Voice *self, PyObject *args) {
HRESULT hr = S_OK;
long rate;
if (FAILED(hr = self->voice->GetRate(&rate))) return error_from_hresult(hr);
return PyLong_FromLong(rate);
}
static PyObject*
Voice_set_current_rate(Voice *self, PyObject *args) {
HRESULT hr = S_OK;
long rate;
if (!PyArg_ParseTuple(args, "l", &rate)) return NULL;
if (rate < -10 || rate > 10) { PyErr_SetString(PyExc_ValueError, "rate must be between -10 and 10"); return NULL; }
if (FAILED(hr = self->voice->SetRate(rate))) return error_from_hresult(hr);
Py_RETURN_NONE;
}
static PyObject*
Voice_set_current_volume(Voice *self, PyObject *args) {
HRESULT hr = S_OK;
unsigned short volume;
if (!PyArg_ParseTuple(args, "H", &volume)) return NULL;
if (FAILED(hr = self->voice->SetVolume(volume))) return error_from_hresult(hr);
Py_RETURN_NONE;
}
// }}}
static PyObject*
Voice_speak(Voice *self, PyObject *args) {
wchar_raii text_or_path;
unsigned long flags = SPF_DEFAULT;
int want_events = 0;
HRESULT hr = S_OK;
if (!PyArg_ParseTuple(args, "O&|kp", py_to_wchar, &text_or_path, &flags, &want_events)) return NULL;
ULONGLONG events = want_events ? speak_events : 0;
if (FAILED(hr = self->voice->SetInterest(events, events))) {
return error_from_hresult(hr, "Failed to ask for events");
}
ULONG stream_number;
Py_BEGIN_ALLOW_THREADS;
hr = self->voice->Speak(text_or_path.ptr(), flags, &stream_number);
Py_END_ALLOW_THREADS;
if (FAILED(hr)) return error_from_hresult(hr, "Failed to speak");
return PyLong_FromUnsignedLong(stream_number);
}
static PyObject*
Voice_wait_until_done(Voice *self, PyObject *args) {
unsigned long timeout = INFINITE;
if (!PyArg_ParseTuple(args, "|k", &timeout)) return NULL;
HRESULT hr ;
Py_BEGIN_ALLOW_THREADS;
hr = self->voice->WaitUntilDone(timeout);
Py_END_ALLOW_THREADS;
if (hr == S_OK) Py_RETURN_TRUE;
Py_RETURN_FALSE;
}
static PyObject*
Voice_pause(Voice *self, PyObject *args) {
HRESULT hr = self->voice->Pause();
if (FAILED(hr)) return error_from_hresult(hr);
Py_RETURN_NONE;
}
static PyObject*
Voice_resume(Voice *self, PyObject *args) {
HRESULT hr = self->voice->Resume();
if (FAILED(hr)) return error_from_hresult(hr);
Py_RETURN_NONE;
}
static PyObject*
Voice_create_recording_wav(Voice *self, PyObject *args) {
HRESULT hr = S_OK;
wchar_raii path, text;
int do_events = 0;
SPSTREAMFORMAT format = SPSF_22kHz16BitMono;
if (!PyArg_ParseTuple(args, "O&O&|ip", py_to_wchar_no_none, &path, py_to_wchar_no_none, &text, &format, &do_events)) return NULL;
CComPtr <ISpStream> stream = NULL;
CSpStreamFormat audio_fmt;
if (FAILED(hr = audio_fmt.AssignFormat(format))) return error_from_hresult(hr, "Invalid Audio format");
CComPtr<ISpObjectToken> token = NULL;
if (FAILED(hr = self->voice->GetOutputObjectToken(&token))) return error_from_hresult(hr, "Failed to get current output object token");
bool uses_default_output = hr == S_FALSE;
if (FAILED(hr = SPBindToFile(path.ptr(), SPFM_CREATE_ALWAYS, &stream, &audio_fmt.FormatId(), audio_fmt.WaveFormatExPtr())))
return error_from_hresult(hr, "Failed to open file", PyTuple_GET_ITEM(args, 0));
if (FAILED(hr = self->voice->SetOutput(stream, TRUE))) {
stream->Close();
return error_from_hresult(hr, "Failed to set output to wav file", PyTuple_GET_ITEM(args, 0));
}
Py_BEGIN_ALLOW_THREADS;
hr = self->voice->Speak(text.ptr(), SPF_DEFAULT, NULL);
Py_END_ALLOW_THREADS;
stream->Close();
self->voice->SetOutput(uses_default_output ? NULL: token, TRUE);
if (FAILED(hr)) return error_from_hresult(hr, "Failed to speak into wav file", PyTuple_GET_ITEM(args, 0));
Py_RETURN_NONE;
}
static PyObject*
Voice_shutdown_event_loop(Voice *self, PyObject *args) {
if (!SetEvent(self->shutdown_events_thread)) return PyErr_SetFromWindowsErr(0);
Py_RETURN_NONE;
}
static PyObject*
Voice_get_events(Voice *self, PyObject *args) {
HRESULT hr;
const ULONG asz = 32;
ULONG num_events;
SPEVENT events[asz];
PyObject *ret;
long long val;
int etype;
PyObject *ans = PyList_New(0);
if (!ans) return NULL;
while (true) {
Py_BEGIN_ALLOW_THREADS;
hr = self->voice->GetEvents(asz, events, &num_events);
Py_END_ALLOW_THREADS;
if (hr != S_OK && hr != S_FALSE) break;
if (num_events == 0) break;
for (ULONG i = 0; i < num_events; i++) {
etype = events[i].eEventId;
bool ok = false;
switch(etype) {
case SPEI_TTS_BOOKMARK:
val = events[i].wParam;
ok = true;
break;
case SPEI_START_INPUT_STREAM:
case SPEI_END_INPUT_STREAM:
val = 0;
ok = true;
break;
}
if (ok) {
ret = Py_BuildValue("kiL", events[i].ulStreamNum, etype, val);
if (!ret) { Py_CLEAR(ans); return NULL; }
int x = PyList_Append(ans, ret);
Py_DECREF(ret);
if (x != 0) { Py_CLEAR(ans); return NULL; }
}
}
}
return ans;
}
static PyObject*
Voice_wait_for_event(Voice *self, PyObject *args) {
const HANDLE handles[2] = {self->shutdown_events_thread, self->events_available};
DWORD ev;
Py_BEGIN_ALLOW_THREADS;
ev = WaitForMultipleObjects(2, handles, false, INFINITE);
Py_END_ALLOW_THREADS;
switch (ev) {
case WAIT_OBJECT_0:
Py_RETURN_FALSE;
case WAIT_OBJECT_0 + 1:
Py_RETURN_TRUE;
}
Py_RETURN_NONE;
}
// Boilerplate {{{
#define M(name, args) { #name, (PyCFunction)Voice_##name, args, ""}
static PyMethodDef Voice_methods[] = {
M(get_all_voices, METH_NOARGS),
M(get_all_sound_outputs, METH_NOARGS),
M(speak, METH_VARARGS),
M(wait_until_done, METH_VARARGS),
M(pause, METH_NOARGS),
M(resume, METH_NOARGS),
M(create_recording_wav, METH_VARARGS),
M(get_current_rate, METH_NOARGS),
M(get_current_volume, METH_NOARGS),
M(get_current_voice, METH_NOARGS),
M(get_current_sound_output, METH_NOARGS),
M(set_current_voice, METH_VARARGS),
M(set_current_rate, METH_VARARGS),
M(set_current_volume, METH_VARARGS),
M(set_current_sound_output, METH_VARARGS),
M(shutdown_event_loop, METH_NOARGS),
M(wait_for_event, METH_NOARGS),
M(get_events, METH_NOARGS),
{NULL, NULL, 0, NULL}
};
#undef M
#define M(name, args) { #name, name, args, ""}
static PyMethodDef winsapi_methods[] = {
{NULL, NULL, 0, NULL}
};
#undef M
static int
exec_module(PyObject *m) {
VoiceType.tp_name = "winsapi.ISpVoice";
VoiceType.tp_doc = "Wrapper for ISpVoice";
VoiceType.tp_basicsize = sizeof(Voice);
VoiceType.tp_itemsize = 0;
VoiceType.tp_flags = Py_TPFLAGS_DEFAULT;
VoiceType.tp_new = Voice_new;
VoiceType.tp_methods = Voice_methods;
VoiceType.tp_dealloc = (destructor)Voice_dealloc;
if (PyType_Ready(&VoiceType) < 0) return -1;
Py_INCREF(&VoiceType);
if (PyModule_AddObject(m, "ISpVoice", (PyObject *) &VoiceType) < 0) {
Py_DECREF(&VoiceType);
return -1;
}
#define AI(name) if (PyModule_AddIntMacro(m, name) != 0) { Py_DECREF(&VoiceType); return -1; }
AI(SPF_DEFAULT);
AI(SPF_ASYNC);
AI(SPF_PURGEBEFORESPEAK);
AI(SPF_IS_FILENAME);
AI(SPF_IS_XML);
AI(SPF_IS_NOT_XML);
AI(SPF_PERSIST_XML);
AI(SPF_NLP_SPEAK_PUNC);
AI(SPF_PARSE_SSML);
AI(SPF_PARSE_AUTODETECT);
AI(SPF_NLP_MASK);
AI(SPF_PARSE_MASK);
AI(SPF_VOICE_MASK);
AI(SPF_UNUSED_FLAGS);
AI(INFINITE);
AI(SPSF_Default);
AI(SPSF_NoAssignedFormat);
AI(SPSF_Text);
AI(SPSF_NonStandardFormat);
AI(SPSF_ExtendedAudioFormat);
// Standard PCM wave formats
AI(SPSF_8kHz8BitMono);
AI(SPSF_8kHz8BitStereo);
AI(SPSF_8kHz16BitMono);
AI(SPSF_8kHz16BitStereo);
AI(SPSF_11kHz8BitMono);
AI(SPSF_11kHz8BitStereo);
AI(SPSF_11kHz16BitMono);
AI(SPSF_11kHz16BitStereo);
AI(SPSF_12kHz8BitMono);
AI(SPSF_12kHz8BitStereo);
AI(SPSF_12kHz16BitMono);
AI(SPSF_12kHz16BitStereo);
AI(SPSF_16kHz8BitMono);
AI(SPSF_16kHz8BitStereo);
AI(SPSF_16kHz16BitMono);
AI(SPSF_16kHz16BitStereo);
AI(SPSF_22kHz8BitMono);
AI(SPSF_22kHz8BitStereo);
AI(SPSF_22kHz16BitMono);
AI(SPSF_22kHz16BitStereo);
AI(SPSF_24kHz8BitMono);
AI(SPSF_24kHz8BitStereo);
AI(SPSF_24kHz16BitMono);
AI(SPSF_24kHz16BitStereo);
AI(SPSF_32kHz8BitMono);
AI(SPSF_32kHz8BitStereo);
AI(SPSF_32kHz16BitMono);
AI(SPSF_32kHz16BitStereo);
AI(SPSF_44kHz8BitMono);
AI(SPSF_44kHz8BitStereo);
AI(SPSF_44kHz16BitMono);
AI(SPSF_44kHz16BitStereo);
AI(SPSF_48kHz8BitMono);
AI(SPSF_48kHz8BitStereo);
AI(SPSF_48kHz16BitMono);
AI(SPSF_48kHz16BitStereo);
// TrueSpeech format
AI(SPSF_TrueSpeech_8kHz1BitMono);
// A-Law formats
AI(SPSF_CCITT_ALaw_8kHzMono);
AI(SPSF_CCITT_ALaw_8kHzStereo);
AI(SPSF_CCITT_ALaw_11kHzMono);
AI(SPSF_CCITT_ALaw_11kHzStereo);
AI(SPSF_CCITT_ALaw_22kHzMono);
AI(SPSF_CCITT_ALaw_22kHzStereo);
AI(SPSF_CCITT_ALaw_44kHzMono);
AI(SPSF_CCITT_ALaw_44kHzStereo);
// u-Law formats
AI(SPSF_CCITT_uLaw_8kHzMono);
AI(SPSF_CCITT_uLaw_8kHzStereo);
AI(SPSF_CCITT_uLaw_11kHzMono);
AI(SPSF_CCITT_uLaw_11kHzStereo);
AI(SPSF_CCITT_uLaw_22kHzMono);
AI(SPSF_CCITT_uLaw_22kHzStereo);
AI(SPSF_CCITT_uLaw_44kHzMono);
AI(SPSF_CCITT_uLaw_44kHzStereo);
// ADPCM formats
AI(SPSF_ADPCM_8kHzMono);
AI(SPSF_ADPCM_8kHzStereo);
AI(SPSF_ADPCM_11kHzMono);
AI(SPSF_ADPCM_11kHzStereo);
AI(SPSF_ADPCM_22kHzMono);
AI(SPSF_ADPCM_22kHzStereo);
AI(SPSF_ADPCM_44kHzMono);
AI(SPSF_ADPCM_44kHzStereo);
// GSM 6.10 formats
AI(SPSF_GSM610_8kHzMono);
AI(SPSF_GSM610_11kHzMono);
AI(SPSF_GSM610_22kHzMono);
AI(SPSF_GSM610_44kHzMono);
AI(SPEI_UNDEFINED);
//--- TTS engine
AI(SPEI_START_INPUT_STREAM);
AI(SPEI_END_INPUT_STREAM);
AI(SPEI_VOICE_CHANGE);
AI(SPEI_TTS_BOOKMARK);
AI(SPEI_WORD_BOUNDARY);
AI(SPEI_PHONEME);
AI(SPEI_SENTENCE_BOUNDARY);
AI(SPEI_VISEME);
AI(SPEI_TTS_AUDIO_LEVEL);
//--- Engine vendors use these reserved bits
AI(SPEI_TTS_PRIVATE);
AI(SPEI_MIN_TTS);
AI(SPEI_MAX_TTS);
//--- Speech Recognition
AI(SPEI_END_SR_STREAM);
AI(SPEI_SOUND_START);
AI(SPEI_SOUND_END);
AI(SPEI_PHRASE_START);
AI(SPEI_RECOGNITION);
AI(SPEI_HYPOTHESIS);
AI(SPEI_SR_BOOKMARK);
AI(SPEI_PROPERTY_NUM_CHANGE);
AI(SPEI_PROPERTY_STRING_CHANGE);
AI(SPEI_FALSE_RECOGNITION);
AI(SPEI_INTERFERENCE);
AI(SPEI_REQUEST_UI);
AI(SPEI_RECO_STATE_CHANGE);
AI(SPEI_ADAPTATION);
AI(SPEI_START_SR_STREAM);
AI(SPEI_RECO_OTHER_CONTEXT);
AI(SPEI_SR_AUDIO_LEVEL);
AI(SPEI_SR_RETAINEDAUDIO);
//--- Engine vendors use these reserved bits
AI(SPEI_SR_PRIVATE);
AI(SPEI_MIN_SR);
AI(SPEI_MAX_SR);
//--- Reserved: Do not use
AI(SPEI_RESERVED1);
AI(SPEI_RESERVED2);
#undef AI
return 0;
}
static PyModuleDef_Slot slots[] = { {Py_mod_exec, (void*)exec_module}, {0, NULL} };
static struct PyModuleDef module_def = {PyModuleDef_HEAD_INIT};
CALIBRE_MODINIT_FUNC PyInit_winsapi(void) {
module_def.m_name = "winsapi";
module_def.m_doc = "SAPI wrapper";
module_def.m_methods = winsapi_methods;
module_def.m_slots = slots;
return PyModuleDef_Init(&module_def);
}

View File

@ -1,79 +0,0 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
from calibre_extensions.winsapi import ISpVoice
def develop():
spv = ISpVoice()
spv.create_recording_wav('test.wav', 'Hello, world!')
def find_tests():
import os
import unittest
is_ci = os.environ.get('CI', '').lower() == 'true'
class TestSAPI(unittest.TestCase):
def setUp(self):
self.sapi = ISpVoice()
def tearDown(self):
self.sapi = None
def test_enumeration_of_voices(self):
default_voice = self.sapi.get_current_voice()
self.assertTrue(default_voice)
all_voices = self.sapi.get_all_voices()
self.assertTrue(all_voices)
self.assertIn(default_voice, {x['id'] for x in all_voices})
for voice in all_voices:
for key in ('name', 'gender', 'age', 'language', 'description'):
self.assertIn(key, voice)
self.sapi.set_current_voice(voice['id'])
self.assertEqual(self.sapi.get_current_voice(), voice['id'])
self.sapi.set_current_voice()
self.assertEqual(self.sapi.get_current_voice(), default_voice)
@unittest.skipIf(is_ci, 'No sound output on CI')
def test_enumeration_of_sound_outputs(self):
default_output = self.sapi.get_current_sound_output()
self.assertTrue(default_output)
all_outputs = self.sapi.get_all_sound_outputs()
self.assertTrue(all_outputs)
self.assertIn(default_output, {x['id'] for x in all_outputs})
for output in all_outputs:
for key in ('id', 'description',):
self.assertIn(key, output)
self.sapi.set_current_voice(output['id'])
self.assertEqual(self.sapi.get_current_sound_output(), output['id'])
self.sapi.set_current_sound_output()
self.assertEqual(self.sapi.get_current_sound_output(), default_output)
def test_volume_and_rate(self):
dr = self.sapi.get_current_rate()
new_rate = dr // 2 + 1
self.sapi.set_current_rate(new_rate)
self.assertEqual(self.sapi.get_current_rate(), new_rate)
self.sapi.set_current_rate(dr)
dv = self.sapi.get_current_volume()
new_vol = dv // 2 + 3
self.sapi.set_current_volume(new_vol)
self.assertEqual(self.sapi.get_current_volume(), new_vol)
self.sapi.set_current_volume(dv)
def test_record_as_audio_file(self):
import tempfile
with tempfile.TemporaryDirectory() as tdir:
wav_path = os.path.join(tdir, 'test.wav')
self.sapi.create_recording_wav(wav_path, 'testing microsoft voices')
self.assertGreater(os.path.getsize(wav_path), 256)
return unittest.defaultTestLoader.loadTestsFromTestCase(TestSAPI)
def run_tests():
from calibre.utils.run_tests import run_tests
run_tests(find_tests)

View File

@ -1,935 +0,0 @@
/*
* winspeech.cpp
* Copyright (C) 2023 Kovid Goyal <kovid at kovidgoyal.net>
*
* Distributed under terms of the GPL3 license.
*/
#include "common.h"
#include <atomic>
#include <filesystem>
#include <string_view>
#include <fstream>
#include <mutex>
#include <filesystem>
#include <functional>
#include <iostream>
#include <unordered_map>
#include <io.h>
#include <winrt/base.h>
#include <winrt/windows.foundation.h>
#include <winrt/windows.foundation.collections.h>
#include <winrt/windows.storage.streams.h>
#include <winrt/windows.media.speechsynthesis.h>
#include <winrt/windows.media.core.h>
#include <winrt/windows.media.playback.h>
#include <winrt/windows.media.devices.h>
#include <winrt/windows.devices.enumeration.h>
#ifdef max
#undef max
#endif
using namespace winrt::Windows::Foundation;
using namespace winrt::Windows::Foundation::Collections;
using namespace winrt::Windows::Media::SpeechSynthesis;
using namespace winrt::Windows::Media::Playback;
using namespace winrt::Windows::Media::Core;
using namespace winrt::Windows::Media::Devices;
using namespace winrt::Windows::Devices::Enumeration;
using namespace winrt::Windows::Storage::Streams;
typedef uint64_t id_type;
static std::mutex output_lock;
static DWORD main_thread_id;
template<typename T> static void
__debug_multiple_impl(T x) {
if constexpr (std::is_same_v<T, wchar_t*> || std::is_same_v<T, std::wstring> || std::is_same_v<T, winrt::hstring> || std::is_same_v<T, std::wstring_view>) {
std::cerr << winrt::to_string(x);
} else {
std::cerr << x;
}
}
template<typename T> static void
__debug_multiple(T x) {
__debug_multiple_impl(x);
std::cerr << std::endl;
}
template<typename T, typename... Args> static void
__debug_multiple(T x, Args... args) {
__debug_multiple_impl(x);
std::cerr << " ";
__debug_multiple(args...);
}
template<typename... Args> static void
debug(Args... args) {
std::scoped_lock _sl_(output_lock);
DWORD tid = GetCurrentThreadId();
if (tid == main_thread_id) std::cerr << "thread-main"; else std::cerr << "thread-" << tid;
std::cerr << ": ";
__debug_multiple(args...);
}
static std::atomic_bool main_loop_is_running;
enum {
STDIN_FAILED = 1,
STDIN_MSG,
EXIT_REQUESTED
};
static std::vector<std::wstring_view>
split(std::wstring_view const &src, std::wstring const &delim = L" ") {
size_t pos;
std::vector<std::wstring_view> ans; ans.reserve(16);
std::wstring_view sv(src);
while ((pos = sv.find(delim)) != std::wstring_view::npos) {
if (pos > 0) ans.emplace_back(sv.substr(0, pos));
sv = sv.substr(pos + 1);
}
if (sv.size() > 0) ans.emplace_back(sv);
return ans;
}
static std::wstring
join(std::vector<std::wstring_view> parts, std::wstring const &delim = L" ") {
std::wstring ans; ans.reserve(1024);
for (auto const &x : parts) {
ans.append(x);
ans.append(delim);
}
ans.erase(ans.size() - delim.size());
return ans;
}
static id_type
parse_id(std::wstring_view const& s) {
id_type ans = 0;
for (auto ch : s) {
auto delta = ch - '0';
if (delta < 0 || delta > 9) {
throw std::wstring(L"Not a valid id: ") + std::wstring(s);
}
ans = (ans * 10) + delta;
}
return ans;
}
static double
parse_double(const wchar_t *raw) {
std::wistringstream s(raw, std::ios_base::in);
s.imbue(std::locale("C"));
double ans;
s >> ans;
return ans;
}
static void
serialize_string_for_json(std::string const &src, std::ostream &out) {
out << '"';
for (auto ch : src) {
switch(ch) {
case '\\':
out << "\\\\"; break;
case '"':
out << "\\\""; break;
case '\n':
out << "\\n"; break;
case '\r':
out << "\\r"; break;
default:
out << ch; break;
}
}
out << '"';
}
template<typename T> static void
serialize_integer(std::ostream &out, T val, int base = 10) {
std::array<char, 16> str;
if (auto [ptr, ec] = std::to_chars(str.data(), str.data() + str.size(), val, base); ec == std::errc()) {
out << std::string_view(str.data(), ptr - str.data());
} else {
throw std::exception(std::make_error_code(ec).message().c_str());
}
}
template<typename T>static void
serialize_float(std::ostream &out, T val, std::chars_format fmt = std::chars_format::fixed) {
std::array<char, 16> str;
if (auto [ptr, ec] = std::to_chars(str.data(), str.data() + str.size(), val, fmt); ec == std::errc()) {
out << std::string_view(str.data(), ptr - str.data());
} else {
throw std::exception(std::make_error_code(ec).message().c_str());
}
}
class json_val { // {{{
private:
enum { DT_INT, DT_UINT, DT_STRING, DT_LIST, DT_OBJECT, DT_NONE, DT_BOOL, DT_FLOAT } type;
std::string s;
bool b;
double f;
int64_t i;
uint64_t u;
std::vector<json_val> list;
std::map<std::string, json_val> object;
void serialize(std::ostream &out) const {
switch(type) {
case DT_NONE:
out << "nil"; break;
case DT_BOOL:
out << (b ? "true" : "false"); break;
case DT_INT:
// this is not really correct since JS has various limits on numeric types, but good enough for us
serialize_integer(out, i); break;
case DT_UINT:
// this is not really correct since JS has various limits on numeric types, but good enough for us
serialize_integer(out, u); break;
case DT_FLOAT:
// again not technically correct
serialize_float(out, f); break;
case DT_STRING:
return serialize_string_for_json(s, out);
case DT_LIST: {
out << '[';
bool first = true;
for (auto const &i : list) {
if (!first) out << ", ";
first = false;
i.serialize(out);
}
out << ']';
break;
}
case DT_OBJECT: {
out << '{';
bool first = true;
for (const auto& [key, value]: object) {
if (!first) out << ", ";
first = false;
serialize_string_for_json(key, out);
out << ": ";
value.serialize(out);
}
out << '}';
break;
}
}
}
public:
json_val() : type(DT_NONE) {}
json_val(std::string &&text) : type(DT_STRING), s(text) {}
json_val(const char *ns) : type(DT_STRING), s(ns) {}
json_val(winrt::hstring const& text) : type(DT_STRING), s(winrt::to_string(text)) {}
json_val(std::wstring const& text) : type(DT_STRING), s(winrt::to_string(text)) {}
json_val(std::string_view text) : type(DT_STRING), s(text) {}
json_val(std::vector<json_val> &&items) : type(DT_LIST), list(items) {}
json_val(std::map<std::string, json_val> &&m) : type(DT_OBJECT), object(m) {}
json_val(std::initializer_list<std::pair<const std::string, json_val>> const& vals) : type(DT_OBJECT), object(vals) { }
static json_val from_hresult(HRESULT hr) {
json_val ans; ans.type = DT_STRING;
std::array<char, 16> str;
str[0] = '0'; str[1] = 'x';
if (auto [ptr, ec] = std::to_chars(str.data()+2, str.data() + str.size(), (uint32_t)hr, 16); ec == std::errc()) {
ans.s = std::string(str.data(), ptr - str.data());
} else {
throw std::exception(std::make_error_code(ec).message().c_str());
}
return ans;
}
json_val(VoiceInformation const& voice) : type(DT_OBJECT) {
const char *gender = "";
switch (voice.Gender()) {
case VoiceGender::Male: gender = "male"; break;
case VoiceGender::Female: gender = "female"; break;
}
object = {
{"display_name", voice.DisplayName()},
{"description", voice.Description()},
{"id", voice.Id()},
{"language", voice.Language()},
{"gender", gender},
};
}
json_val(IVectorView<VoiceInformation> const& voices) : type(DT_LIST) {
list.reserve(voices.Size());
for(auto const& voice : voices) {
list.emplace_back(voice);
}
}
json_val(TimedMetadataTrackErrorCode const ec) : type(DT_STRING) {
switch(ec) {
case TimedMetadataTrackErrorCode::DataFormatError:
s = "data_format_error"; break;
case TimedMetadataTrackErrorCode::NetworkError:
s = "network_error"; break;
case TimedMetadataTrackErrorCode::InternalError:
s = "internal_error"; break;
case TimedMetadataTrackErrorCode::None:
s = "none"; break;
}
}
json_val(DeviceInformationKind const dev) : type(DT_STRING) {
switch(dev) {
case DeviceInformationKind::Unknown:
s = "unknown"; break;
case DeviceInformationKind::AssociationEndpoint:
s = "association_endpoint"; break;
case DeviceInformationKind::AssociationEndpointContainer:
s = "association_endpoint_container"; break;
case DeviceInformationKind::AssociationEndpointService:
s = "association_endpoint_service"; break;
case DeviceInformationKind::Device:
s = "device"; break;
case DeviceInformationKind::DevicePanel:
s = "device_panel"; break;
case DeviceInformationKind::DeviceInterface:
s = "device_interface"; break;
case DeviceInformationKind::DeviceInterfaceClass:
s = "device_interface_class"; break;
case DeviceInformationKind::DeviceContainer:
s = "device_container"; break;
}
}
json_val(DeviceInformation const& dev) : type(DT_OBJECT) {
object = {
{"id", dev.Id()},
{"name", dev.Name()},
{"kind", dev.Kind()},
{"is_default", dev.IsDefault()},
{"is_enabled", dev.IsEnabled()},
};
}
json_val(DeviceInformationCollection const& devices) : type(DT_LIST) {
list.reserve(devices.Size());
for(auto const& dev : devices) {
list.emplace_back(json_val(dev));
}
}
json_val(MediaPlaybackState const& state) : type(DT_STRING) {
switch(state) {
case MediaPlaybackState::None: s = "none"; break;
case MediaPlaybackState::Opening: s = "opening"; break;
case MediaPlaybackState::Buffering: s = "buffering"; break;
case MediaPlaybackState::Playing: s = "playing"; break;
case MediaPlaybackState::Paused: s = "paused"; break;
}
}
json_val(MediaPlayerError const& e) : type(DT_STRING) {
// https://learn.microsoft.com/en-us/uwp/api/windows.media.playback.mediaplayererror
switch(e) {
case MediaPlayerError::Unknown: s = "unknown"; break;
case MediaPlayerError::Aborted: s = "aborted"; break;
case MediaPlayerError::NetworkError: s = "network_error"; break;
case MediaPlayerError::DecodingError: s = "decoding_error"; break;
case MediaPlayerError::SourceNotSupported: s = "source_not_supported"; break;
}
}
json_val(winrt::Windows::Foundation::TimeSpan const &t) : type(DT_INT) {
i = std::chrono::nanoseconds(t).count();
}
json_val(winrt::hstring const &label, SpeechCue const &cue) : type(DT_OBJECT) {
object = {
{"type", label},
{"text", cue.Text()},
{"start_time", cue.StartTime()},
{"start_pos_in_text", cue.StartPositionInInput().Value()},
{"end_pos_in_text", cue.EndPositionInInput().Value()},
};
}
template<typename T> json_val(T const x) {
if constexpr (std::is_same_v<T, bool>) {
type = DT_BOOL;
b = x;
} else if constexpr (std::is_unsigned_v<T>) {
type = DT_UINT;
u = x;
} else if constexpr (std::is_integral_v<T>) {
type = DT_INT;
i = x;
} else if constexpr (std::is_floating_point_v<T>) {
type = DT_FLOAT;
f = x;
} else {
static_assert(!sizeof(T), "Unknown type T cannot be converted to JSON");
}
}
friend std::ostream& operator<<(std::ostream &os, const json_val &self) {
self.serialize(os);
return os;
}
}; // }}}
static void
output(id_type cmd_id, std::string_view const &msg_type, json_val const &&msg) {
std::scoped_lock sl(output_lock);
try {
std::cout << cmd_id << " " << msg_type << " " << msg << std::endl;
} catch(...) {}
}
static void
output_error(id_type cmd_id, std::string_view const &msg, std::string_view const &error, int64_t line, HRESULT hr=S_OK) {
std::map<std::string, json_val> m = {{"msg", msg}, {"error", error}, {"file", "winspeech.cpp"}, {"line", line}};
if (hr != S_OK) m["hr"] = json_val::from_hresult(hr);
output(cmd_id, "error", std::move(m));
}
static bool
run_catching_exceptions(std::function<void(void)> f, std::string_view const &msg, int64_t line, id_type cmd_id=0) {
bool ok = false;
try {
f();
ok = true;
} catch(winrt::hresult_error const& ex) {
output_error(cmd_id, msg, winrt::to_string(ex.message()), line, ex.to_abi());
} catch(const std::system_error& ex) {
output_error(cmd_id, msg, "system_error with code: " + std::to_string(ex.code().value()) + " and meaning: " + ex.what(), line);
} catch (std::exception const &ex) {
output_error(cmd_id, msg, ex.what(), line);
} catch (std::string const &ex) {
output_error(cmd_id, msg, ex, line);
} catch (std::wstring const &ex) {
output_error(cmd_id, msg, winrt::to_string(ex), line);
} catch (...) {
output_error(cmd_id, msg, "Unknown exception type was raised", line);
}
return ok;
}
struct Revokers {
MediaPlaybackSession::PlaybackStateChanged_revoker playback_state_changed;
MediaPlayer::MediaEnded_revoker media_ended; MediaPlayer::MediaOpened_revoker media_opened;
MediaPlayer::MediaFailed_revoker media_failed; MediaPlayer::SourceChanged_revoker source_changed;
MediaPlaybackItem::TimedMetadataTracksChanged_revoker timed_metadata_tracks_changed;
std::vector<TimedMetadataTrack::CueEntered_revoker> cue_entered;
std::vector<TimedMetadataTrack::CueExited_revoker> cue_exited;
std::vector<TimedMetadataTrack::TrackFailed_revoker> track_failed;
};
struct Mark {
uint32_t id, pos_in_text;
Mark(uint32_t id, uint32_t pos) : id(id), pos_in_text(pos) {}
};
struct Marks {
std::vector<Mark> entries;
int32_t last_reported_mark_index;
Marks() : entries(), last_reported_mark_index(-1) {}
};
static SpeechSynthesizer speech_synthesizer{nullptr};
static MediaPlayer media_player{nullptr};
static size_t
decode_into(std::string_view src, std::wstring_view dest) {
int n = MultiByteToWideChar(CP_UTF8, 0, src.data(), (int)src.size(), (wchar_t*)dest.data(), (int)dest.size());
if (n == 0 && src.size() > 0) {
throw std::system_error(GetLastError(), std::system_category(), "Failed to decode cued text");
}
return n;
}
static std::wstring_view
parse_cued_text(std::string_view src, Marks &marks, std::wstring_view dest) {
size_t dest_pos = 0;
if (dest.size() < src.size()) throw std::exception("Destination buffer for parse_cued_text() too small");
while (src.size()) {
auto pos = src.find('\0');
size_t limit = pos == std::string_view::npos ? src.size() : pos;
if (limit) {
dest_pos += decode_into(src.substr(0, limit), dest.substr(dest_pos, dest.size() - dest_pos));
src = src.substr(limit, src.size() - limit);
}
if (pos != std::string_view::npos) {
src = src.substr(1, src.size() - 1);
if (src.size() >= 4) {
uint32_t mark = *((uint32_t*)src.data());
marks.entries.emplace_back(mark, (uint32_t)dest_pos);
src = src.substr(4, src.size() - 4);
}
}
}
return dest.substr(0, dest_pos);
}
static std::wstring_view
read_from_shm(id_type cmd_id, const std::wstring_view size, const std::wstring &address, std::vector<wchar_t> &buf, Marks &marks, bool is_cued=false) {
id_type shm_size = parse_id(size);
handle_raii_null handle(OpenFileMappingW(FILE_MAP_READ, false, address.data()));
if (!handle) {
output_error(cmd_id, "Could not open shared memory at: " + winrt::to_string(address), winrt::to_string(get_last_error()), __LINE__);
return {};
}
mapping_raii mapping(MapViewOfFile(handle.ptr(), FILE_MAP_READ, 0, 0, (SIZE_T)shm_size));
if (!mapping) {
output_error(cmd_id, "Could not map shared memory", winrt::to_string(get_last_error()), __LINE__);
return {};
}
buf.reserve(shm_size + 2);
std::string_view src((const char*)mapping.ptr(), shm_size);
std::wstring_view dest(buf.data(), buf.capacity());
if (is_cued) return parse_cued_text(src, marks, dest);
return std::wstring_view(buf.data(), decode_into(src, dest));
}
// Speak {{{
static Revokers speak_revoker = {};
static void
register_metadata_handler_for_track(MediaPlaybackTimedMetadataTrackList const &tracks, uint32_t index, id_type cmd_id, std::shared_ptr<Marks> marks) {
TimedMetadataTrack track = tracks.GetAt(index);
tracks.SetPresentationMode((unsigned int)index, TimedMetadataTrackPresentationMode::ApplicationPresented);
speak_revoker.cue_entered.emplace_back(track.CueEntered(winrt::auto_revoke, [cmd_id, marks](auto track, const auto& args) {
if (!main_loop_is_running.load()) return;
auto label = track.Label();
auto cue = args.Cue().template as<SpeechCue>();
output(cmd_id, "cue_entered", {label, cue});
if (label != L"SpeechWord") return;
uint32_t pos = cue.StartPositionInInput().Value();
for (int32_t i = std::max(0, marks->last_reported_mark_index); i < (int32_t)marks->entries.size(); i++) {
int32_t idx = -1;
if (marks->entries[i].pos_in_text > pos) {
idx = i-1;
if (idx == marks->last_reported_mark_index && marks->entries[i].pos_in_text - pos < 3) idx = i;
} else if (marks->entries[i].pos_in_text == pos) idx = i;
if (idx > -1) {
output(cmd_id, "mark_reached", {{"id", marks->entries[idx].id}});
marks->last_reported_mark_index = idx;
break;
}
}
}));
speak_revoker.cue_exited.emplace_back(track.CueExited(winrt::auto_revoke, [cmd_id](auto track, const auto& args) {
if (main_loop_is_running.load()) output(
cmd_id, "cue_exited", json_val(track.Label(), args.Cue().template as<SpeechCue>()));
}));
speak_revoker.track_failed.emplace_back(track.TrackFailed(winrt::auto_revoke, [cmd_id](auto, const auto& args) {
auto error = args.Error();
if (main_loop_is_running.load()) output(
cmd_id, "track_failed", {{"code", error.ErrorCode()}, {"hr", json_val::from_hresult(error.ExtendedError())}});
}));
};
static void
handle_speak(id_type cmd_id, std::vector<std::wstring_view> &parts) {
bool is_ssml = false, is_shm = false, is_cued = false;
try {
is_ssml = parts.at(0) == L"ssml";
is_shm = parts.at(1) == L"shm";
is_cued = parts.at(0) == L"cued";
} catch (std::exception const&) {
throw std::string("Not a well formed speak command");
}
parts.erase(parts.begin(), parts.begin() + 2);
std::wstring address;
auto marks = std::make_shared<Marks>();
std::vector<wchar_t> buf;
std::wstring_view text;
if (is_shm) {
text = read_from_shm(cmd_id, parts.at(0), std::wstring(parts.at(1)), buf, *marks, is_cued);
if (text.size() == 0) return;
} else {
address = join(parts);
if (address.size() == 0) throw std::string("Address missing");
buf.reserve(address.size() + 1);
text = std::wstring_view(buf.data(), address.size());
address.copy(buf.data(), address.size());
}
*((wchar_t*)text.data() + text.size()) = 0; // ensure NULL termination
output(cmd_id, "synthesizing", {{"ssml", is_ssml}, {"num_marks", marks->entries.size()}, {"text_length", text.size()}});
SpeechSynthesisStream stream{nullptr};
if (!run_catching_exceptions([&]() {
speech_synthesizer.Options().IncludeSentenceBoundaryMetadata(true);
speech_synthesizer.Options().IncludeWordBoundaryMetadata(true);
if (is_ssml) stream = speech_synthesizer.SynthesizeSsmlToStreamAsync(text).get();
else stream = speech_synthesizer.SynthesizeTextToStreamAsync(text).get();
}, "Failed to synthesize speech", __LINE__, cmd_id)) return;
speak_revoker = {}; // delete any revokers previously installed
MediaSource source(MediaSource::CreateFromStream(stream, stream.ContentType()));
speak_revoker.playback_state_changed = media_player.PlaybackSession().PlaybackStateChanged(
winrt::auto_revoke, [cmd_id](auto session, auto const&) {
if (main_loop_is_running.load()) output(
cmd_id, "playback_state_changed", {{"state", session.PlaybackState()}});
});
speak_revoker.media_opened = media_player.MediaOpened(winrt::auto_revoke, [cmd_id](auto player, auto const&) {
if (main_loop_is_running.load()) output(
cmd_id, "media_state_changed", {{"state", "opened"}});
});
speak_revoker.media_ended = media_player.MediaEnded(winrt::auto_revoke, [cmd_id](auto player, auto const&) {
if (main_loop_is_running.load()) output(
cmd_id, "media_state_changed", {{"state", "ended"}});
});
speak_revoker.media_failed = media_player.MediaFailed(winrt::auto_revoke, [cmd_id](auto player, auto const& args) {
if (main_loop_is_running.load()) output(
cmd_id, "media_state_changed", {{"state", "failed"}, {"error", args.ErrorMessage()}, {"hr", json_val::from_hresult(args.ExtendedErrorCode())}, {"code", args.Error()}});
});
auto playback_item = std::make_shared<MediaPlaybackItem>(source);
speak_revoker.timed_metadata_tracks_changed = playback_item->TimedMetadataTracksChanged(winrt::auto_revoke,
[cmd_id, playback_item_weak_ref = std::weak_ptr(playback_item), marks](auto, auto const &args) {
auto change_type = args.CollectionChange();
long index;
switch (change_type) {
case CollectionChange::ItemInserted: index = args.Index(); break;
case CollectionChange::Reset: index = -1; break;
default: index = -2; break;
}
auto pi{ playback_item_weak_ref.lock() };
if (index > -2 && pi && main_loop_is_running.load()) register_metadata_handler_for_track(pi->TimedMetadataTracks(), index, cmd_id, marks);
});
for (uint32_t i = 0; i < playback_item->TimedMetadataTracks().Size(); i++) {
register_metadata_handler_for_track(playback_item->TimedMetadataTracks(), i, cmd_id, marks);
}
media_player.Source(*playback_item);
}
// }}}
// Save {{{
static void
save_stream(SpeechSynthesisStream const &&stream, std::filesystem::path path, id_type cmd_id) {
unsigned long long stream_size = stream.Size(), bytes_read = 0;
DataReader reader(stream);
unsigned int n;
const static unsigned int chunk_size = 16 * 1024;
std::array<uint8_t, chunk_size> buf;
std::ofstream outfile;
if (!run_catching_exceptions([&](){
outfile.open(path.string(), std::ios::out | std::ios::trunc);
}, "Failed to create file: " + path.string(), __LINE__, cmd_id)) return;
while (bytes_read < stream_size) {
if (!run_catching_exceptions([&]() {
n = reader.LoadAsync(chunk_size).get();
}, "Failed to load data from DataReader", __LINE__, cmd_id)) return;
if (n > 0) {
bytes_read += n;
if (!run_catching_exceptions([&]() {
reader.ReadBytes(winrt::array_view(buf.data(), buf.data() + n));
outfile.write((const char*)buf.data(), n);
if (!outfile.good()) throw "Failed to write to output file";
}, "Failed to save bytes from DataReader to file", __LINE__, cmd_id)) return;
}
}
output(cmd_id, "saved", {{"size", bytes_read}});
}
static void
handle_save(id_type cmd_id, std::vector<std::wstring_view> &parts) {
bool is_ssml;
try {
is_ssml = parts.at(0) == L"ssml";
} catch (std::exception const&) {
throw "Not a well formed save command"s;
}
std::vector<wchar_t> buf;
std::wstring address;
Marks marks;
std::wstring_view text = read_from_shm(cmd_id, parts.at(1), std::wstring(parts.at(2)), buf, marks);
if (text.size() == 0) return;
parts.erase(parts.begin(), parts.begin() + 3);
*((wchar_t*)text.data() + text.size()) = 0; // ensure NULL termination
auto filename = join(parts);
auto path = std::filesystem::absolute(filename);
output(cmd_id, "saving", {{"ssml", is_ssml}, {"output_path", path.string()}});
SpeechSynthesisStream stream{nullptr};
speech_synthesizer.Options().IncludeSentenceBoundaryMetadata(false);
speech_synthesizer.Options().IncludeWordBoundaryMetadata(false);
if (!run_catching_exceptions([&]() {
if (is_ssml) stream = speech_synthesizer.SynthesizeSsmlToStreamAsync(text).get();
else stream = speech_synthesizer.SynthesizeTextToStreamAsync(text).get();
}, "Failed to synthesize speech", __LINE__, cmd_id)) return;
save_stream(std::move(stream), path, cmd_id);
}
// }}}
typedef std::function<void(id_type, std::vector<std::wstring_view>, int64_t*)> handler_function;
static DeviceInformationKind
get_device_kind(const std::wstring x) {
if (x == L"device") return DeviceInformationKind::Device;
if (x == L"association_endpoint") return DeviceInformationKind::AssociationEndpoint;
if (x == L"association_endpoint_container") return DeviceInformationKind::AssociationEndpointContainer;
if (x == L"association_endpoint_service") return DeviceInformationKind::AssociationEndpointService;
if (x == L"device_container") return DeviceInformationKind::DeviceContainer;
if (x == L"device_interface") return DeviceInformationKind::DeviceInterface;
if (x == L"device_interface_class") return DeviceInformationKind::DeviceInterfaceClass;
if (x == L"device_panel") return DeviceInformationKind::DevicePanel;
return DeviceInformationKind::Unknown;
}
static const std::unordered_map<std::string, handler_function> handlers = {
{"exit", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t* exit_code) {
try {
*exit_code = parse_id(parts.at(0));
} catch(...) { }
*exit_code = 0;
}},
{"echo", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t*) {
output(cmd_id, "echo", {{"msg", join(parts)}});
}},
{"play", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t*) {
media_player.Play();
output(cmd_id, "play", {{"playback_state", media_player.PlaybackSession().PlaybackState()}});
}},
{"pause", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t*) {
media_player.Pause();
output(cmd_id, "pause", {{"playback_state", media_player.PlaybackSession().PlaybackState()}});
}},
{"state", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t*) {
auto ps = media_player.PlaybackSession();
if (ps) output(cmd_id, "state", {{"playback_state", ps.PlaybackState()}});
else output(cmd_id, "state", {{"playback_state", ""}});
}},
{"default_voice", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t*) {
output(cmd_id, "default_voice", {{"voice", SpeechSynthesizer::DefaultVoice()}});
}},
{"all_voices", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t*) {
output(cmd_id, "all_voices", {{"voices", SpeechSynthesizer::AllVoices()}});
}},
{"all_audio_devices", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t*) {
output(cmd_id, "all_audio_devices", {{"devices", DeviceInformation::FindAllAsync(MediaDevice::GetAudioRenderSelector()).get()}});
}},
{"speak", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t*) {
handle_speak(cmd_id, parts);
}},
{"audio_device", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t*) {
bool found = false;
if (parts.size()) {
auto device_kind = std::wstring(parts.at(0));
parts.erase(parts.begin(), parts.begin() + 1);
auto device_id = join(parts);
auto di = DeviceInformation::CreateFromIdAsync(device_id, {}, get_device_kind(device_kind)).get();
if (di) {
media_player.AudioDevice(di);
found = true;
}
}
auto x = media_player.AudioDevice();
if (x) output(cmd_id, "audio_device", {{"device", x}, {"found", found}});
else output(cmd_id, "audio_device", {{"device", ""}, {"found", found}});
}},
{"voice", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t*) {
bool found = false;
if (parts.size()) {
auto voice_id = winrt::hstring(parts.at(0));
if (voice_id == L"__default__") {
voice_id = SpeechSynthesizer::DefaultVoice().Id();
}
for (auto const &candidate : SpeechSynthesizer::AllVoices()) {
if (candidate.Id() == voice_id) {
speech_synthesizer.Voice(candidate);
found = true;
break;
}
}
}
auto x = speech_synthesizer.Voice();
if (x) output(cmd_id, "voice", {{"voice", speech_synthesizer.Voice()}, {"found", found}});
else output(cmd_id, "voice", {{"voice", ""}, {"found", found}});
}},
{"volume", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t*) {
if (parts.size()) {
auto vol = parse_double(parts.at(0).data());
if (vol < 0 || vol > 1) throw std::out_of_range("Invalid volume value must be between 0 and 1");
speech_synthesizer.Options().AudioVolume(vol);
}
output(cmd_id, "volume", {{"value", speech_synthesizer.Options().AudioVolume()}});
}},
{"rate", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t*) {
if (parts.size()) {
auto rate = parse_double(parts.at(0).data());
if (rate < 0.5 || rate > 6.0) throw std::out_of_range("Invalid rate value must be between 0.5 and 6");
speech_synthesizer.Options().SpeakingRate(rate);
}
output(cmd_id, "rate", {{"value", speech_synthesizer.Options().SpeakingRate()}});
}},
{"pitch", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t*) {
if (parts.size()) {
auto pitch = parse_double(parts.at(0).data());
if (pitch < 0 || pitch > 2) throw std::out_of_range("Invalid pitch value must be between 0 and 2");
speech_synthesizer.Options().AudioPitch(pitch);
}
output(cmd_id, "pitch", {{"value", speech_synthesizer.Options().AudioPitch()}});
}},
{"save", [](id_type cmd_id, std::vector<std::wstring_view> parts, int64_t*) {
handle_save(cmd_id, parts);
}},
};
static int64_t
handle_stdin_message(winrt::hstring const &&msg) {
if (msg == L"exit") {
return 0;
}
id_type cmd_id;
std::wstring_view command;
bool ok = false;
std::vector<std::wstring_view> parts;
int64_t exit_code = -1;
if (!run_catching_exceptions([&]() {
parts = split(msg);
command = parts.at(1); cmd_id = parse_id(parts.at(0));
if (cmd_id == 0) {
throw std::exception("Command id of zero is not allowed");
}
parts.erase(parts.begin(), parts.begin() + 2);
ok = true;
}, "Invalid input message: " + winrt::to_string(msg), __LINE__)) return exit_code;
handler_function handler;
std::string cmd(winrt::to_string(command));
try {
handler = handlers.at(cmd.c_str());
} catch (std::out_of_range) {
output_error(cmd_id, "Unknown command", cmd, __LINE__);
return exit_code;
}
run_catching_exceptions([&]() {
handler(cmd_id, parts, &exit_code);
}, "Error handling input message", __LINE__, cmd_id);
return exit_code;
}
#define INITIALIZE_FAILURE_MESSAGE "Failed to initialize SpeechSynthesizer and MediaPlayer"
static PyObject*
run_main_loop(PyObject*, PyObject*) {
if (!run_catching_exceptions([]() {
std::cout.imbue(std::locale("C"));
std::cin.imbue(std::locale("C"));
std::cerr.imbue(std::locale("C"));
std::wcin.imbue(std::locale("C"));
std::wcout.imbue(std::locale("C"));
std::wcerr.imbue(std::locale("C"));
}, "Failed to set stdio locales to C", __LINE__)) {
return PyLong_FromLongLong(1);
}
if (!run_catching_exceptions([]() {
winrt::init_apartment(winrt::apartment_type::multi_threaded);
}, "Failed to initialize COM", __LINE__)) {
return PyLong_FromLongLong(1);
}
main_thread_id = GetCurrentThreadId();
if (!run_catching_exceptions([]() {
speech_synthesizer = SpeechSynthesizer();
media_player = MediaPlayer();
media_player.AudioCategory(MediaPlayerAudioCategory::Speech);
media_player.AutoPlay(true);
}, INITIALIZE_FAILURE_MESSAGE, __LINE__)) {
return PyLong_FromLongLong(1);
}
if (_isatty(_fileno(stdin))) {
std::cout << "Welcome to winspeech. Type exit to quit." << std::endl;
}
int64_t exit_code = -1;
main_loop_is_running.store(true);
Py_BEGIN_ALLOW_THREADS;
std::string input_buffer;
while (exit_code < 0) {
try {
if (!std::getline(std::cin, input_buffer)) {
if (!std::cin.eof()) exit_code = 1;
break;
}
rtrim(input_buffer);
if (input_buffer.size() > 0) {
run_catching_exceptions([&]() {
exit_code = handle_stdin_message(std::move(winrt::to_hstring(input_buffer)));
}, "Error handling STDIN message", __LINE__);
if (exit_code >= 0) break;
}
} catch(...) {
exit_code = 1;
output_error(0, "Unknown exception type reading and handling line of input", "", __LINE__);
break;
}
}
Py_END_ALLOW_THREADS;
main_loop_is_running.store(false);
try {
speak_revoker = {};
speech_synthesizer = SpeechSynthesizer{nullptr};
media_player = MediaPlayer{nullptr};
} catch(...) {}
return PyLong_FromLongLong(exit_code);
}
#define M(name, args) { #name, name, args, ""}
static PyMethodDef methods[] = {
M(run_main_loop, METH_NOARGS),
{NULL, NULL, 0, NULL}
};
#undef M
static int
exec_module(PyObject *m) {
PyModule_AddStringMacro(m, INITIALIZE_FAILURE_MESSAGE);
return 0;
}
static PyModuleDef_Slot slots[] = { {Py_mod_exec, (void*)exec_module}, {0, NULL} };
static struct PyModuleDef module_def = {PyModuleDef_HEAD_INIT};
PyMODINIT_FUNC PyInit_winspeech(void) {
module_def.m_name = "winspeech";
module_def.m_doc = "Windows Speech API wrapper";
module_def.m_methods = methods;
module_def.m_slots = slots;
return PyModuleDef_Init(&module_def);
}

View File

@ -1,573 +0,0 @@
#!/usr/bin/env python
# License: GPLv3 Copyright: 2023, Kovid Goyal <kovid at kovidgoyal.net>
import json
import os
import struct
import sys
from contextlib import closing, suppress
from enum import Enum, auto
from itertools import count
from queue import Empty, Queue
from threading import Thread
from time import monotonic
from typing import NamedTuple, Optional, Tuple
from calibre.constants import DEBUG
from calibre.utils.ipc.simple_worker import start_pipe_worker
from calibre.utils.shm import SharedMemory
SSML_SAMPLE = '''
<speak version="1.0" xmlns="http://www.w3.org/2001/10/synthesis" xml:lang="en-US">
<voice name="en-US-AriaNeural">
We are selling <bookmark mark='flower_1'/>roses and <bookmark mark='flower_2'/>daisies.
</voice>
</speak>
'''
def start_worker():
return start_pipe_worker('from calibre_extensions.winspeech import run_main_loop; raise SystemExit(run_main_loop())')
def max_buffer_size(text) -> int:
if isinstance(text, str):
text = [text]
ans = 0
for x in text:
if isinstance(x, int):
ans += 5
else:
ans += 4 * len(x)
return ans
def encode_to_file_object(text, output) -> int:
if isinstance(text, str):
text = [text]
p = struct.pack
sz = 0
for x in text:
if isinstance(x, int):
output.write(b'\0')
output.write(p('=I', x))
sz += 5
else:
b = x.encode('utf-8')
output.write(b)
sz += len(b)
return sz
# message decoding {{{
class Saving(NamedTuple):
related_to: int
ssml: bool
output_path: str
class Saved(NamedTuple):
related_to: int
size: int
class CueEntered(NamedTuple):
related_to: int
start_pos_in_text: int
end_pos_in_text: int
start_time: int
type: str
text: str
class CueExited(CueEntered):
related_to: int
start_pos_in_text: int
end_pos_in_text: int
start_time: int
type: str
class MarkReached(NamedTuple):
related_to: int
id: int
class SpeechError(OSError):
def __init__(self, err, msg=''):
val = 'There was an error in the Windows Speech subsystem. '
if msg:
val += f'{msg}. '
val += err.msg + ': ' + err.error + f'\nFile: {err.file} Line: {err.line}'
if err.hr:
# List of mediaserver errors is here: https://www.hresult.info/FACILITY_MEDIASERVER
val += f' HRESULT: 0x{err.hr:x}'
super().__init__(val)
class NoAudioDevices(OSError):
display_to_user = True
def __init__(self):
super().__init__(_('No active audio output devices found.'
' Connect headphones or speakers. If you are using Remote Desktop then enable Remote Audio for it.'))
class NoMediaPack(OSError):
display_to_user = True
def __init__(self):
super().__init__(_('This computer is missing the Windows MediaPack, or the DLLs are corrupted. This is needed for Read aloud. Instructions'
' for installing it are available at {}').format(
'https://support.medal.tv/support/solutions/articles/48001157311-windows-is-missing-media-pack'))
class Error(NamedTuple):
msg: str
error: str = ''
line: int = 0
file: str = 'winspeech.py'
hr: str = 0
related_to: int = 0
def as_exception(self, msg='', check_for_no_audio_devices=False):
from calibre_extensions.winspeech import INITIALIZE_FAILURE_MESSAGE
if check_for_no_audio_devices and self.hr == 0xc00d36fa:
return NoAudioDevices()
if check_for_no_audio_devices and self.hr == 0x80070002 and self.msg == INITIALIZE_FAILURE_MESSAGE:
return NoMediaPack()
return SpeechError(self, msg)
class Synthesizing(NamedTuple):
related_to: int
ssml: bool
num_marks: int
text_length: int
class TrackFailed(NamedTuple):
related_to: int
code: str
hr: str
class PlaybackState(Enum):
none = auto()
opening = auto()
buffering = auto()
playing = auto()
paused = auto()
class PlaybackStateChanged(NamedTuple):
related_to: int
state: PlaybackState
class MediaState(Enum):
opened = auto()
ended = auto()
failed = auto()
class MediaPlayerError(Enum):
unknown = auto()
aborted = auto()
network_error = auto()
decoding_error = auto()
source_not_supported = auto()
class MediaStateChanged(NamedTuple):
related_to: int
state: MediaState
error: str = ""
code: MediaPlayerError = MediaPlayerError.unknown
hr: int = 0
def as_exception(self):
err = Error("Playback of speech stream failed", self.error + f' ({self.code})', hr=self.hr)
return err.as_exception(check_for_no_audio_devices=True)
class Echo(NamedTuple):
related_to: int
msg: str
class Play(NamedTuple):
related_to: int
playback_state: PlaybackState
class Pause(NamedTuple):
related_to: int
playback_state: PlaybackState
class State(NamedTuple):
related_to: int
playback_state: PlaybackState
class VoiceInformation(NamedTuple):
display_name: str
description: str
id: str
language: str
gender: str
class DefaultVoice(NamedTuple):
related_to: int
voice: VoiceInformation
class Voice(NamedTuple):
related_to: int
voice: Optional[VoiceInformation]
found: bool = True
class DeviceInformation(NamedTuple):
id: str
name: str
kind: str
is_default: bool
is_enabled: bool
def spec(self) -> Tuple[str, str]:
return self.kind, self.id
class AudioDevice(NamedTuple):
related_to: int
device: Optional[DeviceInformation]
found: bool = True
class AllAudioDevices(NamedTuple):
related_to: int
devices: Tuple[DeviceInformation, ...]
class AllVoices(NamedTuple):
related_to: int
voices: Tuple[VoiceInformation, ...]
class Volume(NamedTuple):
related_to: int
value: float
class Rate(NamedTuple):
related_to: int
value: float
class Pitch(NamedTuple):
related_to: int
value: float
def parse_message(line):
parts = line.strip().split(b' ', 2)
msg_id, msg_type, ans = int(parts[0]), parts[1].decode(), json.loads(parts[2])
ans['related_to'] = msg_id
if msg_type == 'cue_entered':
return CueEntered(**ans)
if msg_type == 'cue_exited':
return CueExited(**ans)
if msg_type == 'mark_reached':
return MarkReached(**ans)
if msg_type == 'playback_state_changed':
ans['state'] = getattr(PlaybackState, ans['state'])
return PlaybackStateChanged(**ans)
if msg_type == 'media_state_changed':
ans['state'] = getattr(MediaState, ans['state'])
if 'code' in ans:
ans['code'] = getattr(MediaPlayerError, ans['code'])
if 'hr' in ans:
ans['hr'] = int(ans['hr'], 16)
return MediaStateChanged(**ans)
if msg_type == 'error':
if 'hr' in ans:
ans['hr'] = int(ans['hr'], 16)
return Error(**ans)
if msg_type == 'synthesizing':
return Synthesizing(**ans)
if msg_type == 'track_failed':
return TrackFailed(**ans)
if msg_type == 'saving':
return Saving(**ans)
if msg_type == 'saved':
return Saved(**ans)
if msg_type == 'echo':
return Echo(**ans)
if msg_type == 'play':
ans['playback_state'] = getattr(PlaybackState, ans['playback_state'])
return Play(**ans)
if msg_type == 'pause':
ans['playback_state'] = getattr(PlaybackState, ans['playback_state'])
return Pause(**ans)
if msg_type == 'state':
ans['playback_state'] = getattr(PlaybackState, ans['playback_state'])
return State(**ans)
if msg_type == 'default_voice':
ans['voice'] = VoiceInformation(**ans['voice'])
return DefaultVoice(**ans)
if msg_type == 'all_voices':
ans['voices'] = tuple(VoiceInformation(**x) for x in ans['voices'])
return AllVoices(**ans)
if msg_type == 'all_audio_devices':
ans['devices'] = tuple(DeviceInformation(**x) for x in ans['devices'])
return AllAudioDevices(**ans)
if msg_type == 'audio_device':
if ans['device']:
ans['device'] = DeviceInformation(**ans['device'])
else:
ans['device'] = None
return AudioDevice(**ans)
if msg_type == 'voice':
if ans['voice']:
ans['voice'] = VoiceInformation(**ans['voice'])
else:
ans['voice'] = None
return Voice(**ans)
if msg_type == 'volume':
return Volume(**ans)
if msg_type == 'rate':
return Rate(**ans)
if msg_type == 'Pitch':
return Pitch(**ans)
return Error(f'Unknown message type: {msg_type}')
# }}}
class WinSpeech:
def __init__(self, event_dispatcher=print):
self._worker = None
self.queue = Queue()
self.msg_id_counter = count()
next(self.msg_id_counter)
self.pending_messages = []
self.current_speak_cmd_id = 0
self.waiting_for = -1
self.event_dispatcher = event_dispatcher
@property
def worker(self):
if self._worker is None:
self._worker = start_worker()
Thread(name='WinspeechQueue', target=self._get_messages, args=(self._worker, self.queue), daemon=True).start()
return self._worker
def __del__(self):
if self._worker is not None:
self.send_command('exit')
with suppress(Exception):
self._worker.wait(0.3)
if self._worker.poll() is None:
self._worker.kill()
self._worker = None
shutdown = __del__
def _get_messages(self, worker, queue):
def send_msg(msg):
if self.waiting_for == msg.related_to:
self.queue.put(msg)
else:
self.dispatch_message(msg)
try:
for line in worker.stdout:
line = line.strip()
if DEBUG:
with suppress(Exception):
print('winspeech:\x1b[32m<-\x1b[39m', line.decode('utf-8', 'replace'), flush=True)
send_msg(parse_message(line))
except OSError as e:
send_msg(Error('Failed to read from worker', str(e)))
except Exception as e:
send_msg(Error('Failed to parse message from worker', str(e)))
def send_command(self, cmd):
cmd_id = next(self.msg_id_counter)
w = self.worker
cmd = f'{cmd_id} {cmd}'
if DEBUG:
with suppress(Exception):
print('winspeech:\x1b[31m->\x1b[39m', cmd, flush=True)
w.stdin.write(f'{cmd}\n'.encode())
w.stdin.flush()
return cmd_id
def wait_for(self, error_msg, *classes, related_to=-1, timeout=4):
orig, self.waiting_for = self.waiting_for, related_to
try:
limit = monotonic() + timeout
while True:
left = limit - monotonic()
if left <= 0:
break
try:
x = self.queue.get(True, left)
except Empty:
break
if (not classes or isinstance(x, *classes)) and (not related_to or x.related_to == related_to):
return x
if isinstance(x, Error) and (not related_to or x.related_to == related_to):
raise x.as_exception(error_msg)
raise TimeoutError('Timed out waiting for: ' + error_msg)
finally:
self.waiting_for = orig
def speak(self, text, is_cued=False, is_xml=False):
with SharedMemory(size=max_buffer_size(text)) as shm:
st = 'cued' if is_cued else ('ssml' if is_xml else 'text')
sz = encode_to_file_object(text, shm)
self.current_speak_cmd_id = self.send_command(f'speak {st} shm {sz} {shm.name}')
self.wait_for('speech synthesis to start', Synthesizing, related_to=self.current_speak_cmd_id, timeout=8)
return self.current_speak_cmd_id
def dispatch_message(self, x):
if x.related_to == self.current_speak_cmd_id:
if isinstance(x, (Error, MediaStateChanged, MarkReached)):
self.event_dispatcher(x)
def pause(self):
self.wait_for('pause', Pause, related_to=self.send_command('pause'))
def play(self):
self.wait_for('play', Play, related_to=self.send_command('play'))
def set_rate(self, val):
val = float(val)
self.wait_for('Setting the rate', Rate, related_to=self.send_command(f'rate {val}'))
def set_voice(self, spec, default_system_voice):
val = spec or getattr(default_system_voice, 'id', '__default__')
x = self.wait_for('Setting the voice', Voice, related_to=self.send_command(f'voice {val}'))
if not x.found:
raise SpeechError(f'Failed to find the voice: {val}')
def set_audio_device(self, spec, default_system_audio_device):
if not spec and not default_system_audio_device:
return
if not spec:
spec = default_system_audio_device.spec()
x = self.wait_for('Setting the audio device', AudioDevice, related_to=self.send_command(f'audio_device {spec[0]} {spec[1]}'))
if not x.found:
raise SpeechError(f'Failed to find the audio device: {spec}')
def get_audio_device(self):
return self.wait_for('Audio device', AudioDevice, related_to=self.send_command('audio_device'))
def default_voice(self):
return self.wait_for('Default voice', DefaultVoice, related_to=self.send_command('default_voice'))
def all_voices(self):
return self.wait_for('All voices', AllVoices, related_to=self.send_command('all_voices'))
def all_audio_devices(self):
return self.wait_for('All audio devices', AllAudioDevices, related_to=self.send_command('all_audio_devices'))
# develop {{{
def develop_loop(*commands):
p = start_worker()
q = Queue()
def echo_output(p):
for line in p.stdout:
sys.stdout.buffer.write(b'\x1b[33m' + line + b'\x1b[39m]]'[:-2])
sys.stdout.buffer.flush()
q.put(parse_message(line))
def send(*a):
cmd = ' '.join(map(str, a)) + '\n'
p.stdin.write(cmd.encode())
p.stdin.flush()
Thread(name='Echo', target=echo_output, args=(p,), daemon=True).start()
exit_code = 0
with closing(p.stdin), closing(p.stdout):
try:
send('1 echo Synthesizer started')
send('1 volume 0.1')
for command in commands:
if isinstance(command, str):
send(command)
else:
while True:
m = q.get()
if m.related_to != command:
continue
if isinstance(m, MediaStateChanged) and m.state in (MediaState.ended, MediaState.failed):
break
if isinstance(m, Saved):
break
if isinstance(m, Error):
exit_code = 1
break
send(f'333 echo Synthesizer exiting with exit code: {exit_code}')
send(f'334 exit {exit_code}')
ec = p.wait(1)
print(f'Worker exited with code: {os.waitstatus_to_exitcode(p.wait(1))}', file=sys.stderr, flush=True)
raise SystemExit(ec)
finally:
if p.poll() is None:
p.kill()
raise SystemExit(1)
def develop_speech(text='Lucca Brazzi sleeps with the fishes.', mark_words=True):
print('\x1b[32mSpeaking', text, '\x1b[39m]]'[:-2], flush=True)
st = 'ssml' if '<speak' in text else 'text'
if mark_words:
st = 'cued'
words = text.split()
text = []
for i, w in enumerate(words):
text.append(i+1)
text.append(w)
if w is not words[-1]:
text.append(' ')
with SharedMemory(size=max_buffer_size(text)) as shm:
sz = encode_to_file_object(text, shm)
develop_loop(f'2 speak {st} shm {sz} {shm.name}', 2)
def develop_save(text='Lucca Brazzi sleeps with the fishes.', filename="speech.wav"):
print('\x1b[32mSaving', text, '\x1b[39m]]'[:-2], flush=True)
st = 'ssml' if '<speak' in text else 'text'
with SharedMemory(size=max_buffer_size(text)) as shm:
sz = encode_to_file_object(text, shm)
develop_loop(f'2 save {st} {sz} {shm.name} {filename}', 2)
def develop_interactive():
import subprocess
from calibre.debug import run_calibre_debug
print('\x1b[32mInteractive winspeech', '\x1b[39m]]'[:-2], flush=True)
p = run_calibre_debug('-c', 'from calibre_extensions.winspeech import run_main_loop; raise SystemExit(run_main_loop())',
stdin=subprocess.PIPE)
try:
while True:
line = input()
if p.poll() is not None:
raise SystemExit(p.returncode)
p.stdin.write((line + '\n').encode())
p.stdin.flush()
except KeyboardInterrupt:
print('Exiting on interrupt', flush=True)
finally:
if p.poll() is None:
p.kill()
# }}}