Code to generate TTS based SMIL overlays in EPUB files

This commit is contained in:
Kovid Goyal 2024-10-16 08:49:59 +05:30
parent a2be9e6981
commit 08b10373a0
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 221 additions and 5 deletions

View File

@ -128,6 +128,18 @@ def href_to_name(href, root, base=None):
return None return None
def seconds_to_timestamp(duration: float) -> str:
seconds = int(duration)
float_part = int((duration - seconds) * 1000)
hours = seconds // 3600
minutes = (seconds % 3600) // 60
seconds = seconds % 60
ans = f'{hours:02d}:{minutes:02d}:{seconds:02d}'
if float_part:
ans += f'.{float_part}'
return ans
class ContainerBase: # {{{ class ContainerBase: # {{{
''' '''
A base class that implements just the parsing methods. Useful to create A base class that implements just the parsing methods. Useful to create
@ -667,11 +679,14 @@ class Container(ContainerBase): # {{{
' The version set on the OPF\'s <package> element as a tuple of integers ' ' The version set on the OPF\'s <package> element as a tuple of integers '
return parse_opf_version(self.opf_version) return parse_opf_version(self.opf_version)
@property
def manifest_items(self):
return self.opf_xpath('//opf:manifest/opf:item[@href and @id]')
@property @property
def manifest_id_map(self): def manifest_id_map(self):
' Mapping of manifest id to canonical names ' ' Mapping of manifest id to canonical names '
return {item.get('id'):self.href_to_name(item.get('href'), self.opf_name) return {item.get('id'):self.href_to_name(item.get('href'), self.opf_name) for item in self.manifest_items}
for item in self.opf_xpath('//opf:manifest/opf:item[@href and @id]')}
@property @property
def manifest_type_map(self): def manifest_type_map(self):
@ -869,6 +884,12 @@ class Container(ContainerBase): # {{{
self.remove_from_xml(meta) self.remove_from_xml(meta)
self.dirty(self.opf_name) self.dirty(self.opf_name)
for meta in self.opf_xpath('//opf:meta[@refines]'):
q = meta.get('refines')
if q.startswith('#') and q[1:] in removed:
self.remove_from_xml(meta)
self.dirty(self.opf_name)
if remove_from_guide: if remove_from_guide:
for item in self.opf_xpath('//opf:guide/opf:reference[@href]'): for item in self.opf_xpath('//opf:guide/opf:reference[@href]'):
if self.href_to_name(item.get('href'), self.opf_name) == name: if self.href_to_name(item.get('href'), self.opf_name) == name:
@ -882,6 +903,21 @@ class Container(ContainerBase): # {{{
self.parsed_cache.pop(name, None) self.parsed_cache.pop(name, None)
self.dirtied.discard(name) self.dirtied.discard(name)
def set_media_overlay_durations(self, duration_map):
self.dirty(self.opf_name)
for meta in self.opf_xpath('//opf:meta[@property="media:duration"]'):
self.remove_from_xml(meta)
metadata = self.opf_xpath('//opf:metadata')[0]
total_duration = 0
for item_id, duration in duration_map.items():
meta = metadata.makeelement(OPF('meta'), property="media:duration", refines="#" + item_id)
meta.text = seconds_to_timestamp(duration)
self.insert_into_xml(metadata, meta)
total_duration += duration
meta = metadata.makeelement(OPF('meta'), property="media:duration")
meta.text = seconds_to_timestamp(total_duration)
self.insert_into_xml(metadata, meta)
def dirty(self, name): def dirty(self, name):
''' Mark the parsed object corresponding to name as dirty. See also: :meth:`parsed`. ''' ''' Mark the parsed object corresponding to name as dirty. See also: :meth:`parsed`. '''
self.dirtied.add(name) self.dirtied.add(name)
@ -951,11 +987,13 @@ class Container(ContainerBase): # {{{
href = self.name_to_href(name, self.opf_name) href = self.name_to_href(name, self.opf_name)
base, ext = href.rpartition('.')[0::2] base, ext = href.rpartition('.')[0::2]
all_ids = {x.get('id') for x in self.opf_xpath('//*[@id]')} all_ids = {x.get('id') for x in self.opf_xpath('//*[@id]')}
if id_prefix.endswith('-'):
all_ids.add(id_prefix)
c = 0 c = 0
item_id = id_prefix item_id = id_prefix
while item_id in all_ids: while item_id in all_ids:
c += 1 c += 1
item_id = id_prefix + '%d'%c item_id = f'{id_prefix}{c}'
manifest = self.opf_xpath('//opf:manifest')[0] manifest = self.opf_xpath('//opf:manifest')[0]
item = manifest.makeelement(OPF('item'), item = manifest.makeelement(OPF('item'),

View File

@ -20,3 +20,7 @@ class DRMError(_DRMError):
class MalformedMarkup(ValueError): class MalformedMarkup(ValueError):
pass pass
class UnsupportedContainerType(Exception):
pass

View File

@ -1,7 +1,10 @@
#!/usr/bin/env python #!/usr/bin/env python
# License: GPLv3 Copyright: 2024, Kovid Goyal <kovid at kovidgoyal.net> # License: GPLv3 Copyright: 2024, Kovid Goyal <kovid at kovidgoyal.net>
import io
import json import json
import os
import sys
from collections import defaultdict from collections import defaultdict
from contextlib import suppress from contextlib import suppress
from typing import NamedTuple from typing import NamedTuple
@ -10,7 +13,10 @@ from lxml.etree import ElementBase as Element
from lxml.etree import tostring as _tostring from lxml.etree import tostring as _tostring
from calibre.ebooks.html_transform_rules import unwrap_tag from calibre.ebooks.html_transform_rules import unwrap_tag
from calibre.ebooks.oeb.base import barename from calibre.ebooks.oeb.base import EPUB, EPUB_NS, SMIL_NS, barename
from calibre.ebooks.oeb.polish.container import OEB_DOCS, seconds_to_timestamp
from calibre.ebooks.oeb.polish.errors import UnsupportedContainerType
from calibre.ebooks.oeb.polish.upgrade import upgrade_book
from calibre.spell.break_iterator import sentence_positions from calibre.spell.break_iterator import sentence_positions
from calibre.utils.localization import canonicalize_lang, get_lang from calibre.utils.localization import canonicalize_lang, get_lang
@ -346,3 +352,171 @@ def mark_sentences_in_html(root, lang: str = '', voice: str = '') -> list[Senten
if (p := clone.getparent()) is not None: if (p := clone.getparent()) is not None:
p.remove(clone) p.remove(clone)
return ans return ans
class PerFileData:
def __init__(self, name: str):
self.name = name
self.root = None
self.sentences: list[Sentence] = []
self.key_map: dict[tuple[str, str], list[Sentence]] = defaultdict(list)
self.audio_file_name = self.smil_file_name = ''
class ReportProgress:
def __init__(self):
self.current_stage = ''
def __call__(self, stage: str, item: str, count: int, total: int) -> None:
if stage != self.current_stage:
self.current_stage = stage
print()
print(self.current_stage)
return
frac = count / total
print(f'\r{frac:4.0%} {item}', end='')
def make_par(container, seq, html_href, audio_href, elem_id, pos, duration) -> None:
seq.set(EPUB('textref'), html_href)
par = seq.makeelement('par')
par.tail = '\n '
par.set('id', f'par-{len(seq) + 1}')
seq.append(par)
text = par.makeelement('text')
text.set('src', f'{html_href}#{elem_id}')
par.append(text)
audio = par.makeelement('audio')
audio.set('src', audio_href)
audio.set('clipBegin', seconds_to_timestamp(pos))
audio.set('clipEnd', seconds_to_timestamp(pos + duration))
def remove_embedded_tts(container):
manifest_items = container.manifest_items
id_map = {item.get('id'): item for item in manifest_items}
container.set_media_overlay_durations({})
media_files = set()
for item in manifest_items:
smil_id = item.get('media-overlay')
href = item.get('href')
if href and smil_id:
name = container.href_to_name(href, container.opf_name)
root = container.parsed(name)
unmark_sentences_in_html(root)
container.dirty(name)
smil_item = id_map.get(smil_id)
if smil_item:
smil_href = smil_item.get('href')
if smil_href:
smil_name = container.href_to_name(smil_item.get('href'))
smil_root = container.parsed(smil_name)
for ahref in smil_root.xpath('//@src'):
aname = container.href_to_name(ahref, smil_name)
media_files.add(aname)
container.remove_from_xml(smil_item)
for aname in media_files:
container.remove_item(aname)
def embed_tts(container, report_progress=None, parent_widget=None):
report_progress = report_progress or ReportProgress()
if container.book_type != 'epub':
raise UnsupportedContainerType(_('Only the EPUB format has support for embedding speech overlay audio'))
if container.opf_version_parsed[0] < 3:
report_progress(_('Updating book internals'), '', 0, 0)
upgrade_book(container, print)
remove_embedded_tts(container)
from calibre.gui2.tts.piper import HIGH_QUALITY_SAMPLE_RATE, PiperEmbedded
from calibre_extensions.ffmpeg import transcode_single_audio_stream, wav_header_for_pcm_data
piper = PiperEmbedded()
language = container.mi.language
name_map = {}
for name, is_linear in container.spine_names:
if container.mime_map.get(name) in OEB_DOCS:
name_map[name] = PerFileData(name)
stage = _('Processing HTML')
report_progress(stage, '', 0, len(name_map))
all_voices = set()
total_num_sentences = 0
for i, (name, pfd) in enumerate(name_map.items()):
pfd.root = container.parsed(name)
pfd.sentences = mark_sentences_in_html(pfd.root, lang=language)
total_num_sentences += len(pfd.sentences)
for s in pfd.sentences:
key = s.lang, s.voice
pfd.key_map[key].append(s)
all_voices.add(key)
container.dirty(name)
report_progress(stage, name, i+1, len(name_map))
piper.ensure_voices_downloaded(iter(all_voices), parent=parent_widget)
stage = _('Converting text to speech')
report_progress(stage, '', 0, total_num_sentences)
snum = 0
size_of_audio_data = 0
mmap = {container.href_to_name(item.get('href'), container.opf_name):item for item in container.manifest_items}
duration_map = {}
for name, pfd in name_map.items():
audio_map: dict[Sentence, tuple[bytes, float]] = {}
for (lang, voice), sentences in pfd.key_map.items():
texts = tuple(s.text for s in sentences)
for i, (audio_data, duration) in enumerate(piper.text_to_raw_audio_data(texts, lang, voice, sample_rate=HIGH_QUALITY_SAMPLE_RATE)):
s = sentences[i]
audio_map[s] = audio_data, duration
size_of_audio_data += len(audio_data)
snum += 1
report_progress(stage, _('Sentence number: {}').format(snum), snum, total_num_sentences)
pos = 0
wav = io.BytesIO()
wav.write(wav_header_for_pcm_data(size_of_audio_data, HIGH_QUALITY_SAMPLE_RATE))
afitem = container.generate_item(name + '.mp4', id_prefix='tts-')
pfd.audio_file_name = container.href_to_name(afitem.get('href'), container.opf_name)
smilitem = container.generate_item(name + '.smil', id_prefix='smil-')
pfd.smil_file_name = container.href_to_name(smilitem.get('href'), container.opf_name)
with container.open(pfd.smil_file_name, 'w') as sf:
sf.write(f'''
<smil xmlns="{SMIL_NS}" xmlns:epub="{EPUB_NS}" version="3.0">
<body>
<seq id="generated-by-calibre">
</seq>
</body>
</smil>''')
smil_root = container.parsed(pfd.smil_file_name)
seq = smil_root[0][0]
audio_href = container.name_to_href(pfd.audio_file_name, pfd.smil_file_name)
html_href = container.name_to_href(pfd.name, pfd.smil_file_name)
file_duration = 0
for i, s in enumerate(pfd.sentences):
audio_data, duration = audio_map[s]
file_duration += duration
wav.write(audio_data)
make_par(container, seq, html_href, audio_href, s.elem_id, pos, duration)
wav.seek(0)
with container.open(pfd.audio_file_name, 'wb') as mp4:
transcode_single_audio_stream(wav, mp4)
container.pretty_print.add(pfd.smil_file_name)
container.dirty(pfd.smil_file_name)
container.serialize_item(pfd.smil_file_name)
html_item = mmap[name]
html_item.set('media-overlay', smilitem.get('id'))
duration_map[smilitem.get('id')] = file_duration
container.set_media_overlay_durations(duration_map)
def develop():
from calibre.ebooks.oeb.polish.container import get_container
path = sys.argv[-1]
container = get_container(path, tweak_mode=True)
embed_tts(container)
b, e = os.path.splitext(path)
outpath = b + '-tts' + e
container.commit(outpath)
print('Output saved to:', outpath)
if __name__ == '__main__':
develop()