From 08b10373a044c38435be96a28ea742b8fef7ed17 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Wed, 16 Oct 2024 08:49:59 +0530 Subject: [PATCH] Code to generate TTS based SMIL overlays in EPUB files --- src/calibre/ebooks/oeb/polish/container.py | 44 ++++- src/calibre/ebooks/oeb/polish/errors.py | 4 + src/calibre/ebooks/oeb/polish/tts.py | 178 ++++++++++++++++++++- 3 files changed, 221 insertions(+), 5 deletions(-) diff --git a/src/calibre/ebooks/oeb/polish/container.py b/src/calibre/ebooks/oeb/polish/container.py index fd8204cb8c..67e95b0754 100644 --- a/src/calibre/ebooks/oeb/polish/container.py +++ b/src/calibre/ebooks/oeb/polish/container.py @@ -128,6 +128,18 @@ def href_to_name(href, root, base=None): return None +def seconds_to_timestamp(duration: float) -> str: + seconds = int(duration) + float_part = int((duration - seconds) * 1000) + hours = seconds // 3600 + minutes = (seconds % 3600) // 60 + seconds = seconds % 60 + ans = f'{hours:02d}:{minutes:02d}:{seconds:02d}' + if float_part: + ans += f'.{float_part}' + return ans + + class ContainerBase: # {{{ ''' A base class that implements just the parsing methods. Useful to create @@ -667,11 +679,14 @@ class Container(ContainerBase): # {{{ ' The version set on the OPF\'s element as a tuple of integers ' return parse_opf_version(self.opf_version) + @property + def manifest_items(self): + return self.opf_xpath('//opf:manifest/opf:item[@href and @id]') + @property def manifest_id_map(self): ' Mapping of manifest id to canonical names ' - return {item.get('id'):self.href_to_name(item.get('href'), self.opf_name) - for item in self.opf_xpath('//opf:manifest/opf:item[@href and @id]')} + return {item.get('id'):self.href_to_name(item.get('href'), self.opf_name) for item in self.manifest_items} @property def manifest_type_map(self): @@ -869,6 +884,12 @@ class Container(ContainerBase): # {{{ self.remove_from_xml(meta) self.dirty(self.opf_name) + for meta in self.opf_xpath('//opf:meta[@refines]'): + q = meta.get('refines') + if q.startswith('#') and q[1:] in removed: + self.remove_from_xml(meta) + self.dirty(self.opf_name) + if remove_from_guide: for item in self.opf_xpath('//opf:guide/opf:reference[@href]'): if self.href_to_name(item.get('href'), self.opf_name) == name: @@ -882,6 +903,21 @@ class Container(ContainerBase): # {{{ self.parsed_cache.pop(name, None) self.dirtied.discard(name) + def set_media_overlay_durations(self, duration_map): + self.dirty(self.opf_name) + for meta in self.opf_xpath('//opf:meta[@property="media:duration"]'): + self.remove_from_xml(meta) + metadata = self.opf_xpath('//opf:metadata')[0] + total_duration = 0 + for item_id, duration in duration_map.items(): + meta = metadata.makeelement(OPF('meta'), property="media:duration", refines="#" + item_id) + meta.text = seconds_to_timestamp(duration) + self.insert_into_xml(metadata, meta) + total_duration += duration + meta = metadata.makeelement(OPF('meta'), property="media:duration") + meta.text = seconds_to_timestamp(total_duration) + self.insert_into_xml(metadata, meta) + def dirty(self, name): ''' Mark the parsed object corresponding to name as dirty. See also: :meth:`parsed`. ''' self.dirtied.add(name) @@ -951,11 +987,13 @@ class Container(ContainerBase): # {{{ href = self.name_to_href(name, self.opf_name) base, ext = href.rpartition('.')[0::2] all_ids = {x.get('id') for x in self.opf_xpath('//*[@id]')} + if id_prefix.endswith('-'): + all_ids.add(id_prefix) c = 0 item_id = id_prefix while item_id in all_ids: c += 1 - item_id = id_prefix + '%d'%c + item_id = f'{id_prefix}{c}' manifest = self.opf_xpath('//opf:manifest')[0] item = manifest.makeelement(OPF('item'), diff --git a/src/calibre/ebooks/oeb/polish/errors.py b/src/calibre/ebooks/oeb/polish/errors.py index 4c1de5830a..9878d1e1e9 100644 --- a/src/calibre/ebooks/oeb/polish/errors.py +++ b/src/calibre/ebooks/oeb/polish/errors.py @@ -20,3 +20,7 @@ class DRMError(_DRMError): class MalformedMarkup(ValueError): pass + + +class UnsupportedContainerType(Exception): + pass diff --git a/src/calibre/ebooks/oeb/polish/tts.py b/src/calibre/ebooks/oeb/polish/tts.py index 6a14d306de..3fa922c202 100644 --- a/src/calibre/ebooks/oeb/polish/tts.py +++ b/src/calibre/ebooks/oeb/polish/tts.py @@ -1,7 +1,10 @@ #!/usr/bin/env python # License: GPLv3 Copyright: 2024, Kovid Goyal +import io import json +import os +import sys from collections import defaultdict from contextlib import suppress from typing import NamedTuple @@ -10,7 +13,10 @@ from lxml.etree import ElementBase as Element from lxml.etree import tostring as _tostring from calibre.ebooks.html_transform_rules import unwrap_tag -from calibre.ebooks.oeb.base import barename +from calibre.ebooks.oeb.base import EPUB, EPUB_NS, SMIL_NS, barename +from calibre.ebooks.oeb.polish.container import OEB_DOCS, seconds_to_timestamp +from calibre.ebooks.oeb.polish.errors import UnsupportedContainerType +from calibre.ebooks.oeb.polish.upgrade import upgrade_book from calibre.spell.break_iterator import sentence_positions from calibre.utils.localization import canonicalize_lang, get_lang @@ -19,7 +25,7 @@ class Sentence(NamedTuple): elem_id: str text: str lang: str - voice : str + voice: str def tostring(x) -> str: @@ -346,3 +352,171 @@ def mark_sentences_in_html(root, lang: str = '', voice: str = '') -> list[Senten if (p := clone.getparent()) is not None: p.remove(clone) return ans + + +class PerFileData: + + def __init__(self, name: str): + self.name = name + self.root = None + self.sentences: list[Sentence] = [] + self.key_map: dict[tuple[str, str], list[Sentence]] = defaultdict(list) + self.audio_file_name = self.smil_file_name = '' + + +class ReportProgress: + + def __init__(self): + self.current_stage = '' + + def __call__(self, stage: str, item: str, count: int, total: int) -> None: + if stage != self.current_stage: + self.current_stage = stage + print() + print(self.current_stage) + return + frac = count / total + print(f'\r{frac:4.0%} {item}', end='') + + +def make_par(container, seq, html_href, audio_href, elem_id, pos, duration) -> None: + seq.set(EPUB('textref'), html_href) + par = seq.makeelement('par') + par.tail = '\n ' + par.set('id', f'par-{len(seq) + 1}') + seq.append(par) + text = par.makeelement('text') + text.set('src', f'{html_href}#{elem_id}') + par.append(text) + audio = par.makeelement('audio') + audio.set('src', audio_href) + audio.set('clipBegin', seconds_to_timestamp(pos)) + audio.set('clipEnd', seconds_to_timestamp(pos + duration)) + + +def remove_embedded_tts(container): + manifest_items = container.manifest_items + id_map = {item.get('id'): item for item in manifest_items} + container.set_media_overlay_durations({}) + media_files = set() + for item in manifest_items: + smil_id = item.get('media-overlay') + href = item.get('href') + if href and smil_id: + name = container.href_to_name(href, container.opf_name) + root = container.parsed(name) + unmark_sentences_in_html(root) + container.dirty(name) + smil_item = id_map.get(smil_id) + if smil_item: + smil_href = smil_item.get('href') + if smil_href: + smil_name = container.href_to_name(smil_item.get('href')) + smil_root = container.parsed(smil_name) + for ahref in smil_root.xpath('//@src'): + aname = container.href_to_name(ahref, smil_name) + media_files.add(aname) + container.remove_from_xml(smil_item) + for aname in media_files: + container.remove_item(aname) + + +def embed_tts(container, report_progress=None, parent_widget=None): + report_progress = report_progress or ReportProgress() + if container.book_type != 'epub': + raise UnsupportedContainerType(_('Only the EPUB format has support for embedding speech overlay audio')) + if container.opf_version_parsed[0] < 3: + report_progress(_('Updating book internals'), '', 0, 0) + upgrade_book(container, print) + remove_embedded_tts(container) + + from calibre.gui2.tts.piper import HIGH_QUALITY_SAMPLE_RATE, PiperEmbedded + from calibre_extensions.ffmpeg import transcode_single_audio_stream, wav_header_for_pcm_data + + piper = PiperEmbedded() + language = container.mi.language + name_map = {} + for name, is_linear in container.spine_names: + if container.mime_map.get(name) in OEB_DOCS: + name_map[name] = PerFileData(name) + stage = _('Processing HTML') + report_progress(stage, '', 0, len(name_map)) + all_voices = set() + total_num_sentences = 0 + for i, (name, pfd) in enumerate(name_map.items()): + pfd.root = container.parsed(name) + pfd.sentences = mark_sentences_in_html(pfd.root, lang=language) + total_num_sentences += len(pfd.sentences) + for s in pfd.sentences: + key = s.lang, s.voice + pfd.key_map[key].append(s) + all_voices.add(key) + container.dirty(name) + report_progress(stage, name, i+1, len(name_map)) + piper.ensure_voices_downloaded(iter(all_voices), parent=parent_widget) + stage = _('Converting text to speech') + report_progress(stage, '', 0, total_num_sentences) + snum = 0 + size_of_audio_data = 0 + mmap = {container.href_to_name(item.get('href'), container.opf_name):item for item in container.manifest_items} + duration_map = {} + for name, pfd in name_map.items(): + audio_map: dict[Sentence, tuple[bytes, float]] = {} + for (lang, voice), sentences in pfd.key_map.items(): + texts = tuple(s.text for s in sentences) + for i, (audio_data, duration) in enumerate(piper.text_to_raw_audio_data(texts, lang, voice, sample_rate=HIGH_QUALITY_SAMPLE_RATE)): + s = sentences[i] + audio_map[s] = audio_data, duration + size_of_audio_data += len(audio_data) + snum += 1 + report_progress(stage, _('Sentence number: {}').format(snum), snum, total_num_sentences) + pos = 0 + wav = io.BytesIO() + wav.write(wav_header_for_pcm_data(size_of_audio_data, HIGH_QUALITY_SAMPLE_RATE)) + afitem = container.generate_item(name + '.mp4', id_prefix='tts-') + pfd.audio_file_name = container.href_to_name(afitem.get('href'), container.opf_name) + smilitem = container.generate_item(name + '.smil', id_prefix='smil-') + pfd.smil_file_name = container.href_to_name(smilitem.get('href'), container.opf_name) + with container.open(pfd.smil_file_name, 'w') as sf: + sf.write(f''' + + + + + +''') + smil_root = container.parsed(pfd.smil_file_name) + seq = smil_root[0][0] + audio_href = container.name_to_href(pfd.audio_file_name, pfd.smil_file_name) + html_href = container.name_to_href(pfd.name, pfd.smil_file_name) + file_duration = 0 + for i, s in enumerate(pfd.sentences): + audio_data, duration = audio_map[s] + file_duration += duration + wav.write(audio_data) + make_par(container, seq, html_href, audio_href, s.elem_id, pos, duration) + wav.seek(0) + with container.open(pfd.audio_file_name, 'wb') as mp4: + transcode_single_audio_stream(wav, mp4) + container.pretty_print.add(pfd.smil_file_name) + container.dirty(pfd.smil_file_name) + container.serialize_item(pfd.smil_file_name) + html_item = mmap[name] + html_item.set('media-overlay', smilitem.get('id')) + duration_map[smilitem.get('id')] = file_duration + container.set_media_overlay_durations(duration_map) + + +def develop(): + from calibre.ebooks.oeb.polish.container import get_container + path = sys.argv[-1] + container = get_container(path, tweak_mode=True) + embed_tts(container) + b, e = os.path.splitext(path) + outpath = b + '-tts' + e + container.commit(outpath) + print('Output saved to:', outpath) + + +if __name__ == '__main__': + develop()