Add admin page to list current streams (#1484)

This commit is contained in:
Zoe Roux
2026-04-29 00:44:10 +02:00
committed by GitHub
18 changed files with 1106 additions and 76 deletions
+1 -19
View File
@@ -2,7 +2,7 @@ import { TypeCompiler } from "@sinclair/typebox/compiler";
import { Value } from "@sinclair/typebox/value";
import Elysia, { t } from "elysia";
import { createRemoteJWKSet, jwtVerify } from "jose";
import { KError } from "./models/error";
import { UserC } from "~/models/user";
import type { Prettify } from "./utils";
const jwtSecret = process.env.JWT_SECRET
@@ -91,24 +91,6 @@ export const auth = new Elysia({ name: "auth" })
})
.as("scoped");
const User = t.Object({
id: t.String({ format: "uuid" }),
username: t.String(),
email: t.String({ format: "email" }),
createdDate: t.Date(),
lastSeen: t.Date(),
claims: t.Record(t.String(), t.Any()),
oidc: t.Record(
t.String(),
t.Object({
id: t.String(),
username: t.String(),
profileUrl: t.Nullable(t.String({ format: "url" })),
}),
),
});
const UserC = TypeCompiler.Compile(t.Union([User, KError]));
export async function getUserInfo(
id: string,
headers: { authorization: string },
+2
View File
@@ -15,6 +15,7 @@ import { movies } from "./controllers/shows/movies";
import { series } from "./controllers/shows/series";
import { showsH } from "./controllers/shows/shows";
import { staffH } from "./controllers/staff";
import { streamsH } from "./controllers/streams";
import { studiosH } from "./controllers/studios";
import { videosMetadata } from "./controllers/video-metadata";
import { videosReadH } from "./controllers/videos";
@@ -115,6 +116,7 @@ export const handlers = new Elysia({ prefix })
},
(app) =>
app
.use(streamsH)
.use(showsH)
.use(movies)
.use(series)
+223
View File
@@ -0,0 +1,223 @@
import { TypeCompiler } from "@sinclair/typebox/compiler";
import { and, desc, eq, sql } from "drizzle-orm";
import { Elysia, t } from "elysia";
import { auth } from "~/auth";
import { db } from "~/db";
import { history, profiles, videos } from "~/db/schema";
import { sqlarr } from "~/db/utils";
import { Entry } from "~/models/entry";
import { KError } from "~/models/error";
import { Show } from "~/models/show";
import { User } from "~/models/user";
import { AcceptLanguage, processLanguages } from "~/models/utils";
import { uniq } from "~/utils";
import { getVideos } from "./videos";
const TranscodeStatus = t.Object({
index: t.Integer(),
quality: t.String(),
heads: t.Array(
t.Object({
start: t.Number(),
end: t.Number(),
startHead: t.Integer(),
endHead: t.Integer(),
isRunning: t.Boolean(),
}),
),
});
const ViewerTrack = t.Object({
index: t.Integer(),
quality: t.String(),
head: t.Integer(),
});
const TranscoderViewer = t.Object({
clientId: t.String(),
profileId: t.Nullable(t.String({ format: "uuid" })),
sessionId: t.Nullable(t.String()),
video: t.Nullable(ViewerTrack),
audio: t.Nullable(ViewerTrack),
});
const TranscoderStream = t.Object({
path: t.String(),
sha: t.String(),
duration: t.Number(),
videos: t.Array(TranscodeStatus),
audios: t.Array(TranscodeStatus),
viewers: t.Array(TranscoderViewer),
});
type TranscoderStream = typeof TranscoderStream.static;
const TranscoderStreamListC = TypeCompiler.Compile(t.Array(TranscoderStream));
const UserPageC = TypeCompiler.Compile(t.Object({ items: t.Array(User) }));
const RunningViewerTrack = t.Object({
index: t.Integer(),
quality: t.String(),
head: t.Integer(),
});
const RunningStream = t.Object({
id: t.String({ format: "uuid" }),
path: t.String(),
duration: t.Number(),
videos: t.Array(TranscodeStatus),
audios: t.Array(TranscodeStatus),
viewers: t.Array(
t.Object({
user: t.Nullable(User),
progress: t.Nullable(t.Number()),
video: t.Nullable(RunningViewerTrack),
audio: t.Nullable(RunningViewerTrack),
}),
),
entries: t.Array(t.Omit(Entry, ["videos", "progress"])),
show: t.Nullable(Show),
});
export const streamsH = new Elysia({ tags: ["videos"] }).use(auth).get(
"videos/streams",
// @ts-expect-error idk
async ({
headers: { authorization, "accept-language": langs },
jwt: { sub, settings },
status,
}) => {
let streams: TranscoderStream[];
try {
const response = await fetch(
new URL(
"/video/streams",
process.env.TRANSCODER_SERVER ?? "http://transcoder:7666",
),
{
headers: authorization ? { authorization } : undefined,
},
);
if (!response.ok) {
return status(502, {
status: 502,
message: "Cannot fetch running streams from transcoder.",
details: await response.text(),
});
}
streams = TranscoderStreamListC.Decode(await response.json());
} catch (e) {
return status(502, {
status: 502,
message: "Cannot reach transcoder service.",
details: e,
});
}
if (!streams.length) return [];
const usersById = new Map<string, User>();
try {
const resp: Response = await fetch(
new URL("/auth/users", process.env.AUTH_SERVER ?? "http://auth:4568"),
{
headers: authorization ? { authorization } : undefined,
},
);
if (!resp.ok) {
return status(502, {
status: 502,
message: "Cannot fetch users from auth service.",
details: await resp.text(),
});
}
const { items } = UserPageC.Decode(await resp.json());
for (const user of items) {
usersById.set(user.id, user);
}
} catch (e) {
return status(502, {
status: 502,
message: "Cannot reach auth service.",
details: e,
});
}
const paths = streams.map((x) => x.path);
const items = await getVideos({
limit: paths.length,
filter: eq(videos.path, sql`any(${sqlarr(paths)})`),
languages: processLanguages(langs),
preferOriginal: settings.preferOriginal,
relations: ["show"],
userId: sub,
});
const profileIds = uniq(
streams
.flatMap((x) => x.viewers.map((v) => v.profileId))
.filter((x): x is string => !!x),
);
const videoIds = items.map((x) => x.id);
const progress = new Map<string, number>();
if (profileIds.length > 0 && videoIds.length > 0) {
const progressRows = await db
.selectDistinctOn([profiles.id, videos.id], {
profileId: profiles.id,
videoId: videos.id,
time: history.time,
})
.from(history)
.innerJoin(profiles, eq(history.profilePk, profiles.pk))
.innerJoin(videos, eq(history.videoPk, videos.pk))
.where(
and(
eq(profiles.id, sql`any(${sqlarr(profileIds)}::uuid[])`),
eq(videos.id, sql`any(${sqlarr(videoIds)}::uuid[])`),
),
)
.orderBy(profiles.id, videos.id, desc(history.playedDate));
for (const row of progressRows) {
progress.set(`${row.profileId}:${row.videoId}`, row.time);
}
}
const videosByPath = new Map(items.map((x) => [x.path, x]));
return streams.map((stream) => {
const video = videosByPath.get(stream.path);
return {
id: video!.id,
path: stream.path,
duration: stream.duration,
videos: stream.videos,
audios: stream.audios,
viewers: stream.viewers.map((viewer) => ({
user: usersById.get(viewer.profileId ?? ""),
progress: progress.get(`${viewer.profileId}:${video?.id}`) ?? null,
video: viewer.video,
audio: viewer.audio,
})),
entries: video?.entries ?? [],
show: video?.show ?? null,
};
});
},
{
detail: {
description: "List currently running streams",
},
headers: t.Object({
"accept-language": AcceptLanguage({ autoFallback: true }),
}),
response: {
200: t.Array(RunningStream),
422: KError,
502: KError,
},
},
);
+24
View File
@@ -0,0 +1,24 @@
import { TypeCompiler } from "@sinclair/typebox/compiler";
import { t } from "elysia";
import { KError } from "./error";
export const User = t.Object({
id: t.String({ format: "uuid" }),
username: t.String(),
email: t.String({ format: "email" }),
createdDate: t.Date(),
lastSeen: t.Date(),
claims: t.Record(t.String(), t.Any()),
oidc: t.Record(
t.String(),
t.Object({
id: t.String(),
username: t.String(),
profileUrl: t.Nullable(t.String({ format: "url" })),
}),
),
});
export type User = typeof User.static;
export const UserC = TypeCompiler.Compile(t.Union([User, KError]));
+1 -1
View File
@@ -25,7 +25,7 @@ type User struct {
// List of custom claims JWT created via get /jwt will have
Claims jwt.MapClaims `json:"claims" example:"isAdmin: true"`
// List of other login method available for this user. Access tokens wont be returned here.
Oidc map[string]OidcHandle `json:"oidc,omitempty"`
Oidc map[string]OidcHandle `json:"oidc"`
}
type OidcHandle struct {
+16
View File
@@ -388,6 +388,22 @@
"progress-pending": "{{count}} pending",
"progress-failed": "{{count}} failed"
},
"streams": {
"title": "Streams",
"subtitle": "Currently playing videos and active transcodes.",
"empty": "No stream is running right now.",
"guest": "Guest",
"viewers": "Viewers",
"noActiveViewer": "No active viewer",
"watching": "Watching {{quality}}",
"none": "None",
"runningVideoTranscodes": "Video transcodes",
"runningAudioTranscodes": "Audio transcodes",
"progress": {
"available": "Available",
"transcoding": "Transcoding"
}
},
"add": {
"title": "Add to library",
"searchPlaceholder": "Search for a movie or series...",
@@ -72,6 +72,12 @@ export default function AdminTabsLayout() {
tabBarLabel: t("admin.unmatched.label"),
}}
/>
<TopTabs.Screen
name="streams"
options={{
tabBarLabel: t("admin.streams.title"),
}}
/>
<TopTabs.Screen
name="users"
options={{
@@ -0,0 +1,5 @@
import { AdminStreamsPage } from "~/ui/admin/streams";
export { ErrorBoundary } from "~/ui/error-boundary";
export default AdminStreamsPage;
+6 -10
View File
@@ -2,14 +2,10 @@ import { View, type ViewProps } from "react-native";
import { cn } from "~/utils";
export const Container = ({ className, ...props }: ViewProps) => {
return (
<View
className={cn(
"flex w-full self-center px-4",
"sm:w-xl md:w-3xl lg:w-5xl xl:w-7xl",
className,
)}
{...props}
/>
);
return <View className={cn(Container.className, className)} {...props} />;
};
Container.className = cn(
"flex w-full self-center px-4",
"sm:w-xl md:w-3xl lg:w-5xl xl:w-7xl",
);
+1
View File
@@ -1,6 +1,7 @@
declare module "*.svg" {
import type React from "react";
import type { SvgProps } from "react-native-svg";
const content: React.FC<SvgProps>;
export default content;
}
+1
View File
@@ -1,3 +1,4 @@
export * from "./remap";
export * from "./streams";
export * from "./users";
export * from "./videos-modal";
+426
View File
@@ -0,0 +1,426 @@
import PlayArrow from "@material-symbols/svg-400/rounded/play_arrow-fill.svg";
import { useTranslation } from "react-i18next";
import { FlatList, View } from "react-native";
import { z } from "zod/v4";
import { entryDisplayNumber } from "~/components/entries";
import {
Episode,
type KImage,
MovieEntry,
Show,
Special,
User,
} from "~/models";
import {
Avatar,
Container,
H2,
Heading,
HR,
IconButton,
Image,
Link,
P,
Skeleton,
SubP,
useBreakpointValue,
} from "~/primitives";
import { type QueryIdentifier, useFetch } from "~/query";
import { cn, uniq } from "~/utils";
import { EmptyView } from "../empty-view";
import { toTimerString } from "../player/controls/progress";
const Track = z.object({
index: z.number(),
quality: z.string(),
heads: z.array(
z.object({
start: z.number(),
end: z.number(),
startHead: z.number(),
endHead: z.number(),
isRunning: z.boolean(),
}),
),
});
type Track = z.infer<typeof Track>;
const ViewerTrack = z.object({
index: z.number(),
quality: z.string(),
head: z.number(),
});
type ViewerTrack = z.infer<typeof ViewerTrack>;
const Stream = z.object({
id: z.string(),
path: z.string(),
duration: z.number(),
show: Show.nullable(),
entries: z.array(
z.discriminatedUnion("kind", [
Episode.omit({ progress: true, videos: true }),
MovieEntry.omit({ progress: true, videos: true }),
Special.omit({ progress: true, videos: true }),
]),
),
viewers: z.array(
z.object({
user: User.nullable(),
progress: z.number().nullable(),
video: ViewerTrack.nullable(),
audio: ViewerTrack.nullable(),
}),
),
videos: z.array(Track),
audios: z.array(Track),
});
type Stream = z.infer<typeof Stream>;
const StreamViewer = ({
username,
logo,
progress,
duration,
video,
audio,
}: {
username: string;
logo?: string;
progress: number | null;
duration: number;
video: ViewerTrack | null;
audio: ViewerTrack | null;
}) => {
const { t } = useTranslation();
return (
<Link
className={cn(
"flex-row items-center gap-2 rounded-4xl p-1",
"hover:bg-gray-400/50 focus-visible:bg-gray-400/50",
)}
href={`/profiles/${username}`}
>
<Avatar src={logo} placeholder={username} className="h-7 w-7" />
<View className="min-w-0 flex-1">
<P numberOfLines={1} className="font-semibold text-sm">
{username}
</P>
<SubP numberOfLines={1}>
{t("admin.streams.watching", {
quality: uniq([video?.quality, audio?.quality])
.filter((x) => x)
.join(" / "),
})}
</SubP>
</View>
{progress && (
<SubP>
{`${toTimerString(progress, duration)}/${toTimerString(duration)}`}
</SubP>
)}
</Link>
);
};
const StreamProgressBar = ({
index,
quality,
duration,
heads,
viewers,
}: {
index: number;
quality: string;
duration: number;
heads: Track["heads"];
viewers: { id: string; username: string; logo?: string; progress: number }[];
}) => {
return (
<View className="gap-2">
<SubP className="font-semibold">
#{index} {quality}
</SubP>
<View className="relative h-6 rounded bg-slate-800">
{heads.map((head, headIndex) => (
<View
key={`${headIndex}-${head.start}-${head.end}`}
className={cn(
"absolute inset-y-0",
head.isRunning ? "bg-amber-500/70" : "bg-emerald-500/60",
)}
style={{
left: `${(head.start / duration) * 100}%`,
width: `${Math.max(((head.end - head.start) / duration) * 100, 0.75)}%`,
}}
/>
))}
{viewers.map((viewer) => (
<View
key={viewer.id}
className="absolute -top-2"
style={{ left: `${(viewer.progress / duration) * 100}%` }}
>
<Avatar
src={viewer.logo}
placeholder={viewer.username}
className="h-5 w-5 -translate-x-1/2 ring-1 ring-slate-950"
/>
</View>
))}
</View>
</View>
);
};
const StreamCard = ({
id,
path,
name,
thumbnail,
duration,
viewers,
videos,
audios,
}: {
id: string;
path: string;
name: string | null;
thumbnail: KImage | null;
duration: number;
viewers: Stream["viewers"];
videos: Stream["videos"];
audios: Stream["videos"];
}) => {
const { t } = useTranslation();
return (
<View
className={cn(
"group rounded-md bg-card p-4 outline-0",
"ring-accent hover:ring-3 focus-visible:ring-3",
)}
>
<Image
src={thumbnail}
quality="low"
className="mb-3 aspect-video w-full rounded"
/>
<View className="mb-3 flex-row items-center gap-2">
<IconButton
as={Link}
icon={PlayArrow}
href={`/watch/${id}`}
iconClassName="fill-accent dark:fill-accent"
/>
<Heading className="flex-1 font-semibold">{name}</Heading>
</View>
<P numberOfLines={2} className="wrap-anywhere mb-3 text-sm">
{path}
</P>
<HR />
<View className="mt-3 gap-2">
<SubP className="font-semibold uppercase">
{t("admin.streams.viewers")}
</SubP>
{viewers.length === 0 ? (
<SubP>{t("admin.streams.noActiveViewer")}</SubP>
) : (
viewers.map((x, i) => (
<StreamViewer
key={x.user?.id ?? i}
username={x.user?.username ?? t("admin.streams.guest")}
logo={x.user?.logo}
progress={x.progress}
duration={duration}
video={x.video}
audio={x.audio}
/>
))
)}
</View>
<HR className="my-3" />
<View className="gap-2">
<SubP className="font-semibold uppercase">
{t("admin.streams.runningVideoTranscodes")}
</SubP>
<View className="gap-3">
{videos.length === 0 ? (
<SubP>{t("admin.streams.none")}</SubP>
) : (
videos.map((video) => (
<StreamProgressBar
key={`${video.index}-${video.quality}`}
index={video.index}
quality={video.quality}
duration={duration}
heads={video.heads}
viewers={viewers
.filter(
(x) =>
x.progress &&
x.video?.quality === video.quality &&
x.video?.index === video.index,
)
.map((x, i) => ({
id: x.user?.id ?? i.toString(),
username: x.user?.username ?? t("admin.streams.guest"),
logo: x.user?.logo,
progress: x.progress!,
}))}
/>
))
)}
</View>
<SubP className="font-semibold uppercase">
{t("admin.streams.runningAudioTranscodes")}
</SubP>
<View className="gap-3">
{audios.length === 0 ? (
<SubP>{t("admin.streams.none")}</SubP>
) : (
audios.map((audio) => (
<StreamProgressBar
key={`${audio.index}-${audio.quality}`}
index={audio.index}
quality={audio.quality}
duration={duration}
heads={audio.heads}
viewers={viewers
.filter(
(x) =>
x.progress &&
x.audio?.quality === audio.quality &&
x.audio?.index === audio.index,
)
.map((x, i) => ({
id: x.user?.id ?? i.toString(),
username: x.user?.username ?? t("admin.streams.guest"),
logo: x.user?.logo,
progress: x.progress!,
}))}
/>
))
)}
</View>
<View className="mb-1 flex-row items-center gap-3">
<View className="flex-row items-center gap-1">
<View className="h-2 w-2 rounded-sm bg-emerald-500/60" />
<SubP>{t("admin.streams.progress.available")}</SubP>
</View>
<View className="flex-row items-center gap-1">
<View className="h-2 w-2 rounded-sm bg-amber-500/70" />
<SubP>{t("admin.streams.progress.transcoding")}</SubP>
</View>
</View>
</View>
</View>
);
};
StreamCard.Loader = () => {
return (
<View className="rounded-md border border-slate-700 bg-slate-900/40 p-4">
<Skeleton className="mb-2 h-5 w-3/4" />
<Skeleton className="mb-2 h-4 w-2/3" />
<Skeleton className="mb-3 h-9" />
<HR />
<View className="mt-3 gap-2">
<Skeleton className="h-4 w-1/4" />
<View className="flex-row items-center gap-2">
<Avatar.Loader className="h-7 w-7" />
<Skeleton className="h-4 flex-1" />
</View>
</View>
</View>
);
};
export const AdminStreamsPage = () => {
const { t } = useTranslation();
const { data } = useFetch(AdminStreamsPage.query());
const columns = useBreakpointValue({ xs: 1, md: 2, xl: 3 });
if (!data) {
return (
<Container className="py-4">
<H2 className="mb-2">{t("admin.streams.title")}</H2>
<SubP className="mb-4">{t("admin.streams.subtitle")}</SubP>
<View className="flex-row flex-wrap">
{Array.from({ length: 6 }).map((_, index) => (
<View
key={index}
className={cn(
"p-1",
columns === 1 ? "w-full" : "w-1/2",
columns > 2 && "xl:w-1/3",
)}
>
<StreamCard.Loader />
</View>
))}
</View>
</Container>
);
}
if (data.length === 0) {
return (
<Container className="py-4">
<H2 className="mb-2">{t("admin.streams.title")}</H2>
<SubP className="mb-4">{t("admin.streams.subtitle")}</SubP>
<EmptyView message={t("admin.streams.empty")} />
</Container>
);
}
return (
<FlatList
key={columns}
data={data}
numColumns={columns}
ListHeaderComponent={
<View>
<H2 className="mb-2">{t("admin.streams.title")}</H2>
<SubP className="mb-4">{t("admin.streams.subtitle")}</SubP>
</View>
}
contentContainerClassName={Container.className}
keyExtractor={(item) => item.id}
renderItem={({ item }) => (
<View
className={cn(
"p-1",
columns === 1 && "w-full",
columns === 2 && "w-1/2",
columns === 3 && "xl:w-1/3",
)}
>
<StreamCard
id={item.id}
path={item.path}
thumbnail={
item.entries[0]?.thumbnail ?? item.show?.thumbnail ?? null
}
name={
(item.entries[0] && item.show?.kind === "serie"
? `${item.show.name} - ${entryDisplayNumber(item.entries[0])}`
: item.show?.name) ?? item.path
}
duration={item.duration}
viewers={item.viewers}
videos={item.videos}
audios={item.audios}
/>
</View>
)}
/>
);
};
AdminStreamsPage.query = (): QueryIdentifier<Stream[]> => ({
parser: z.array(Stream),
path: ["api", "videos", "streams"],
refetchInterval: 5000,
});
+6
View File
@@ -5,6 +5,7 @@ import Close from "@material-symbols/svg-400/rounded/close.svg";
import Login from "@material-symbols/svg-400/rounded/login.svg";
import Logout from "@material-symbols/svg-400/rounded/logout.svg";
import Person from "@material-symbols/svg-400/rounded/person-fill.svg";
import Play from "@material-symbols/svg-400/rounded/play_arrow-fill.svg";
import Search from "@material-symbols/svg-400/rounded/search-fill.svg";
import Settings from "@material-symbols/svg-400/rounded/settings.svg";
import { useIsFocused } from "@react-navigation/native";
@@ -71,6 +72,11 @@ export const NavbarLeft = () => {
icon={Search}
href="/admin/unmatched"
/>
<Menu.Item
label={t("admin.streams.title")}
icon={Play}
href="/admin/streams"
/>
<Menu.Item label="Users" icon={Admin} href="/admin/users" />
</Menu>
)}
+4
View File
@@ -81,6 +81,10 @@ export function shuffle<T>(array: T[]): T[] {
return array;
}
export function uniq<T>(a: T[]): T[] {
return uniqBy(a, (x) => x as string);
}
export function uniqBy<T>(a: T[], key: (val: T) => string | number): T[] {
const seen: Record<string, boolean> = {};
return a.filter((item) => {
+68 -4
View File
@@ -6,6 +6,7 @@ import (
"strconv"
"strings"
"github.com/golang-jwt/jwt/v5"
"github.com/labstack/echo/v5"
"github.com/zoriya/kyoo/transcoder/src"
)
@@ -17,6 +18,7 @@ type shandler struct {
func RegisterStreamHandlers(e *echo.Group, transcoder *src.Transcoder) {
h := shandler{transcoder}
e.GET("/streams", h.GetStreams)
e.GET("/:path/direct", DirectStream)
e.GET("/:path/direct/:identifier", DirectStream)
e.GET("/:path/master.m3u8", h.GetMaster)
@@ -61,12 +63,13 @@ func (h *shandler) GetMaster(c *echo.Context) error {
if err != nil {
return err
}
profileId, sessionId := getIdentity(c)
path, sha, err := getPath(c)
if err != nil {
return err
}
ret, err := h.transcoder.GetMaster(c.Request().Context(), path, client, sha)
ret, err := h.transcoder.GetMaster(c.Request().Context(), path, client, profileId, sessionId, sha)
if err != nil {
return err
}
@@ -97,12 +100,22 @@ func (h *shandler) GetVideoIndex(c *echo.Context) error {
if err != nil {
return err
}
profileId, sessionId := getIdentity(c)
path, sha, err := getPath(c)
if err != nil {
return err
}
ret, err := h.transcoder.GetVideoIndex(c.Request().Context(), path, uint32(video), quality, client, sha)
ret, err := h.transcoder.GetVideoIndex(
c.Request().Context(),
path,
uint32(video),
quality,
client,
profileId,
sessionId,
sha,
)
if err != nil {
return err
}
@@ -133,12 +146,22 @@ func (h *shandler) GetAudioIndex(c *echo.Context) error {
if err != nil {
return err
}
profileId, sessionId := getIdentity(c)
path, sha, err := getPath(c)
if err != nil {
return err
}
ret, err := h.transcoder.GetAudioIndex(c.Request().Context(), path, uint32(audio), quality, client, sha)
ret, err := h.transcoder.GetAudioIndex(
c.Request().Context(),
path,
uint32(audio),
quality,
client,
profileId,
sessionId,
sha,
)
if err != nil {
return err
}
@@ -171,6 +194,7 @@ func (h *shandler) GetVideoSegment(c *echo.Context) error {
if err != nil {
return err
}
profileId, sessionId := getIdentity(c)
path, sha, err := getPath(c)
if err != nil {
return err
@@ -183,6 +207,8 @@ func (h *shandler) GetVideoSegment(c *echo.Context) error {
quality,
segment,
client,
profileId,
sessionId,
sha,
)
if err != nil {
@@ -216,18 +242,33 @@ func (h *shandler) GetAudioSegment(c *echo.Context) error {
if err != nil {
return err
}
profileId, sessionId := getIdentity(c)
path, sha, err := getPath(c)
if err != nil {
return err
}
ret, err := h.transcoder.GetAudioSegment(c.Request().Context(), path, uint32(audio), quality, segment, client, sha)
ret, err := h.transcoder.GetAudioSegment(
c.Request().Context(),
path,
uint32(audio),
quality,
segment,
client,
profileId,
sessionId,
sha,
)
if err != nil {
return err
}
return c.File(strings.TrimLeft(ret, "/"))
}
func (h *shandler) GetStreams(c *echo.Context) error {
return c.JSON(http.StatusOK, h.transcoder.ListRunningStreams())
}
func getClientId(c *echo.Context) (string, error) {
key := c.QueryParam("clientId")
if key == "" {
@@ -239,6 +280,29 @@ func getClientId(c *echo.Context) (string, error) {
return key, nil
}
func getIdentity(c *echo.Context) (*string, *string) {
token, ok := c.Get("user").(*jwt.Token)
if !ok || token == nil {
return nil, nil
}
claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
return nil, nil
}
profileId := normalizeOptionalId(claims["sub"])
sessionId := normalizeOptionalId(claims["sid"])
return profileId, sessionId
}
func normalizeOptionalId(value any) *string {
id, ok := value.(string)
if !ok || id == "" || id == "00000000-0000-0000-0000-000000000000" {
return nil
}
return &id
}
func parseSegment(segment string) (int32, error) {
var ret int32
_, err := fmt.Sscanf(segment, "segment-%d.ts", &ret)
+42 -8
View File
@@ -7,13 +7,15 @@ import (
)
type ClientInfo struct {
client string
sha string
path string
video *VideoKey
audio *AudioKey
vhead int32
ahead int32
client string
profileId *string
sessionId *string
sha string
path string
video *VideoKey
audio *AudioKey
vhead int32
ahead int32
}
type Tracker struct {
@@ -25,6 +27,7 @@ type Tracker struct {
lastUsage map[string]time.Time
transcoder *Transcoder
deletedStream chan string
snapshotReq chan chan map[string]ClientInfo
}
func NewTracker(t *Transcoder) *Tracker {
@@ -33,6 +36,7 @@ func NewTracker(t *Transcoder) *Tracker {
visitDate: make(map[string]time.Time),
lastUsage: make(map[string]time.Time),
deletedStream: make(chan string),
snapshotReq: make(chan chan map[string]ClientInfo),
transcoder: t,
}
go ret.start()
@@ -59,6 +63,12 @@ func (t *Tracker) start() {
old, ok := t.clients[info.client]
// First fixup the info. Most routes ruturn partial infos
if ok && old.sha == info.sha {
if info.profileId == nil {
info.profileId = old.profileId
}
if info.sessionId == nil {
info.sessionId = old.sessionId
}
if info.video == nil {
info.video = old.video
}
@@ -98,6 +108,7 @@ func (t *Tracker) start() {
case <-timer:
timer = time.After(inactive_time)
// Purge old clients
stale := make([]ClientInfo, 0)
for client, date := range t.visitDate {
if time.Since(date) < inactive_time {
continue
@@ -106,7 +117,10 @@ func (t *Tracker) start() {
info := t.clients[client]
delete(t.clients, client)
delete(t.visitDate, client)
stale = append(stale, info)
}
for _, info := range stale {
if !t.KillStreamIfDead(info.sha, info.path) {
audio_cleanup := info.audio != nil && t.KillAudioIfDead(info.sha, info.path, *info.audio)
video_cleanup := info.video != nil && t.KillVideoIfDead(info.sha, info.path, *info.video)
@@ -117,6 +131,8 @@ func (t *Tracker) start() {
}
case path := <-t.deletedStream:
t.DestroyStreamIfOld(path)
case reply := <-t.snapshotReq:
reply <- t.cloneClients()
}
}
}
@@ -143,7 +159,11 @@ func (t *Tracker) KillStreamIfDead(sha string, path string) bool {
}
func (t *Tracker) DestroyStreamIfOld(sha string) {
if time.Since(t.lastUsage[sha]) < 4*time.Hour {
lastUsage, ok := t.lastUsage[sha]
if !ok {
return
}
if time.Since(lastUsage) < 4*time.Hour {
return
}
stream, ok := t.transcoder.streams.GetAndRemove(sha)
@@ -239,3 +259,17 @@ func (t *Tracker) killOrphanedeheads(stream *Stream, is_video bool) {
}
}
}
func (t *Tracker) SnapshotClients() map[string]ClientInfo {
ret := make(chan map[string]ClientInfo)
t.snapshotReq <- ret
return <-ret
}
func (t *Tracker) cloneClients() map[string]ClientInfo {
out := make(map[string]ClientInfo, len(t.clients))
for clientId, info := range t.clients {
out[clientId] = info
}
return out
}
+222
View File
@@ -0,0 +1,222 @@
package src
import (
"sort"
)
type StreamStatus struct {
Path string `json:"path"`
Sha string `json:"sha"`
Duration float64 `json:"duration"`
Videos []TranscodeStatus `json:"videos"`
Audios []TranscodeStatus `json:"audios"`
Viewers []ClientStatus `json:"viewers"`
}
type TranscodeStatus struct {
Index uint32 `json:"index"`
Quality string `json:"quality"`
Heads []HeadRange `json:"heads"`
}
type HeadRange struct {
Start float64 `json:"start"`
End float64 `json:"end"`
StartHead int32 `json:"startHead"`
EndHead int32 `json:"endHead"`
IsRunning bool `json:"isRunning"`
}
type ViewerTrack struct {
Index uint32 `json:"index"`
Quality string `json:"quality"`
Head int32 `json:"head"`
}
type ClientStatus struct {
ClientId string `json:"clientId"`
ProfileId *string `json:"profileId"`
SessionId *string `json:"sessionId"`
Video *ViewerTrack `json:"video"`
Audio *ViewerTrack `json:"audio"`
}
func (t *Transcoder) ListRunningStreams() []StreamStatus {
clients := t.tracker.SnapshotClients()
t.streams.lock.RLock()
streams := make(map[string]*FileStream, len(t.streams.data))
for sha, stream := range t.streams.data {
streams[sha] = stream
}
t.streams.lock.RUnlock()
clientBySha := make(map[string][]ClientInfo)
for _, client := range clients {
clientBySha[client.sha] = append(clientBySha[client.sha], client)
}
ret := make([]StreamStatus, 0, len(streams))
for sha, stream := range streams {
if stream == nil || stream.Info == nil {
continue
}
status := StreamStatus{
Path: stream.Info.Path,
Sha: sha,
Duration: stream.Info.Duration,
Videos: listVideoStatuses(stream),
Audios: listAudioStatuses(stream),
Viewers: listClientStatuses(stream, clientBySha[sha]),
}
if len(status.Videos) == 0 && len(status.Audios) == 0 && len(status.Viewers) == 0 {
continue
}
ret = append(ret, status)
}
sort.Slice(ret, func(i int, j int) bool {
return ret[i].Path < ret[j].Path
})
return ret
}
func listVideoStatuses(stream *FileStream) []TranscodeStatus {
stream.videos.lock.RLock()
ret := make([]TranscodeStatus, 0, len(stream.videos.data))
for key, video := range stream.videos.data {
if video == nil {
continue
}
ret = append(ret, TranscodeStatus{
Index: key.idx,
Quality: string(key.quality),
Heads: listHeadRanges(stream, &video.Stream, true, key.idx),
})
}
stream.videos.lock.RUnlock()
sort.Slice(ret, func(i int, j int) bool {
if ret[i].Index == ret[j].Index {
return ret[i].Quality < ret[j].Quality
}
return ret[i].Index < ret[j].Index
})
return ret
}
func listAudioStatuses(stream *FileStream) []TranscodeStatus {
stream.audios.lock.RLock()
ret := make([]TranscodeStatus, 0, len(stream.audios.data))
for key, audio := range stream.audios.data {
if audio == nil {
continue
}
ret = append(ret, TranscodeStatus{
Index: key.idx,
Quality: string(key.quality),
Heads: listHeadRanges(stream, &audio.Stream, false, key.idx),
})
}
stream.audios.lock.RUnlock()
sort.Slice(ret, func(i int, j int) bool {
if ret[i].Index == ret[j].Index {
return ret[i].Quality < ret[j].Quality
}
return ret[i].Index < ret[j].Index
})
return ret
}
func listClientStatuses(stream *FileStream, clients []ClientInfo) []ClientStatus {
ret := make([]ClientStatus, 0, len(clients))
for _, client := range clients {
var video *ViewerTrack
if client.video != nil {
video = &ViewerTrack{
Index: client.video.idx,
Quality: string(client.video.quality),
Head: client.vhead,
}
}
var audio *ViewerTrack
if client.audio != nil {
audio = &ViewerTrack{
Index: client.audio.idx,
Quality: string(client.audio.quality),
Head: client.ahead,
}
}
ret = append(ret, ClientStatus{
ClientId: client.client,
ProfileId: client.profileId,
SessionId: client.sessionId,
Video: video,
Audio: audio,
})
}
sort.Slice(ret, func(i int, j int) bool {
return ret[i].ClientId < ret[j].ClientId
})
return ret
}
func listHeadRanges(file *FileStream, stream *Stream, isVideo bool, index uint32) []HeadRange {
stream.lock.RLock()
defer stream.lock.RUnlock()
ret := make([]HeadRange, 0, len(stream.heads))
for _, head := range stream.heads {
if head == DeletedHead {
continue
}
end := stream.file.Info.Duration
length, _ := stream.keyframes.Length()
if head.end <= length {
end = stream.keyframes.Get(head.end)
}
ret = append(ret, HeadRange{
Start: stream.keyframes.Get(head.segment),
End: end,
StartHead: head.segment,
EndHead: head.end,
IsRunning: head.command != nil && head.command.ProcessState == nil,
})
}
for i := int32(0); i < int32(len(stream.segments)); i++ {
if !stream.isSegmentReady(i) {
continue
}
start := i
for i < int32(len(stream.segments)) && stream.isSegmentReady(i) {
i++
}
end := i
ret = append(ret, HeadRange{
Start: stream.keyframes.Get(start),
End: stream.keyframes.Get(end),
StartHead: start,
EndHead: end,
IsRunning: false,
})
i--
}
sort.Slice(ret, func(i int, j int) bool {
if ret[i].StartHead == ret[j].StartHead {
return ret[i].EndHead < ret[j].EndHead
}
return ret[i].StartHead < ret[j].StartHead
})
return ret
}
+52 -34
View File
@@ -50,20 +50,22 @@ func (t *Transcoder) getFileStream(ctx context.Context, path string, sha string)
return ret, nil
}
func (t *Transcoder) GetMaster(ctx context.Context, path string, client string, sha string) (string, error) {
func (t *Transcoder) GetMaster(ctx context.Context, path string, client string, profileId *string, sessionId *string, sha string) (string, error) {
ctx = context.WithoutCancel(ctx)
stream, err := t.getFileStream(ctx, path, sha)
if err != nil {
return "", err
}
t.clientChan <- ClientInfo{
client: client,
sha: sha,
path: path,
video: nil,
audio: nil,
vhead: -1,
ahead: -1,
client: client,
profileId: profileId,
sessionId: sessionId,
sha: sha,
path: path,
video: nil,
audio: nil,
vhead: -1,
ahead: -1,
}
return stream.GetMaster(ctx, client), nil
}
@@ -74,6 +76,8 @@ func (t *Transcoder) GetVideoIndex(
video uint32,
quality VideoQuality,
client string,
profileId *string,
sessionId *string,
sha string,
) (string, error) {
ctx = context.WithoutCancel(ctx)
@@ -82,13 +86,15 @@ func (t *Transcoder) GetVideoIndex(
return "", err
}
t.clientChan <- ClientInfo{
client: client,
sha: sha,
path: path,
video: &VideoKey{video, quality},
audio: nil,
vhead: -1,
ahead: -1,
client: client,
profileId: profileId,
sessionId: sessionId,
sha: sha,
path: path,
video: &VideoKey{video, quality},
audio: nil,
vhead: -1,
ahead: -1,
}
return stream.GetVideoIndex(ctx, video, quality, client)
}
@@ -99,6 +105,8 @@ func (t *Transcoder) GetAudioIndex(
audio uint32,
quality AudioQuality,
client string,
profileId *string,
sessionId *string,
sha string,
) (string, error) {
ctx = context.WithoutCancel(ctx)
@@ -107,12 +115,14 @@ func (t *Transcoder) GetAudioIndex(
return "", err
}
t.clientChan <- ClientInfo{
client: client,
sha: sha,
path: path,
audio: &AudioKey{audio, quality},
vhead: -1,
ahead: -1,
client: client,
profileId: profileId,
sessionId: sessionId,
sha: sha,
path: path,
audio: &AudioKey{audio, quality},
vhead: -1,
ahead: -1,
}
return stream.GetAudioIndex(ctx, audio, quality, client)
}
@@ -124,6 +134,8 @@ func (t *Transcoder) GetVideoSegment(
quality VideoQuality,
segment int32,
client string,
profileId *string,
sessionId *string,
sha string,
) (string, error) {
ctx = context.WithoutCancel(ctx)
@@ -132,13 +144,15 @@ func (t *Transcoder) GetVideoSegment(
return "", err
}
t.clientChan <- ClientInfo{
client: client,
sha: sha,
path: path,
video: &VideoKey{video, quality},
vhead: segment,
audio: nil,
ahead: -1,
client: client,
profileId: profileId,
sessionId: sessionId,
sha: sha,
path: path,
video: &VideoKey{video, quality},
vhead: segment,
audio: nil,
ahead: -1,
}
return stream.GetVideoSegment(ctx, video, quality, segment)
}
@@ -150,6 +164,8 @@ func (t *Transcoder) GetAudioSegment(
quality AudioQuality,
segment int32,
client string,
profileId *string,
sessionId *string,
sha string,
) (string, error) {
ctx = context.WithoutCancel(ctx)
@@ -158,12 +174,14 @@ func (t *Transcoder) GetAudioSegment(
return "", err
}
t.clientChan <- ClientInfo{
client: client,
sha: sha,
path: path,
audio: &AudioKey{audio, quality},
ahead: segment,
vhead: -1,
client: client,
profileId: profileId,
sessionId: sessionId,
sha: sha,
path: path,
audio: &AudioKey{audio, quality},
ahead: segment,
vhead: -1,
}
return stream.GetAudioSegment(ctx, audio, quality, segment)
}