295 lines
11 KiB
Plaintext

# vim:fileencoding=utf-8
# License: GPL v3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
from __python__ import bound_methods, hash_literals
from elementmaker import E
from dom import unique_id
from gettext import gettext as _
from book_list.globals import get_session_data
from modals import create_custom_dialog, error_dialog
from widgets import create_button
class Tracker:
def __init__(self):
self.clear()
def clear(self):
self.positions = v'[]'
self.last_pos = 0
self.queue = v'[]'
def parse_marked_text(self, marked_text):
self.clear()
text = v'[]'
text_len = chunk_len = index_in_positions = offset_in_text = 0
limit = 2048
def commit():
self.queue.push({
'text': ''.join(text), 'index_in_positions': index_in_positions,
'offset_in_text': offset_in_text, 'reached_offset': 0})
for x in marked_text:
if jstype(x) is 'number':
self.positions.push({'mark': x, 'offset_in_text': text_len})
else:
text_len += x.length
chunk_len += x.length
text.push(x)
if chunk_len > limit:
commit()
chunk_len = 0
text = v'[]'
index_in_positions = max(0, self.positions.length - 1)
offset_in_text = text_len
if text.length:
commit()
self.marked_text = marked_text
return self.current_text()
def pop_first(self):
self.queue.splice(0, 1)
def current_text(self):
if self.queue.length:
return self.queue[0].text
return ''
def resume(self):
self.last_pos = 0
if self.queue.length:
self.last_pos = self.queue[0].index_in_positions
if self.queue[0].reached_offset:
o = self.queue[0].reached_offset
# make sure positions remain the same for word tracking
self.queue[0].text = (' ' * o) + self.queue[0].text[o:]
return self.current_text()
def boundary_reached(self, start):
if self.queue.length:
self.queue[0].reached_offset = start
def mark_word(self, start, length):
if not self.queue.length:
return
start += self.queue[0].offset_in_text
end = start + length
matches = v'[]'
while self.last_pos < self.positions.length:
pos = self.positions[self.last_pos]
if start <= pos.offset_in_text < end:
matches.push(pos)
elif pos.offset_in_text >= end:
break
self.last_pos += 1
if matches.length:
return matches[0].mark, matches[-1].mark
return None
class Client:
min_rate = 0.1
max_rate = 2
def __init__(self):
self.stop_requested_at = None
self.status = {'synthesizing': False, 'paused': False}
self.tracker = Tracker(v'[]')
self.last_reached_mark = None
self.onevent = def():
pass
data = get_session_data().get('tts_backend')
self.current_voice_uri = data.voice or ''
self.current_rate = data.rate or None
def create_utterance(self, text):
ut = new window.SpeechSynthesisUtterance(text)
ut.onstart = self.utterance_started
ut.onpause = self.utterance_paused
ut.onend = self.utterance_ended
ut.onerror = self.utterance_failed
ut.onresume = self.utterance_resumed
ut.addEventListener('boundary', self.utterance_boundary_reached)
if self.current_voice_uri:
for voice in window.speechSynthesis.getVoices():
if voice.voiceURI is self.current_voice_uri:
ut.voice = voice
break
if self.current_rate:
ut.rate = self.current_rate
return ut
def utterance_started(self, event):
self.status = {'synthesizing': True, 'paused': False}
self.onevent('begin')
def utterance_paused(self, event):
self.status = {'synthesizing': True, 'paused': True}
self.onevent('pause')
def speak(self, text):
self.current_utterance = None
if text and text.length:
self.current_utterance = self.create_utterance(text)
window.speechSynthesis.speak(self.current_utterance)
def utterance_ended(self, event):
self.status = {'synthesizing': False, 'paused': False}
if self.stop_requested_at? and window.performance.now() - self.stop_requested_at < 1000:
self.stop_requested_at = None
return
self.tracker.pop_first()
text = self.tracker.current_text()
if text and text.length:
self.speak(text)
else:
self.onevent('end')
def utterance_failed(self, event):
self.status = {'synthesizing': False, 'paused': False}
self.tracker.clear()
if event.error is not 'interrupted' and event.error is not 'canceled':
if event.error is 'synthesis-unavailable':
msg = _('Text-to-Speech not available in this browser. You may need to install some Text-to-Speech software.')
else:
msg = _('An error has occurred with speech synthesis: ') + event.error
error_dialog(_('Speaking failed'), msg)
self.onevent('cancel')
def utterance_boundary_reached(self, event):
self.tracker.boundary_reached(event.charIndex)
if event.name is 'word':
x = self.tracker.mark_word(event.charIndex, event.charLength or 2)
if x:
first, last = x[0], x[1]
self.onevent('mark', {'first': first, 'last': last})
def utterance_resumed(self, event):
self.status = {'synthesizing': True, 'paused': False}
self.onevent('resume')
def pause(self):
window.speechSynthesis.pause()
def resume(self):
window.speechSynthesis.resume()
def pause_for_configure(self):
if self.current_utterance:
ut = self.current_utterance
self.current_utterance = None
ut.onstart = ut.onpause = ut.onend = ut.onerror = ut.onresume = None
window.speechSynthesis.cancel()
def resume_after_configure(self):
text = self.tracker.resume()
if text and text.length:
self.speak(text)
def stop(self):
self.tracker.clear()
self.stop_requested_at = window.performance.now()
window.speechSynthesis.cancel()
self.status = {'synthesizing': False, 'paused': False}
def speak_simple_text(self, text):
self.stop()
text = self.tracker.parse_marked_text(v'[text]')
if text and text.length:
self.speak(text)
def speak_marked_text(self, text_segments, onevent):
self.stop()
self.onevent = onevent
text = self.tracker.parse_marked_text(text_segments)
if text and text.length:
self.speak(text)
def faster(self):
self.change_rate(steps=1)
def slower(self):
self.change_rate(steps=-1)
def save_settings(self):
sd = get_session_data()
sd.set('tts_backend', {'voice': self.current_voice_uri, 'rate': self.current_rate})
def change_rate(self, steps=1):
rate = current_rate = (self.current_rate or 1) * 10
rate += steps
rate /= 10
rate = max(self.min_rate, min(rate, self.max_rate))
if rate is not current_rate:
is_speaking = bool(window.speechSynthesis.speaking)
if is_speaking:
self.pause_for_configure()
self.current_rate = rate
self.save_settings()
if is_speaking:
self.resume_after_configure()
def configure(self):
voice_id = unique_id()
rate_id = unique_id()
default_voice = None
def restore_defaults():
document.getElementById(voice_id).selectedIndex = -1
document.getElementById(rate_id).value = 10
create_custom_dialog(_('Configure Text-to-Speech'), def (parent_div, close_modal):
nonlocal default_voice
select = E.select(size='5', id=voice_id)
voices = window.speechSynthesis.getVoices()
voices.sort(def (a, b):
a = a.name.toLowerCase()
b = b.name.toLowerCase()
return -1 if a < b else (0 if a is b else 1)
)
for voice in voices:
dflt = ''
if voice.default:
default_voice = voice.voiceURI
dflt = '-- {}'.format(_('default'))
option = E.option(f'{voice.name} ({voice.lang}){dflt}', value=voice.voiceURI)
if (self.current_voice_uri and voice.voiceURI is self.current_voice_uri) or (not self.current_voice_uri and voice.default):
option.setAttribute('selected', 'selected')
select.appendChild(option)
parent_div.appendChild(E.div(_('Speed of speech:')))
parent_div.appendChild(E.input(type='range', id=rate_id, min=(self.min_rate * 10) + '', max=(self.max_rate * 10) + '', value=((self.current_rate or 1) * 10) + ''))
parent_div.appendChild(E.div(_('Pick a voice below:')))
parent_div.appendChild(select)
if select.options.selectedIndex? and select.options[select.options.selectedIndex]:
select.options[select.options.selectedIndex].scrollIntoView()
parent_div.appendChild(E.div(
style='margin: 1rem; display: flex; justify-content: space-between; align-items: flex-start',
create_button(_('Restore defaults'), action=restore_defaults),
create_button(_('Close'), action=close_modal)
))
, on_close=def():
voice = document.getElementById(voice_id).value
rate = int(document.getElementById(rate_id).value) / 10
if rate is 1:
rate = None
if voice is default_voice:
voice = ''
changed = voice is not self.current_voice_uri or rate is not self.current_rate
if changed:
self.current_voice_uri = voice
self.current_rate = rate
is_speaking = bool(window.speechSynthesis.speaking)
if is_speaking:
self.pause_for_configure()
self.save_settings()
if is_speaking:
self.resume_after_configure()
self.onevent('configured')
)