More work on TTS

This commit is contained in:
Kovid Goyal 2024-08-31 09:36:22 +05:30
parent 15020ede75
commit 1813a15653
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
4 changed files with 207 additions and 71 deletions

View File

@ -278,8 +278,7 @@ class EngineSpecificConfig(QWidget):
class ConfigDialog(Dialog):
def __init__(self, current_tts_backend, parent=None):
self.current_tts_backend = current_tts_backend
def __init__(self, parent=None):
super().__init__(_('Configure Read aloud'), 'configure-read-aloud2', parent=parent)
def setup_ui(self):
@ -307,8 +306,10 @@ class ConfigDialog(Dialog):
def develop():
from calibre.gui2 import Application
app = Application([])
d = ConfigDialog(create_tts_backend(app))
d = ConfigDialog()
d.exec()
del d
del app
if __name__ == '__main__':

View File

@ -2,10 +2,11 @@
# License: GPLv3 Copyright: 2024, Kovid Goyal <kovid at kovidgoyal.net>
from qt.core import QAction, QPlainTextEdit, QToolBar
from qt.core import QAction, QKeySequence, QPlainTextEdit, Qt, QTextCursor, QTextToSpeech, QToolBar
from calibre.gui2 import Application
from calibre.gui2.main_window import MainWindow
from calibre.gui2.tts2.manager import TTSManager
TEXT = '''\
Demonstration of DOCX support in calibre
@ -21,36 +22,81 @@ Set the output format in the top right corner of the conversion dialog to EPUB o
'''
def to_marked_text(text=TEXT):
pos = 0
for word in text.split():
yield pos
yield word
yield ' '
pos += 1 + len(word)
class MainWindow(MainWindow):
def __init__(self, text):
super().__init__()
self.display = d = QPlainTextEdit(self)
self.toolbar = tb = QToolBar(self)
self.tts = TTSManager(self)
self.tts.state_changed.connect(self.state_changed, type=Qt.ConnectionType.QueuedConnection)
self.tts.saying.connect(self.saying)
self.addToolBar(tb)
self.setCentralWidget(d)
d.setPlainText(text)
d.setReadOnly(True)
self.marked_text = to_marked_text(text)
self.resize(self.sizeHint())
c = d.textCursor()
c.setPosition(0)
marked_text = []
while True:
marked_text.append(c.position())
if not c.movePosition(QTextCursor.MoveOperation.NextWord, QTextCursor.MoveMode.KeepAnchor):
break
marked_text.append(c.selectedText())
c.setPosition(c.position())
c.setPosition(0)
self.marked_text = marked_text
self.play_action = pa = QAction('Play')
pa.setShortcut(QKeySequence(Qt.Key.Key_Space))
pa.setCheckable(True)
pa.toggled.connect(self.toggled)
self.toolbar.addAction(pa)
self.stop_action = sa = QAction('Stop')
sa.setShortcut(QKeySequence(Qt.Key.Key_Escape))
sa.triggered.connect(self.tts.stop)
self.toolbar.addAction(sa)
self.faster_action = fa = QAction('Faster')
self.toolbar.addAction(fa)
self.slower_action = sa = QAction('Slower')
self.toolbar.addAction(sa)
self.configure_action = ca = QAction('Configure')
self.toolbar.addAction(ca)
ca.triggered.connect(self.tts.configure)
self.state_changed(self.tts.state)
self.resize(self.sizeHint())
def state_changed(self, state):
self.statusBar().showMessage(str(state))
if state in (QTextToSpeech.State.Ready, QTextToSpeech.State.Paused, QTextToSpeech.State.Error):
self.play_action.setChecked(False)
if state is QTextToSpeech.State.Ready:
c = self.display.textCursor()
c.setPosition(0)
self.display.setTextCursor(c)
else:
self.play_action.setChecked(True)
self.stop_action.setEnabled(state in (QTextToSpeech.State.Speaking, QTextToSpeech.State.Synthesizing))
def toggled(self):
if self.play_action.isChecked():
self.play_action.setText('Pause')
if self.tts.state is QTextToSpeech.State.Paused:
self.tts.resume()
elif self.tts.state in (QTextToSpeech.State.Ready, QTextToSpeech.State.Error):
self.tts.speak_marked_text(self.marked_text)
else:
if self.tts.state in (QTextToSpeech.State.Speaking, QTextToSpeech.State.Synthesizing):
self.tts.pause()
self.play_action.setText('Play')
def saying(self, first, last):
c = self.display.textCursor()
c.setPosition(first)
if last != first:
c.setPosition(last, QTextCursor.MoveMode.KeepAnchor)
c.movePosition(QTextCursor.MoveOperation.WordRight, QTextCursor.MoveMode.KeepAnchor)
self.display.setTextCursor(c)
def main():

