diff --git a/api/src/auth.ts b/api/src/auth.ts index e0cc3e6d..d2be7c8e 100644 --- a/api/src/auth.ts +++ b/api/src/auth.ts @@ -2,7 +2,7 @@ import { TypeCompiler } from "@sinclair/typebox/compiler"; import { Value } from "@sinclair/typebox/value"; import Elysia, { t } from "elysia"; import { createRemoteJWKSet, jwtVerify } from "jose"; -import { KError } from "./models/error"; +import { UserC } from "~/models/user"; import type { Prettify } from "./utils"; const jwtSecret = process.env.JWT_SECRET @@ -91,24 +91,6 @@ export const auth = new Elysia({ name: "auth" }) }) .as("scoped"); -const User = t.Object({ - id: t.String({ format: "uuid" }), - username: t.String(), - email: t.String({ format: "email" }), - createdDate: t.Date(), - lastSeen: t.Date(), - claims: t.Record(t.String(), t.Any()), - oidc: t.Record( - t.String(), - t.Object({ - id: t.String(), - username: t.String(), - profileUrl: t.Nullable(t.String({ format: "url" })), - }), - ), -}); -const UserC = TypeCompiler.Compile(t.Union([User, KError])); - export async function getUserInfo( id: string, headers: { authorization: string }, diff --git a/api/src/base.ts b/api/src/base.ts index d6db13a7..73215c9f 100644 --- a/api/src/base.ts +++ b/api/src/base.ts @@ -15,6 +15,7 @@ import { movies } from "./controllers/shows/movies"; import { series } from "./controllers/shows/series"; import { showsH } from "./controllers/shows/shows"; import { staffH } from "./controllers/staff"; +import { streamsH } from "./controllers/streams"; import { studiosH } from "./controllers/studios"; import { videosMetadata } from "./controllers/video-metadata"; import { videosReadH } from "./controllers/videos"; @@ -115,6 +116,7 @@ export const handlers = new Elysia({ prefix }) }, (app) => app + .use(streamsH) .use(showsH) .use(movies) .use(series) diff --git a/api/src/controllers/streams.ts b/api/src/controllers/streams.ts new file mode 100644 index 00000000..ad5ccc5d --- /dev/null +++ b/api/src/controllers/streams.ts @@ -0,0 +1,223 @@ +import { TypeCompiler } from "@sinclair/typebox/compiler"; +import { and, desc, eq, sql } from "drizzle-orm"; +import { Elysia, t } from "elysia"; +import { auth } from "~/auth"; +import { db } from "~/db"; +import { history, profiles, videos } from "~/db/schema"; +import { sqlarr } from "~/db/utils"; +import { Entry } from "~/models/entry"; +import { KError } from "~/models/error"; +import { Show } from "~/models/show"; +import { User } from "~/models/user"; +import { AcceptLanguage, processLanguages } from "~/models/utils"; +import { uniq } from "~/utils"; +import { getVideos } from "./videos"; + +const TranscodeStatus = t.Object({ + index: t.Integer(), + quality: t.String(), + heads: t.Array( + t.Object({ + start: t.Number(), + end: t.Number(), + startHead: t.Integer(), + endHead: t.Integer(), + isRunning: t.Boolean(), + }), + ), +}); + +const ViewerTrack = t.Object({ + index: t.Integer(), + quality: t.String(), + head: t.Integer(), +}); + +const TranscoderViewer = t.Object({ + clientId: t.String(), + profileId: t.Nullable(t.String({ format: "uuid" })), + sessionId: t.Nullable(t.String()), + video: t.Nullable(ViewerTrack), + audio: t.Nullable(ViewerTrack), +}); + +const TranscoderStream = t.Object({ + path: t.String(), + sha: t.String(), + duration: t.Number(), + videos: t.Array(TranscodeStatus), + audios: t.Array(TranscodeStatus), + viewers: t.Array(TranscoderViewer), +}); +type TranscoderStream = typeof TranscoderStream.static; +const TranscoderStreamListC = TypeCompiler.Compile(t.Array(TranscoderStream)); + +const UserPageC = TypeCompiler.Compile(t.Object({ items: t.Array(User) })); + +const RunningViewerTrack = t.Object({ + index: t.Integer(), + quality: t.String(), + head: t.Integer(), +}); + +const RunningStream = t.Object({ + id: t.String({ format: "uuid" }), + path: t.String(), + duration: t.Number(), + videos: t.Array(TranscodeStatus), + audios: t.Array(TranscodeStatus), + viewers: t.Array( + t.Object({ + user: t.Nullable(User), + progress: t.Nullable(t.Number()), + video: t.Nullable(RunningViewerTrack), + audio: t.Nullable(RunningViewerTrack), + }), + ), + entries: t.Array(t.Omit(Entry, ["videos", "progress"])), + show: t.Nullable(Show), +}); + +export const streamsH = new Elysia({ tags: ["videos"] }).use(auth).get( + "videos/streams", + // @ts-expect-error idk + async ({ + headers: { authorization, "accept-language": langs }, + jwt: { sub, settings }, + status, + }) => { + let streams: TranscoderStream[]; + + try { + const response = await fetch( + new URL( + "/video/streams", + process.env.TRANSCODER_SERVER ?? "http://transcoder:7666", + ), + { + headers: authorization ? { authorization } : undefined, + }, + ); + + if (!response.ok) { + return status(502, { + status: 502, + message: "Cannot fetch running streams from transcoder.", + details: await response.text(), + }); + } + + streams = TranscoderStreamListC.Decode(await response.json()); + } catch (e) { + return status(502, { + status: 502, + message: "Cannot reach transcoder service.", + details: e, + }); + } + + if (!streams.length) return []; + + const usersById = new Map(); + try { + const resp: Response = await fetch( + new URL("/auth/users", process.env.AUTH_SERVER ?? "http://auth:4568"), + { + headers: authorization ? { authorization } : undefined, + }, + ); + if (!resp.ok) { + return status(502, { + status: 502, + message: "Cannot fetch users from auth service.", + details: await resp.text(), + }); + } + + const { items } = UserPageC.Decode(await resp.json()); + + for (const user of items) { + usersById.set(user.id, user); + } + } catch (e) { + return status(502, { + status: 502, + message: "Cannot reach auth service.", + details: e, + }); + } + + const paths = streams.map((x) => x.path); + const items = await getVideos({ + limit: paths.length, + filter: eq(videos.path, sql`any(${sqlarr(paths)})`), + languages: processLanguages(langs), + preferOriginal: settings.preferOriginal, + relations: ["show"], + userId: sub, + }); + + const profileIds = uniq( + streams + .flatMap((x) => x.viewers.map((v) => v.profileId)) + .filter((x): x is string => !!x), + ); + const videoIds = items.map((x) => x.id); + const progress = new Map(); + if (profileIds.length > 0 && videoIds.length > 0) { + const progressRows = await db + .selectDistinctOn([profiles.id, videos.id], { + profileId: profiles.id, + videoId: videos.id, + time: history.time, + }) + .from(history) + .innerJoin(profiles, eq(history.profilePk, profiles.pk)) + .innerJoin(videos, eq(history.videoPk, videos.pk)) + .where( + and( + eq(profiles.id, sql`any(${sqlarr(profileIds)}::uuid[])`), + eq(videos.id, sql`any(${sqlarr(videoIds)}::uuid[])`), + ), + ) + .orderBy(profiles.id, videos.id, desc(history.playedDate)); + + for (const row of progressRows) { + progress.set(`${row.profileId}:${row.videoId}`, row.time); + } + } + + const videosByPath = new Map(items.map((x) => [x.path, x])); + return streams.map((stream) => { + const video = videosByPath.get(stream.path); + return { + id: video!.id, + path: stream.path, + duration: stream.duration, + videos: stream.videos, + audios: stream.audios, + viewers: stream.viewers.map((viewer) => ({ + user: usersById.get(viewer.profileId ?? ""), + progress: progress.get(`${viewer.profileId}:${video?.id}`) ?? null, + video: viewer.video, + audio: viewer.audio, + })), + entries: video?.entries ?? [], + show: video?.show ?? null, + }; + }); + }, + { + detail: { + description: "List currently running streams", + }, + headers: t.Object({ + "accept-language": AcceptLanguage({ autoFallback: true }), + }), + response: { + 200: t.Array(RunningStream), + 422: KError, + 502: KError, + }, + }, +); diff --git a/api/src/models/user.ts b/api/src/models/user.ts new file mode 100644 index 00000000..d8cb7d0e --- /dev/null +++ b/api/src/models/user.ts @@ -0,0 +1,24 @@ +import { TypeCompiler } from "@sinclair/typebox/compiler"; +import { t } from "elysia"; +import { KError } from "./error"; + +export const User = t.Object({ + id: t.String({ format: "uuid" }), + username: t.String(), + email: t.String({ format: "email" }), + createdDate: t.Date(), + lastSeen: t.Date(), + claims: t.Record(t.String(), t.Any()), + oidc: t.Record( + t.String(), + t.Object({ + id: t.String(), + username: t.String(), + profileUrl: t.Nullable(t.String({ format: "url" })), + }), + ), +}); + +export type User = typeof User.static; + +export const UserC = TypeCompiler.Compile(t.Union([User, KError])); diff --git a/auth/models/users.go b/auth/models/users.go index de704348..a047d214 100644 --- a/auth/models/users.go +++ b/auth/models/users.go @@ -25,7 +25,7 @@ type User struct { // List of custom claims JWT created via get /jwt will have Claims jwt.MapClaims `json:"claims" example:"isAdmin: true"` // List of other login method available for this user. Access tokens wont be returned here. - Oidc map[string]OidcHandle `json:"oidc,omitempty"` + Oidc map[string]OidcHandle `json:"oidc"` } type OidcHandle struct { diff --git a/front/public/translations/en.json b/front/public/translations/en.json index ba92ce8b..17585e9d 100644 --- a/front/public/translations/en.json +++ b/front/public/translations/en.json @@ -388,6 +388,22 @@ "progress-pending": "{{count}} pending", "progress-failed": "{{count}} failed" }, + "streams": { + "title": "Streams", + "subtitle": "Currently playing videos and active transcodes.", + "empty": "No stream is running right now.", + "guest": "Guest", + "viewers": "Viewers", + "noActiveViewer": "No active viewer", + "watching": "Watching {{quality}}", + "none": "None", + "runningVideoTranscodes": "Video transcodes", + "runningAudioTranscodes": "Audio transcodes", + "progress": { + "available": "Available", + "transcoding": "Transcoding" + } + }, "add": { "title": "Add to library", "searchPlaceholder": "Search for a movie or series...", diff --git a/front/src/app/(app)/(tabs)/admin/_layout.tsx b/front/src/app/(app)/(tabs)/admin/_layout.tsx index 7943079b..3bf2b4a0 100644 --- a/front/src/app/(app)/(tabs)/admin/_layout.tsx +++ b/front/src/app/(app)/(tabs)/admin/_layout.tsx @@ -72,6 +72,12 @@ export default function AdminTabsLayout() { tabBarLabel: t("admin.unmatched.label"), }} /> + { - return ( - - ); + return ; }; + +Container.className = cn( + "flex w-full self-center px-4", + "sm:w-xl md:w-3xl lg:w-5xl xl:w-7xl", +); diff --git a/front/src/primitives/svg.d.ts b/front/src/primitives/svg.d.ts index 6f73d5fc..500e371f 100644 --- a/front/src/primitives/svg.d.ts +++ b/front/src/primitives/svg.d.ts @@ -1,6 +1,7 @@ declare module "*.svg" { import type React from "react"; import type { SvgProps } from "react-native-svg"; + const content: React.FC; export default content; } diff --git a/front/src/ui/admin/index.tsx b/front/src/ui/admin/index.tsx index 0abc8f6f..abd65760 100644 --- a/front/src/ui/admin/index.tsx +++ b/front/src/ui/admin/index.tsx @@ -1,3 +1,4 @@ export * from "./remap"; +export * from "./streams"; export * from "./users"; export * from "./videos-modal"; diff --git a/front/src/ui/admin/streams.tsx b/front/src/ui/admin/streams.tsx new file mode 100644 index 00000000..4a5b718e --- /dev/null +++ b/front/src/ui/admin/streams.tsx @@ -0,0 +1,426 @@ +import PlayArrow from "@material-symbols/svg-400/rounded/play_arrow-fill.svg"; +import { useTranslation } from "react-i18next"; +import { FlatList, View } from "react-native"; +import { z } from "zod/v4"; +import { entryDisplayNumber } from "~/components/entries"; +import { + Episode, + type KImage, + MovieEntry, + Show, + Special, + User, +} from "~/models"; +import { + Avatar, + Container, + H2, + Heading, + HR, + IconButton, + Image, + Link, + P, + Skeleton, + SubP, + useBreakpointValue, +} from "~/primitives"; +import { type QueryIdentifier, useFetch } from "~/query"; +import { cn, uniq } from "~/utils"; +import { EmptyView } from "../empty-view"; +import { toTimerString } from "../player/controls/progress"; + +const Track = z.object({ + index: z.number(), + quality: z.string(), + heads: z.array( + z.object({ + start: z.number(), + end: z.number(), + startHead: z.number(), + endHead: z.number(), + isRunning: z.boolean(), + }), + ), +}); +type Track = z.infer; + +const ViewerTrack = z.object({ + index: z.number(), + quality: z.string(), + head: z.number(), +}); +type ViewerTrack = z.infer; + +const Stream = z.object({ + id: z.string(), + path: z.string(), + duration: z.number(), + show: Show.nullable(), + entries: z.array( + z.discriminatedUnion("kind", [ + Episode.omit({ progress: true, videos: true }), + MovieEntry.omit({ progress: true, videos: true }), + Special.omit({ progress: true, videos: true }), + ]), + ), + viewers: z.array( + z.object({ + user: User.nullable(), + progress: z.number().nullable(), + video: ViewerTrack.nullable(), + audio: ViewerTrack.nullable(), + }), + ), + videos: z.array(Track), + audios: z.array(Track), +}); +type Stream = z.infer; + +const StreamViewer = ({ + username, + logo, + progress, + duration, + video, + audio, +}: { + username: string; + logo?: string; + progress: number | null; + duration: number; + video: ViewerTrack | null; + audio: ViewerTrack | null; +}) => { + const { t } = useTranslation(); + + return ( + + + +

+ {username} +

+ + {t("admin.streams.watching", { + quality: uniq([video?.quality, audio?.quality]) + .filter((x) => x) + .join(" / "), + })} + +
+ {progress && ( + + {`${toTimerString(progress, duration)}/${toTimerString(duration)}`} + + )} + + ); +}; + +const StreamProgressBar = ({ + index, + quality, + duration, + heads, + viewers, +}: { + index: number; + quality: string; + duration: number; + heads: Track["heads"]; + viewers: { id: string; username: string; logo?: string; progress: number }[]; +}) => { + return ( + + + #{index} {quality} + + + {heads.map((head, headIndex) => ( + + ))} + {viewers.map((viewer) => ( + + + + ))} + + + ); +}; + +const StreamCard = ({ + id, + path, + name, + thumbnail, + duration, + viewers, + videos, + audios, +}: { + id: string; + path: string; + name: string | null; + thumbnail: KImage | null; + duration: number; + viewers: Stream["viewers"]; + videos: Stream["videos"]; + audios: Stream["videos"]; +}) => { + const { t } = useTranslation(); + + return ( + + + + + {name} + +

+ {path} +

+
+ + + {t("admin.streams.viewers")} + + {viewers.length === 0 ? ( + {t("admin.streams.noActiveViewer")} + ) : ( + viewers.map((x, i) => ( + + )) + )} + +
+ + + {t("admin.streams.runningVideoTranscodes")} + + + {videos.length === 0 ? ( + {t("admin.streams.none")} + ) : ( + videos.map((video) => ( + + x.progress && + x.video?.quality === video.quality && + x.video?.index === video.index, + ) + .map((x, i) => ({ + id: x.user?.id ?? i.toString(), + username: x.user?.username ?? t("admin.streams.guest"), + logo: x.user?.logo, + progress: x.progress!, + }))} + /> + )) + )} + + + {t("admin.streams.runningAudioTranscodes")} + + + {audios.length === 0 ? ( + {t("admin.streams.none")} + ) : ( + audios.map((audio) => ( + + x.progress && + x.audio?.quality === audio.quality && + x.audio?.index === audio.index, + ) + .map((x, i) => ({ + id: x.user?.id ?? i.toString(), + username: x.user?.username ?? t("admin.streams.guest"), + logo: x.user?.logo, + progress: x.progress!, + }))} + /> + )) + )} + + + + + {t("admin.streams.progress.available")} + + + + {t("admin.streams.progress.transcoding")} + + + + + ); +}; + +StreamCard.Loader = () => { + return ( + + + + +
+ + + + + + + +
+ ); +}; + +export const AdminStreamsPage = () => { + const { t } = useTranslation(); + const { data } = useFetch(AdminStreamsPage.query()); + const columns = useBreakpointValue({ xs: 1, md: 2, xl: 3 }); + + if (!data) { + return ( + +

{t("admin.streams.title")}

+ {t("admin.streams.subtitle")} + + {Array.from({ length: 6 }).map((_, index) => ( + 2 && "xl:w-1/3", + )} + > + + + ))} + +
+ ); + } + + if (data.length === 0) { + return ( + +

{t("admin.streams.title")}

+ {t("admin.streams.subtitle")} + +
+ ); + } + + return ( + +

{t("admin.streams.title")}

+ {t("admin.streams.subtitle")} +
+ } + contentContainerClassName={Container.className} + keyExtractor={(item) => item.id} + renderItem={({ item }) => ( + + + + )} + /> + ); +}; + +AdminStreamsPage.query = (): QueryIdentifier => ({ + parser: z.array(Stream), + path: ["api", "videos", "streams"], + refetchInterval: 5000, +}); diff --git a/front/src/ui/navbar.tsx b/front/src/ui/navbar.tsx index ec089f93..3ede1fde 100644 --- a/front/src/ui/navbar.tsx +++ b/front/src/ui/navbar.tsx @@ -5,6 +5,7 @@ import Close from "@material-symbols/svg-400/rounded/close.svg"; import Login from "@material-symbols/svg-400/rounded/login.svg"; import Logout from "@material-symbols/svg-400/rounded/logout.svg"; import Person from "@material-symbols/svg-400/rounded/person-fill.svg"; +import Play from "@material-symbols/svg-400/rounded/play_arrow-fill.svg"; import Search from "@material-symbols/svg-400/rounded/search-fill.svg"; import Settings from "@material-symbols/svg-400/rounded/settings.svg"; import { useIsFocused } from "@react-navigation/native"; @@ -71,6 +72,11 @@ export const NavbarLeft = () => { icon={Search} href="/admin/unmatched" /> + )} diff --git a/front/src/utils.ts b/front/src/utils.ts index 77d18d3d..1f13454e 100644 --- a/front/src/utils.ts +++ b/front/src/utils.ts @@ -81,6 +81,10 @@ export function shuffle(array: T[]): T[] { return array; } +export function uniq(a: T[]): T[] { + return uniqBy(a, (x) => x as string); +} + export function uniqBy(a: T[], key: (val: T) => string | number): T[] { const seen: Record = {}; return a.filter((item) => { diff --git a/transcoder/src/api/streams.go b/transcoder/src/api/streams.go index 16dbd115..33a1b5ee 100644 --- a/transcoder/src/api/streams.go +++ b/transcoder/src/api/streams.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" + "github.com/golang-jwt/jwt/v5" "github.com/labstack/echo/v5" "github.com/zoriya/kyoo/transcoder/src" ) @@ -17,6 +18,7 @@ type shandler struct { func RegisterStreamHandlers(e *echo.Group, transcoder *src.Transcoder) { h := shandler{transcoder} + e.GET("/streams", h.GetStreams) e.GET("/:path/direct", DirectStream) e.GET("/:path/direct/:identifier", DirectStream) e.GET("/:path/master.m3u8", h.GetMaster) @@ -61,12 +63,13 @@ func (h *shandler) GetMaster(c *echo.Context) error { if err != nil { return err } + profileId, sessionId := getIdentity(c) path, sha, err := getPath(c) if err != nil { return err } - ret, err := h.transcoder.GetMaster(c.Request().Context(), path, client, sha) + ret, err := h.transcoder.GetMaster(c.Request().Context(), path, client, profileId, sessionId, sha) if err != nil { return err } @@ -97,12 +100,22 @@ func (h *shandler) GetVideoIndex(c *echo.Context) error { if err != nil { return err } + profileId, sessionId := getIdentity(c) path, sha, err := getPath(c) if err != nil { return err } - ret, err := h.transcoder.GetVideoIndex(c.Request().Context(), path, uint32(video), quality, client, sha) + ret, err := h.transcoder.GetVideoIndex( + c.Request().Context(), + path, + uint32(video), + quality, + client, + profileId, + sessionId, + sha, + ) if err != nil { return err } @@ -133,12 +146,22 @@ func (h *shandler) GetAudioIndex(c *echo.Context) error { if err != nil { return err } + profileId, sessionId := getIdentity(c) path, sha, err := getPath(c) if err != nil { return err } - ret, err := h.transcoder.GetAudioIndex(c.Request().Context(), path, uint32(audio), quality, client, sha) + ret, err := h.transcoder.GetAudioIndex( + c.Request().Context(), + path, + uint32(audio), + quality, + client, + profileId, + sessionId, + sha, + ) if err != nil { return err } @@ -171,6 +194,7 @@ func (h *shandler) GetVideoSegment(c *echo.Context) error { if err != nil { return err } + profileId, sessionId := getIdentity(c) path, sha, err := getPath(c) if err != nil { return err @@ -183,6 +207,8 @@ func (h *shandler) GetVideoSegment(c *echo.Context) error { quality, segment, client, + profileId, + sessionId, sha, ) if err != nil { @@ -216,18 +242,33 @@ func (h *shandler) GetAudioSegment(c *echo.Context) error { if err != nil { return err } + profileId, sessionId := getIdentity(c) path, sha, err := getPath(c) if err != nil { return err } - ret, err := h.transcoder.GetAudioSegment(c.Request().Context(), path, uint32(audio), quality, segment, client, sha) + ret, err := h.transcoder.GetAudioSegment( + c.Request().Context(), + path, + uint32(audio), + quality, + segment, + client, + profileId, + sessionId, + sha, + ) if err != nil { return err } return c.File(strings.TrimLeft(ret, "/")) } +func (h *shandler) GetStreams(c *echo.Context) error { + return c.JSON(http.StatusOK, h.transcoder.ListRunningStreams()) +} + func getClientId(c *echo.Context) (string, error) { key := c.QueryParam("clientId") if key == "" { @@ -239,6 +280,29 @@ func getClientId(c *echo.Context) (string, error) { return key, nil } +func getIdentity(c *echo.Context) (*string, *string) { + token, ok := c.Get("user").(*jwt.Token) + if !ok || token == nil { + return nil, nil + } + claims, ok := token.Claims.(jwt.MapClaims) + if !ok { + return nil, nil + } + + profileId := normalizeOptionalId(claims["sub"]) + sessionId := normalizeOptionalId(claims["sid"]) + return profileId, sessionId +} + +func normalizeOptionalId(value any) *string { + id, ok := value.(string) + if !ok || id == "" || id == "00000000-0000-0000-0000-000000000000" { + return nil + } + return &id +} + func parseSegment(segment string) (int32, error) { var ret int32 _, err := fmt.Sscanf(segment, "segment-%d.ts", &ret) diff --git a/transcoder/src/tracker.go b/transcoder/src/tracker.go index a6009b3c..2b48ea94 100644 --- a/transcoder/src/tracker.go +++ b/transcoder/src/tracker.go @@ -7,13 +7,15 @@ import ( ) type ClientInfo struct { - client string - sha string - path string - video *VideoKey - audio *AudioKey - vhead int32 - ahead int32 + client string + profileId *string + sessionId *string + sha string + path string + video *VideoKey + audio *AudioKey + vhead int32 + ahead int32 } type Tracker struct { @@ -25,6 +27,7 @@ type Tracker struct { lastUsage map[string]time.Time transcoder *Transcoder deletedStream chan string + snapshotReq chan chan map[string]ClientInfo } func NewTracker(t *Transcoder) *Tracker { @@ -33,6 +36,7 @@ func NewTracker(t *Transcoder) *Tracker { visitDate: make(map[string]time.Time), lastUsage: make(map[string]time.Time), deletedStream: make(chan string), + snapshotReq: make(chan chan map[string]ClientInfo), transcoder: t, } go ret.start() @@ -59,6 +63,12 @@ func (t *Tracker) start() { old, ok := t.clients[info.client] // First fixup the info. Most routes ruturn partial infos if ok && old.sha == info.sha { + if info.profileId == nil { + info.profileId = old.profileId + } + if info.sessionId == nil { + info.sessionId = old.sessionId + } if info.video == nil { info.video = old.video } @@ -98,6 +108,7 @@ func (t *Tracker) start() { case <-timer: timer = time.After(inactive_time) // Purge old clients + stale := make([]ClientInfo, 0) for client, date := range t.visitDate { if time.Since(date) < inactive_time { continue @@ -106,7 +117,10 @@ func (t *Tracker) start() { info := t.clients[client] delete(t.clients, client) delete(t.visitDate, client) + stale = append(stale, info) + } + for _, info := range stale { if !t.KillStreamIfDead(info.sha, info.path) { audio_cleanup := info.audio != nil && t.KillAudioIfDead(info.sha, info.path, *info.audio) video_cleanup := info.video != nil && t.KillVideoIfDead(info.sha, info.path, *info.video) @@ -117,6 +131,8 @@ func (t *Tracker) start() { } case path := <-t.deletedStream: t.DestroyStreamIfOld(path) + case reply := <-t.snapshotReq: + reply <- t.cloneClients() } } } @@ -143,7 +159,11 @@ func (t *Tracker) KillStreamIfDead(sha string, path string) bool { } func (t *Tracker) DestroyStreamIfOld(sha string) { - if time.Since(t.lastUsage[sha]) < 4*time.Hour { + lastUsage, ok := t.lastUsage[sha] + if !ok { + return + } + if time.Since(lastUsage) < 4*time.Hour { return } stream, ok := t.transcoder.streams.GetAndRemove(sha) @@ -239,3 +259,17 @@ func (t *Tracker) killOrphanedeheads(stream *Stream, is_video bool) { } } } + +func (t *Tracker) SnapshotClients() map[string]ClientInfo { + ret := make(chan map[string]ClientInfo) + t.snapshotReq <- ret + return <-ret +} + +func (t *Tracker) cloneClients() map[string]ClientInfo { + out := make(map[string]ClientInfo, len(t.clients)) + for clientId, info := range t.clients { + out[clientId] = info + } + return out +} diff --git a/transcoder/src/transcode_status.go b/transcoder/src/transcode_status.go new file mode 100644 index 00000000..4d3b9f60 --- /dev/null +++ b/transcoder/src/transcode_status.go @@ -0,0 +1,222 @@ +package src + +import ( + "sort" +) + +type StreamStatus struct { + Path string `json:"path"` + Sha string `json:"sha"` + Duration float64 `json:"duration"` + Videos []TranscodeStatus `json:"videos"` + Audios []TranscodeStatus `json:"audios"` + Viewers []ClientStatus `json:"viewers"` +} + +type TranscodeStatus struct { + Index uint32 `json:"index"` + Quality string `json:"quality"` + Heads []HeadRange `json:"heads"` +} + +type HeadRange struct { + Start float64 `json:"start"` + End float64 `json:"end"` + StartHead int32 `json:"startHead"` + EndHead int32 `json:"endHead"` + IsRunning bool `json:"isRunning"` +} + +type ViewerTrack struct { + Index uint32 `json:"index"` + Quality string `json:"quality"` + Head int32 `json:"head"` +} + +type ClientStatus struct { + ClientId string `json:"clientId"` + ProfileId *string `json:"profileId"` + SessionId *string `json:"sessionId"` + Video *ViewerTrack `json:"video"` + Audio *ViewerTrack `json:"audio"` +} + +func (t *Transcoder) ListRunningStreams() []StreamStatus { + clients := t.tracker.SnapshotClients() + + t.streams.lock.RLock() + streams := make(map[string]*FileStream, len(t.streams.data)) + for sha, stream := range t.streams.data { + streams[sha] = stream + } + t.streams.lock.RUnlock() + + clientBySha := make(map[string][]ClientInfo) + for _, client := range clients { + clientBySha[client.sha] = append(clientBySha[client.sha], client) + } + + ret := make([]StreamStatus, 0, len(streams)) + for sha, stream := range streams { + if stream == nil || stream.Info == nil { + continue + } + + status := StreamStatus{ + Path: stream.Info.Path, + Sha: sha, + Duration: stream.Info.Duration, + Videos: listVideoStatuses(stream), + Audios: listAudioStatuses(stream), + Viewers: listClientStatuses(stream, clientBySha[sha]), + } + if len(status.Videos) == 0 && len(status.Audios) == 0 && len(status.Viewers) == 0 { + continue + } + ret = append(ret, status) + } + + sort.Slice(ret, func(i int, j int) bool { + return ret[i].Path < ret[j].Path + }) + return ret +} + +func listVideoStatuses(stream *FileStream) []TranscodeStatus { + stream.videos.lock.RLock() + ret := make([]TranscodeStatus, 0, len(stream.videos.data)) + for key, video := range stream.videos.data { + if video == nil { + continue + } + ret = append(ret, TranscodeStatus{ + Index: key.idx, + Quality: string(key.quality), + Heads: listHeadRanges(stream, &video.Stream, true, key.idx), + }) + } + stream.videos.lock.RUnlock() + + sort.Slice(ret, func(i int, j int) bool { + if ret[i].Index == ret[j].Index { + return ret[i].Quality < ret[j].Quality + } + return ret[i].Index < ret[j].Index + }) + return ret +} + +func listAudioStatuses(stream *FileStream) []TranscodeStatus { + stream.audios.lock.RLock() + ret := make([]TranscodeStatus, 0, len(stream.audios.data)) + for key, audio := range stream.audios.data { + if audio == nil { + continue + } + ret = append(ret, TranscodeStatus{ + Index: key.idx, + Quality: string(key.quality), + Heads: listHeadRanges(stream, &audio.Stream, false, key.idx), + }) + } + stream.audios.lock.RUnlock() + + sort.Slice(ret, func(i int, j int) bool { + if ret[i].Index == ret[j].Index { + return ret[i].Quality < ret[j].Quality + } + return ret[i].Index < ret[j].Index + }) + return ret +} + +func listClientStatuses(stream *FileStream, clients []ClientInfo) []ClientStatus { + ret := make([]ClientStatus, 0, len(clients)) + for _, client := range clients { + var video *ViewerTrack + if client.video != nil { + video = &ViewerTrack{ + Index: client.video.idx, + Quality: string(client.video.quality), + Head: client.vhead, + } + } + + var audio *ViewerTrack + if client.audio != nil { + audio = &ViewerTrack{ + Index: client.audio.idx, + Quality: string(client.audio.quality), + Head: client.ahead, + } + } + + ret = append(ret, ClientStatus{ + ClientId: client.client, + ProfileId: client.profileId, + SessionId: client.sessionId, + Video: video, + Audio: audio, + }) + } + + sort.Slice(ret, func(i int, j int) bool { + return ret[i].ClientId < ret[j].ClientId + }) + return ret +} + +func listHeadRanges(file *FileStream, stream *Stream, isVideo bool, index uint32) []HeadRange { + stream.lock.RLock() + defer stream.lock.RUnlock() + + ret := make([]HeadRange, 0, len(stream.heads)) + for _, head := range stream.heads { + if head == DeletedHead { + continue + } + + end := stream.file.Info.Duration + length, _ := stream.keyframes.Length() + if head.end <= length { + end = stream.keyframes.Get(head.end) + } + + ret = append(ret, HeadRange{ + Start: stream.keyframes.Get(head.segment), + End: end, + StartHead: head.segment, + EndHead: head.end, + IsRunning: head.command != nil && head.command.ProcessState == nil, + }) + } + + for i := int32(0); i < int32(len(stream.segments)); i++ { + if !stream.isSegmentReady(i) { + continue + } + + start := i + for i < int32(len(stream.segments)) && stream.isSegmentReady(i) { + i++ + } + end := i + + ret = append(ret, HeadRange{ + Start: stream.keyframes.Get(start), + End: stream.keyframes.Get(end), + StartHead: start, + EndHead: end, + IsRunning: false, + }) + i-- + } + + sort.Slice(ret, func(i int, j int) bool { + if ret[i].StartHead == ret[j].StartHead { + return ret[i].EndHead < ret[j].EndHead + } + return ret[i].StartHead < ret[j].StartHead + }) + return ret +} diff --git a/transcoder/src/transcoder.go b/transcoder/src/transcoder.go index 4a0e44e0..034c3b1b 100644 --- a/transcoder/src/transcoder.go +++ b/transcoder/src/transcoder.go @@ -50,20 +50,22 @@ func (t *Transcoder) getFileStream(ctx context.Context, path string, sha string) return ret, nil } -func (t *Transcoder) GetMaster(ctx context.Context, path string, client string, sha string) (string, error) { +func (t *Transcoder) GetMaster(ctx context.Context, path string, client string, profileId *string, sessionId *string, sha string) (string, error) { ctx = context.WithoutCancel(ctx) stream, err := t.getFileStream(ctx, path, sha) if err != nil { return "", err } t.clientChan <- ClientInfo{ - client: client, - sha: sha, - path: path, - video: nil, - audio: nil, - vhead: -1, - ahead: -1, + client: client, + profileId: profileId, + sessionId: sessionId, + sha: sha, + path: path, + video: nil, + audio: nil, + vhead: -1, + ahead: -1, } return stream.GetMaster(ctx, client), nil } @@ -74,6 +76,8 @@ func (t *Transcoder) GetVideoIndex( video uint32, quality VideoQuality, client string, + profileId *string, + sessionId *string, sha string, ) (string, error) { ctx = context.WithoutCancel(ctx) @@ -82,13 +86,15 @@ func (t *Transcoder) GetVideoIndex( return "", err } t.clientChan <- ClientInfo{ - client: client, - sha: sha, - path: path, - video: &VideoKey{video, quality}, - audio: nil, - vhead: -1, - ahead: -1, + client: client, + profileId: profileId, + sessionId: sessionId, + sha: sha, + path: path, + video: &VideoKey{video, quality}, + audio: nil, + vhead: -1, + ahead: -1, } return stream.GetVideoIndex(ctx, video, quality, client) } @@ -99,6 +105,8 @@ func (t *Transcoder) GetAudioIndex( audio uint32, quality AudioQuality, client string, + profileId *string, + sessionId *string, sha string, ) (string, error) { ctx = context.WithoutCancel(ctx) @@ -107,12 +115,14 @@ func (t *Transcoder) GetAudioIndex( return "", err } t.clientChan <- ClientInfo{ - client: client, - sha: sha, - path: path, - audio: &AudioKey{audio, quality}, - vhead: -1, - ahead: -1, + client: client, + profileId: profileId, + sessionId: sessionId, + sha: sha, + path: path, + audio: &AudioKey{audio, quality}, + vhead: -1, + ahead: -1, } return stream.GetAudioIndex(ctx, audio, quality, client) } @@ -124,6 +134,8 @@ func (t *Transcoder) GetVideoSegment( quality VideoQuality, segment int32, client string, + profileId *string, + sessionId *string, sha string, ) (string, error) { ctx = context.WithoutCancel(ctx) @@ -132,13 +144,15 @@ func (t *Transcoder) GetVideoSegment( return "", err } t.clientChan <- ClientInfo{ - client: client, - sha: sha, - path: path, - video: &VideoKey{video, quality}, - vhead: segment, - audio: nil, - ahead: -1, + client: client, + profileId: profileId, + sessionId: sessionId, + sha: sha, + path: path, + video: &VideoKey{video, quality}, + vhead: segment, + audio: nil, + ahead: -1, } return stream.GetVideoSegment(ctx, video, quality, segment) } @@ -150,6 +164,8 @@ func (t *Transcoder) GetAudioSegment( quality AudioQuality, segment int32, client string, + profileId *string, + sessionId *string, sha string, ) (string, error) { ctx = context.WithoutCancel(ctx) @@ -158,12 +174,14 @@ func (t *Transcoder) GetAudioSegment( return "", err } t.clientChan <- ClientInfo{ - client: client, - sha: sha, - path: path, - audio: &AudioKey{audio, quality}, - ahead: segment, - vhead: -1, + client: client, + profileId: profileId, + sessionId: sessionId, + sha: sha, + path: path, + audio: &AudioKey{audio, quality}, + ahead: segment, + vhead: -1, } return stream.GetAudioSegment(ctx, audio, quality, segment) }