Update tracker

This commit is contained in:
Zoe Roux 2024-08-02 21:05:29 +02:00
parent e3fdf0af45
commit 59010797a8
4 changed files with 99 additions and 69 deletions

View File

@ -55,8 +55,12 @@ func (h *Handler) GetMaster(c echo.Context) error {
// This route can take a few seconds to respond since it will way for at least one segment to be
// available.
//
// Path: /:path/:quality/index.m3u8
// Path: /:path/:video/:quality/index.m3u8
func (h *Handler) GetVideoIndex(c echo.Context) error {
video, err := strconv.ParseInt(c.Param("video"), 10, 32)
if err != nil {
return err
}
quality, err := src.QualityFromString(c.Param("quality"))
if err != nil {
return err
@ -70,7 +74,7 @@ func (h *Handler) GetVideoIndex(c echo.Context) error {
return err
}
ret, err := h.transcoder.GetVideoIndex(path, quality, client, sha)
ret, err := h.transcoder.GetVideoIndex(path, int32(video), quality, client, sha)
if err != nil {
return err
}
@ -109,8 +113,12 @@ func (h *Handler) GetAudioIndex(c echo.Context) error {
//
// Retrieve a chunk of a transmuxed video.
//
// Path: /:path/:quality/segments-:chunk.ts
// Path: /:path/:video/:quality/segments-:chunk.ts
func (h *Handler) GetVideoSegment(c echo.Context) error {
video, err := strconv.ParseInt(c.Param("video"), 10, 32)
if err != nil {
return err
}
quality, err := src.QualityFromString(c.Param("quality"))
if err != nil {
return err
@ -128,7 +136,14 @@ func (h *Handler) GetVideoSegment(c echo.Context) error {
return err
}
ret, err := h.transcoder.GetVideoSegment(path, quality, segment, client, sha)
ret, err := h.transcoder.GetVideoSegment(
path,
int32(video),
quality,
segment,
client,
sha,
)
if err != nil {
return err
}
@ -285,7 +300,7 @@ func main() {
e.GET("/:path/direct", DirectStream)
e.GET("/:path/master.m3u8", h.GetMaster)
e.GET("/:path/:quality/index.m3u8", h.GetVideoIndex)
e.GET("/:path/:video/:quality/index.m3u8", h.GetVideoIndex)
e.GET("/:path/audio/:audio/index.m3u8", h.GetAudioIndex)
e.GET("/:path/:quality/:chunk", h.GetVideoSegment)
e.GET("/:path/audio/:audio/:chunk", h.GetAudioSegment)

View File

@ -121,7 +121,7 @@ func toSegmentStr(segments []float64) string {
func (ts *Stream) run(start int32) error {
// Start the transcode up to the 100th segment (or less)
length, is_done := ts.file.Keyframes.Length()
length, is_done := ts.keyframes.Length()
end := min(start+100, length)
// if keyframes analysys is not finished, always have a 1-segment padding
// for the extra segment needed for precise split (look comment before -to flag)
@ -151,7 +151,7 @@ func (ts *Stream) run(start int32) error {
log.Printf(
"Starting transcode %d for %s (from %d to %d out of %d segments)",
encoder_id,
ts.file.Path,
ts.file.Info.Path,
start,
end,
length,
@ -169,7 +169,7 @@ func (ts *Stream) run(start int32) error {
// the previous segment is played another time. the -segment_times is way more precise so it does not do the same with this one
start_segment = start - 1
if ts.handle.getFlags()&AudioF != 0 {
start_ref = ts.file.Keyframes.Get(start_segment)
start_ref = ts.keyframes.Get(start_segment)
} else {
// the param for the -ss takes the keyframe before the specificed time
// (if the specified time is a keyframe, it either takes that keyframe or the one before)
@ -178,9 +178,9 @@ func (ts *Stream) run(start int32) error {
// this can't be used with audio since we need to have context before the start-time
// without this context, the cut loses a bit of audio (audio gap of ~100ms)
if start_segment+1 == length {
start_ref = (ts.file.Keyframes.Get(start_segment) + float64(ts.file.Info.Duration)) / 2
start_ref = (ts.keyframes.Get(start_segment) + float64(ts.file.Info.Duration)) / 2
} else {
start_ref = (ts.file.Keyframes.Get(start_segment) + ts.file.Keyframes.Get(start_segment+1)) / 2
start_ref = (ts.keyframes.Get(start_segment) + ts.keyframes.Get(start_segment+1)) / 2
}
}
}
@ -188,7 +188,7 @@ func (ts *Stream) run(start int32) error {
if end == length {
end_padding = 0
}
segments := ts.file.Keyframes.Slice(start_segment+1, end+end_padding)
segments := ts.keyframes.Slice(start_segment+1, end+end_padding)
if len(segments) == 0 {
// we can't leave that empty else ffmpeg errors out.
segments = []float64{9999999}
@ -222,17 +222,17 @@ func (ts *Stream) run(start int32) error {
if end+1 < length {
// sometimes, the duration is shorter than expected (only during transcode it seems)
// always include more and use the -f segment to split the file where we want
end_ref := ts.file.Keyframes.Get(end + 1)
end_ref := ts.keyframes.Get(end + 1)
// it seems that the -to is confused when -ss seek before the given time (because it searches for a keyframe)
// add back the time that would be lost otherwise
// this only appens when -to is before -i but having -to after -i gave a bug (not sure, don't remember)
end_ref += start_ref - ts.file.Keyframes.Get(start_segment)
end_ref += start_ref - ts.keyframes.Get(start_segment)
args = append(args,
"-to", fmt.Sprintf("%.6f", end_ref),
)
}
args = append(args,
"-i", ts.file.Path,
"-i", ts.file.Info.Path,
// this makes behaviors consistent between soft and hardware decodes.
// this also means that after a -ss 50, the output video will start at 50s
"-start_at_zero",
@ -258,7 +258,7 @@ func (ts *Stream) run(start int32) error {
// segment_times want durations, not timestamps so we must substract the -ss param
// since we give a greater value to -ss to prevent wrong seeks but -segment_times
// needs precise segments, we use the keyframe we want to seek to as a reference.
return seg - ts.file.Keyframes.Get(start_segment)
return seg - ts.keyframes.Get(start_segment)
})),
"-segment_list_type", "flat",
"-segment_list", "pipe:1",
@ -361,16 +361,16 @@ func (ts *Stream) GetIndex() (string, error) {
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-INDEPENDENT-SEGMENTS
`
length, is_done := ts.file.Keyframes.Length()
length, is_done := ts.keyframes.Length()
for segment := int32(0); segment < length-1; segment++ {
index += fmt.Sprintf("#EXTINF:%.6f\n", ts.file.Keyframes.Get(segment+1)-ts.file.Keyframes.Get(segment))
index += fmt.Sprintf("#EXTINF:%.6f\n", ts.keyframes.Get(segment+1)-ts.keyframes.Get(segment))
index += fmt.Sprintf("segment-%d.ts\n", segment)
}
// do not forget to add the last segment between the last keyframe and the end of the file
// if the keyframes extraction is not done, do not bother to add it, it will be retrived on the next index retrival
if is_done {
index += fmt.Sprintf("#EXTINF:%.6f\n", float64(ts.file.Info.Duration)-ts.file.Keyframes.Get(length-1))
index += fmt.Sprintf("#EXTINF:%.6f\n", float64(ts.file.Info.Duration)-ts.keyframes.Get(length-1))
index += fmt.Sprintf("segment-%d.ts\n", length-1)
index += `#EXT-X-ENDLIST`
}
@ -442,13 +442,13 @@ func (ts *Stream) prerareNextSegements(segment int32) {
}
func (ts *Stream) getMinEncoderDistance(segment int32) float64 {
time := ts.file.Keyframes.Get(segment)
time := ts.keyframes.Get(segment)
distances := Map(ts.heads, func(head Head, _ int) float64 {
// ignore killed heads or heads after the current time
if head.segment < 0 || ts.file.Keyframes.Get(head.segment) > time || segment >= head.end {
if head.segment < 0 || ts.keyframes.Get(head.segment) > time || segment >= head.end {
return math.Inf(1)
}
return time - ts.file.Keyframes.Get(head.segment)
return time - ts.keyframes.Get(head.segment)
})
if len(distances) == 0 {
return math.Inf(1)

View File

@ -6,11 +6,12 @@ import (
)
type ClientInfo struct {
client string
path string
quality *Quality
audio int32
head int32
client string
path string
video *VideoKey
audio int32
vhead int32
ahead int32
}
type Tracker struct {
@ -56,14 +57,17 @@ func (t *Tracker) start() {
old, ok := t.clients[info.client]
// First fixup the info. Most routes ruturn partial infos
if ok && old.path == info.path {
if info.quality == nil {
info.quality = old.quality
if info.video == nil {
info.video = old.video
}
if info.audio == -1 {
info.audio = old.audio
}
if info.head == -1 {
info.head = old.head
if info.vhead == -1 {
info.vhead = old.vhead
}
if info.ahead == -1 {
info.ahead = old.ahead
}
}
@ -76,11 +80,14 @@ func (t *Tracker) start() {
if old.audio != info.audio && old.audio != -1 {
t.KillAudioIfDead(old.path, old.audio)
}
if old.quality != info.quality && old.quality != nil {
t.KillQualityIfDead(old.path, *old.quality)
if old.video != info.video && old.video != nil {
t.KillVideoIfDead(old.path, *old.video)
}
if old.head != -1 && Abs(info.head-old.head) > 100 {
t.KillOrphanedHeads(old.path, old.quality, old.audio)
if old.vhead != -1 && Abs(info.vhead-old.vhead) > 100 {
t.KillOrphanedHeads(old.path, old.video, -1)
}
if old.ahead != -1 && Abs(info.ahead-old.ahead) > 100 {
t.KillOrphanedHeads(old.path, nil, old.audio)
}
} else if ok {
t.KillStreamIfDead(old.path)
@ -100,9 +107,9 @@ func (t *Tracker) start() {
if !t.KillStreamIfDead(info.path) {
audio_cleanup := info.audio != -1 && t.KillAudioIfDead(info.path, info.audio)
video_cleanup := info.quality != nil && t.KillQualityIfDead(info.path, *info.quality)
video_cleanup := info.video != nil && t.KillVideoIfDead(info.path, *info.video)
if !audio_cleanup || !video_cleanup {
t.KillOrphanedHeads(info.path, info.quality, info.audio)
t.KillOrphanedHeads(info.path, info.video, info.audio)
}
}
}
@ -163,19 +170,19 @@ func (t *Tracker) KillAudioIfDead(path string, audio int32) bool {
return true
}
func (t *Tracker) KillQualityIfDead(path string, quality Quality) bool {
func (t *Tracker) KillVideoIfDead(path string, video VideoKey) bool {
for _, stream := range t.clients {
if stream.path == path && stream.quality != nil && *stream.quality == quality {
if stream.path == path && stream.video != nil && *stream.video == video {
return false
}
}
log.Printf("Nobody is watching quality %s of %s. Killing it", quality, path)
log.Printf("Nobody is watching %s video %d quality %s. Killing it", path, video.idx, video.quality)
stream, ok := t.transcoder.streams.Get(path)
if !ok {
return false
}
vstream, vok := stream.videos.Get(quality)
vstream, vok := stream.videos.Get(video)
if !vok {
return false
}
@ -183,27 +190,27 @@ func (t *Tracker) KillQualityIfDead(path string, quality Quality) bool {
return true
}
func (t *Tracker) KillOrphanedHeads(path string, quality *Quality, audio int32) {
func (t *Tracker) KillOrphanedHeads(path string, video *VideoKey, audio int32) {
stream, ok := t.transcoder.streams.Get(path)
if !ok {
return
}
if quality != nil {
vstream, vok := stream.videos.Get(*quality)
if video != nil {
vstream, vok := stream.videos.Get(*video)
if vok {
t.killOrphanedeheads(&vstream.Stream)
t.killOrphanedeheads(&vstream.Stream, true)
}
}
if audio != -1 {
astream, aok := stream.audios.Get(audio)
if aok {
t.killOrphanedeheads(&astream.Stream)
t.killOrphanedeheads(&astream.Stream, false)
}
}
}
func (t *Tracker) killOrphanedeheads(stream *Stream) {
func (t *Tracker) killOrphanedeheads(stream *Stream, is_video bool) {
stream.lock.Lock()
defer stream.lock.Unlock()
@ -214,13 +221,14 @@ func (t *Tracker) killOrphanedeheads(stream *Stream) {
distance := int32(99999)
for _, info := range t.clients {
if info.head == -1 {
continue
ihead := info.vhead
if is_video {
ihead = info.ahead
}
distance = min(Abs(info.head-head.segment), distance)
distance = min(Abs(ihead-head.segment), distance)
}
if distance > 20 {
log.Printf("Killing orphaned head %s %d", stream.file.Path, encoder_id)
log.Printf("Killing orphaned head %s %d", stream.file.Info.Path, encoder_id)
stream.KillHead(encoder_id)
}
}

View File

@ -53,17 +53,19 @@ func (t *Transcoder) GetMaster(path string, client string, sha string) (string,
return "", err
}
t.clientChan <- ClientInfo{
client: client,
path: path,
quality: nil,
audio: -1,
head: -1,
client: client,
path: path,
video: nil,
audio: -1,
vhead: -1,
ahead: -1,
}
return stream.GetMaster(), nil
}
func (t *Transcoder) GetVideoIndex(
path string,
video int32,
quality Quality,
client string,
sha string,
@ -73,13 +75,14 @@ func (t *Transcoder) GetVideoIndex(
return "", err
}
t.clientChan <- ClientInfo{
client: client,
path: path,
quality: &quality,
audio: -1,
head: -1,
client: client,
path: path,
video: &VideoKey{video, quality},
audio: -1,
vhead: -1,
ahead: -1,
}
return stream.GetVideoIndex(quality)
return stream.GetVideoIndex(video, quality)
}
func (t *Transcoder) GetAudioIndex(
@ -96,13 +99,15 @@ func (t *Transcoder) GetAudioIndex(
client: client,
path: path,
audio: audio,
head: -1,
vhead: -1,
ahead: -1,
}
return stream.GetAudioIndex(audio)
}
func (t *Transcoder) GetVideoSegment(
path string,
video int32,
quality Quality,
segment int32,
client string,
@ -113,13 +118,14 @@ func (t *Transcoder) GetVideoSegment(
return "", err
}
t.clientChan <- ClientInfo{
client: client,
path: path,
quality: &quality,
audio: -1,
head: segment,
client: client,
path: path,
video: &VideoKey{video, quality},
vhead: segment,
audio: -1,
ahead: -1,
}
return stream.GetVideoSegment(quality, segment)
return stream.GetVideoSegment(video, quality, segment)
}
func (t *Transcoder) GetAudioSegment(
@ -137,7 +143,8 @@ func (t *Transcoder) GetAudioSegment(
client: client,
path: path,
audio: audio,
head: segment,
ahead: segment,
vhead: -1,
}
return stream.GetAudioSegment(audio, segment)
}