View File

@ -2,21 +2,160 @@
# License: GPLv3 Copyright: 2024, Kovid Goyal <kovid at kovidgoyal.net>
from qt.core import QObject
from collections import deque
from typing import NamedTuple
from qt.core import QDialog, QObject, QTextToSpeech, pyqtSignal
from calibre.gui2 import error_dialog
class Utterance(NamedTuple):
text: str
index_in_positions: int
offset_in_text: int
reached_offset: int = 0
class Position(NamedTuple):
mark: int
offset_in_text: int
class Tracker:
def __init__(self):
self.clear()
def clear(self):
self.positions: list[Position] = []
self.last_pos = 0
self.queue: deque[Utterance] = deque()
def parse_marked_text(self, marked_text, limit = 32 * 1024):
self.clear()
text = []
text_len = chunk_len = index_in_positions = offset_in_text = 0
def commit():
self.queue.append(Utterance(''.join(text), index_in_positions, offset_in_text))
for x in marked_text:
if isinstance(x, int):
self.positions.append(Position(x, text_len))
else:
text_len += len(x)
chunk_len += len(x)
text.append(x)
if chunk_len > limit:
commit()
chunk_len = 0
text = []
index_in_positions = max(0, len(self.positions) - 1)
offset_in_text = text_len
if len(text):
commit()
self.marked_text = marked_text
return self.current_text()
def pop_first(self):
if self.queue:
self.queue.popleft()
def current_text(self):
if self.queue:
return self.queue[0].text
return ''
def resume(self):
self.last_pos = 0
if self.queue:
self.last_pos = self.queue[0].index_in_positions
if self.queue[0].reached_offset:
o = self.queue[0].reached_offset
# make sure positions remain the same for word tracking
self.queue[0].text = (' ' * o) + self.queue[0].text[o:]
return self.current_text()
def boundary_reached(self, start):
if self.queue:
self.queue[0] = self.queue[0]._replace(reached_offset=start)
def mark_word_or_sentence(self, start, length):
if not self.queue:
return
start += self.queue[0].offset_in_text
end = start + length
matches = []
while self.last_pos < len(self.positions):
pos = self.positions[self.last_pos]
if start <= pos.offset_in_text < end:
matches.append(pos)
elif pos.offset_in_text >= end:
break
self.last_pos += 1
if len(matches):
return matches[0].mark, matches[-1].mark
return None
class TTSManager(QObject):
state_changed = pyqtSignal(QTextToSpeech.State)
saying = pyqtSignal(int, int)
def __init__(self, parent=None):
super().__init__(parent)
self._tts = None
self.state = QTextToSpeech.State.Ready
self.tracker = Tracker()
@property
def tts(self):
if self._tts is None:
from calibre.gui2.tts.types import create_tts_backend
from calibre.gui2.tts2.types import create_tts_backend
self._tts = create_tts_backend(parent=self)
self._tts.state_changed.connect(self._state_changed)
self._tts.saying.connect(self._saying)
return self._tts
def stop(self) -> None:
self.tracker.clear()
self.tts.stop()
def pause(self) -> None:
self.tts.pause()
def resume(self) -> None:
self.tts.resume()
def speak_simple_text(self, text: str) -> None:
self.speak_marked_text([0, text])
def speak_marked_text(self, marked_text):
pass
self.stop()
self.tts.say(self.tracker.parse_marked_text(marked_text))
def configure(self) -> None:
from calibre.gui2.tts2.config import ConfigDialog
self.tts.pause()
d = ConfigDialog(parent=self)
if d.exec() == QDialog.DialogCode.Accepted:
self.stop()
self._tts = None
if self._tts is None:
self.tts.say(self.tracker.resume())
else:
self.tts.resume()
def _state_changed(self, state: QTextToSpeech.State) -> None:
self.state = state
if state is QTextToSpeech.State.Error:
error_dialog(self, _('Read aloud failed'), self.tts.error_message(), show=True)
self.state_changed.emit(state)
def _saying(self, offset: int, length: int) -> None:
self.tracker.boundary_reached(offset)
x = self.tracker.mark_word_or_sentence(offset, length)
if x is not None:
self.saying.emit(x[0], x[1])

