From 4fd25ce5acce15cac9eaa495dd1e94e471b5bb97 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Mon, 1 Jul 2024 15:39:24 +0000 Subject: [PATCH] Switch to -f hls instead of -f segment For now, disabled all audios variants since it's handling will be entierly different. Found out that audio and video segments don't need to lineup. (same number/duration). As long as the whole file stays long enough it's fine. Video handling now fails when there are too many keyfranmes close enough (like 0.01, 0.3, 0.4, 2, 4). It would only output 3 segments instead of the 5 we would want. We might get arround using fragments containing more than 1 keyframe if we handle things right --- transcoder/go.mod | 2 +- transcoder/src/audiostream.go | 4 +- transcoder/src/filestream.go | 36 +++++----- transcoder/src/stream.go | 121 ++++++++++++++++++++++++---------- transcoder/src/videostream.go | 4 +- transcoder/utils.go | 3 + 6 files changed, 114 insertions(+), 56 deletions(-) diff --git a/transcoder/go.mod b/transcoder/go.mod index 3a34c4b0..c01716e5 100644 --- a/transcoder/go.mod +++ b/transcoder/go.mod @@ -1,6 +1,6 @@ module github.com/zoriya/kyoo/transcoder -go 1.21 +go 1.22 require github.com/labstack/echo/v4 v4.12.0 // direct diff --git a/transcoder/src/audiostream.go b/transcoder/src/audiostream.go index 9f230983..23452a4f 100644 --- a/transcoder/src/audiostream.go +++ b/transcoder/src/audiostream.go @@ -18,8 +18,8 @@ func NewAudioStream(file *FileStream, idx int32) *AudioStream { return ret } -func (as *AudioStream) getOutPath(encoder_id int) string { - return fmt.Sprintf("%s/segment-a%d-%d-%%d.m4s", as.file.Out, as.index, encoder_id) +func (as *AudioStream) getSegmentName() string { + return fmt.Sprintf("segment-a-%d-%%d.m4s", as.index) } func (as *AudioStream) getFlags() Flags { diff --git a/transcoder/src/filestream.go b/transcoder/src/filestream.go index 4c4660b9..08c81664 100644 --- a/transcoder/src/filestream.go +++ b/transcoder/src/filestream.go @@ -114,24 +114,24 @@ func (fs *FileStream) GetMaster() string { } } } - for _, audio := range fs.Info.Audios { - master += "#EXT-X-MEDIA:TYPE=AUDIO," - master += "GROUP-ID=\"audio\"," - if audio.Language != nil { - master += fmt.Sprintf("LANGUAGE=\"%s\",", *audio.Language) - } - if audio.Title != nil { - master += fmt.Sprintf("NAME=\"%s\",", *audio.Title) - } else if audio.Language != nil { - master += fmt.Sprintf("NAME=\"%s\",", *audio.Language) - } else { - master += fmt.Sprintf("NAME=\"Audio %d\",", audio.Index) - } - if audio.IsDefault { - master += "DEFAULT=YES," - } - master += fmt.Sprintf("URI=\"./audio/%d/index.m3u8\"\n", audio.Index) - } + // for _, audio := range fs.Info.Audios { + // master += "#EXT-X-MEDIA:TYPE=AUDIO," + // master += "GROUP-ID=\"audio\"," + // if audio.Language != nil { + // master += fmt.Sprintf("LANGUAGE=\"%s\",", *audio.Language) + // } + // if audio.Title != nil { + // master += fmt.Sprintf("NAME=\"%s\",", *audio.Title) + // } else if audio.Language != nil { + // master += fmt.Sprintf("NAME=\"%s\",", *audio.Language) + // } else { + // master += fmt.Sprintf("NAME=\"Audio %d\",", audio.Index) + // } + // if audio.IsDefault { + // master += "DEFAULT=YES," + // } + // master += fmt.Sprintf("URI=\"./audio/%d/index.m3u8\"\n", audio.Index) + // } return master } diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index 3e324506..43761c1c 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -8,7 +8,6 @@ import ( "math" "os" "os/exec" - "path/filepath" "slices" "strings" "sync" @@ -25,7 +24,7 @@ const ( type StreamHandle interface { getTranscodeArgs(segments string) []string - getOutPath(encoder_id int) string + getSegmentName() string getFlags() Flags } @@ -33,7 +32,9 @@ type Stream struct { handle StreamHandle file *FileStream segments []Segment - heads []Head + // An init.mp4 reference. Only one exists per stream + init Segment + heads []Head // the lock used for the heads lock sync.RWMutex } @@ -67,12 +68,17 @@ func NewStream(file *FileStream, handle StreamHandle, ret *Stream) { ret.file = file ret.heads = make([]Head, 0) + ret.init.channel = make(chan struct{}) + length, is_done := file.Keyframes.Length() ret.segments = make([]Segment, length, max(length, 2000)) for seg := range ret.segments { ret.segments[seg].channel = make(chan struct{}) } + // Try to encode asap, the client will first require the init.mp4 anyways so we can't know where to start. + ret.run(0) + if !is_done { file.Keyframes.AddListener(func(keyframes []float64) { ret.lock.Lock() @@ -86,10 +92,25 @@ func NewStream(file *FileStream, handle StreamHandle, ret *Stream) { for seg := old_length; seg < len(keyframes); seg++ { ret.segments[seg].channel = make(chan struct{}) } + + // if we still haven't created the init.mp4 create it now. + if len(ret.heads) == 0 { + ret.run(0) + } }) } } +func (ts *Stream) isInitReady() bool { + select { + case <-ts.init.channel: + // if the channel returned, it means it was closed + return true + default: + return false + } +} + // Remember to lock before calling this. func (ts *Stream) isSegmentReady(segment int32) bool { select { @@ -181,8 +202,8 @@ func (ts *Stream) run(start int32) error { } } } - end_padding := int32(1) - if end == length { + end_padding := int32(2) + if end == length+1 { end_padding = 0 } segments := ts.file.Keyframes.Slice(start_segment+1, end+end_padding) @@ -191,8 +212,9 @@ func (ts *Stream) run(start int32) error { segments = []float64{9999999} } - outpath := ts.handle.getOutPath(encoder_id) - err := os.MkdirAll(filepath.Dir(outpath), 0o755) + outpath := fmt.Sprintf("%s/%d", ts.file.Out, encoder_id) + fmt.Print(outpath) + err := os.MkdirAll(outpath, 0o755) if err != nil { return err } @@ -232,7 +254,7 @@ func (ts *Stream) run(start int32) error { "-i", ts.file.Path, // this makes behaviors consistent between soft and hardware decodes. // this also means that after a -ss 50, the output video will start at 50s - // "-start_at_zero", + "-start_at_zero", // for hls streams, -copyts is mandatory "-copyts", // this makes output file start at 0s instead of a random delay + the -ss value @@ -244,25 +266,18 @@ func (ts *Stream) run(start int32) error { ) args = append(args, ts.handle.getTranscodeArgs(toSegmentStr(segments))...) args = append(args, - "-f", "segment", - // needed for rounding issues when forcing keyframes - // recommended value is 1/(2*frame_rate), which for a 24fps is ~0.021 - // we take a little bit more than that to be extra safe but too much can be harmful - // when segments are short (can make the video repeat itself) - "-segment_time_delta", "0.05", - "-segment_format", "mp4", - "-segment_format_options", "movflags=frag_keyframe+empty_moov+omit_tfhd_offset", - "-output_ts_offset", fmt.Sprint(ts.file.Keyframes.Get(start_segment)), - "-segment_times", toSegmentStr(Map(segments, func(seg float64, _ int) float64 { - // segment_times want durations, not timestamps so we must subtract the -ss param - // since we give a greater value to -ss to prevent wrong seeks but -segment_times - // needs precise segments, we use the keyframe we want to seek to as a reference. - return seg - ts.file.Keyframes.Get(start_segment) - })), - "-segment_list_type", "flat", - "-segment_list", "pipe:1", - "-segment_start_number", fmt.Sprint(start_segment), - outpath, + "-f", "hls", + // Cut at every keyframes. + "-hls_time", "0", + "-start_number", fmt.Sprint(start_segment), + "-hls_segment_type", "fmp4", + "-hls_fmp4_init_filename", fmt.Sprintf("%s/init.mp4", outpath), + "-hls_segment_filename", fmt.Sprintf("%s/%s", outpath, ts.handle.getSegmentName()), + // Make the playlist easier to parse in our program by only outputing 1 segment and no endlist marker + // anyways this list is only read once and we generate our own. + "-hls_list_size", "1", + "-hls_flags", "omit_endlist", + "-", ) cmd := exec.Command("ffmpeg", args...) @@ -285,10 +300,17 @@ func (ts *Stream) run(start int32) error { go func() { scanner := bufio.NewScanner(stdout) - format := filepath.Base(outpath) + format := ts.handle.getSegmentName() should_stop := false + is_init_ready:= false for scanner.Scan() { + line := scanner.Text() + // ignore m3u8 infos, we only want to know when segments are ready. + if line[0] == '#' { + continue + } + var segment int32 _, _ = fmt.Sscanf(scanner.Text(), format, &segment) @@ -298,12 +320,19 @@ func (ts *Stream) run(start int32) error { continue } ts.lock.Lock() + + if !is_init_ready && !ts.isInitReady() { + ts.init.encoder = encoder_id + close(ts.init.channel) + is_init_ready = true + } + ts.heads[encoder_id].segment = segment log.Printf("Segment %d got ready (%d)", segment, encoder_id) if ts.isSegmentReady(segment) { // the current segment is already marked at done so another process has already gone up to here. cmd.Process.Signal(os.Interrupt) - log.Printf("Killing ffmpeg because segment %d is already ready", segment) + log.Printf("Killing ffmpeg %d because segment %d is already ready", encoder_id, segment) should_stop = true } else { ts.segments[segment].encoder = encoder_id @@ -313,7 +342,7 @@ func (ts *Stream) run(start int32) error { should_stop = true } else if ts.isSegmentReady(segment + 1) { cmd.Process.Signal(os.Interrupt) - log.Printf("Killing ffmpeg because next segment %d is ready", segment) + log.Printf("Killing ffmpeg %d because next segment %d is ready", encoder_id, segment) should_stop = true } } @@ -335,9 +364,9 @@ func (ts *Stream) run(start int32) error { if exiterr, ok := err.(*exec.ExitError); ok && exiterr.ExitCode() == 255 { log.Printf("ffmpeg %d was killed by us", encoder_id) } else if err != nil { - log.Printf("ffmpeg %d occurred an error: %s: %s", encoder_id, err, stderr.String()) + log.Printf("ffmpeg %d occurred an error: %v: %s", encoder_id, err, stderr.String()) } else { - log.Printf("ffmpeg %d finished successfully", encoder_id) + log.Printf("ffmpeg %d finished successfully, last segment: %d", encoder_id, ts.heads[encoder_id].segment) } ts.lock.Lock() @@ -359,6 +388,7 @@ func (ts *Stream) GetIndex() (string, error) { #EXT-X-TARGETDURATION:4 #EXT-X-MEDIA-SEQUENCE:0 #EXT-X-INDEPENDENT-SEGMENTS +#EXT-X-MAP:URI="init.mp4" ` length, is_done := ts.file.Keyframes.Length() @@ -376,7 +406,27 @@ func (ts *Stream) GetIndex() (string, error) { return index, nil } +func (ts *Stream) GetInit() (string, error) { + // No need to lock, the channel won't change. + select { + case <-ts.init.channel: + return fmt.Sprintf( + "%s/%d/%s", + ts.file.Out, + ts.init.encoder, + "init.mp4", + ), nil + case <-time.After(60 * time.Second): + return "", errors.New("could not retrieve the selected segment (timeout)") + } +} + func (ts *Stream) GetSegment(segment int32) (string, error) { + // I was too lazy to put this appart. + if segment == -1 { + return ts.GetInit() + } + ts.lock.RLock() ready := ts.isSegmentReady(segment) // we want to calculate distance in the same lock else it can be funky @@ -413,7 +463,12 @@ func (ts *Stream) GetSegment(segment int32) (string, error) { } } ts.prerareNextSegements(segment) - return fmt.Sprintf(ts.handle.getOutPath(ts.segments[segment].encoder), segment), nil + return fmt.Sprintf( + "%s/%d/%s", + ts.file.Out, + ts.segments[segment].encoder, + fmt.Sprintf(ts.handle.getSegmentName(), segment), + ), nil } func (ts *Stream) prerareNextSegements(segment int32) { diff --git a/transcoder/src/videostream.go b/transcoder/src/videostream.go index 2f32d9bb..80ea8b7a 100644 --- a/transcoder/src/videostream.go +++ b/transcoder/src/videostream.go @@ -25,8 +25,8 @@ func (vs *VideoStream) getFlags() Flags { return VideoF } -func (vs *VideoStream) getOutPath(encoder_id int) string { - return fmt.Sprintf("%s/segment-%s-%d-%%d.m4s", vs.file.Out, vs.quality, encoder_id) +func (vs *VideoStream) getSegmentName() string { + return fmt.Sprintf("segment-%s-%%d.m4s", vs.quality) } func closestMultiple(n int32, x int32) int32 { diff --git a/transcoder/utils.go b/transcoder/utils.go index df498967..f5c587f7 100644 --- a/transcoder/utils.go +++ b/transcoder/utils.go @@ -71,6 +71,9 @@ func GetClientId(c echo.Context) (string, error) { } func ParseSegment(segment string) (int32, error) { + if segment == "init.mp4" { + return -1, nil + } var ret int32 _, err := fmt.Sscanf(segment, "segment-%d.m4s", &ret) if err != nil {