Add intro/credits detection (#1463)

This commit is contained in:
Zoe Roux 2026-04-18 15:32:30 +02:00 committed by GitHub
commit ed91da2bc3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
46 changed files with 1939 additions and 294 deletions

View File

@ -24,6 +24,8 @@ Kyoo does not have a plugin system and aim to have every features built-in (see
- **Video Preview Thumbnails:** Simply hover the video's progress bar and see a preview of the video.
- **Intro/Credit detection:** Automatically detect intro/credits with audio fingerprinting (or chapter title matching).
- **Enhanced Subtitle Support:** Subtitles are important, Kyoo supports PGS/VODSUB and SSA/ASS and uses the video's embedded fonts when available.
- **Anime Name Parsing**: Kyoo will match weird anime names (like `[Some-Stuffs] Jojo's Bizarre Adventure Stone Ocean 24 (1920x1080 Blu-Ray Opus) [2750810F].mkv`) without issue.

View File

@ -1,12 +1,16 @@
import { eq } from "drizzle-orm";
import { getLogger } from "@logtape/logtape";
import { and, eq } from "drizzle-orm";
import { Elysia, t } from "elysia";
import slugify from "slugify";
import { auth } from "~/auth";
import { db } from "~/db";
import { entryVideoJoin, videos } from "~/db/schema";
import { entries, entryVideoJoin, videos } from "~/db/schema";
import { KError } from "~/models/error";
import { isUuid } from "~/models/utils";
import { Video } from "~/models/video";
import { toQueryStr } from "~/utils";
const logger = getLogger();
export const videosMetadata = new Elysia({
prefix: "/videos",
@ -188,4 +192,85 @@ export const videosMetadata = new Elysia({
},
},
},
)
.get(
":id/prepare",
async ({ params: { id }, headers: { authorization }, status }) => {
const ret = await prepareVideo(id, authorization!);
if ("kyoo" in ret) return status(ret.status, ret as any);
return { status: ret.status, body: await ret.json() };
},
{
detail: { description: "Prepare a video for playback" },
params: t.Object({
id: t.String({
description: "The id or slug of the video to watch.",
example: "made-in-abyss-s1e13",
}),
}),
response: {
200: t.Any({
description:
"Prepare said video for playback (compute everything possible and cache it)",
}),
404: {
...KError,
description: "No video found with the given id or slug.",
},
},
},
);
export const prepareVideo = async (slug: string, auth: string) => {
const [vid] = await db
.select({ path: videos.path, show: entries.showPk, order: entries.order })
.from(videos)
.innerJoin(entryVideoJoin, eq(videos.pk, entryVideoJoin.videoPk))
.leftJoin(entries, eq(entries.pk, entryVideoJoin.entryPk))
.where(eq(entryVideoJoin.slug, slug))
.limit(1);
if (!vid) {
return {
kyoo: true,
status: 404,
message: `No video found with slug ${slug}`,
} as const;
}
const related = vid.show
? await db
.select({ order: entries.order, path: videos.path })
.from(entries)
.innerJoin(entryVideoJoin, eq(entries.pk, entryVideoJoin.entryPk))
.innerJoin(videos, eq(videos.pk, entryVideoJoin.videoPk))
.where(and(eq(entries.showPk, vid.show), eq(entries.kind, "episode")))
.orderBy(entries.order)
: [];
const idx = related.findIndex((x) => x.order === vid.order);
const prev = related[idx - 1]?.path;
const next = related[idx + 1]?.path;
logger.info("Preparing next video {slug} (near episodes: {near})", {
slug,
near: [prev, next],
});
const path = Buffer.from(vid.path, "utf8").toString("base64url");
const params = {
prev: prev ? Buffer.from(prev, "utf8").toString("base64url") : null,
next: next ? Buffer.from(next, "utf8").toString("base64url") : null,
};
return await fetch(
new URL(
`/video/${path}/prepare${toQueryStr(params)}`,
process.env.TRANSCODER_SERVER ?? "http://transcoder:7666",
),
{
headers: {
authorization: auth,
"content-type": "application/json",
},
},
);
};

View File

@ -19,6 +19,7 @@ import type { LogRecordExporter } from "@opentelemetry/sdk-logs";
import {
BatchLogRecordProcessor,
LoggerProvider,
type LogRecordProcessor,
} from "@opentelemetry/sdk-logs";
import {
MeterProvider,
@ -45,6 +46,20 @@ const resource = resourceFromAttributes({
const logger = getLogger();
// all logs in kyoo are in uppercase by default, also make it uppercase here.
function upperCaseSeverityTextProcessor(): LogRecordProcessor {
return {
onEmit(logRecord) {
const record = logRecord as { severityText?: string };
if (typeof record.severityText === "string") {
record.severityText = record.severityText.toUpperCase();
}
},
forceFlush: async () => {},
shutdown: async () => {},
};
}
export function setupOtel() {
logger.info("Configuring OTEL");
const protocol = (
@ -99,7 +114,10 @@ export function setupOtel() {
if (le) {
lp = new LoggerProvider({
resource,
processors: [new BatchLogRecordProcessor(le)],
processors: [
upperCaseSeverityTextProcessor(),
new BatchLogRecordProcessor(le),
],
});
}

View File

@ -5,6 +5,7 @@ import Elysia, { type TSchema, t } from "elysia";
import { auth } from "./auth";
import { updateProgress } from "./controllers/profiles/history";
import { getOrCreateProfile } from "./controllers/profiles/profile";
import { prepareVideo } from "./controllers/video-metadata";
import { getVideos } from "./controllers/videos";
import { videos } from "./db/schema";
@ -61,23 +62,14 @@ const actionMap = {
languages: ["*"],
userId: ws.data.jwt.sub,
});
if (!vid) return;
logger.info("Preparing next video {videoId}", {
videoId: vid.id,
});
const path = Buffer.from(vid.path, "utf8").toString("base64url");
await fetch(
new URL(
`/video/${path}/prepare`,
process.env.TRANSCODER_SERVER ?? "http://transcoder:7666",
),
{
headers: {
authorization: ws.data.headers.authorization!,
},
},
);
const next = vid?.next?.video;
if (!next) {
logger.info("No next video to prepare for ${slug}", {
slug: vid.path,
});
return;
}
await prepareVideo(next, ws.data.headers.authorization!);
}
},
}),

View File

@ -188,6 +188,27 @@
"label": "Subtitle language",
"description": "The default subtitle language used",
"none": "None"
},
"chapterSkip": {
"label": "Chapter skip",
"behaviors": {
"autoSkip": "Auto skip",
"autoSkipExceptFirstAppearance": "Auto skip except first appearance",
"showSkipButton": "Show skip button",
"disabled": "Do nothing"
},
"types": {
"recap": "Recap",
"intro": "Intro",
"credits": "Credits",
"preview": "Preview"
},
"descriptions": {
"recap": "Control what happens when a recap chapter starts",
"intro": "Control what happens when an intro chapter starts",
"credits": "Control what happens when a credits chapter starts",
"preview": "Control what happens when a preview chapter starts"
}
}
},
"account": {
@ -256,7 +277,14 @@
"unsupportedError": "Video codec not supported, transcoding in progress...",
"not-available": "{{entry}} is not available on kyoo yet, ask your server admins about it",
"fatal": "Fatal playback error",
"entry-list": "Entry list"
"entry-list": "Entry list",
"chapters": {
"skip": "Skip {{type}}",
"intro": "intro",
"credits": "credits",
"recap": "recap",
"preview": "preview"
}
},
"search": {
"empty": "No result found. Try a different query."

View File

@ -3,7 +3,7 @@ pkgs.mkShell {
packages = with pkgs; [
bun
biome
nodePackages.eas-cli
eas-cli
];
}

View File

@ -1,5 +1,14 @@
import { z } from "zod/v4";
const ChapterSkipBehavior = z
.enum([
"autoSkip",
"autoSkipExceptFirstAppearance",
"showSkipButton",
"disabled",
])
.catch("showSkipButton");
export const User = z
.object({
id: z.string(),
@ -28,11 +37,30 @@ export const User = z
.catch("original"),
audioLanguage: z.string().catch("default"),
subtitleLanguage: z.string().nullable().catch(null),
chapterSkip: z
.object({
recap: ChapterSkipBehavior,
intro: ChapterSkipBehavior,
credits: ChapterSkipBehavior,
preview: ChapterSkipBehavior,
})
.catch({
recap: "showSkipButton",
intro: "showSkipButton",
credits: "showSkipButton",
preview: "showSkipButton",
}),
})
.default({
downloadQuality: "original",
audioLanguage: "default",
subtitleLanguage: null,
chapterSkip: {
recap: "showSkipButton",
intro: "showSkipButton",
credits: "showSkipButton",
preview: "showSkipButton",
},
}),
}),
oidc: z

View File

@ -65,6 +65,8 @@ export const Chapter = z.object({
endTime: z.number(),
name: z.string(),
type: z.enum(["content", "recap", "intro", "credits", "preview"]),
firstAppearance: z.boolean().optional(),
matchAccuracy: z.number().optional(),
});
export type Chapter = z.infer<typeof Chapter>;

View File