View File

@ -1,53 +1,12 @@
#!/usr/bin/env python
# License: GPLv3 Copyright: 2024, Kovid Goyal <kovid at kovidgoyal.net>
from typing import NamedTuple
from qt.core import QMediaDevices, QObject, QTextToSpeech, pyqtSignal
from calibre.gui2.tts2.types import EngineSpecificSettings, Voice, qvoice_to_voice
class Pos(NamedTuple):
mark: int
offset_in_text: int
class Tracker:
def reset(self) -> None:
self.positions: list[Pos] = []
self.last_pos: int = 0
def parse_marked_text(self, marked_text: list[str | int]) -> str:
self.reset()
text: list[str] = []
text_len: int = 0
for x in marked_text:
if isinstance(x, int):
self.positions.append(Pos(x, text_len))
else:
text_len += len(x)
text.append(x)
return ''.join(text)
def mark_word(self, start: int, length: int) -> tuple[int, int] | None:
end = start + length
matches: list[Pos] = []
while True:
if self.last_pos >= len(self.positions):
break
pos = self.positions[self.last_pos]
if start <= pos.offset_in_text < end:
matches.append(pos)
elif pos.offset_in_text >= end:
break
self.last_pos += 1
if matches:
return matches[0].mark, matches[-1].mark
return None
class QtTTSBackend(QObject):
saying = pyqtSignal(int, int)
@ -55,7 +14,6 @@ class QtTTSBackend(QObject):
def __init__(self, engine_name: str = '', parent: QObject|None = None):
super().__init__(parent)
self.tracker = Tracker()
self._voices = None
self._create_engine(engine_name)
@ -86,9 +44,6 @@ class QtTTSBackend(QObject):
def shutdown(self) -> None:
self.tts.stop(QTextToSpeech.BoundaryHint.Immediate)
def speak_simple_text(self, text: str) -> None:
self.tts.say(text)
def pause(self) -> None:
self.tts.pause()
@ -98,11 +53,8 @@ class QtTTSBackend(QObject):
def stop(self) -> None:
self.tts.stop()
def resume_after_configure(self) -> None:
raise NotImplementedError('TODO: Implement me')
def speak_marked_text(self, marked_text: list[str | int]) -> None:
self.tts.say(self.tracker.parse_marked_text(marked_text))
def say(self, text: str) -> None:
self.tts.say(text)
def error_message(self) -> str:
return self.tts.errorString()
@ -142,6 +94,4 @@ class QtTTSBackend(QObject):
self._current_settings = settings
def _saying_word(self, word: str, utterance_id: int, start: int, length: int) -> None:
x = self.tracker.mark_word(start, length)
if x is not None:
self.saying.emit(x[0], x[1])
self.saying.emit(start, length)