diff --git a/transcoder/src/audiostream.go b/transcoder/src/audiostream.go index f32872cf..2dd51afa 100644 --- a/transcoder/src/audiostream.go +++ b/transcoder/src/audiostream.go @@ -39,6 +39,9 @@ func (as *AudioStream) getOutPath(encoder_id int) string { } func (as *AudioStream) getFlags() Flags { + if as.quality == AOriginal { + return AudioF | CopyF + } return AudioF } diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index cca727e4..c6c1f820 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -22,6 +22,7 @@ type Flags int32 const ( AudioF Flags = 1 << 0 VideoF Flags = 1 << 1 + CopyF Flags = 1 << 2 Transmux Flags = 1 << 3 ) @@ -159,6 +160,8 @@ func (ts *Stream) run(ctx context.Context, start int32) error { slog.InfoContext(ctx, "starting transcode", "encoderId", encoder_id, "path", ts.file.Info.Path, "start", start, "end", end, "length", length) + copy_audio := ts.handle.getFlags()&(AudioF|CopyF) == (AudioF | CopyF) + // Include both the start and end delimiter because -ss and -to are not accurate // Having an extra segment allows us to cut precisely the segments we want with the // -f segment that does cut the begining and the end at the keyframe like asked @@ -169,7 +172,11 @@ func (ts *Stream) run(ctx context.Context, start int32) error { // - Audio: we need context before the starting point, without that ffmpeg doesnt know what to do and leave ~100ms of silence // - Video: if a segment is really short (between 20 and 100ms), the padding given in the else block bellow is not enough and // the previous segment is played another time. the -segment_times is way more precise so it does not do the same with this one - start_segment = start - 1 + if !copy_audio { + // For copied audio we rely on output-side seeking to avoid drift between + // independent invocations. Start exactly at the requested segment. + start_segment = start - 1 + } if ts.handle.getFlags()&AudioF != 0 { start_ref = ts.keyframes.Get(start_segment) } else { @@ -210,7 +217,9 @@ func (ts *Stream) run(ctx context.Context, start int32) error { args = append(args, Settings.HwAccel.DecodeFlags...) } - if start_ref != 0 { + // in `copy_audio` mode, we use `-ss` and `-t` as an output flag. + // without this, seeking is NOT precise and we can have an overlap/gap. + if start_ref != 0 && !copy_audio { if ts.handle.getFlags()&VideoF != 0 { // This is the default behavior in transmux mode and needed to force pre/post segment to work // This must be disabled when processing only audio because it creates gaps in audio @@ -221,7 +230,7 @@ func (ts *Stream) run(ctx context.Context, start int32) error { ) } // do not include -to if we want the file to go to the end - if end+1 < length { + if end+1 < length && !copy_audio { // sometimes, the duration is shorter than expected (only during transcode it seems) // always include more and use the -f segment to split the file where we want end_ref := ts.keyframes.Get(end + 1) @@ -240,9 +249,6 @@ func (ts *Stream) run(ctx context.Context, start int32) error { // since this is better than errorring or not supporting transmux at all, i'll keep it here for now. "-fflags", "+genpts", "-i", ts.file.Info.Path, - // this makes behaviors consistent between soft and hardware decodes. - // this also means that after a -ss 50, the output video will start at 50s - "-start_at_zero", // for hls streams, -copyts is mandatory "-copyts", // this makes output file start at 0s instead of a random delay + the -ss value @@ -252,6 +258,31 @@ func (ts *Stream) run(ctx context.Context, start int32) error { // to keep in mind when debugging "-muxdelay", "0", ) + if copy_audio { + if start_ref != 0 { + args = append(args, + "-ss", fmt.Sprintf("%.6f", start_ref), + ) + } + if end+1 < length { + duration := ts.keyframes.Get(end+1) - ts.keyframes.Get(start_segment) + args = append(args, + "-t", fmt.Sprintf("%.6f", duration), + ) + } + args = append(args, "-copytb", "1") + if start_ref != 0 { + // output-side seek on copied audio can rebase timestamps at 0 for each invocation. + // reapply an absolute offset so all lazy windows stay in a single timeline. + args = append(args, + "-output_ts_offset", fmt.Sprintf("%.6f", start_ref), + ) + } + } else { + // this makes behaviors consistent between soft and hardware decodes. + // this also means that after a -ss 50, the output video will start at 50s + args = append(args, "-start_at_zero") + } args = append(args, ts.handle.getTranscodeArgs(toSegmentStr(segments))...) args = append(args, "-f", "segment", diff --git a/transcoder/tests/hls_validation/conftest.py b/transcoder/tests/hls_validation/conftest.py index 8b30f891..48ea0a6f 100644 --- a/transcoder/tests/hls_validation/conftest.py +++ b/transcoder/tests/hls_validation/conftest.py @@ -32,7 +32,9 @@ class TestConfig: def test_config() -> TestConfig: media_path = os.getenv("TRANSCODER_MEDIA_PATH", "").strip() if not media_path: - raise pytest.UsageError("TRANSCODER_MEDIA_PATH is required (absolute path as seen by transcoder)") + raise pytest.UsageError( + "TRANSCODER_MEDIA_PATH is required (absolute path as seen by transcoder)" + ) api_key = os.getenv("TRANSCODER_API_KEY", "").strip() headers: dict[str, str] = {} @@ -40,12 +42,18 @@ def test_config() -> TestConfig: headers["X-Api-Key"] = api_key return TestConfig( - base_url=os.getenv("TRANSCODER_BASE_URL", "http://localhost:7666/video").strip(), + base_url=os.getenv( + "TRANSCODER_BASE_URL", "http://localhost:7666/video" + ).strip(), media_path=media_path, client_prefix=os.getenv("TRANSCODER_CLIENT_ID_PREFIX", "hls-test").strip(), max_segments=max(3, int(os.getenv("TRANSCODER_MAX_SEGMENTS", "14"))), - max_variant_playlists=max(1, int(os.getenv("TRANSCODER_MAX_VARIANT_PLAYLISTS", "3"))), - max_audio_playlists=max(0, int(os.getenv("TRANSCODER_MAX_AUDIO_PLAYLISTS", "2"))), + max_variant_playlists=max( + 1, int(os.getenv("TRANSCODER_MAX_VARIANT_PLAYLISTS", "3")) + ), + max_audio_playlists=max( + 0, int(os.getenv("TRANSCODER_MAX_AUDIO_PLAYLISTS", "2")) + ), timeout_seconds=float(os.getenv("TRANSCODER_TIMEOUT_SECONDS", "40")), headers=headers, ) @@ -69,13 +77,17 @@ def client_id(test_config: TestConfig) -> str: @pytest.fixture def master_context(test_config: TestConfig, client_id: str) -> dict: - master_url = build_master_url(test_config.base_url, test_config.media_path, client_id) + master_url = build_master_url( + test_config.base_url, test_config.media_path, client_id + ) master_text = fetch_text( master_url, timeout_seconds=test_config.timeout_seconds, headers=test_config.headers, ) - variants, audios = parse_master_playlist(master_text, master_url=master_url, client_id=client_id) + variants, audios = parse_master_playlist( + master_text, master_url=master_url, client_id=client_id + ) if not variants: raise AssertionError(f"No variants discovered in master playlist: {master_url}") @@ -100,7 +112,9 @@ def media_playlists(master_context: dict, test_config: TestConfig) -> dict: timeout_seconds=test_config.timeout_seconds, headers=test_config.headers, ) - variant_playlists.append(parse_media_playlist(text, variant.url, master_context["client_id"])) + variant_playlists.append( + parse_media_playlist(text, variant.url, master_context["client_id"]) + ) audio_playlists = [] for audio in master_context["audios"][: test_config.max_audio_playlists]: @@ -109,7 +123,9 @@ def media_playlists(master_context: dict, test_config: TestConfig) -> dict: timeout_seconds=test_config.timeout_seconds, headers=test_config.headers, ) - audio_playlists.append(parse_media_playlist(text, audio.url, master_context["client_id"])) + audio_playlists.append( + parse_media_playlist(text, audio.url, master_context["client_id"]) + ) return { "variants": variant_playlists, diff --git a/transcoder/tests/hls_validation/hls_utils.py b/transcoder/tests/hls_validation/hls_utils.py index ffffac98..bb98397f 100644 --- a/transcoder/tests/hls_validation/hls_utils.py +++ b/transcoder/tests/hls_validation/hls_utils.py @@ -31,6 +31,7 @@ class AudioRendition: class MediaPlaylist: url: str segment_urls: list[str] + segment_durations: list[float] map_url: str | None @@ -51,7 +52,16 @@ def with_query_param(url: str, key: str, value: str) -> str: query = dict(parse_qsl(parsed.query, keep_blank_values=True)) query[key] = value new_query = urlencode(query) - return urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, new_query, parsed.fragment)) + return urlunparse( + ( + parsed.scheme, + parsed.netloc, + parsed.path, + parsed.params, + new_query, + parsed.fragment, + ) + ) def build_master_url(base_url: str, media_path: str, client_id: str) -> str: @@ -61,7 +71,9 @@ def build_master_url(base_url: str, media_path: str, client_id: str) -> str: return with_query_param(master, "clientId", client_id) -def fetch_text(url: str, timeout_seconds: float, headers: dict[str, str] | None = None) -> str: +def fetch_text( + url: str, timeout_seconds: float, headers: dict[str, str] | None = None +) -> str: req_headers = {"Accept": "application/vnd.apple.mpegurl,text/plain,*/*"} if headers: req_headers.update(headers) @@ -70,7 +82,9 @@ def fetch_text(url: str, timeout_seconds: float, headers: dict[str, str] | None return response.read().decode("utf-8") -def fetch_binary(url: str, timeout_seconds: float, headers: dict[str, str] | None = None) -> bytes: +def fetch_binary( + url: str, timeout_seconds: float, headers: dict[str, str] | None = None +) -> bytes: req = Request(url, headers=headers or {}) with urlopen(req, timeout=timeout_seconds) as response: return response.read() @@ -103,7 +117,9 @@ def _parse_attrs(line: str) -> dict[str, str]: return out -def parse_master_playlist(text: str, master_url: str, client_id: str) -> tuple[list[PlaylistVariant], list[AudioRendition]]: +def parse_master_playlist( + text: str, master_url: str, client_id: str +) -> tuple[list[PlaylistVariant], list[AudioRendition]]: variants: list[PlaylistVariant] = [] audios: list[AudioRendition] = [] @@ -123,7 +139,9 @@ def parse_master_playlist(text: str, master_url: str, client_id: str) -> tuple[l elif line.startswith("#EXT-X-MEDIA:"): attrs = _parse_attrs(line) if attrs.get("TYPE") == "AUDIO" and "URI" in attrs: - url = with_query_param(urljoin(master_url, attrs["URI"]), "clientId", client_id) + url = with_query_param( + urljoin(master_url, attrs["URI"]), "clientId", client_id + ) audios.append(AudioRendition(url=url, attrs=attrs)) i += 1 @@ -133,20 +151,43 @@ def parse_master_playlist(text: str, master_url: str, client_id: str) -> tuple[l def parse_media_playlist(text: str, playlist_url: str, client_id: str) -> MediaPlaylist: map_url: str | None = None segment_urls: list[str] = [] + segment_durations: list[float] = [] + pending_duration: float | None = None lines = [line.strip() for line in text.splitlines() if line.strip()] for line in lines: + if line.startswith("#EXTINF:"): + payload = line.split(":", 1)[1] + duration_raw = payload.split(",", 1)[0].strip() + try: + pending_duration = float(duration_raw) + except ValueError: + pending_duration = None + continue if line.startswith("#EXT-X-MAP:"): attrs = _parse_attrs(line) map_uri = attrs.get("URI") if map_uri: - map_url = with_query_param(urljoin(playlist_url, map_uri), "clientId", client_id) + map_url = with_query_param( + urljoin(playlist_url, map_uri), "clientId", client_id + ) continue if line.startswith("#"): continue - segment_urls.append(with_query_param(urljoin(playlist_url, line), "clientId", client_id)) + segment_urls.append( + with_query_param(urljoin(playlist_url, line), "clientId", client_id) + ) + segment_durations.append( + pending_duration if pending_duration is not None else 0.0 + ) + pending_duration = None - return MediaPlaylist(url=playlist_url, segment_urls=segment_urls, map_url=map_url) + return MediaPlaylist( + url=playlist_url, + segment_urls=segment_urls, + segment_durations=segment_durations, + map_url=map_url, + ) class ByteCache: @@ -154,7 +195,9 @@ class ByteCache: self._cache: dict[str, bytes] = {} self._lock = threading.Lock() - def get(self, url: str, timeout_seconds: float, headers: dict[str, str] | None = None) -> bytes: + def get( + self, url: str, timeout_seconds: float, headers: dict[str, str] | None = None + ) -> bytes: with self._lock: if url in self._cache: return self._cache[url] @@ -183,7 +226,9 @@ def _probe_packets(path: Path, stream_selector: str) -> list[dict]: return data.get("packets", []) -def _timeline_from_packets(segment_url: str, stream_kind: str, packets: list[dict]) -> SegmentTimeline: +def _timeline_from_packets( + segment_url: str, stream_kind: str, packets: list[dict] +) -> SegmentTimeline: pts_values = [float(p["pts_time"]) for p in packets if "pts_time" in p] dts_values = [float(p["dts_time"]) for p in packets if "dts_time" in p] starts = pts_values or dts_values @@ -193,12 +238,18 @@ def _timeline_from_packets(segment_url: str, stream_kind: str, packets: list[dic start = min(starts) end_pts = max(starts) - durations = [float(p["duration_time"]) for p in packets if "duration_time" in p and float(p["duration_time"]) > 0] + durations = [ + float(p["duration_time"]) + for p in packets + if "duration_time" in p and float(p["duration_time"]) > 0 + ] default_step = median(durations) if durations else 0.04 tail = durations[-1] if durations else default_step end = end_pts + max(tail, default_step) - return SegmentTimeline(segment_url=segment_url, stream_kind=stream_kind, start=start, end=end) + return SegmentTimeline( + segment_url=segment_url, stream_kind=stream_kind, start=start, end=end + ) def probe_segment_timeline( @@ -220,7 +271,9 @@ def probe_segment_timeline( if attempt == retries: break time.sleep(retry_delay_seconds * (attempt + 1)) - raise AssertionError(f"Failed to fetch segment after retries: {url} ({last_err})") + raise AssertionError( + f"Failed to fetch segment after retries: {url} ({last_err})" + ) segment_bytes = get_with_retry(segment_url) prefix = get_with_retry(map_url) if map_url else b"" diff --git a/transcoder/tests/hls_validation/test_audio_boundaries.py b/transcoder/tests/hls_validation/test_audio_boundaries.py index f61141bc..2128c296 100644 --- a/transcoder/tests/hls_validation/test_audio_boundaries.py +++ b/transcoder/tests/hls_validation/test_audio_boundaries.py @@ -3,7 +3,9 @@ from __future__ import annotations from .hls_utils import fetch_text, parse_media_playlist, probe_segment_timeline -def test_audio_segment_100_boundary_is_continuous(master_context: dict, test_config, byte_cache) -> None: +def test_audio_segment_100_boundary_is_continuous( + master_context: dict, test_config, byte_cache +) -> None: if not master_context["audios"]: return @@ -48,4 +50,57 @@ def test_audio_segment_100_boundary_is_continuous(master_context: dict, test_con f" delta={delta:.6f}s" ) - assert not failures, "audio discontinuity around lazy head boundary: " + ", ".join(failures) + assert not failures, "audio discontinuity around lazy head boundary: " + ", ".join( + failures + ) + + +def test_audio_pts_never_resets_across_lazy_windows( + master_context: dict, test_config, byte_cache +) -> None: + if not master_context["audios"]: + return + + failures: list[str] = [] + max_allowed_backward_jump_seconds = 0.25 + + for audio in master_context["audios"]: + text = fetch_text( + audio.url, + timeout_seconds=test_config.timeout_seconds, + headers=test_config.headers, + ) + playlist = parse_media_playlist(text, audio.url, master_context["client_id"]) + if not playlist.segment_urls: + continue + + total_extinf = 0.0 + # sample enough segments to cross several lazy windows + sample_count = min(len(playlist.segment_urls), 340) + for idx in range(sample_count): + timeline = probe_segment_timeline( + segment_url=playlist.segment_urls[idx], + map_url=playlist.map_url, + timeout_seconds=test_config.timeout_seconds, + byte_cache=byte_cache, + headers=test_config.headers, + ) + expected = total_extinf + drift = timeline.start - expected + + if drift < -max_allowed_backward_jump_seconds: + failures.append( + f"{audio.attrs.get('GROUP-ID', audio.url)} seg={idx}" + f" expected_start~{expected:.3f}s actual={timeline.start:.3f}s" + f" backward_jump={-drift:.3f}s" + ) + break + + if idx < len(playlist.segment_durations): + dur = playlist.segment_durations[idx] + if dur > 0: + total_extinf += dur + + assert not failures, ( + "audio timeline reset detected across lazy windows: " + ", ".join(failures) + ) diff --git a/transcoder/tests/hls_validation/test_continuity.py b/transcoder/tests/hls_validation/test_continuity.py index 8727f481..0b0ab378 100644 --- a/transcoder/tests/hls_validation/test_continuity.py +++ b/transcoder/tests/hls_validation/test_continuity.py @@ -3,7 +3,9 @@ from __future__ import annotations from .hls_utils import assert_no_large_gaps_or_overlaps, probe_segment_timeline -def test_variant_playlists_have_continuous_timeline(media_playlists: dict, test_config, byte_cache) -> None: +def test_variant_playlists_have_continuous_timeline( + media_playlists: dict, test_config, byte_cache +) -> None: playlists = media_playlists["variants"] + media_playlists["audios"] for playlist in playlists: diff --git a/transcoder/tests/hls_validation/test_playback_patterns.py b/transcoder/tests/hls_validation/test_playback_patterns.py index 73dbe0df..27ea2473 100644 --- a/transcoder/tests/hls_validation/test_playback_patterns.py +++ b/transcoder/tests/hls_validation/test_playback_patterns.py @@ -13,13 +13,17 @@ from .hls_utils import ( def _load_variant_playlists_for_client(test_config, client_id: str): - master_url = build_master_url(test_config.base_url, test_config.media_path, client_id) + master_url = build_master_url( + test_config.base_url, test_config.media_path, client_id + ) master_text = fetch_text( master_url, timeout_seconds=test_config.timeout_seconds, headers=test_config.headers, ) - variants, _ = parse_master_playlist(master_text, master_url=master_url, client_id=client_id) + variants, _ = parse_master_playlist( + master_text, master_url=master_url, client_id=client_id + ) unique_variants = list({v.url: v for v in variants}.values()) playlists = [] for variant in unique_variants: @@ -32,7 +36,9 @@ def _load_variant_playlists_for_client(test_config, client_id: str): return playlists -def test_abr_switching_has_no_timeline_holes(master_context: dict, test_config, byte_cache) -> None: +def test_abr_switching_has_no_timeline_holes( + master_context: dict, test_config, byte_cache +) -> None: playlists = [] for variant in master_context["variants"]: text = fetch_text( @@ -40,7 +46,9 @@ def test_abr_switching_has_no_timeline_holes(master_context: dict, test_config, timeout_seconds=test_config.timeout_seconds, headers=test_config.headers, ) - playlists.append(parse_media_playlist(text, variant.url, master_context["client_id"])) + playlists.append( + parse_media_playlist(text, variant.url, master_context["client_id"]) + ) if len(playlists) < 2: return @@ -71,7 +79,9 @@ def test_abr_switching_has_no_timeline_holes(master_context: dict, test_config, ) -def test_seek_storm_contiguous_windows_stay_continuous(media_playlists: dict, test_config, byte_cache) -> None: +def test_seek_storm_contiguous_windows_stay_continuous( + media_playlists: dict, test_config, byte_cache +) -> None: playlist = media_playlists["variants"][0] count = len(playlist.segment_urls) assert count >= 8, f"Need at least 8 segments for seek test, got {count}" @@ -106,7 +116,9 @@ def test_seek_storm_contiguous_windows_stay_continuous(media_playlists: dict, te ) -def test_concurrent_clients_can_seek_without_breaking_timeline(test_config, byte_cache) -> None: +def test_concurrent_clients_can_seek_without_breaking_timeline( + test_config, byte_cache +) -> None: worker_count = 3 def worker(idx: int) -> None: @@ -118,7 +130,9 @@ def test_concurrent_clients_can_seek_without_breaking_timeline(test_config, byte playlist = playlists[0] count = len(playlist.segment_urls) if count < 6: - raise AssertionError(f"Need at least 6 segments for concurrent worker, got {count}") + raise AssertionError( + f"Need at least 6 segments for concurrent worker, got {count}" + ) pattern = [0, count // 3, (2 * count) // 3, 1, 2, 3] timelines = []