@ -7,6 +7,7 @@ import { useIsTouch } from "~/primitives";
import { Back } from "./back";
import { BottomControls } from "./bottom-controls";
import { MiddleControls } from "./middle-controls";
import { SkipChapterButton } from "./skip-chapter";
import { TouchControls } from "./touch";
export const Controls = ({
@ -20,6 +21,7 @@ export const Controls = ({
chapters,
playPrev,
playNext,
seekEnd,
onOpenEntriesMenu,
forceShow,
}: {
@ -33,6 +35,7 @@ export const Controls = ({
chapters: Chapter[];
playPrev: (() => boolean) | null;
playNext: (() => boolean) | null;
seekEnd: () => void;
onOpenEntriesMenu?: () => void;
forceShow?: boolean;
}) => {
@ -40,6 +43,7 @@ export const Controls = ({
const [hover, setHover] = useState(false);
const [menuOpened, setMenuOpened] = useState(false);
const [controlsVisible, setControlsVisible] = useState(false);
const hoverControls = {
onPointerEnter: (e) => {
@ -61,6 +65,7 @@ export const Controls = ({
<TouchControls
player={player}
forceShow={hover || menuOpened || forceShow}
onVisibilityChange={setControlsVisible}
className="absolute inset-0"
>
<Back
@ -91,6 +96,12 @@ export const Controls = ({
{...hoverControls}
/>
</TouchControls>
<SkipChapterButton
player={player}
chapters={chapters}
isVisible={controlsVisible}
seekEnd={seekEnd}
/>
</View>
);
};

View File

@ -0,0 +1,87 @@
import { useCallback, useEffect, useRef, useState } from "react";
import { useTranslation } from "react-i18next";
import { useEvent, type VideoPlayer } from "react-native-video";
import type { Chapter } from "~/models";
import { Button } from "~/primitives";
import { useAccount } from "~/providers/account-context";
import { useFetch } from "~/query";
import { Info } from "~/ui/info";
import { cn, useQueryState } from "~/utils";
export const SkipChapterButton = ({
player,
seekEnd,
chapters,
isVisible,
}: {
player: VideoPlayer;
seekEnd: () => void;
chapters: Chapter[];
isVisible: boolean;
}) => {
const { t } = useTranslation();
const account = useAccount();
const [slug] = useQueryState<string>("slug", undefined!);
const { data } = useFetch(Info.infoQuery(slug));
const lastAutoSkippedChapter = useRef<number | null>(null);
const [progress, setProgress] = useState(player.currentTime || 0);
useEvent(player, "onProgress", ({ currentTime }) => {
setProgress(currentTime);
});
const chapter = chapters.find(
(chapter) => chapter.startTime <= progress && progress < chapter.endTime,
);
const behavior =
(chapter &&
chapter.type !== "content" &&
account?.claims.settings.chapterSkip[chapter.type]) ||
"showSkipButton";
const shouldAutoSkip =
behavior === "autoSkip" ||
(behavior === "autoSkipExceptFirstAppearance" && !chapter!.firstAppearance);
// delay credits appearance by a few seconds, we want to make sure it doesn't
// show on top of the end of the serie. it's common for the end credits music
// to start playing on top of the episode also.
const start = chapter
? chapter.startTime + +(chapter.type === "credits") * 4
: Infinity;
const skipChapter = useCallback(() => {
if (!chapter) return;
if (data?.durationSeconds && data.durationSeconds <= chapter.endTime + 3) {
return seekEnd();
}
player.seekTo(chapter.endTime);
}, [player, chapter, data?.durationSeconds, seekEnd]);
useEffect(() => {
if (
chapter &&
shouldAutoSkip &&
progress >= start &&
lastAutoSkippedChapter.current !== chapter.startTime
) {
lastAutoSkippedChapter.current = chapter.startTime;
skipChapter();
}
}, [chapter, progress, shouldAutoSkip, start, skipChapter]);
if (!chapter || chapter.type === "content" || behavior === "disabled")
return null;
if (!isVisible && progress >= start + 8) return null;
return (
<Button
text={t(`player.chapters.skip`, { type: chapter.type })}
onPress={skipChapter}
className={cn(
"absolute right-safe bottom-2/10 m-8",
"z-20 bg-slate-900/70 px-4 py-2",
)}
/>
);
};

View File

@ -16,10 +16,12 @@ export const TouchControls = ({
player,
children,
forceShow = false,
onVisibilityChange,
...props
}: {
player: VideoPlayer;
forceShow?: boolean;
onVisibilityChange?: (isVisible: boolean) => void;
} & ViewProps) => {
const isTouch = useIsTouch();
@ -31,6 +33,11 @@ export const TouchControls = ({
const [_show, setShow] = useState(false);
const hideTimeout = useRef<NodeJS.Timeout | number | null>(null);
const shouldShow = forceShow || _show || !playing;
useEffect(() => {
onVisibilityChange?.(shouldShow);
}, [onVisibilityChange, shouldShow]);
const show = useCallback((val: boolean = true) => {
setShow(val);
if (hideTimeout.current) clearTimeout(hideTimeout.current);

View File

@ -139,6 +139,10 @@ export const Player = () => {
setSlug(data.next.video);
return true;
}, [data?.next, setSlug, setStart, t]);
const onEnd = useCallback(() => {
const hasNext = playNext();
if (!hasNext && data?.show?.href) router.navigate(data.show.href);
}, [data?.show?.href, playNext, router]);
useProgressObserver(
player,
@ -146,10 +150,7 @@ export const Player = () => {
);
useLanguagePreference(player, slug);
useEvent(player, "onEnd", () => {
const hasNext = playNext();
if (!hasNext && data?.show) router.navigate(data.show.href);
});
useEvent(player, "onEnd", onEnd);
// TODO: add the equivalent of this for android
useEffect(() => {
@ -225,6 +226,7 @@ export const Player = () => {
chapters={info?.chapters ?? []}
playPrev={data?.previous ? playPrev : null}
playNext={data?.next ? playNext : null}
seekEnd={onEnd}
onOpenEntriesMenu={
data?.show?.kind === "serie"
? () => setEntriesMenuOpen(true)

View File

@ -1,8 +1,9 @@
import { useMemo, useState } from "react";
import { useTranslation } from "react-i18next";
import { View } from "react-native";
import { useEvent, type VideoPlayer } from "react-native-video";
import type { Chapter } from "~/models";
import { P, Sprite } from "~/primitives";
import { P, Sprite, SubP } from "~/primitives";
import { useToken } from "~/providers/account-context";
import { type QueryIdentifier, useFetch } from "~/query";
import { useQueryState } from "~/utils";
@ -95,6 +96,7 @@ export const ScrubberTooltip = ({
seconds: number;
}) => {
const { info, stats } = useScrubber(videoSlug);
const { t } = useTranslation();
const current =
info.findLast((x) => x.from <= seconds * 1000 && seconds * 1000 < x.to) ??
@ -118,8 +120,11 @@ export const ScrubberTooltip = ({
/>
)}
<P className="text-center">
{toTimerString(seconds)} {chapter && `- ${chapter.name}`}
{toTimerString(seconds)} {chapter?.name && `- ${chapter.name}`}
</P>
{chapter && chapter.type !== "content" && (
<SubP>{t(`player.chapters.${chapter.type}`)}</SubP>
)}
</View>
);
};
@ -135,6 +140,7 @@ export const BottomScrubber = ({
}) => {
const [slug] = useQueryState<string>("slug", undefined!);
const { info, stats } = useScrubber(slug);
const { t } = useTranslation();
const [duration, setDuration] = useState(player.duration);
useEvent(player, "onLoad", (info) => {
@ -177,6 +183,9 @@ export const BottomScrubber = ({
{toTimerString(seek)}
{chapter && `\n${chapter.name}`}
</P>
{chapter && chapter.type !== "content" && (
<SubP>{t(`player.chapters.${chapter.type}`)}</SubP>
)}
</View>
</View>
);

View File

@ -3,7 +3,7 @@ import { useAccount } from "~/providers/account-context";
import { AccountSettings } from "./account";
import { About, GeneralSettings } from "./general";
import { OidcSettings } from "./oidc";
import { PlaybackSettings } from "./playback";
import { ChapterSkipSettings, PlaybackSettings } from "./playback";
import { SessionsSettings } from "./sessions";
export const SettingsPage = () => {
@ -12,6 +12,7 @@ export const SettingsPage = () => {
<ScrollView contentContainerClassName="gap-8 pb-8">
<GeneralSettings />
{account && <PlaybackSettings />}
{account && <ChapterSkipSettings />}
{account && <AccountSettings />}
{account && <SessionsSettings />}
{account && <OidcSettings />}

View File

@ -23,6 +23,8 @@ export const OidcSettings = () => {
invalidate: ["auth", "users", "me"],
});
if (data && Object.keys(data.oidc).length === 0) return null;
return (
<SettingsContainer title={t("settings.oidc.label")}>
{unlinkError && <P className="text-red-500">{unlinkError}</P>}

View File

@ -1,6 +1,10 @@
import SubtitleLanguage from "@material-symbols/svg-400/rounded/closed_caption-fill.svg";
import PlayModeI from "@material-symbols/svg-400/rounded/display_settings-fill.svg";
import AudioLanguage from "@material-symbols/svg-400/rounded/music_note-fill.svg";
import SubtitleLanguage from "@material-symbols/svg-400/rounded/closed_caption.svg";
import PlayModeI from "@material-symbols/svg-400/rounded/display_settings.svg";
import MovieInfo from "@material-symbols/svg-400/rounded/movie_info.svg";
import AudioLanguage from "@material-symbols/svg-400/rounded/music_note.svg";
import PlayArrow from "@material-symbols/svg-400/rounded/play_arrow.svg";
import Replay from "@material-symbols/svg-400/rounded/replay.svg";
import Theaters from "@material-symbols/svg-400/rounded/theaters.svg";
import langmap from "langmap";
import { useTranslation } from "react-i18next";
import { Select } from "~/primitives";
@ -85,3 +89,62 @@ export const PlaybackSettings = () => {
</SettingsContainer>
);
};
const defaultChapterSkipBehaviors = [
"autoSkip",
"showSkipButton",
"disabled",
] as const;
const introCreditsChapterSkipBehaviors = [
"autoSkip",
"autoSkipExceptFirstAppearance",
"showSkipButton",
"disabled",
] as const;
const chapterTypes = [
{ type: "recap", icon: Replay },
{ type: "intro", icon: PlayArrow },
{ type: "credits", icon: Theaters },
{ type: "preview", icon: MovieInfo },
] as const;
export const ChapterSkipSettings = () => {
const { t } = useTranslation();
const [chapterSkip, setChapterSkip] = useSetting("chapterSkip")!;
return (
<SettingsContainer title={t("settings.playback.chapterSkip.label")}>
{chapterTypes.map(({ type, icon }) => {
const values =
type === "intro" || type === "credits"
? introCreditsChapterSkipBehaviors
: defaultChapterSkipBehaviors;
return (
<Preference
key={type}
icon={icon}
label={t(`settings.playback.chapterSkip.types.${type}`)}
description={t(
`settings.playback.chapterSkip.descriptions.${type}`,
)}
>
<Select
label={t(`settings.playback.chapterSkip.types.${type}`)}
value={chapterSkip[type]}
onValueChange={(value) =>
setChapterSkip({ ...chapterSkip, [type]: value })
}
values={[...values]}
getLabel={(key) =>
t(`settings.playback.chapterSkip.behaviors.${key}`)
}
/>
</Preference>
);
})}
</SettingsContainer>
);
};

View File

@ -1,6 +1,6 @@
module github.com/zoriya/kyoo/transcoder
go 1.25.0
go 1.26.0
require (
github.com/MicahParks/keyfunc/v3 v3.8.0
@ -37,7 +37,7 @@ require (
require (
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/MicahParks/jwkset v0.11.0 // indirect
github.com/asticode/go-astikit v0.58.0 // indirect
github.com/asticode/go-astikit v0.59.0 // indirect
github.com/asticode/go-astits v1.15.0 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.9 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
@ -54,6 +54,7 @@ require (
github.com/go-openapi/swag/stringutils v0.25.5 // indirect
github.com/go-openapi/swag/typeutils v0.25.5 // indirect
github.com/go-openapi/swag/yamlutils v0.25.5 // indirect
github.com/go-slog/otelslog v0.3.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect
github.com/jackc/pgerrcode v0.0.0-20250907135507-afb5586c32a6 // indirect
@ -69,10 +70,10 @@ require (
go.opentelemetry.io/proto/otlp v1.10.0 // indirect
go.yaml.in/yaml/v2 v2.4.4 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/mod v0.34.0 // indirect
golang.org/x/tools v0.43.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
golang.org/x/mod v0.35.0 // indirect
golang.org/x/tools v0.44.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260414002931-afd174a4e478 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260414002931-afd174a4e478 // indirect
google.golang.org/grpc v1.80.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
@ -95,11 +96,11 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.30.15 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.19 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.41.10 // indirect
github.com/aws/smithy-go v1.24.2 // indirect
github.com/aws/smithy-go v1.24.3 // indirect
github.com/golang-jwt/jwt/v5 v5.3.1
golang.org/x/image v0.38.0 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/text v0.35.0
golang.org/x/image v0.39.0 // indirect
golang.org/x/net v0.53.0 // indirect
golang.org/x/sys v0.43.0 // indirect
golang.org/x/text v0.36.0
golang.org/x/time v0.15.0 // indirect
)

View File

@ -12,6 +12,8 @@ github.com/asticode/go-astikit v0.20.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xbl
github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/asticode/go-astikit v0.58.0 h1:WXNpaxCPNFReikHiXvzyDv49NpV/GMD6PV80iem6WGo=
github.com/asticode/go-astikit v0.58.0/go.mod h1:fV43j20UZYfXzP9oBn33udkvCvDvCDhzjVqoLFuuYZE=
github.com/asticode/go-astikit v0.59.0 h1:tjbwDym+MTSxqkAhJoHRZmHMXK6Jv4vGx+97FptKH6k=
github.com/asticode/go-astikit v0.59.0/go.mod h1:fV43j20UZYfXzP9oBn33udkvCvDvCDhzjVqoLFuuYZE=
github.com/asticode/go-astisub v0.39.0 h1:j1/rFLRUH0TT2CW9YCtBek9lRdMp96oxaZm6vbgE96M=
github.com/asticode/go-astisub v0.39.0/go.mod h1:WTkuSzFB+Bp7wezuSf2Oxulj5A8zu2zLRVFf6bIFQK8=
github.com/asticode/go-astits v1.8.0/go.mod h1:DkOWmBNQpnr9mv24KfZjq4JawCFX1FCqjLVGvO0DygQ=
@ -55,6 +57,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.41.10 h1:p8ogvvLugcR/zLBXTXrTkj0RYBU
github.com/aws/aws-sdk-go-v2/service/sts v1.41.10/go.mod h1:60dv0eZJfeVXfbT1tFJinbHrDfSJ2GZl4Q//OSSNAVw=
github.com/aws/smithy-go v1.24.2 h1:FzA3bu/nt/vDvmnkg+R8Xl46gmzEDam6mZ1hzmwXFng=
github.com/aws/smithy-go v1.24.2/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
github.com/aws/smithy-go v1.24.3 h1:XgOAaUgx+HhVBoP4v8n6HCQoTRDhoMghKqw4LNHsDNg=
github.com/aws/smithy-go v1.24.3/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
@ -114,6 +118,8 @@ github.com/go-openapi/testify/enable/yaml/v2 v2.4.0 h1:7SgOMTvJkM8yWrQlU8Jm18VeD
github.com/go-openapi/testify/enable/yaml/v2 v2.4.0/go.mod h1:14iV8jyyQlinc9StD7w1xVPW3CO3q1Gj04Jy//Kw4VM=
github.com/go-openapi/testify/v2 v2.4.0 h1:8nsPrHVCWkQ4p8h1EsRVymA2XABB4OT40gcvAu+voFM=
github.com/go-openapi/testify/v2 v2.4.0/go.mod h1:HCPmvFFnheKK2BuwSA0TbbdxJ3I16pjwMkYkP4Ywn54=
github.com/go-slog/otelslog v0.3.0 h1:O/ettamJL9sJKCmtEa/xHF5dqHvWF6xION/NIiZJ6gk=
github.com/go-slog/otelslog v0.3.0/go.mod h1:TxQTymq11rhMaNLE4yMe3kXuUf9ksoGyN9ivieHFWu8=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY=
@ -235,12 +241,18 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.38.0 h1:5l+q+Y9JDC7mBOMjo4/aPhMDcxEptsX+Tt3GgRQRPuE=
golang.org/x/image v0.38.0/go.mod h1:/3f6vaXC+6CEanU4KJxbcUZyEePbyKbaLoDOe4ehFYY=
golang.org/x/image v0.39.0 h1:skVYidAEVKgn8lZ602XO75asgXBgLj9G/FE3RbuPFww=
golang.org/x/image v0.39.0/go.mod h1:sIbmppfU+xFLPIG0FoVUTvyBMmgng1/XAMhQ2ft0hpA=
golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI=
golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY=
golang.org/x/mod v0.35.0 h1:Ww1D637e6Pg+Zb2KrWfHQUnH2dQRLBQyAtpr/haaJeM=
golang.org/x/mod v0.35.0/go.mod h1:+GwiRhIInF8wPm+4AoT6L0FA1QWAad3OMdTRx4tFYlU=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -248,21 +260,31 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg=
golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164=
golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s=
golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0=
golang.org/x/tools v0.44.0 h1:UP4ajHPIcuMjT1GqzDWRlalUEoY+uzoZKnhOjbIPD2c=
golang.org/x/tools v0.44.0/go.mod h1:KA0AfVErSdxRZIsOVipbv3rQhVXTnlU6UhKxHd1seDI=
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 h1:VPWxll4HlMw1Vs/qXtN7BvhZqsS9cdAittCNvVENElA=
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:7QBABkRtR8z+TEnmXTqIqwJLlzrZKVfAUm7tY3yGv0M=
google.golang.org/genproto/googleapis/api v0.0.0-20260414002931-afd174a4e478 h1:yQugLulqltosq0B/f8l4w9VryjV+N/5gcW0jQ3N8Qec=
google.golang.org/genproto/googleapis/api v0.0.0-20260414002931-afd174a4e478/go.mod h1:C6ADNqOxbgdUUeRTU+LCHDPB9ttAMCTff6auwCVa4uc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260414002931-afd174a4e478 h1:RmoJA1ujG+/lRGNfUnOMfhCy5EipVMyvUE+KNbPbTlw=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260414002931-afd174a4e478/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=

View File

@ -100,12 +100,12 @@ func main() {
_, err := SetupLogger(ctx)
if err != nil {
slog.Error("logger init", "err", err)
slog.ErrorContext(ctx, "logger init", "err", err)
}
cleanup, err := setupOtel(ctx)
if err != nil {
slog.Error("Failed to setup otel: ", "err", err)
slog.ErrorContext(ctx, "Failed to setup otel: ", "err", err)
return
}
defer cleanup(ctx)
@ -121,7 +121,10 @@ func main() {
"/video/health",
"/video/ready",
}
slog.Info("Skipping request logging for these paths", "paths", func() string { sort.Strings(ignorepath); return strings.Join(ignorepath, ",") }())
slog.InfoContext(ctx, "Skipping request logging for these paths", "paths", func() string {
sort.Strings(ignorepath)
return strings.Join(ignorepath, ",")
}())
// using example from https://echo.labstack.com/docs/middleware/logger#examples
// full configs https://github.com/labstack/echo/blob/master/middleware/request_logger.go
@ -183,7 +186,7 @@ func main() {
g := e.Group("/video")
if src.Settings.JwksUrl != "" {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
defer cancel()
k, err := keyfunc.NewDefaultCtx(ctx, []string{src.Settings.JwksUrl})

View File

@ -0,0 +1,49 @@
begin;
-- chapters
alter table gocoder.chapters add column sha varchar(40);
update gocoder.chapters c set sha = i.sha from gocoder.info i where c.id = i.id;
alter table gocoder.chapters alter column sha set not null;
alter table gocoder.chapters drop constraint chapter_pk;
alter table gocoder.chapters drop constraint chapters_info_fk;
alter table gocoder.chapters drop column id;
alter table gocoder.chapters add constraint chapter_pk primary key (sha, start_time);
alter table gocoder.chapters add foreign key (sha) references gocoder.info(sha) on delete cascade;
-- subtitles
alter table gocoder.subtitles add column sha varchar(40);
update gocoder.subtitles s set sha = i.sha from gocoder.info i where s.id = i.id;
alter table gocoder.subtitles alter column sha set not null;
alter table gocoder.subtitles drop constraint subtitle_pk;
alter table gocoder.subtitles drop constraint subtitles_info_fk;
alter table gocoder.subtitles drop column id;
alter table gocoder.subtitles add constraint subtitle_pk primary key (sha, idx);
alter table gocoder.subtitles add foreign key (sha) references gocoder.info(sha) on delete cascade;
-- audios
alter table gocoder.audios add column sha varchar(40);
update gocoder.audios a set sha = i.sha from gocoder.info i where a.id = i.id;
alter table gocoder.audios alter column sha set not null;
alter table gocoder.audios drop constraint audios_pk;
alter table gocoder.audios drop constraint audios_info_fk;
alter table gocoder.audios drop column id;
alter table gocoder.audios add constraint audios_pk primary key (sha, idx);
alter table gocoder.audios add foreign key (sha) references gocoder.info(sha) on delete cascade;
-- videos
alter table gocoder.videos add column sha varchar(40);
update gocoder.videos v set sha = i.sha from gocoder.info i where v.id = i.id;
alter table gocoder.videos alter column sha set not null;
alter table gocoder.videos drop constraint videos_pk;
alter table gocoder.videos drop constraint videos_info_fk;
alter table gocoder.videos drop column id;
alter table gocoder.videos add constraint videos_pk primary key (sha, idx);
alter table gocoder.videos add foreign key (sha) references gocoder.info(sha) on delete cascade;
alter table gocoder.info drop column id;
commit;

View File

@ -0,0 +1,55 @@
begin;
alter table gocoder.info add column id serial unique not null;
alter table gocoder.info drop constraint info_pkey;
alter table gocoder.info add constraint info_pkey primary key(id);
-- videos
alter table gocoder.videos add column id integer;
update gocoder.videos v set id = i.id from gocoder.info i where v.sha = i.sha;
alter table gocoder.videos alter column id set not null;
alter table gocoder.videos drop constraint videos_pk;
alter table gocoder.videos drop constraint videos_sha_fkey;
alter table gocoder.videos drop column sha;
alter table gocoder.videos add constraint videos_info_fk
foreign key (id) references gocoder.info(id) on delete cascade;
alter table gocoder.videos add constraint videos_pk primary key (id, idx);
-- audios
alter table gocoder.audios add column id integer;
update gocoder.audios a set id = i.id from gocoder.info i where a.sha = i.sha;
alter table gocoder.audios alter column id set not null;
alter table gocoder.audios drop constraint audios_pk;
alter table gocoder.audios drop constraint audios_sha_fkey;
alter table gocoder.audios drop column sha;
alter table gocoder.audios add constraint audios_info_fk
foreign key (id) references gocoder.info(id) on delete cascade;
alter table gocoder.audios add constraint audios_pk primary key (id, idx);
-- subtitles
alter table gocoder.subtitles add column id integer;
update gocoder.subtitles s set id = i.id from gocoder.info i where s.sha = i.sha;
alter table gocoder.subtitles alter column id set not null;
alter table gocoder.subtitles drop constraint subtitle_pk;
alter table gocoder.subtitles drop constraint subtitles_sha_fkey;
alter table gocoder.subtitles drop column sha;
alter table gocoder.subtitles add constraint subtitles_info_fk
foreign key (id) references gocoder.info(id) on delete cascade;
alter table gocoder.subtitles add constraint subtitle_pk primary key (id, idx);
-- chapters
alter table gocoder.chapters add column id integer;
update gocoder.chapters c set id = i.id from gocoder.info i where c.sha = i.sha;
alter table gocoder.chapters alter column id set not null;
alter table gocoder.chapters drop constraint chapter_pk;
alter table gocoder.chapters drop constraint chapters_sha_fkey;
alter table gocoder.chapters drop column sha;
alter table gocoder.chapters add constraint chapters_info_fk
foreign key (id) references gocoder.info(id) on delete cascade;
alter table gocoder.chapters add constraint chapter_pk primary key (id, start_time);
commit;

View File

@ -0,0 +1,8 @@
begin;
alter table gocoder.chapters drop column match_accuracy;
alter table gocoder.chapters drop column first_appearance;
drop table gocoder.fingerprints;
alter table gocoder.info drop column ver_fingerprint;
commit;

View File

@ -0,0 +1,15 @@
begin;
alter table gocoder.info add column ver_fingerprint integer not null default 0;
alter table gocoder.info add column ver_fp_with text[] not null default '{}';
create table gocoder.fingerprints(
id integer not null primary key references gocoder.info(id) on delete cascade,
start_data text not null,
end_data text not null
);
alter table gocoder.chapters add column match_accuracy integer;
alter table gocoder.chapters add column first_appearance boolean;
commit;

View File

@ -3,8 +3,10 @@ package main
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"strconv"
"strings"
echootel "github.com/labstack/echo-opentelemetry"
@ -29,8 +31,127 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
traceotel "go.opentelemetry.io/otel/trace"
traceotelnoop "go.opentelemetry.io/otel/trace/noop"
logotelbridge "go.opentelemetry.io/contrib/bridges/otelslog"
attributeotel "go.opentelemetry.io/otel/attribute"
)
func newOtelBridgeHandler() slog.Handler {
bridgeHandler := logotelbridge.NewHandler(
"slog",
logotelbridge.WithLoggerProvider(logotelglobal.GetLoggerProvider()),
logotelbridge.WithAttributes(attributeotel.String("source", "slog")),
)
return &fullTextMessageHandler{next: bridgeHandler}
}
type fullTextMessageHandler struct {
next slog.Handler
attrs []slog.Attr
groups []string
}
func (h *fullTextMessageHandler) Enabled(ctx context.Context, level slog.Level) bool {
return h.next.Enabled(ctx, level)
}
func (h *fullTextMessageHandler) Handle(ctx context.Context, r slog.Record) error {
parts := make([]string, 0)
for _, attr := range h.attrs {
parts = appendAttrParts(parts, h.groups, attr)
}
r.Attrs(func(attr slog.Attr) bool {
parts = appendAttrParts(parts, h.groups, attr)
return true
})
message := r.Message
if len(parts) > 0 {
message = fmt.Sprintf("%s %s", message, strings.Join(parts, " "))
}
record := slog.NewRecord(r.Time, r.Level, message, r.PC)
r.Attrs(func(attr slog.Attr) bool {
record.AddAttrs(attr)
return true
})
return h.next.Handle(ctx, record)
}
func (h *fullTextMessageHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
combined := make([]slog.Attr, 0, len(h.attrs)+len(attrs))
combined = append(combined, h.attrs...)
combined = append(combined, attrs...)
groups := make([]string, len(h.groups))
copy(groups, h.groups)
return &fullTextMessageHandler{
next: h.next.WithAttrs(attrs),
attrs: combined,
groups: groups,
}
}
func (h *fullTextMessageHandler) WithGroup(name string) slog.Handler {
attrs := make([]slog.Attr, len(h.attrs))
copy(attrs, h.attrs)
groups := make([]string, 0, len(h.groups)+1)
groups = append(groups, h.groups...)
groups = append(groups, name)
return &fullTextMessageHandler{
next: h.next.WithGroup(name),
attrs: attrs,
groups: groups,
}
}
func appendAttrParts(parts []string, groups []string, attr slog.Attr) []string {
attr.Value = attr.Value.Resolve()
if attr.Equal(slog.Attr{}) {
return parts
}
if attr.Value.Kind() == slog.KindGroup {
nextGroups := groups
if attr.Key != "" {
nextGroups = append(append([]string(nil), groups...), attr.Key)
}
for _, nestedAttr := range attr.Value.Group() {
parts = appendAttrParts(parts, nextGroups, nestedAttr)
}
return parts
}
keyParts := groups
if attr.Key != "" {
keyParts = append(append([]string(nil), groups...), attr.Key)
}
key := strings.Join(keyParts, ".")
if key == "" {
return parts
}
return append(parts, fmt.Sprintf("%s=%s", key, formatAttrValue(attr.Value)))
}
func formatAttrValue(value slog.Value) string {
switch value.Kind() {
case slog.KindString:
stringValue := value.String()
if strings.ContainsAny(stringValue, " \t\n\r\"=") {
return strconv.Quote(stringValue)
}
return stringValue
default:
return value.String()
}
}
func setupOtel(ctx context.Context) (func(context.Context) error, error) {
res, err := resource.New(
ctx,
@ -45,7 +166,7 @@ func setupOtel(ctx context.Context) (func(context.Context) error, error) {
return nil, err
}
slog.Info("Configuring OTEL")
slog.InfoContext(ctx, "Configuring OTEL")
otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(
@ -59,42 +180,42 @@ func setupOtel(ctx context.Context) (func(context.Context) error, error) {
var te tracesdk.SpanExporter
switch {
case strings.TrimSpace(os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT")) == "":
slog.Info("Using OLTP type", "type", "noop")
slog.InfoContext(ctx, "Using OLTP type", "type", "noop")
le = nil
me = nil
te = nil
case strings.ToLower(strings.TrimSpace(os.Getenv("OTEL_EXPORTER_OTLP_PROTOCOL"))) == "grpc":
slog.Info("Using OLTP type", "type", "grpc")
slog.InfoContext(ctx, "Using OLTP type", "type", "grpc")
le, err = otlploggrpc.New(ctx)
if err != nil {
slog.Error("Failed setting up OLTP", "err", err)
slog.ErrorContext(ctx, "Failed setting up OLTP", "err", err)
return nil, err
}
me, err = otlpmetricgrpc.New(ctx)
if err != nil {
slog.Error("Failed setting up OLTP", "err", err)
slog.ErrorContext(ctx, "Failed setting up OLTP", "err", err)
return nil, err
}
te, err = otlptracegrpc.New(ctx)
if err != nil {
slog.Error("Failed setting up OLTP", "err", err)
slog.ErrorContext(ctx, "Failed setting up OLTP", "err", err)
return nil, err
}
default:
slog.Info("Using OLTP type", "type", "http")
slog.InfoContext(ctx, "Using OLTP type", "type", "http")
le, err = otlploghttp.New(ctx)
if err != nil {
slog.Error("Failed setting up OLTP", "err", err)
slog.ErrorContext(ctx, "Failed setting up OLTP", "err", err)
return nil, err
}
me, err = otlpmetrichttp.New(ctx)
if err != nil {
slog.Error("Failed setting up OLTP", "err", err)
slog.ErrorContext(ctx, "Failed setting up OLTP", "err", err)
return nil, err
}
te, err = otlptracehttp.New(ctx)
if err != nil {
slog.Error("Failed setting up OLTP", "err", err)
slog.ErrorContext(ctx, "Failed setting up OLTP", "err", err)
return nil, err
}
}
@ -160,7 +281,7 @@ func setupOtel(ctx context.Context) (func(context.Context) error, error) {
}
return func(ctx context.Context) error {
slog.Info("Shutting down OTEL")
slog.InfoContext(ctx, "Shutting down OTEL")
// run shutdowns and collect errors
var errs []error

View File

@ -5,9 +5,6 @@ import (
"log/slog"
"os"
"strings"
logotelbridge "go.opentelemetry.io/contrib/bridges/otelslog"
logotelglobal "go.opentelemetry.io/otel/log/global"
)
type tee struct {
@ -64,7 +61,7 @@ func SetupLogger(ctx context.Context) (func(context.Context) error, error) {
return a
},
})
otelHandler := logotelbridge.NewHandler("slog", logotelbridge.WithLoggerProvider(logotelglobal.GetLoggerProvider()))
otelHandler := newOtelBridgeHandler()
minStdout := parseLogLevel(os.Getenv("STDOUT_LOG_LEVEL"))
minOtel := parseLogLevel(os.Getenv("OTEL_LOG_LEVEL"))

View File

@ -4,10 +4,12 @@ import (
"context"
"fmt"
"io"
"log/slog"
"mime"
"net/http"
"os"
"path/filepath"
"slices"
"github.com/asticode/go-astisub"
"github.com/labstack/echo/v5"
@ -62,10 +64,11 @@ func (h *mhandler) GetInfo(c *echo.Context) error {
Container: nil,
MimeCodec: nil,
Versions: src.Versions{
Info: -1,
Extract: 0,
Thumbs: 0,
Keyframes: 0,
Info: -1,
Extract: 0,
Thumbs: 0,
Keyframes: 0,
Fingerprint: 0,
},
Videos: make([]src.Video, 0),
Audios: make([]src.Audio, 0),
@ -79,41 +82,78 @@ func (h *mhandler) GetInfo(c *echo.Context) error {
// @Summary Prepare metadata
//
// @Description Starts metadata preparation in background (info, extract, thumbs, keyframes).
// @Description Starts metadata preparation in background (info, extract, thumbs, keyframes, chapter identification).
//
// @Tags metadata
// @Param path path string true "Base64 of a video's path" format(base64) example(L3ZpZGVvL2J1YmJsZS5ta3YK)
// @Param prev query false "Previous episode path (base64)"
// @Param next query false "Next episode path (base64)"
//
// @Success 202 "Preparation started"
// @Router /:path/prepare [get]
// @Router /:path/prepare [post]
func (h *mhandler) Prepare(c *echo.Context) error {
path, sha, err := getPath(c)
if err != nil {
return err
}
p := c.QueryParam("prev")
prev, psha, err := getPathS(p)
if p != "" && err != nil {
return err
}
n := c.QueryParam("next")
next, nsha, err := getPathS(n)
if n != "" && err != nil {
return err
}
go func(path string, sha string) {
bgCtx := context.Background()
go func() {
ctx := context.WithoutCancel(c.Request().Context())
info, err := h.metadata.GetMetadata(bgCtx, path, sha)
info, err := h.metadata.GetMetadata(ctx, path, sha)
if err != nil {
fmt.Printf("failed to prepare metadata for %s: %v\n", path, err)
slog.ErrorContext(ctx, "failed to prepare metadata", "path", path, "err", err)
return
}
// thumb & subs are already extracted in `GetMetadata`
for _, video := range info.Videos {
if _, err := h.metadata.GetKeyframes(info, true, video.Index); err != nil {
fmt.Printf("failed to extract video keyframes for %s (stream %d): %v\n", path, video.Index, err)
if _, err := h.metadata.GetKeyframes(ctx, info, true, video.Index); err != nil {
slog.WarnContext(ctx, "failed to extract video keyframes", "path", path, "stream", video.Index, "err", err)
}
}
for _, audio := range info.Audios {
if _, err := h.metadata.GetKeyframes(info, false, audio.Index); err != nil {
fmt.Printf("failed to extract audio keyframes for %s (stream %d): %v\n", path, audio.Index, err)
if _, err := h.metadata.GetKeyframes(ctx, info, false, audio.Index); err != nil {
slog.WarnContext(ctx, "failed to extract audio keyframes", "path", path, "stream", audio.Index, "err", err)
}
}
}(path, sha)
fpWith := make([]string, 0, 2)
if prev != "" {
fpWith = append(fpWith, psha)
}
if next != "" {
fpWith = append(fpWith, nsha)
}
if slices.Compare(info.Versions.FpWith, fpWith) != 0 {
err = h.metadata.IdentifyChapters(ctx, info, prev, next)
if err != nil {
return
}
_, err = h.metadata.Database.Exec(
ctx,
"update gocoder.info set ver_fp_with = $2 where id = $1",
info.Id,
fpWith,
)
if err != nil {
slog.WarnContext(ctx, "failed to save chapter identify info", "path", path, "err", err)
}
} else {
slog.InfoContext(ctx, "chapter detection up to date", "path", path)
}
}()
return c.NoContent(http.StatusAccepted)
}

View File

@ -1,11 +1,8 @@
package api
import (
"crypto/sha1"
"encoding/base64"
"encoding/hex"
"net/http"
"os"
"path/filepath"
"strings"
@ -14,7 +11,10 @@ import (
)
func getPath(c *echo.Context) (string, string, error) {
key := c.Param("path")
return getPathS(c.Param("path"))
}
func getPathS(key string) (string, string, error) {
if key == "" {
return "", "", echo.NewHTTPError(http.StatusBadRequest, "Missing resouce path.")
}
@ -38,15 +38,7 @@ func getPath(c *echo.Context) (string, string, error) {
}
func getHash(path string) (string, error) {
info, err := os.Stat(path)
if err != nil {
return "", err
}
h := sha1.New()
h.Write([]byte(path))
h.Write([]byte(info.ModTime().String()))
sha := hex.EncodeToString(h.Sum(nil))
return sha, nil
return src.ComputeSha(path)
}
func sanitizePath(path string) error {

View File

@ -1,8 +1,9 @@
package src
import (
"context"
"fmt"
"log"
"log/slog"
)
type AudioStream struct {
@ -11,10 +12,11 @@ type AudioStream struct {
quality AudioQuality
}
func (t *Transcoder) NewAudioStream(file *FileStream, idx uint32, quality AudioQuality) (*AudioStream, error) {
log.Printf("Creating a audio stream %d for %s", idx, file.Info.Path)
func (t *Transcoder) NewAudioStream(ctx context.Context, file *FileStream, idx uint32, quality AudioQuality) (*AudioStream, error) {
ctx = context.WithoutCancel(ctx)
slog.InfoContext(ctx, "creating an audio stream", "idx", idx, "path", file.Info.Path)
keyframes, err := t.metadataService.GetKeyframes(file.Info, false, idx)
keyframes, err := t.metadataService.GetKeyframes(ctx, file.Info, false, idx)
if err != nil {
return nil, err
}

275
transcoder/src/chapters.go Normal file
View File

@ -0,0 +1,275 @@
package src
import (
"context"
"fmt"
"log/slog"
"math"
"github.com/zoriya/kyoo/transcoder/src/utils"
)
const (
// MergeWindowSec is the maximum gap (in seconds) between a detected chapter
// boundary and an existing chapter for them to be merged.
MergeWindowSec float32 = 3.0
)
func (s *MetadataService) IdentifyChapters(
ctx context.Context,
info *MediaInfo,
prev string,
next string,
) error {
defer utils.PrintExecTime(ctx, "identify chapters for %s", info.Path)()
fingerprint, err := s.ComputeFingerprint(ctx, info)
if err != nil {
slog.ErrorContext(ctx, "failed to compute fingerprint", "path", info.Path, "err", err)
return err
}
candidates := make([]Chapter, 0)
for _, otherPath := range []string{prev, next} {
if otherPath == "" {
continue
}
nc, err := s.compareWithOther(ctx, info, fingerprint, otherPath)
if err != nil {
slog.WarnContext(ctx, "failed to compare episodes", "path", info.Path, "otherPath", otherPath, "err", err)
continue
}
if otherPath == next {
for i := range nc {
nc[i].FirstAppearance = new(true)
}
}
candidates = append(candidates, nc...)
}
chapters := mergeChapters(info, candidates)
err = s.saveChapters(ctx, info.Id, chapters)
if err != nil {
slog.ErrorContext(ctx, "failed to save chapters", "path", info.Path, "err", err)
return err
}
return nil
}
func (s *MetadataService) compareWithOther(
ctx context.Context,
info *MediaInfo,
fingerprint *Fingerprint,
otherPath string,
) ([]Chapter, error) {
otherSha, err := ComputeSha(otherPath)
if err != nil {
return nil, fmt.Errorf("failed to compute sha for %s: %w", otherPath, err)
}
otherInfo, err := s.GetMetadata(ctx, otherPath, otherSha)
if err != nil {
return nil, fmt.Errorf("failed to get metadata for %s: %w", otherPath, err)
}
otherPrint, err := s.ComputeFingerprint(ctx, otherInfo)
if err != nil {
return nil, fmt.Errorf("failed to compute fingerprint for %s: %w", otherInfo.Path, err)
}
if err := s.StoreFingerprint(ctx, otherInfo.Id, otherPrint); err != nil {
slog.WarnContext(ctx, "failed to store fingerprint", "path", otherInfo.Path, "err", err)
}
intros, err := FpFindOverlap(ctx, fingerprint.Start, otherPrint.Start)
if err != nil {
return nil, fmt.Errorf("failed to find intro overlaps: %w", err)
}
credits, err := FpFindOverlap(ctx, fingerprint.End, otherPrint.End)
if err != nil {
return nil, fmt.Errorf("failed to find credit overlaps: %w", err)
}
var candidates []Chapter
for _, intro := range intros {
slog.InfoContext(ctx, "Identified intro", "start", intro.StartFirst, "duration", intro.Duration)
candidates = append(candidates, Chapter{
Id: info.Id,
StartTime: float32(intro.StartFirst),
EndTime: float32(intro.StartFirst + intro.Duration),
Name: "",
Type: Intro,
MatchAccuracy: new(int32(intro.Accuracy)),
})
}
for _, cred := range credits {
endOffset := info.Duration - samplesToSec(len(fingerprint.End))
slog.InfoContext(ctx, "Identified credits", "start", endOffset+cred.StartFirst, "duration", cred.Duration, "end_offset", endOffset)
candidates = append(candidates, Chapter{
Id: info.Id,
StartTime: float32(endOffset + cred.StartFirst),
EndTime: float32(endOffset + cred.StartFirst + cred.Duration),
Name: "",
Type: Credits,
MatchAccuracy: new(int32(cred.Accuracy)),
})
}
return candidates, nil
}
func mergeChapters(info *MediaInfo, candidates []Chapter) []Chapter {
if len(candidates) == 0 {
return info.Chapters
}
chapters := make([]Chapter, 0, len(info.Chapters))
for _, c := range info.Chapters {
// ignore pre-generated chapters
if c.Name != "" {
chapters = append(chapters, c)
}
}
for _, cand := range candidates {
if cand.Type == Content {
continue
}
merged := false
for i := range chapters {
if absF32(chapters[i].StartTime-cand.StartTime) < MergeWindowSec &&
absF32(chapters[i].EndTime-cand.EndTime) < MergeWindowSec {
if chapters[i].Type == Content {
chapters[i].Type = cand.Type
}
if chapters[i].MatchAccuracy != nil {
chapters[i].MatchAccuracy = new(max(*chapters[i].MatchAccuracy, *cand.MatchAccuracy))
} else {
chapters[i].MatchAccuracy = cand.MatchAccuracy
chapters[i].FirstAppearance = cand.FirstAppearance
}
if chapters[i].Name != "" {
chapters[i].FirstAppearance = cand.FirstAppearance
}
merged = true
break
}
}
if !merged {
if cand.StartTime < MergeWindowSec {
cand.StartTime = 0
}
if absF32(float32(info.Duration)-cand.EndTime) < MergeWindowSec {
cand.EndTime = float32(info.Duration)
}
chapters = insertChapter(chapters, Chapter{
Id: info.Id,
StartTime: cand.StartTime,
EndTime: cand.EndTime,
Name: "",
Type: cand.Type,
MatchAccuracy: cand.MatchAccuracy,
FirstAppearance: cand.FirstAppearance,
}, info.Duration)
}
}
return chapters
}
// insertChapter adds a new chapter into the chapter list, adjusting adjacent
// chapters so there are no gaps or overlaps.
func insertChapter(chapters []Chapter, ch Chapter, duration float64) []Chapter {
var ret []Chapter
if len(chapters) == 0 {
if ch.StartTime > 0 {
ret = append(ret, Chapter{
Id: ch.Id,
StartTime: 0,
EndTime: ch.StartTime,
Name: "",
Type: Content,
})
}
ret = append(ret, ch)
if ch.EndTime < float32(duration) {
ret = append(ret, Chapter{
Id: ch.Id,
StartTime: ch.EndTime,
EndTime: float32(duration),
Name: "",
Type: Content,
})
}
return ret
}
inserted := false
for _, existing := range chapters {
if !inserted && ch.StartTime < existing.EndTime {
if ch.StartTime > existing.StartTime {
before := existing
before.EndTime = ch.StartTime
ret = append(ret, before)
}
ret = append(ret, ch)
inserted = true
if ch.EndTime < existing.EndTime {
after := existing
after.StartTime = ch.EndTime
ret = append(ret, after)
}
continue
}
if inserted && existing.StartTime < ch.EndTime {
if existing.EndTime > ch.EndTime {
existing.StartTime = ch.EndTime
ret = append(ret, existing)
}
continue
}
ret = append(ret, existing)
}
if !inserted {
ret = append(ret, ch)
}
return ret
}
func (s *MetadataService) saveChapters(ctx context.Context, infoId int32, chapters []Chapter) error {
tx, err := s.Database.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback(ctx)
_, err = tx.Exec(ctx, `delete from gocoder.chapters where id = $1`, infoId)
if err != nil {
return fmt.Errorf("failed to delete existing chapters: %w", err)
}
for _, c := range chapters {
_, err = tx.Exec(ctx,
`insert into gocoder.chapters(id, start_time, end_time, name, type, match_accuracy, first_appearance)
values ($1, $2, $3, $4, $5, $6, $7)`,
infoId, c.StartTime, c.EndTime, c.Name, c.Type, c.MatchAccuracy, c.FirstAppearance,
)
if err != nil {
return fmt.Errorf("failed to insert chapter: %w", err)
}
}
return tx.Commit(ctx)
}
func absF32(v float32) float32 {
return float32(math.Abs(float64(v)))
}

View File

@ -1,8 +1,9 @@
package src
import (
"context"
"fmt"
"log"
"log/slog"
"strings"
"gopkg.in/vansante/go-ffprobe.v2"
@ -143,7 +144,7 @@ func GetMimeCodec(stream *ffprobe.Stream) *string {
return &ret
default:
log.Printf("No known mime format for: %s", stream.CodecName)
slog.WarnContext(context.WithoutCancel(context.Background()), "no known mime format", "codec", stream.CodecName)
return nil
}
}

145
transcoder/src/exec/exec.go Normal file
View File

@ -0,0 +1,145 @@
package exec
import (
"context"
"errors"
"fmt"
osexec "os/exec"
"path/filepath"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
traceotel "go.opentelemetry.io/otel/trace"
)
var (
ErrDot = osexec.ErrDot
ErrNotFound = osexec.ErrNotFound
ErrWaitDelay = osexec.ErrWaitDelay
)
type Error = osexec.Error
type ExitError = osexec.ExitError
var tracer = otel.Tracer("kyoo.transcoder.cli")
type Cmd struct {
*osexec.Cmd
span traceotel.Span
spanCtx context.Context
spanned bool
spanDone bool
}
func LookPath(file string) (string, error) {
return osexec.LookPath(file)
}
func Command(name string, arg ...string) *Cmd {
return wrap(context.Background(), osexec.Command(name, arg...))
}
func CommandContext(ctx context.Context, name string, arg ...string) *Cmd {
if ctx == nil {
ctx = context.Background()
}
return wrap(ctx, osexec.CommandContext(ctx, name, arg...))
}
func wrap(ctx context.Context, cmd *osexec.Cmd) *Cmd {
if ctx == nil {
ctx = context.Background()
}
return &Cmd{Cmd: cmd, spanCtx: ctx}
}
func (c *Cmd) Run() error {
c.startSpan()
err := c.Cmd.Run()
c.endSpan(err)
return err
}
func (c *Cmd) Output() ([]byte, error) {
c.startSpan()
output, err := c.Cmd.Output()
c.endSpan(err)
return output, err
}
func (c *Cmd) CombinedOutput() ([]byte, error) {
c.startSpan()
output, err := c.Cmd.CombinedOutput()
c.endSpan(err)
return output, err
}
func (c *Cmd) Start() error {
c.startSpan()
err := c.Cmd.Start()
if err != nil {
c.endSpan(err)
}
return err
}
func (c *Cmd) Wait() error {
err := c.Cmd.Wait()
c.endSpan(err)
return err
}
func (c *Cmd) startSpan() {
if c == nil || c.spanned {
return
}
ctx := c.spanCtx
if ctx == nil {
ctx = context.Background()
}
attrs := []attribute.KeyValue{
attribute.String("process.command", c.Path),
}
if len(c.Args) > 1 {
attrs = append(attrs, attribute.StringSlice("process.command_args", c.Args[1:]))
}
if c.Dir != "" {
attrs = append(attrs, attribute.String("process.working_directory", c.Dir))
}
_, span := tracer.Start(
ctx,
fmt.Sprintf("exec %s", filepath.Base(c.Path)),
traceotel.WithAttributes(attrs...),
)
c.span = span
c.spanned = true
c.spanDone = false
}
func (c *Cmd) endSpan(err error) {
if c == nil || !c.spanned || c.spanDone || c.span == nil {
return
}
if err == nil {
c.span.SetAttributes(attribute.Int("process.exit.code", 0))
c.span.SetStatus(codes.Ok, "")
c.span.End()
c.spanDone = true
return
}
c.span.RecordError(err)
if exitErr, ok := errors.AsType[*osexec.ExitError](err); ok {
c.span.SetAttributes(attribute.Int("process.exit.code", exitErr.ExitCode()))
}
c.span.SetStatus(codes.Error, err.Error())
c.span.End()
c.spanDone = true
}

View File

@ -4,12 +4,12 @@ import (
"context"
"fmt"
"io"
"log"
"log/slog"
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/zoriya/kyoo/transcoder/src/exec"
"github.com/zoriya/kyoo/transcoder/src/storage"
"github.com/zoriya/kyoo/transcoder/src/utils"
)
@ -24,10 +24,10 @@ func (s *MetadataService) ExtractSubs(ctx context.Context, info *MediaInfo) (any
err := s.extractSubs(ctx, info)
if err != nil {
log.Printf("Couldn't extract subs: %v", err)
slog.ErrorContext(ctx, "couldn't extract subs", "err", err)
return set(nil, err)
}
_, err = s.Database.Exec(ctx, `update gocoder.info set ver_extract = $2 where sha = $1`, info.Sha, ExtractVersion)
_, err = s.Database.Exec(ctx, `update gocoder.info set ver_extract = $2 where id = $1`, info.Id, ExtractVersion)
return set(nil, err)
}
@ -73,7 +73,7 @@ func (s *MetadataService) GetSubtitle(
}
func (s *MetadataService) extractSubs(ctx context.Context, info *MediaInfo) (err error) {
defer utils.PrintExecTime("extraction of %s", info.Path)()
defer utils.PrintExecTime(ctx, "extraction of %s", info.Path)()
// If there are no supported, embedded subtitles, there is nothing to extract.
hasSupportedSubtitle := false
@ -129,11 +129,11 @@ func (s *MetadataService) extractSubs(ctx context.Context, info *MediaInfo) (err
)
}
}
log.Printf("Starting extraction with the command: %s", cmd)
slog.InfoContext(ctx, "starting extraction", "command", cmd.String())
cmd.Stdout = nil
if err := cmd.Run(); err != nil {
fmt.Println("Error starting ffmpeg extract:", err)
slog.ErrorContext(ctx, "error starting ffmpeg extract", "err", err)
return err
}
@ -142,7 +142,7 @@ func (s *MetadataService) extractSubs(ctx context.Context, info *MediaInfo) (err
return fmt.Errorf("failed while saving files to backend: %w", err)
}
log.Printf("Extraction finished for %s", info.Path)
slog.InfoContext(ctx, "extraction finished", "path", info.Path)
return nil
}

View File

@ -3,7 +3,7 @@ package src
import (
"context"
"fmt"
"log"
"log/slog"
"math"
"os"
"slices"
@ -33,7 +33,7 @@ type VideoKey struct {
quality VideoQuality
}
func (t *Transcoder) newFileStream(path string, sha string) *FileStream {
func (t *Transcoder) newFileStream(ctx context.Context, path string, sha string) *FileStream {
ret := &FileStream{
transcoder: t,
Out: fmt.Sprintf("%s/%s", Settings.Outpath, sha),
@ -42,14 +42,15 @@ func (t *Transcoder) newFileStream(path string, sha string) *FileStream {
}
ret.ready.Add(1)
go func() {
go func(ctx context.Context) {
defer ret.ready.Done()
info, err := t.metadataService.GetMetadata(context.Background(), path, sha)
ctx = context.WithoutCancel(ctx)
info, err := t.metadataService.GetMetadata(ctx, path, sha)
ret.Info = info
if err != nil {
ret.err = err
}
}()
}(ctx)
return ret
}
@ -68,13 +69,15 @@ func (fs *FileStream) Kill() {
}
}
func (fs *FileStream) Destroy() {
log.Printf("Removing all transcode cache files for %s", fs.Info.Path)
func (fs *FileStream) Destroy(ctx context.Context) {
ctx = context.WithoutCancel(ctx)
slog.InfoContext(ctx, "removing all transcode cache files", "path", fs.Info.Path)
fs.Kill()
_ = os.RemoveAll(fs.Out)
}
func (fs *FileStream) GetMaster(client string) string {
func (fs *FileStream) GetMaster(ctx context.Context, client string) string {
ctx = context.WithoutCancel(ctx)
master := "#EXTM3U\n"
// codec is the prefix + the level, the level is not part of the codec we want to compare for the same_codec check bellow
@ -233,54 +236,60 @@ func (fs *FileStream) GetMaster(client string) string {
return master
}
func (fs *FileStream) getVideoStream(idx uint32, quality VideoQuality) (*VideoStream, error) {
func (fs *FileStream) getVideoStream(ctx context.Context, idx uint32, quality VideoQuality) (*VideoStream, error) {
ctx = context.WithoutCancel(ctx)
stream, _ := fs.videos.GetOrCreate(VideoKey{idx, quality}, func() *VideoStream {
ret, _ := fs.transcoder.NewVideoStream(fs, idx, quality)
ret, _ := fs.transcoder.NewVideoStream(ctx, fs, idx, quality)
return ret
})
stream.ready.Wait()
return stream, nil
}
func (fs *FileStream) GetVideoIndex(idx uint32, quality VideoQuality, client string) (string, error) {
stream, err := fs.getVideoStream(idx, quality)
func (fs *FileStream) GetVideoIndex(ctx context.Context, idx uint32, quality VideoQuality, client string) (string, error) {
ctx = context.WithoutCancel(ctx)
stream, err := fs.getVideoStream(ctx, idx, quality)
if err != nil {
return "", err
}
return stream.GetIndex(client)
return stream.GetIndex(ctx, client)
}
func (fs *FileStream) GetVideoSegment(idx uint32, quality VideoQuality, segment int32) (string, error) {
stream, err := fs.getVideoStream(idx, quality)
func (fs *FileStream) GetVideoSegment(ctx context.Context, idx uint32, quality VideoQuality, segment int32) (string, error) {
ctx = context.WithoutCancel(ctx)
stream, err := fs.getVideoStream(ctx, idx, quality)
if err != nil {
return "", err
}
return stream.GetSegment(segment)
return stream.GetSegment(ctx, segment)
}
func (fs *FileStream) getAudioStream(idx uint32, quality AudioQuality) (*AudioStream, error) {
func (fs *FileStream) getAudioStream(ctx context.Context, idx uint32, quality AudioQuality) (*AudioStream, error) {
ctx = context.WithoutCancel(ctx)
stream, _ := fs.audios.GetOrCreate(AudioKey{idx, quality}, func() *AudioStream {
ret, _ := fs.transcoder.NewAudioStream(fs, idx, quality)
ret, _ := fs.transcoder.NewAudioStream(ctx, fs, idx, quality)
return ret
})
stream.ready.Wait()
return stream, nil
}
func (fs *FileStream) GetAudioIndex(idx uint32, quality AudioQuality, client string) (string, error) {
stream, err := fs.getAudioStream(idx, quality)
func (fs *FileStream) GetAudioIndex(ctx context.Context, idx uint32, quality AudioQuality, client string) (string, error) {
ctx = context.WithoutCancel(ctx)
stream, err := fs.getAudioStream(ctx, idx, quality)
if err != nil {
return "", err
}
return stream.GetIndex(client)
return stream.GetIndex(ctx, client)
}
func (fs *FileStream) GetAudioSegment(idx uint32, quality AudioQuality, segment int32) (string, error) {
stream, err := fs.getAudioStream(idx, quality)
func (fs *FileStream) GetAudioSegment(ctx context.Context, idx uint32, quality AudioQuality, segment int32) (string, error) {
ctx = context.WithoutCancel(ctx)
stream, err := fs.getAudioStream(ctx, idx, quality)
if err != nil {
return "", err
}
return stream.GetSegment(segment)
return stream.GetSegment(ctx, segment)
}
func matchAudioQuality(q VideoQuality) AudioQuality {

View File

@ -0,0 +1,160 @@
package src
import (
"context"
"encoding/binary"
"errors"
"fmt"
"log/slog"
"github.com/jackc/pgx/v5"
"github.com/zoriya/kyoo/transcoder/src/exec"
"github.com/zoriya/kyoo/transcoder/src/utils"
)
const (
FingerprintVersion = 1
FpStartPercent = 0.20
FpStartDuration = 10 * 60
FpEndDuration = 5 * 60
)
type Fingerprint struct {
Start []uint32
End []uint32
}
func (s *MetadataService) ComputeFingerprint(ctx context.Context, info *MediaInfo) (*Fingerprint, error) {
if info.Versions.Fingerprint == FingerprintVersion {
var startData string
var endData string
err := s.Database.QueryRow(ctx,
`select start_data, end_data from gocoder.fingerprints where id = $1`,
info.Id,
).Scan(&startData, &endData)
if err == nil {
startFingerprint, err := DecompressFingerprint(startData)
if err != nil {
return nil, fmt.Errorf("failed to decompress start fingerprint: %w", err)
}
endFingerprint, err := DecompressFingerprint(endData)
if err != nil {
return nil, fmt.Errorf("failed to decompress end fingerprint: %w", err)
}
return &Fingerprint{
Start: startFingerprint,
End: endFingerprint,
}, nil
}
if !errors.Is(err, pgx.ErrNoRows) {
return nil, fmt.Errorf("failed to query fingerprint: %w", err)
}
}
get_running, set := s.fingerprintLock.Start(info.Sha)
if get_running != nil {
return get_running()
}
defer utils.PrintExecTime(ctx, "chromaprint for %s", info.Path)()
startFingerprint, err := computeChromaprint(
ctx,
info.Path,
0,
min(info.Duration*FpStartPercent, FpStartDuration),
)
if err != nil {
return set(nil, fmt.Errorf("failed to compute start fingerprint: %w", err))
}
endFingerprint, err := computeChromaprint(
ctx,
info.Path,
max(info.Duration-FpEndDuration, 0),
-1,
)
if err != nil {
return set(nil, fmt.Errorf("failed to compute end fingerprint: %w", err))
}
_, err = s.Database.Exec(ctx,
`update gocoder.info set ver_fingerprint = $2 where id = $1`,
info.Id,
FingerprintVersion,
)
if err != nil {
slog.ErrorContext(ctx, "failed to update fingerprint version", "path", info.Path, "err", err)
}
return set(&Fingerprint{
Start: startFingerprint,
End: endFingerprint,
}, nil)
}
func computeChromaprint(
ctx context.Context,
path string,
start float64,
duration float64,
) ([]uint32, error) {
ctx = context.WithoutCancel(ctx)
defer utils.PrintExecTime(ctx, "chromaprint for %s (between %f and %f)", path, start, duration)()
args := []string{
"-v", "error",
}
if start > 0 {
args = append(args, "-ss", fmt.Sprintf("%.6f", start))
}
if duration > 0 {
args = append(args, "-t", fmt.Sprintf("%.6f", duration))
}
args = append(args,
"-i", path,
"-ac", "2",
// this algorithm allows silence detection
"-algorithm", "3",
"-f", "chromaprint",
"-fp_format", "raw",
"-",
)
cmd := exec.CommandContext(
ctx,
"ffmpeg",
args...,
)
output, err := cmd.Output()
if err != nil {
return nil, fmt.Errorf("ffmpeg failed: %w", err)
}
if len(output)%4 != 0 {
return nil, fmt.Errorf("invalid binary fingerprint size: %d", len(output))
}
result := make([]uint32, len(output)/4)
for i := range result {
result[i] = binary.LittleEndian.Uint32(output[i*4:])
}
return result, nil
}
func (s *MetadataService) StoreFingerprint(ctx context.Context, infoID int32, fingerprint *Fingerprint) error {
startCompressed, err := CompressFingerprint(fingerprint.Start)
if err != nil {
return fmt.Errorf("failed to compress start fingerprint: %w", err)
}
endCompressed, err := CompressFingerprint(fingerprint.End)
if err != nil {
return fmt.Errorf("failed to compress end fingerprint: %w", err)
}
_, err = s.Database.Exec(ctx,
`insert into gocoder.fingerprints(id, start_data, end_data) values ($1, $2, $3)
on conflict (id) do update set start_data = excluded.start_data, end_data = excluded.end_data`,
infoID, startCompressed, endCompressed,
)
return err
}

View File

@ -0,0 +1,244 @@
package src
import (
"context"
"log/slog"
"math/bits"
)
/// See how acoustid handles comparision:
//// https://bitbucket.org/acoustid/acoustid-server/src/cb303c2a3588ff055b7669cf6f1711a224ab9183/postgresql/acoustid_compare.c?at=master
const (
MinOverlapDuration = 15.0
// Correlation threshold (0.0-1.0) above which a match is considered valid.
// Uses the AcoustID-style formula: 1.0 - 2.0 * biterror / (32 * length),
// where random noise scores ~0.0 and identical audio scores 1.0.
MatchThreshold = 0.1
// Number of most-significant bits used as a hash key for offset voting.
// Matches AcoustID's MATCH_BITS. The top bits of a chromaprint value are
// the most discriminative (classifiers are ordered by importance).
MatchBits = 14
// Chromaprint encodes silence as this specific value.
// We skip it during offset voting to avoid false matches.
SilenceValue = 627964279
// Number of samples per correlation block (~2 seconds at 7.8125 samples/s).
// Segments are evaluated in blocks of this size to find contiguous matching runs.
CorrBlockSize = 16
)
type Overlap struct {
StartFirst float64
StartSecond float64
Duration float64
Accuracy int
}
type Match struct {
Start float64
Duration float64
Accuracy int
}
func hammingDistance(a, b uint32) int {
return bits.OnesCount32(a ^ b)
}
// segmentCorrelation computes a similarity score between two aligned
// fingerprint slices using the AcoustID formula.
// Returns a value in [0.0, 1.0] where 0.0 means completely different
// (or random noise) and 1.0 means identical.
func segmentCorrelation(fp1 []uint32, fp2 []uint32) float64 {
length := min(len(fp1), len(fp2))
if length == 0 {
return 0
}
biterror := 0
for i := range length {
biterror += hammingDistance(fp1[i], fp2[i])
}
score := 1.0 - 2.0*float64(biterror)/float64(32*length)
return max(0, score)
}
func matchStrip(v uint32) uint16 {
return uint16(v >> (32 - MatchBits))
}
// findBestOffset discovers the time offset that best aligns two fingerprints.
//
// It follows AcoustID's match_fingerprints2 approach:
// 1. Hash each fingerprint value by its top 14 bits into a fixed-size table,
// storing the last seen position for each hash bucket.
// 2. For each hash bucket present in both tables, vote for the offset
// (position_in_fp1 - position_in_fp2).
// 3. The offset with the most votes wins.
// 4. A diversity check rejects matches caused by repetitive/silent audio.
func findBestOffset(ctx context.Context, fp1, fp2 []uint32) *int {
offsets1 := make(map[uint16]int)
offsets2 := make(map[uint16]int)
for i, v := range fp1 {
if v == SilenceValue {
continue
}
key := matchStrip(v)
offsets1[key] = i + 1
}
for i, v := range fp2 {
if v == SilenceValue {
continue
}
key := matchStrip(v)
offsets2[key] = i + 1
}
if len(offsets1) == 0 || len(offsets2) == 0 {
return nil
}
votes := make(map[int]int)
topCount := 0
topOffset := 0
for key, a := range offsets1 {
b, ok := offsets2[key]
if !ok {
continue
}
offset := a - b
votes[offset]++
if votes[offset] > topCount {
topCount = votes[offset]
topOffset = offset
}
}
// Diversity check: reject if the top offset got very few votes relative
// to the number of unique values. This filters out repetitive audio
// (silence, static noise) that would produce spurious matches.
// (at least 2% of values must match with said offset)
percent := float64(topCount) / float64(min(len(offsets1), len(offsets2)))
if percent < 2./100 {
slog.WarnContext(
ctx,
"Diversity check failed, ignoring potential offset",
"offset", topOffset,
"percent", percent,
"vote_count", topCount,
)
return nil
}
slog.DebugContext(ctx, "Identified offset", "offset", topOffset, "percent", percent)
return new(topOffset)
}
// alignFingerprints returns the sub-slices of fp1 and fp2 that overlap
// when fp1 is shifted by `offset` positions relative to fp2.
// offset = position_in_fp1 - position_in_fp2.
// Also returns the starting indices in fp1 and fp2.
func alignFingerprints(fp1, fp2 []uint32, offset int) ([]uint32, []uint32, int, int) {
start1 := 0
start2 := 0
if offset > 0 {
start1 = offset
} else {
start2 = -offset
}
length := min(len(fp1)-start1, len(fp2)-start2)
if length <= 0 {
return nil, nil, 0, 0
}
return fp1[start1 : start1+length], fp2[start2 : start2+length], start1, start2
}
// findMatchingRuns divides the aligned fingerprints into fixed-size blocks,
// computes the correlation of each block, and finds contiguous runs of
// blocks whose correlation exceeds MatchThreshold. Each run that is at least
// MinOverlapDuration long is returned as an Overlap.
func findMatchingRuns(fp1, fp2 []uint32, start1, start2 int) []Overlap {
length := min(len(fp1), len(fp2))
minSamples := secToSamples(MinOverlapDuration)
if length < minSamples {
return nil
}
nblocks := length / CorrBlockSize
blockCorr := make([]float64, nblocks)
for b := range nblocks {
lo := b * CorrBlockSize
hi := lo + CorrBlockSize
blockCorr[b] = segmentCorrelation(fp1[lo:hi], fp2[lo:hi])
}
// Find contiguous runs of blocks above threshold.
var overlaps []Overlap
inRun := false
runStart := 0
// Handle a run that extends to the last block.
nblocks++
blockCorr = append(blockCorr, 0)
for b := range nblocks {
if blockCorr[b] >= MatchThreshold {
if !inRun {
runStart = b
}
inRun = true
continue
}
if !inRun {
continue
}
inRun = false
start := runStart * CorrBlockSize
// the current `b` doesn't match, don't include it.
// also remove the previous ones to be sure we don't skip content.
end := (b - 3) * CorrBlockSize
if end-start >= minSamples {
corr := segmentCorrelation(fp1[start:end], fp2[start:end])
overlaps = append(overlaps, Overlap{
StartFirst: samplesToSec(start1 + start),
StartSecond: samplesToSec(start2 + start),
Duration: samplesToSec(end - start),
Accuracy: max(0, min(int(corr*100), 100)),
})
}
}
return overlaps
}
// FpFindOverlap finds all similar segments (like shared intro music) between
// two chromaprint fingerprints.
//
// 1. Hash each fingerprint value by its top 14 bits to find the best
// time-offset alignment between the two fingerprints (like
// AcoustID's match_fingerprints2)
// 2. Align the fingerprints at that offset.
// 3. Divide the aligned region into ~2-second blocks and compute correlation
// per block using the AcoustID scoring formula.
// 4. Find contiguous runs of high-correlation blocks that are at least
// MinOverlapDuration long.
func FpFindOverlap(ctx context.Context, fp1 []uint32, fp2 []uint32) ([]Overlap, error) {
offset := findBestOffset(ctx, fp1, fp2)
if offset == nil {
return nil, nil
}
a1, a2, s1, s2 := alignFingerprints(fp1, fp2, *offset)
if len(a1) == 0 {
return nil, nil
}
runs := findMatchingRuns(a1, a2, s1, s2)
return runs, nil
}

View File

@ -0,0 +1,77 @@
package src
import (
"bytes"
"compress/zlib"
"encoding/base64"
"encoding/binary"
"fmt"
"io"
"math"
)
// Number of fingerprint items per second (chromaprint default sample rate).
// Chromaprint uses ~8000 Hz sample rate with 4096-sample frames and 4096/3 overlap,
// producing roughly 7.8 items/s. We use the conventional approximation.
const FingerprintSampleRate = 7.8125
func secToSamples(sec float64) int {
return int(math.Round(sec * FingerprintSampleRate))
}
func samplesToSec(samples int) float64 {
return float64(samples) / FingerprintSampleRate
}
func CompressFingerprint(fp []uint32) (string, error) {
if len(fp) == 0 {
return "", nil
}
raw := make([]byte, len(fp)*4)
for i, v := range fp {
binary.LittleEndian.PutUint32(raw[i*4:], v)
}
var compressed bytes.Buffer
zw := zlib.NewWriter(&compressed)
if _, err := zw.Write(raw); err != nil {
_ = zw.Close()
return "", fmt.Errorf("failed to compress fingerprint: %w", err)
}
if err := zw.Close(); err != nil {
return "", fmt.Errorf("failed to finalize compressed fingerprint: %w", err)
}
return base64.StdEncoding.EncodeToString(compressed.Bytes()), nil
}
func DecompressFingerprint(compressed string) ([]uint32, error) {
data, err := base64.StdEncoding.DecodeString(compressed)
if err != nil {
return nil, fmt.Errorf("failed to base64 decode fingerprint: %w", err)
}
zr, err := zlib.NewReader(bytes.NewReader(data))
if err != nil {
return nil, fmt.Errorf("failed to create zlib reader: %w", err)
}
defer zr.Close()
raw, err := io.ReadAll(zr)
if err != nil {
return nil, fmt.Errorf("failed to decompress fingerprint: %w", err)
}
if len(raw)%4 != 0 {
return nil, fmt.Errorf("invalid raw fingerprint size: %d", len(raw))
}
numItems := len(raw) / 4
result := make([]uint32, numItems)
for i := range numItems {
result[i] = binary.LittleEndian.Uint32(raw[i*4:])
}
return result, nil
}

View File

@ -1,16 +1,18 @@
package src
import (
"log"
"context"
"log/slog"
"os"
)
func DetectHardwareAccel() HwAccelT {
ctx := context.WithoutCancel(context.Background())
name := GetEnvOr("GOCODER_HWACCEL", "disabled")
if name == "disabled" {
name = GetEnvOr("GOTRANSCODER_HWACCEL", "disabled")
}
log.Printf("Using hardware acceleration: %s", name)
slog.InfoContext(ctx, "using hardware acceleration", "name", name)
// superfast or ultrafast would produce a file extremely big so we prefer to ignore them. Fast is available on all hwaccel modes
// so we use that by default.
@ -107,7 +109,7 @@ func DetectHardwareAccel() HwAccelT {
NoResizeFilter: "format=nv12|cuda,hwupload,scale_cuda=format=nv12",
}
default:
log.Printf("No hardware accelerator named: %s", name)
slog.ErrorContext(ctx, "no hardware accelerator named", "name", name)
os.Exit(2)
panic("unreachable")
}

View File

@ -3,10 +3,15 @@ package src
import (
"cmp"
"context"
"crypto/sha1"
"encoding/base64"
"encoding/hex"
"fmt"
"log/slog"
"mime"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
@ -20,13 +25,18 @@ import (
const InfoVersion = 4
type Versions struct {
Info int32 `json:"info" db:"ver_info"`
Extract int32 `json:"extract" db:"ver_extract"`
Thumbs int32 `json:"thumbs" db:"ver_thumbs"`
Keyframes int32 `json:"keyframes" db:"ver_keyframes"`
Info int32 `json:"info" db:"ver_info"`
Extract int32 `json:"extract" db:"ver_extract"`
Thumbs int32 `json:"thumbs" db:"ver_thumbs"`
Keyframes int32 `json:"keyframes" db:"ver_keyframes"`
Fingerprint int32 `json:"fingerprint" db:"ver_fingerprint"`
/// List of sha this was fingerprinted with
FpWith []string `json:"fpWith" db:"ver_fp_with"`
}
type MediaInfo struct {
// Auto-increment id used as foreign key for related tables.
Id int32 `json:"id" db:"id"`
// The sha1 of the video file.
Sha string `json:"sha" db:"sha"`
/// The internal path of the video file.
@ -60,7 +70,7 @@ type MediaInfo struct {
}
type Video struct {
Sha string `json:"-" db:"sha"`
Id int32 `json:"-" db:"id"`
/// The index of this track on the media.
Index uint32 `json:"index" db:"idx"`
@ -86,7 +96,7 @@ type Video struct {
}
type Audio struct {
Sha string `json:"-" db:"sha"`
Id int32 `json:"-" db:"id"`
/// The index of this track on the media.
Index uint32 `json:"index" db:"idx"`
@ -110,7 +120,7 @@ type Audio struct {
}
type Subtitle struct {
Sha string `json:"-" db:"sha"`
Id int32 `json:"-" db:"id"`
/// The index of this track on the media.
Index *uint32 `json:"index" db:"idx"`
@ -137,7 +147,7 @@ type Subtitle struct {
}
type Chapter struct {
Sha string `json:"-" db:"sha"`
Id int32 `json:"-" db:"id"`
/// The start time of the chapter (in second from the start of the episode).
StartTime float32 `json:"startTime" db:"start_time"`
@ -145,8 +155,12 @@ type Chapter struct {
EndTime float32 `json:"endTime" db:"end_time"`
/// The name of this chapter. This should be a human-readable name that could be presented to the user.
Name string `json:"name" db:"name"`
/// The type value is used to mark special chapters (openning/credits...)
/// The type value is used to mark special chapters (opening/credits...)
Type ChapterType `json:"type" db:"type"`
// true only for introductions where the audio track is new (first time we'we heard this one in the serie)
FirstAppearance *bool `json:"firstAppearance,omitempty" db:"first_appearance"`
/// Accuracy of the fingerprint match (0-100).
MatchAccuracy *int32 `json:"matchAccuracy,omitempty" db:"match_accuracy"`
}
type ChapterType string
@ -159,6 +173,26 @@ const (
Preview ChapterType = "preview"
)
// regex stolen from https://github.com/intro-skipper/intro-skipper/wiki/Chapter-Detection-Patterns
var chapterTypePatterns = []struct {
kind ChapterType
pattern *regexp.Regexp
}{
{kind: Recap, pattern: regexp.MustCompile(`(?i)\b(re?cap|sum+ary|prev(ious(ly)?)?|(last|earlier)(\b\w+)?|catch\bup)\b`)},
{kind: Intro, pattern: regexp.MustCompile(`(?i)\b(intro|introduction|op|opening)\b`)},
{kind: Credits, pattern: regexp.MustCompile(`(?i)\b(credits?|ed|ending|outro)\b`)},
{kind: Preview, pattern: regexp.MustCompile(`(?i)\b(preview|pv|sneak\b?peek|coming\b?(up|soon)|next\b+(time|on|episode)|extra|teaser|trailer)\b`)},
}
func identifyChapterType(name string) ChapterType {
for _, matcher := range chapterTypePatterns {
if matcher.pattern.MatchString(name) {
return matcher.kind
}
}
return Content
}
func ParseFloat(str string) float32 {
f, err := strconv.ParseFloat(str, 32)
if err != nil {
@ -170,7 +204,7 @@ func ParseFloat(str string) float32 {
func ParseUint(str string) uint32 {
i, err := strconv.ParseUint(str, 10, 32)
if err != nil {
println(str)
slog.WarnContext(context.WithoutCancel(context.Background()), "failed to parse uint", "value", str, "err", err)
return 0
}
return uint32(i)
@ -179,7 +213,7 @@ func ParseUint(str string) uint32 {
func ParseInt64(str string) int64 {
i, err := strconv.ParseInt(str, 10, 64)
if err != nil {
println(str)
slog.WarnContext(context.WithoutCancel(context.Background()), "failed to parse int64", "value", str, "err", err)
return 0
}
return i
@ -233,11 +267,10 @@ var SubtitleExtensions = map[string]string{
"hdmv_pgs_subtitle": "sup",
}
func RetriveMediaInfo(path string, sha string) (*MediaInfo, error) {
defer utils.PrintExecTime("mediainfo for %s", path)()
ctx, cancelFn := context.WithTimeout(context.Background(), 30*time.Second)
func RetriveMediaInfo(ctx context.Context, path string, sha string) (*MediaInfo, error) {
ctx, cancelFn := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer cancelFn()
defer utils.PrintExecTime(ctx, "mediainfo for %s", path)()
mi, err := ffprobe.ProbeURL(ctx, path)
if err != nil {
@ -253,10 +286,11 @@ func RetriveMediaInfo(path string, sha string) (*MediaInfo, error) {
Duration: mi.Format.DurationSeconds,
Container: OrNull(mi.Format.FormatName),
Versions: Versions{
Info: InfoVersion,
Extract: 0,
Thumbs: 0,
Keyframes: 0,
Info: InfoVersion,
Extract: 0,
Thumbs: 0,
Keyframes: 0,
Fingerprint: 0,
},
Videos: MapStream(mi.Streams, ffprobe.StreamVideo, func(stream *ffprobe.Stream, i uint32) Video {
lang, _ := language.Parse(stream.Tags.Language)
@ -312,8 +346,7 @@ func RetriveMediaInfo(path string, sha string) (*MediaInfo, error) {
Name: c.Title(),
StartTime: float32(c.StartTimeSeconds),
EndTime: float32(c.EndTimeSeconds),
// TODO: detect content type
Type: Content,
Type: identifyChapterType(c.Title()),
}
}),
Fonts: MapStream(mi.Streams, ffprobe.StreamAttachment, func(stream *ffprobe.Stream, i uint32) string {
@ -340,3 +373,16 @@ func RetriveMediaInfo(path string, sha string) (*MediaInfo, error) {
}
return &ret, nil
}
// ComputeSha computes a SHA1 hash of the file path and its modification time.
// This is used as a cache key to detect when a file has changed.
func ComputeSha(path string) (string, error) {
info, err := os.Stat(path)
if err != nil {
return "", err
}
h := sha1.New()
h.Write([]byte(path))
h.Write([]byte(info.ModTime().String()))
return hex.EncodeToString(h.Sum(nil)), nil
}

View File

@ -5,13 +5,13 @@ import (
"context"
"errors"
"fmt"
"log"
"os/exec"
"log/slog"
"strconv"
"strings"
"sync"
"github.com/jackc/pgx/v5/pgtype"
"github.com/zoriya/kyoo/transcoder/src/exec"
"github.com/zoriya/kyoo/transcoder/src/utils"
)
@ -102,7 +102,7 @@ type KeyframeKey struct {
Index uint32
}
func (s *MetadataService) GetKeyframes(info *MediaInfo, isVideo bool, idx uint32) (*Keyframe, error) {
func (s *MetadataService) GetKeyframes(ctx context.Context, info *MediaInfo, isVideo bool, idx uint32) (*Keyframe, error) {
info.lock.Lock()
var ret *Keyframe
if isVideo && info.Videos[idx].Keyframes != nil {
@ -139,20 +139,20 @@ func (s *MetadataService) GetKeyframes(info *MediaInfo, isVideo bool, idx uint32
}
info.lock.Unlock()
go func() {
ctx := context.Background()
go func(ctx context.Context) {
ctx = context.WithoutCancel(ctx)
var table string
var err error
if isVideo {
table = "gocoder.videos"
err = getVideoKeyframes(info.Path, idx, kf)
err = getVideoKeyframes(ctx, info.Path, idx, kf)
} else {
table = "gocoder.audios"
err = getAudioKeyframes(info, idx, kf)
err = getAudioKeyframes(ctx, info, idx, kf)
}
if err != nil {
log.Printf("Couldn't retrieve keyframes for %s %s %d: %v", info.Path, table, idx, err)
slog.ErrorContext(ctx, "couldn't retrieve keyframes", "path", info.Path, "table", table, "idx", idx, "err", err)
return
}
@ -160,30 +160,31 @@ func (s *MetadataService) GetKeyframes(info *MediaInfo, isVideo bool, idx uint32
tx, _ := s.Database.Begin(ctx)
tx.Exec(
ctx,
fmt.Sprintf(`update %s set keyframes = $3 where sha = $1 and idx = $2`, table),
info.Sha,
fmt.Sprintf(`update %s set keyframes = $3 where id = $1 and idx = $2`, table),
info.Id,
idx,
kf.Keyframes,
)
tx.Exec(ctx, `update gocoder.info set ver_keyframes = $2 where sha = $1`, info.Sha, KeyframeVersion)
tx.Exec(ctx, `update gocoder.info set ver_keyframes = $2 where id = $1`, info.Id, KeyframeVersion)
err = tx.Commit(ctx)
if err != nil {
log.Printf("Couldn't store keyframes on database: %v", err)
slog.ErrorContext(ctx, "couldn't store keyframes on database", "err", err)
}
}()
}(ctx)
return set(kf, nil)
}
// Retrive video's keyframes and store them inside the kf var.
// Returns when all key frames are retrived (or an error occurs)
// info.ready.Done() is called when more than 100 are retrived (or extraction is done)
func getVideoKeyframes(path string, video_idx uint32, kf *Keyframe) error {
defer utils.PrintExecTime("ffprobe keyframe analysis for %s video n%d", path, video_idx)()
func getVideoKeyframes(ctx context.Context, path string, video_idx uint32, kf *Keyframe) error {
defer utils.PrintExecTime(ctx, "ffprobe keyframe analysis for %s video n%d", path, video_idx)()
// run ffprobe to return all IFrames, IFrames are points where we can split the video in segments.
// We ask ffprobe to return the time of each frame and it's flags
// We could ask it to return only i-frames (keyframes) with the -skip_frame nokey but using it is extremly slow
// since ffmpeg parses every frames when this flag is set.
cmd := exec.Command(
cmd := exec.CommandContext(
ctx,
"ffprobe",
"-loglevel", "error",
"-select_streams", fmt.Sprintf("V:%d", video_idx),
@ -201,6 +202,8 @@ func getVideoKeyframes(path string, video_idx uint32, kf *Keyframe) error {
if err != nil {
return err
}
// we don't care about the result but await it for tracess.
go cmd.Wait()
scanner := bufio.NewScanner(stdout)
@ -276,8 +279,8 @@ func getVideoKeyframes(path string, video_idx uint32, kf *Keyframe) error {
const DummyKeyframeDuration = float64(4)
// we can pretty much cut audio at any point so no need to get specific frames, just cut every 4s
func getAudioKeyframes(info *MediaInfo, audio_idx uint32, kf *Keyframe) error {
defer utils.PrintExecTime("ffprobe keyframe analysis for %s audio n%d", info.Path, audio_idx)()
func getAudioKeyframes(ctx context.Context, info *MediaInfo, audio_idx uint32, kf *Keyframe) error {
defer utils.PrintExecTime(ctx, "ffprobe keyframe analysis for %s audio n%d", info.Path, audio_idx)()
// Format's duration CAN be different than audio's duration. To make sure we do not
// miss a segment or make one more, we need to check the audio's duration.
//
@ -302,7 +305,8 @@ func getAudioKeyframes(info *MediaInfo, audio_idx uint32, kf *Keyframe) error {
//
// We could use the same command to retrieve all packets and know when we can cut PRECISELY
// but since packets always contain only a few ms we don't need this precision.
cmd := exec.Command(
cmd := exec.CommandContext(
ctx,
"ffprobe",
"-select_streams", fmt.Sprintf("a:%d", audio_idx),
"-show_entries", "packet=pts_time",
@ -322,6 +326,8 @@ func getAudioKeyframes(info *MediaInfo, audio_idx uint32, kf *Keyframe) error {
if err != nil {
return err
}
// we don't care about the result but await it for tracess.
go cmd.Wait()
scanner := bufio.NewScanner(stdout)
var duration float64

View File

@ -5,6 +5,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"log/slog"
"os"
"github.com/aws/aws-sdk-go-v2/config"
@ -20,22 +21,24 @@ import (
)
type MetadataService struct {
Database *pgxpool.Pool
lock RunLock[string, *MediaInfo]
thumbLock RunLock[string, any]
extractLock RunLock[string, any]
keyframeLock RunLock[KeyframeKey, *Keyframe]
storage storage.StorageBackend
Database *pgxpool.Pool
lock RunLock[string, *MediaInfo]
thumbLock RunLock[string, any]
extractLock RunLock[string, any]
keyframeLock RunLock[KeyframeKey, *Keyframe]
fingerprintLock RunLock[string, *Fingerprint]
storage storage.StorageBackend
}
func NewMetadataService() (*MetadataService, error) {
ctx := context.TODO()
s := &MetadataService{
lock: NewRunLock[string, *MediaInfo](),
thumbLock: NewRunLock[string, any](),
extractLock: NewRunLock[string, any](),
keyframeLock: NewRunLock[KeyframeKey, *Keyframe](),
lock: NewRunLock[string, *MediaInfo](),
thumbLock: NewRunLock[string, any](),
extractLock: NewRunLock[string, any](),
keyframeLock: NewRunLock[KeyframeKey, *Keyframe](),
fingerprintLock: NewRunLock[string, *Fingerprint](),
}
db, err := s.setupDb()
@ -97,11 +100,11 @@ func (s *MetadataService) setupDb() (*pgxpool.Pool, error) {
db, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
fmt.Printf("Could not connect to database, check your env variables!\n")
slog.ErrorContext(ctx, "could not connect to database, check your env variables", "err", err)
return nil, err
}
fmt.Println("Migrating database")
slog.InfoContext(ctx, "migrating database")
dbi := stdlib.OpenDBFromPool(db)
defer dbi.Close()
@ -117,7 +120,7 @@ func (s *MetadataService) setupDb() (*pgxpool.Pool, error) {
return nil, err
}
m.Up()
fmt.Println("Migrating finished")
slog.InfoContext(ctx, "migrating finished")
return db, nil
}
@ -152,7 +155,7 @@ func (s *MetadataService) GetMetadata(ctx context.Context, path string, sha stri
return nil, err
}
bgCtx := context.Background()
bgCtx := context.WithoutCancel(ctx)
if ret.Versions.Thumbs < ThumbsVersion {
go s.ExtractThumbs(bgCtx, path, sha)
@ -171,12 +174,25 @@ func (s *MetadataService) GetMetadata(ctx context.Context, path string, sha stri
if err != nil {
return nil, err
}
tx.Exec(bgCtx, `update gocoder.videos set keyframes = null where sha = $1`, sha)
tx.Exec(bgCtx, `update gocoder.audios set keyframes = null where sha = $1`, sha)
tx.Exec(bgCtx, `update gocoder.info set ver_keyframes = 0 where sha = $1`, sha)
tx.Exec(bgCtx, `update gocoder.videos set keyframes = null where id = $1`, ret.Id)
tx.Exec(bgCtx, `update gocoder.audios set keyframes = null where id = $1`, ret.Id)
tx.Exec(bgCtx, `update gocoder.info set ver_keyframes = 0 where id = $1`, ret.Id)
err = tx.Commit(bgCtx)
if err != nil {
fmt.Printf("error deleting old keyframes from database: %v", err)
slog.ErrorContext(bgCtx, "error deleting old keyframes from database", "err", err)
}
}
if ret.Versions.Fingerprint < FingerprintVersion && ret.Versions.Fingerprint != 0 {
tx, err := s.Database.Begin(bgCtx)
if err != nil {
return nil, err
}
tx.Exec(bgCtx, `delete from gocoder.fingerprints where id = $1`, ret.Id)
tx.Exec(bgCtx, `update gocoder.info set ver_fingerprint = 0 where id = $1`, ret.Id)
err = tx.Commit(bgCtx)
if err != nil {
slog.ErrorContext(bgCtx, "error deleting old fingerprints from database", "err", err)
}
}
@ -187,12 +203,14 @@ func (s *MetadataService) getMetadata(ctx context.Context, path string, sha stri
rows, _ := s.Database.Query(
ctx,
`select
i.sha, i.path, i.extension, i.mime_codec, i.size, i.duration, i.container, i.fonts,
i.id, i.sha, i.path, i.extension, i.mime_codec, i.size, i.duration, i.container, i.fonts,
jsonb_build_object(
'info', i.ver_info,
'extract', i.ver_extract,
'thumbs', i.ver_thumbs,
'keyframes', i.ver_keyframes
'keyframes', i.ver_keyframes,
'fingerprint', i.ver_fingerprint,
'fpWith', i.ver_fp_with
) as versions
from gocoder.info as i
where i.sha=$1 limit 1`,
@ -201,7 +219,7 @@ func (s *MetadataService) getMetadata(ctx context.Context, path string, sha stri
ret, err := pgx.CollectOneRow(rows, pgx.RowToStructByName[MediaInfo])
if errors.Is(err, pgx.ErrNoRows) || (ret.Versions.Info < InfoVersion && ret.Versions.Info != 0) {
return s.storeFreshMetadata(context.Background(), path, sha)
return s.storeFreshMetadata(context.WithoutCancel(ctx), path, sha)
}
if err != nil {
return nil, err
@ -209,8 +227,8 @@ func (s *MetadataService) getMetadata(ctx context.Context, path string, sha stri
rows, _ = s.Database.Query(
ctx,
`select * from gocoder.videos as v where v.sha=$1`,
sha,
`select * from gocoder.videos as v where v.id=$1`,
ret.Id,
)
ret.Videos, err = pgx.CollectRows(rows, pgx.RowToStructByName[Video])
if err != nil {
@ -219,8 +237,8 @@ func (s *MetadataService) getMetadata(ctx context.Context, path string, sha stri
rows, _ = s.Database.Query(
ctx,
`select * from gocoder.audios as a where a.sha=$1`,
sha,
`select * from gocoder.audios as a where a.id=$1`,
ret.Id,
)
ret.Audios, err = pgx.CollectRows(rows, pgx.RowToStructByName[Audio])
if err != nil {
@ -229,8 +247,8 @@ func (s *MetadataService) getMetadata(ctx context.Context, path string, sha stri
rows, _ = s.Database.Query(
ctx,
`select * from gocoder.subtitles as s where s.sha=$1`,
sha,
`select * from gocoder.subtitles as s where s.id=$1`,
ret.Id,
)
ret.Subtitles, err = pgx.CollectRows(rows, pgx.RowToStructByName[Subtitle])
if err != nil {
@ -249,13 +267,13 @@ func (s *MetadataService) getMetadata(ctx context.Context, path string, sha stri
}
err = ret.SearchExternalSubtitles()
if err != nil {
fmt.Printf("Couldn't find external subtitles: %v", err)
slog.WarnContext(ctx, "couldn't find external subtitles", "err", err)
}
rows, _ = s.Database.Query(
ctx,
`select * from gocoder.chapters as c where c.sha=$1`,
sha,
`select * from gocoder.chapters as c where c.id=$1`,
ret.Id,
)
ret.Chapters, err = pgx.CollectRows(rows, pgx.RowToStructByName[Chapter])
if err != nil {
@ -270,7 +288,7 @@ func (s *MetadataService) storeFreshMetadata(ctx context.Context, path string, s
return get_running()
}
ret, err := RetriveMediaInfo(path, sha)
ret, err := RetriveMediaInfo(ctx, path, sha)
if err != nil {
return set(nil, err)
}
@ -280,27 +298,32 @@ func (s *MetadataService) storeFreshMetadata(ctx context.Context, path string, s
return set(ret, err)
}
// it needs to be a delete instead of a on conflict do update because we want to trigger delete casquade for
// it needs to be a delete instead of a on conflict do update because we want to trigger delete cascade for
// videos/audios & co.
tx.Exec(ctx, `delete from gocoder.info where path = $1`, path)
tx.Exec(ctx,
err = tx.QueryRow(ctx,
`
insert into gocoder.info(sha, path, extension, mime_codec, size, duration, container,
fonts, ver_info, ver_extract, ver_thumbs, ver_keyframes)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
fonts, ver_info, ver_extract, ver_thumbs, ver_keyframes, ver_fingerprint)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
returning id
`,
// on conflict do not update versions of extract/thumbs/keyframes
ret.Sha, ret.Path, ret.Extension, ret.MimeCodec, ret.Size, ret.Duration, ret.Container,
ret.Fonts, ret.Versions.Info, ret.Versions.Extract, ret.Versions.Thumbs, ret.Versions.Keyframes,
)
ret.Versions.Fingerprint,
).Scan(&ret.Id)
if err != nil {
return set(ret, fmt.Errorf("failed to insert info: %w", err))
}
for _, v := range ret.Videos {
tx.Exec(
ctx,
`
insert into gocoder.videos(sha, idx, title, language, codec, mime_codec, width, height, is_default, bitrate)
insert into gocoder.videos(id, idx, title, language, codec, mime_codec, width, height, is_default, bitrate)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
on conflict (sha, idx) do update set
sha = excluded.sha,
on conflict (id, idx) do update set
id = excluded.id,
idx = excluded.idx,
title = excluded.title,
language = excluded.language,
@ -311,17 +334,17 @@ func (s *MetadataService) storeFreshMetadata(ctx context.Context, path string, s
is_default = excluded.is_default,
bitrate = excluded.bitrate
`,
ret.Sha, v.Index, v.Title, v.Language, v.Codec, v.MimeCodec, v.Width, v.Height, v.IsDefault, v.Bitrate,
ret.Id, v.Index, v.Title, v.Language, v.Codec, v.MimeCodec, v.Width, v.Height, v.IsDefault, v.Bitrate,
)
}
for _, a := range ret.Audios {
tx.Exec(
ctx,
`
insert into gocoder.audios(sha, idx, title, language, codec, mime_codec, channels, is_default, bitrate)
insert into gocoder.audios(id, idx, title, language, codec, mime_codec, channels, is_default, bitrate)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9)
on conflict (sha, idx) do update set
sha = excluded.sha,
on conflict (id, idx) do update set
id = excluded.id,
idx = excluded.idx,
title = excluded.title,
language = excluded.language,
@ -331,17 +354,17 @@ func (s *MetadataService) storeFreshMetadata(ctx context.Context, path string, s
is_default = excluded.is_default,
bitrate = excluded.bitrate
`,
ret.Sha, a.Index, a.Title, a.Language, a.Codec, a.MimeCodec, a.Channels, a.IsDefault, a.Bitrate,
ret.Id, a.Index, a.Title, a.Language, a.Codec, a.MimeCodec, a.Channels, a.IsDefault, a.Bitrate,
)
}
for _, s := range ret.Subtitles {
tx.Exec(
ctx,
`
insert into gocoder.subtitles(sha, idx, title, language, codec, extension, is_default, is_forced, is_hearing_impaired)
insert into gocoder.subtitles(id, idx, title, language, codec, extension, is_default, is_forced, is_hearing_impaired)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9)
on conflict (sha, idx) do update set
sha = excluded.sha,
on conflict (id, idx) do update set
id = excluded.id,
idx = excluded.idx,
title = excluded.title,
language = excluded.language,
@ -351,23 +374,23 @@ func (s *MetadataService) storeFreshMetadata(ctx context.Context, path string, s
is_forced = excluded.is_forced,
is_hearing_impaired = excluded.is_hearing_impaired
`,
ret.Sha, s.Index, s.Title, s.Language, s.Codec, s.Extension, s.IsDefault, s.IsForced, s.IsHearingImpaired,
ret.Id, s.Index, s.Title, s.Language, s.Codec, s.Extension, s.IsDefault, s.IsForced, s.IsHearingImpaired,
)
}
for _, c := range ret.Chapters {
tx.Exec(
ctx,
`
insert into gocoder.chapters(sha, start_time, end_time, name, type)
insert into gocoder.chapters(id, start_time, end_time, name, type)
values ($1, $2, $3, $4, $5)
on conflict (sha, start_time) do update set
sha = excluded.sha,
on conflict (id, start_time) do update set
id = excluded.id,
start_time = excluded.start_time,
end_time = excluded.end_time,
name = excluded.name,
type = excluded.type
`,
ret.Sha, c.StartTime, c.EndTime, c.Name, c.Type,
ret.Id, c.StartTime, c.EndTime, c.Name, c.Type,
)
}
err = tx.Commit(ctx)

View File

@ -2,17 +2,19 @@ package src
import (
"bufio"
"context"
"errors"
"fmt"
"log"
"log/slog"
"math"
"os"
"os/exec"
"path/filepath"
"slices"
"strings"
"sync"
"time"
"github.com/zoriya/kyoo/transcoder/src/exec"
)
type Flags int32
@ -125,7 +127,8 @@ func toSegmentStr(segments []float64) string {
}), ",")
}
func (ts *Stream) run(start int32) error {
func (ts *Stream) run(ctx context.Context, start int32) error {
ctx = context.WithoutCancel(ctx)
// Start the transcode up to the 100th segment (or less)
length, is_done := ts.keyframes.Length()
end := min(start+100, length)
@ -154,14 +157,7 @@ func (ts *Stream) run(start int32) error {
ts.heads = append(ts.heads, Head{segment: start, end: end, command: nil})
ts.lock.Unlock()
log.Printf(
"Starting transcode %d for %s (from %d to %d out of %d segments)",
encoder_id,
ts.file.Info.Path,
start,
end,
length,
)
slog.InfoContext(ctx, "starting transcode", "encoderId", encoder_id, "path", ts.file.Info.Path, "start", start, "end", end, "length", length)
// Include both the start and end delimiter because -ss and -to are not accurate
// Having an extra segment allows us to cut precisely the segments we want with the
@ -277,8 +273,8 @@ func (ts *Stream) run(start int32) error {
outpath,
)
cmd := exec.Command("ffmpeg", args...)
log.Printf("Running %s", strings.Join(cmd.Args, " "))
cmd := exec.CommandContext(ctx, "ffmpeg", args...)
slog.InfoContext(ctx, "running ffmpeg", "args", strings.Join(cmd.Args, " "))
stdout, err := cmd.StdoutPipe()
if err != nil {
@ -295,7 +291,7 @@ func (ts *Stream) run(start int32) error {
ts.heads[encoder_id].command = cmd
ts.lock.Unlock()
go func() {
go func(ctx context.Context) {
scanner := bufio.NewScanner(stdout)
format := filepath.Base(outpath)
should_stop := false
@ -311,11 +307,11 @@ func (ts *Stream) run(start int32) error {
}
ts.lock.Lock()
ts.heads[encoder_id].segment = segment
log.Printf("Segment %d got ready (%d)", segment, encoder_id)
slog.InfoContext(ctx, "segment got ready", "segment", segment, "encoderId", encoder_id)
if ts.isSegmentReady(segment) {
// the current segment is already marked at done so another process has already gone up to here.
cmd.Process.Signal(os.Interrupt)
log.Printf("Killing ffmpeg because segment %d is already ready", segment)
slog.InfoContext(ctx, "killing ffmpeg because segment already ready", "segment", segment, "encoderId", encoder_id)
should_stop = true
} else {
ts.segments[segment].encoder = encoder_id
@ -325,7 +321,7 @@ func (ts *Stream) run(start int32) error {
should_stop = true
} else if ts.isSegmentReady(segment + 1) {
cmd.Process.Signal(os.Interrupt)
log.Printf("Killing ffmpeg because next segment %d is ready", segment)
slog.InfoContext(ctx, "killing ffmpeg because next segment is ready", "segment", segment, "encoderId", encoder_id)
should_stop = true
}
}
@ -338,30 +334,30 @@ func (ts *Stream) run(start int32) error {
}
if err := scanner.Err(); err != nil {
log.Println("Error reading stdout of ffmpeg", err)
slog.WarnContext(ctx, "error reading ffmpeg stdout", "err", err)
}
}()
}(ctx)
go func() {
go func(ctx context.Context) {
err := cmd.Wait()
if exiterr, ok := err.(*exec.ExitError); ok && exiterr.ExitCode() == 255 {
log.Printf("ffmpeg %d was killed by us", encoder_id)
slog.InfoContext(ctx, "ffmpeg was killed by us", "encoderId", encoder_id)
} else if err != nil {
log.Printf("ffmpeg %d occured an error: %s: %s", encoder_id, err, stderr.String())
slog.ErrorContext(ctx, "ffmpeg occured an error", "encoderId", encoder_id, "err", err, "stderr", stderr.String())
} else {
log.Printf("ffmpeg %d finished successfully", encoder_id)
slog.InfoContext(ctx, "ffmpeg finished successfully", "encoderId", encoder_id)
}
ts.lock.Lock()
defer ts.lock.Unlock()
// we can't delete the head directly because it would invalidate the others encoder_id
ts.heads[encoder_id] = DeletedHead
}()
}(ctx)
return nil
}
func (ts *Stream) GetIndex(client string) (string, error) {
func (ts *Stream) GetIndex(_ context.Context, client string) (string, error) {
// playlist type is event since we can append to the list if Keyframe.IsDone is false.
// start time offset makes the stream start at 0s instead of ~3segments from the end (requires version 6 of hls)
index := `#EXTM3U
@ -388,7 +384,8 @@ func (ts *Stream) GetIndex(client string) (string, error) {
return index, nil
}
func (ts *Stream) GetSegment(segment int32) (string, error) {
func (ts *Stream) GetSegment(ctx context.Context, segment int32) (string, error) {
ctx = context.WithoutCancel(ctx)
ts.lock.RLock()
ready := ts.isSegmentReady(segment)
// we want to calculate distance in the same lock else it can be funky
@ -409,13 +406,13 @@ func (ts *Stream) GetSegment(segment int32) (string, error) {
if !ready {
// Only start a new encode if there is too big a distance between the current encoder and the segment.
if distance > 60 || !is_scheduled {
log.Printf("Creating new head for %d since closest head is %fs aways", segment, distance)
err := ts.run(segment)
slog.InfoContext(ctx, "creating new head", "segment", segment, "distance", distance)
err := ts.run(ctx, segment)
if err != nil {
return "", err
}
} else {
log.Printf("Waiting for segment %d since encoder head is %fs aways", segment, distance)
slog.InfoContext(ctx, "waiting for segment", "segment", segment, "distance", distance)
}
select {
@ -424,11 +421,12 @@ func (ts *Stream) GetSegment(segment int32) (string, error) {
return "", errors.New("could not retrive the selected segment (timeout)")
}
}
ts.prerareNextSegements(segment)
ts.prerareNextSegements(ctx, segment)
return fmt.Sprintf(ts.handle.getOutPath(ts.segments[segment].encoder), segment), nil
}
func (ts *Stream) prerareNextSegements(segment int32) {
func (ts *Stream) prerareNextSegements(ctx context.Context, segment int32) {
ctx = context.WithoutCancel(ctx)
// Audio is way cheaper to create than video so we don't need to run them in advance
// Running it in advance might actually slow down the video encode since less compute
// power can be used so we simply disable that.
@ -446,8 +444,8 @@ func (ts *Stream) prerareNextSegements(segment int32) {
if ts.getMinEncoderDistance(i) < 60+(5*float64(i-segment)) {
continue
}
log.Printf("Creating new head for future segment (%d)", i)
go ts.run(i)
slog.InfoContext(ctx, "creating new head for future segment", "segment", i)
go ts.run(ctx, i)
return
}
}

View File

@ -7,7 +7,7 @@ import (
"image"
"image/color"
"io"
"log"
"log/slog"
"math"
"strings"
"sync"
@ -40,7 +40,7 @@ func getThumbVttPath(sha string) string {
}
func (s *MetadataService) GetThumbVtt(ctx context.Context, path string, sha string) (io.ReadCloser, error) {
_, err := s.ExtractThumbs(context.Background(), path, sha)
_, err := s.ExtractThumbs(context.WithoutCancel(ctx), path, sha)
if err != nil {
return nil, err
}
@ -54,7 +54,7 @@ func (s *MetadataService) GetThumbVtt(ctx context.Context, path string, sha stri
}
func (s *MetadataService) GetThumbSprite(ctx context.Context, path string, sha string) (io.ReadCloser, error) {
_, err := s.ExtractThumbs(context.Background(), path, sha)
_, err := s.ExtractThumbs(context.WithoutCancel(ctx), path, sha)
if err != nil {
return nil, err
}
@ -82,7 +82,7 @@ func (s *MetadataService) ExtractThumbs(ctx context.Context, path string, sha st
}
func (s *MetadataService) extractThumbnail(ctx context.Context, path string, sha string) (err error) {
defer utils.PrintExecTime("extracting thumbnails for %s", path)()
defer utils.PrintExecTime(ctx, "extracting thumbnails for %s", path)()
vttPath := getThumbVttPath(sha)
spritePath := getThumbPath(sha)
@ -95,7 +95,7 @@ func (s *MetadataService) extractThumbnail(ctx context.Context, path string, sha
gen, err := screengen.NewGenerator(path)
if err != nil {
log.Printf("Error reading video file: %v", err)
slog.ErrorContext(ctx, "error reading video file", "path", path, "err", err)
return err
}
defer gen.Close()
@ -120,13 +120,13 @@ func (s *MetadataService) extractThumbnail(ctx context.Context, path string, sha
sprite := imaging.New(width*columns, height*rows, color.Black)
vtt := "WEBVTT\n\n"
log.Printf("Extracting %d thumbnails for %s (interval of %d).", numcaps, path, interval)
slog.InfoContext(ctx, "extracting thumbnails", "count", numcaps, "path", path, "interval", interval)
ts := 0
for i := 0; i < numcaps; i++ {
img, err := gen.ImageWxH(int64(ts*1000), width, height)
if err != nil {
log.Printf("Could not generate screenshot %s", err)
slog.ErrorContext(ctx, "could not generate screenshot", "err", err)
return err
}

View File

@ -1,7 +1,8 @@
package src
import (
"log"
"context"
"log/slog"
"time"
)
@ -121,12 +122,13 @@ func (t *Tracker) start() {
}
func (t *Tracker) KillStreamIfDead(sha string, path string) bool {
ctx := context.WithoutCancel(context.Background())
for _, stream := range t.clients {
if stream.sha == sha {
return false
}
}
log.Printf("Nobody is watching %s. Killing it", path)
slog.InfoContext(ctx, "nobody is watching stream, killing it", "path", path)
stream, ok := t.transcoder.streams.Get(sha)
if !ok {
@ -148,16 +150,17 @@ func (t *Tracker) DestroyStreamIfOld(sha string) {
if !ok {
return
}
stream.Destroy()
stream.Destroy(context.WithoutCancel(context.Background()))
}
func (t *Tracker) KillAudioIfDead(sha string, path string, audio AudioKey) bool {
ctx := context.WithoutCancel(context.Background())
for _, stream := range t.clients {
if stream.sha == sha && stream.audio != nil && *stream.audio == audio {
return false
}
}
log.Printf("Nobody is listening audio %d of %s. Killing it", audio.idx, path)
slog.InfoContext(ctx, "nobody is listening audio, killing it", "audioIdx", audio.idx, "path", path)
stream, ok := t.transcoder.streams.Get(sha)
if !ok {
@ -172,12 +175,13 @@ func (t *Tracker) KillAudioIfDead(sha string, path string, audio AudioKey) bool
}
func (t *Tracker) KillVideoIfDead(sha string, path string, video VideoKey) bool {
ctx := context.WithoutCancel(context.Background())
for _, stream := range t.clients {
if stream.sha == sha && stream.video != nil && *stream.video == video {
return false
}
}
log.Printf("Nobody is watching %s video %d quality %s. Killing it", path, video.idx, video.quality)
slog.InfoContext(ctx, "nobody is watching video quality, killing it", "path", path, "videoIdx", video.idx, "quality", video.quality)
stream, ok := t.transcoder.streams.Get(sha)
if !ok {
@ -212,6 +216,7 @@ func (t *Tracker) KillOrphanedHeads(sha string, video *VideoKey, audio *AudioKey
}
func (t *Tracker) killOrphanedeheads(stream *Stream, is_video bool) {
ctx := context.WithoutCancel(context.Background())
stream.lock.Lock()
defer stream.lock.Unlock()
@ -229,7 +234,7 @@ func (t *Tracker) killOrphanedeheads(stream *Stream, is_video bool) {
distance = min(Abs(ihead-head.segment), distance)
}
if distance > 20 {
log.Printf("Killing orphaned head %s %d", stream.file.Info.Path, encoder_id)
slog.InfoContext(ctx, "killing orphaned head", "path", stream.file.Info.Path, "encoderId", encoder_id)
stream.KillHead(encoder_id)
}
}

View File

@ -37,9 +37,10 @@ func NewTranscoder(metadata *MetadataService) (*Transcoder, error) {
return ret, nil
}
func (t *Transcoder) getFileStream(path string, sha string) (*FileStream, error) {
func (t *Transcoder) getFileStream(ctx context.Context, path string, sha string) (*FileStream, error) {
ctx = context.WithoutCancel(ctx)
ret, _ := t.streams.GetOrCreate(sha, func() *FileStream {
return t.newFileStream(path, sha)
return t.newFileStream(ctx, path, sha)
})
ret.ready.Wait()
if ret.err != nil {
@ -50,7 +51,8 @@ func (t *Transcoder) getFileStream(path string, sha string) (*FileStream, error)
}
func (t *Transcoder) GetMaster(ctx context.Context, path string, client string, sha string) (string, error) {
stream, err := t.getFileStream(path, sha)
ctx = context.WithoutCancel(ctx)
stream, err := t.getFileStream(ctx, path, sha)
if err != nil {
return "", err
}
@ -63,7 +65,7 @@ func (t *Transcoder) GetMaster(ctx context.Context, path string, client string,
vhead: -1,
ahead: -1,
}
return stream.GetMaster(client), nil
return stream.GetMaster(ctx, client), nil
}
func (t *Transcoder) GetVideoIndex(
@ -74,7 +76,8 @@ func (t *Transcoder) GetVideoIndex(
client string,
sha string,
) (string, error) {
stream, err := t.getFileStream(path, sha)
ctx = context.WithoutCancel(ctx)
stream, err := t.getFileStream(ctx, path, sha)
if err != nil {
return "", err
}
@ -87,7 +90,7 @@ func (t *Transcoder) GetVideoIndex(
vhead: -1,
ahead: -1,
}
return stream.GetVideoIndex(video, quality, client)
return stream.GetVideoIndex(ctx, video, quality, client)
}
func (t *Transcoder) GetAudioIndex(
@ -98,7 +101,8 @@ func (t *Transcoder) GetAudioIndex(
client string,
sha string,
) (string, error) {
stream, err := t.getFileStream(path, sha)
ctx = context.WithoutCancel(ctx)
stream, err := t.getFileStream(ctx, path, sha)
if err != nil {
return "", err
}
@ -110,7 +114,7 @@ func (t *Transcoder) GetAudioIndex(
vhead: -1,
ahead: -1,
}
return stream.GetAudioIndex(audio, quality, client)
return stream.GetAudioIndex(ctx, audio, quality, client)
}
func (t *Transcoder) GetVideoSegment(
@ -122,7 +126,8 @@ func (t *Transcoder) GetVideoSegment(
client string,
sha string,
) (string, error) {
stream, err := t.getFileStream(path, sha)
ctx = context.WithoutCancel(ctx)
stream, err := t.getFileStream(ctx, path, sha)
if err != nil {
return "", err
}
@ -135,7 +140,7 @@ func (t *Transcoder) GetVideoSegment(
audio: nil,
ahead: -1,
}
return stream.GetVideoSegment(video, quality, segment)
return stream.GetVideoSegment(ctx, video, quality, segment)
}
func (t *Transcoder) GetAudioSegment(
@ -147,7 +152,8 @@ func (t *Transcoder) GetAudioSegment(
client string,
sha string,
) (string, error) {
stream, err := t.getFileStream(path, sha)
ctx = context.WithoutCancel(ctx)
stream, err := t.getFileStream(ctx, path, sha)
if err != nil {
return "", err
}
@ -159,5 +165,5 @@ func (t *Transcoder) GetAudioSegment(
ahead: segment,
vhead: -1,
}
return stream.GetAudioSegment(audio, quality, segment)
return stream.GetAudioSegment(ctx, audio, quality, segment)
}

View File

@ -1,19 +1,28 @@
package utils
import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"time"
"go.opentelemetry.io/otel"
)
func PrintExecTime(message string, args ...any) func() {
var tracer = otel.Tracer("")
func PrintExecTime(ctx context.Context, message string, args ...any) func() {
msg := fmt.Sprintf(message, args...)
if ctx == nil {
ctx = context.Background()
}
_, span := tracer.Start(ctx, msg)
start := time.Now()
log.Printf("Running %s", msg)
return func() {
log.Printf("%s finished in %s", msg, time.Since(start))
slog.InfoContext(ctx, fmt.Sprintf("finished %s", msg), "duration", time.Since(start))
span.End()
}
}

View File

@ -1,8 +1,9 @@
package src
import (
"context"
"fmt"
"log"
"log/slog"
)
type VideoStream struct {
@ -11,15 +12,11 @@ type VideoStream struct {
quality VideoQuality
}
func (t *Transcoder) NewVideoStream(file *FileStream, idx uint32, quality VideoQuality) (*VideoStream, error) {
log.Printf(
"Creating a new video stream for %s (n %d) in quality %s",
file.Info.Path,
idx,
quality,
)
func (t *Transcoder) NewVideoStream(ctx context.Context, file *FileStream, idx uint32, quality VideoQuality) (*VideoStream, error) {
ctx = context.WithoutCancel(ctx)
slog.InfoContext(ctx, "creating a new video stream", "path", file.Info.Path, "idx", idx, "quality", quality)
keyframes, err := t.metadataService.GetKeyframes(file.Info, true, idx)
keyframes, err := t.metadataService.GetKeyframes(ctx, file.Info, true, idx)
if err != nil {
return nil, err
}