diff --git a/transcoder/go.mod b/transcoder/go.mod index 3a34c4b0..c01716e5 100644 --- a/transcoder/go.mod +++ b/transcoder/go.mod @@ -1,6 +1,6 @@ module github.com/zoriya/kyoo/transcoder -go 1.21 +go 1.22 require github.com/labstack/echo/v4 v4.12.0 // direct diff --git a/transcoder/src/audiostream.go b/transcoder/src/audiostream.go index 9f230983..23452a4f 100644 --- a/transcoder/src/audiostream.go +++ b/transcoder/src/audiostream.go @@ -18,8 +18,8 @@ func NewAudioStream(file *FileStream, idx int32) *AudioStream { return ret } -func (as *AudioStream) getOutPath(encoder_id int) string { - return fmt.Sprintf("%s/segment-a%d-%d-%%d.m4s", as.file.Out, as.index, encoder_id) +func (as *AudioStream) getSegmentName() string { + return fmt.Sprintf("segment-a-%d-%%d.m4s", as.index) } func (as *AudioStream) getFlags() Flags { diff --git a/transcoder/src/filestream.go b/transcoder/src/filestream.go index 4c4660b9..08c81664 100644 --- a/transcoder/src/filestream.go +++ b/transcoder/src/filestream.go @@ -114,24 +114,24 @@ func (fs *FileStream) GetMaster() string { } } } - for _, audio := range fs.Info.Audios { - master += "#EXT-X-MEDIA:TYPE=AUDIO," - master += "GROUP-ID=\"audio\"," - if audio.Language != nil { - master += fmt.Sprintf("LANGUAGE=\"%s\",", *audio.Language) - } - if audio.Title != nil { - master += fmt.Sprintf("NAME=\"%s\",", *audio.Title) - } else if audio.Language != nil { - master += fmt.Sprintf("NAME=\"%s\",", *audio.Language) - } else { - master += fmt.Sprintf("NAME=\"Audio %d\",", audio.Index) - } - if audio.IsDefault { - master += "DEFAULT=YES," - } - master += fmt.Sprintf("URI=\"./audio/%d/index.m3u8\"\n", audio.Index) - } + // for _, audio := range fs.Info.Audios { + // master += "#EXT-X-MEDIA:TYPE=AUDIO," + // master += "GROUP-ID=\"audio\"," + // if audio.Language != nil { + // master += fmt.Sprintf("LANGUAGE=\"%s\",", *audio.Language) + // } + // if audio.Title != nil { + // master += fmt.Sprintf("NAME=\"%s\",", *audio.Title) + // } else if audio.Language != nil { + // master += fmt.Sprintf("NAME=\"%s\",", *audio.Language) + // } else { + // master += fmt.Sprintf("NAME=\"Audio %d\",", audio.Index) + // } + // if audio.IsDefault { + // master += "DEFAULT=YES," + // } + // master += fmt.Sprintf("URI=\"./audio/%d/index.m3u8\"\n", audio.Index) + // } return master } diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index 3e324506..43761c1c 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -8,7 +8,6 @@ import ( "math" "os" "os/exec" - "path/filepath" "slices" "strings" "sync" @@ -25,7 +24,7 @@ const ( type StreamHandle interface { getTranscodeArgs(segments string) []string - getOutPath(encoder_id int) string + getSegmentName() string getFlags() Flags } @@ -33,7 +32,9 @@ type Stream struct { handle StreamHandle file *FileStream segments []Segment - heads []Head + // An init.mp4 reference. Only one exists per stream + init Segment + heads []Head // the lock used for the heads lock sync.RWMutex } @@ -67,12 +68,17 @@ func NewStream(file *FileStream, handle StreamHandle, ret *Stream) { ret.file = file ret.heads = make([]Head, 0) + ret.init.channel = make(chan struct{}) + length, is_done := file.Keyframes.Length() ret.segments = make([]Segment, length, max(length, 2000)) for seg := range ret.segments { ret.segments[seg].channel = make(chan struct{}) } + // Try to encode asap, the client will first require the init.mp4 anyways so we can't know where to start. + ret.run(0) + if !is_done { file.Keyframes.AddListener(func(keyframes []float64) { ret.lock.Lock() @@ -86,10 +92,25 @@ func NewStream(file *FileStream, handle StreamHandle, ret *Stream) { for seg := old_length; seg < len(keyframes); seg++ { ret.segments[seg].channel = make(chan struct{}) } + + // if we still haven't created the init.mp4 create it now. + if len(ret.heads) == 0 { + ret.run(0) + } }) } } +func (ts *Stream) isInitReady() bool { + select { + case <-ts.init.channel: + // if the channel returned, it means it was closed + return true + default: + return false + } +} + // Remember to lock before calling this. func (ts *Stream) isSegmentReady(segment int32) bool { select { @@ -181,8 +202,8 @@ func (ts *Stream) run(start int32) error { } } } - end_padding := int32(1) - if end == length { + end_padding := int32(2) + if end == length+1 { end_padding = 0 } segments := ts.file.Keyframes.Slice(start_segment+1, end+end_padding) @@ -191,8 +212,9 @@ func (ts *Stream) run(start int32) error { segments = []float64{9999999} } - outpath := ts.handle.getOutPath(encoder_id) - err := os.MkdirAll(filepath.Dir(outpath), 0o755) + outpath := fmt.Sprintf("%s/%d", ts.file.Out, encoder_id) + fmt.Print(outpath) + err := os.MkdirAll(outpath, 0o755) if err != nil { return err } @@ -232,7 +254,7 @@ func (ts *Stream) run(start int32) error { "-i", ts.file.Path, // this makes behaviors consistent between soft and hardware decodes. // this also means that after a -ss 50, the output video will start at 50s - // "-start_at_zero", + "-start_at_zero", // for hls streams, -copyts is mandatory "-copyts", // this makes output file start at 0s instead of a random delay + the -ss value @@ -244,25 +266,18 @@ func (ts *Stream) run(start int32) error { ) args = append(args, ts.handle.getTranscodeArgs(toSegmentStr(segments))...) args = append(args, - "-f", "segment", - // needed for rounding issues when forcing keyframes - // recommended value is 1/(2*frame_rate), which for a 24fps is ~0.021 - // we take a little bit more than that to be extra safe but too much can be harmful - // when segments are short (can make the video repeat itself) - "-segment_time_delta", "0.05", - "-segment_format", "mp4", - "-segment_format_options", "movflags=frag_keyframe+empty_moov+omit_tfhd_offset", - "-output_ts_offset", fmt.Sprint(ts.file.Keyframes.Get(start_segment)), - "-segment_times", toSegmentStr(Map(segments, func(seg float64, _ int) float64 { - // segment_times want durations, not timestamps so we must subtract the -ss param - // since we give a greater value to -ss to prevent wrong seeks but -segment_times - // needs precise segments, we use the keyframe we want to seek to as a reference. - return seg - ts.file.Keyframes.Get(start_segment) - })), - "-segment_list_type", "flat", - "-segment_list", "pipe:1", - "-segment_start_number", fmt.Sprint(start_segment), - outpath, + "-f", "hls", + // Cut at every keyframes. + "-hls_time", "0", + "-start_number", fmt.Sprint(start_segment), + "-hls_segment_type", "fmp4", + "-hls_fmp4_init_filename", fmt.Sprintf("%s/init.mp4", outpath), + "-hls_segment_filename", fmt.Sprintf("%s/%s", outpath, ts.handle.getSegmentName()), + // Make the playlist easier to parse in our program by only outputing 1 segment and no endlist marker + // anyways this list is only read once and we generate our own. + "-hls_list_size", "1", + "-hls_flags", "omit_endlist", + "-", ) cmd := exec.Command("ffmpeg", args...) @@ -285,10 +300,17 @@ func (ts *Stream) run(start int32) error { go func() { scanner := bufio.NewScanner(stdout) - format := filepath.Base(outpath) + format := ts.handle.getSegmentName() should_stop := false + is_init_ready:= false for scanner.Scan() { + line := scanner.Text() + // ignore m3u8 infos, we only want to know when segments are ready. + if line[0] == '#' { + continue + } + var segment int32 _, _ = fmt.Sscanf(scanner.Text(), format, &segment) @@ -298,12 +320,19 @@ func (ts *Stream) run(start int32) error { continue } ts.lock.Lock() + + if !is_init_ready && !ts.isInitReady() { + ts.init.encoder = encoder_id + close(ts.init.channel) + is_init_ready = true + } + ts.heads[encoder_id].segment = segment log.Printf("Segment %d got ready (%d)", segment, encoder_id) if ts.isSegmentReady(segment) { // the current segment is already marked at done so another process has already gone up to here. cmd.Process.Signal(os.Interrupt) - log.Printf("Killing ffmpeg because segment %d is already ready", segment) + log.Printf("Killing ffmpeg %d because segment %d is already ready", encoder_id, segment) should_stop = true } else { ts.segments[segment].encoder = encoder_id @@ -313,7 +342,7 @@ func (ts *Stream) run(start int32) error { should_stop = true } else if ts.isSegmentReady(segment + 1) { cmd.Process.Signal(os.Interrupt) - log.Printf("Killing ffmpeg because next segment %d is ready", segment) + log.Printf("Killing ffmpeg %d because next segment %d is ready", encoder_id, segment) should_stop = true } } @@ -335,9 +364,9 @@ func (ts *Stream) run(start int32) error { if exiterr, ok := err.(*exec.ExitError); ok && exiterr.ExitCode() == 255 { log.Printf("ffmpeg %d was killed by us", encoder_id) } else if err != nil { - log.Printf("ffmpeg %d occurred an error: %s: %s", encoder_id, err, stderr.String()) + log.Printf("ffmpeg %d occurred an error: %v: %s", encoder_id, err, stderr.String()) } else { - log.Printf("ffmpeg %d finished successfully", encoder_id) + log.Printf("ffmpeg %d finished successfully, last segment: %d", encoder_id, ts.heads[encoder_id].segment) } ts.lock.Lock() @@ -359,6 +388,7 @@ func (ts *Stream) GetIndex() (string, error) { #EXT-X-TARGETDURATION:4 #EXT-X-MEDIA-SEQUENCE:0 #EXT-X-INDEPENDENT-SEGMENTS +#EXT-X-MAP:URI="init.mp4" ` length, is_done := ts.file.Keyframes.Length() @@ -376,7 +406,27 @@ func (ts *Stream) GetIndex() (string, error) { return index, nil } +func (ts *Stream) GetInit() (string, error) { + // No need to lock, the channel won't change. + select { + case <-ts.init.channel: + return fmt.Sprintf( + "%s/%d/%s", + ts.file.Out, + ts.init.encoder, + "init.mp4", + ), nil + case <-time.After(60 * time.Second): + return "", errors.New("could not retrieve the selected segment (timeout)") + } +} + func (ts *Stream) GetSegment(segment int32) (string, error) { + // I was too lazy to put this appart. + if segment == -1 { + return ts.GetInit() + } + ts.lock.RLock() ready := ts.isSegmentReady(segment) // we want to calculate distance in the same lock else it can be funky @@ -413,7 +463,12 @@ func (ts *Stream) GetSegment(segment int32) (string, error) { } } ts.prerareNextSegements(segment) - return fmt.Sprintf(ts.handle.getOutPath(ts.segments[segment].encoder), segment), nil + return fmt.Sprintf( + "%s/%d/%s", + ts.file.Out, + ts.segments[segment].encoder, + fmt.Sprintf(ts.handle.getSegmentName(), segment), + ), nil } func (ts *Stream) prerareNextSegements(segment int32) { diff --git a/transcoder/src/videostream.go b/transcoder/src/videostream.go index 2f32d9bb..80ea8b7a 100644 --- a/transcoder/src/videostream.go +++ b/transcoder/src/videostream.go @@ -25,8 +25,8 @@ func (vs *VideoStream) getFlags() Flags { return VideoF } -func (vs *VideoStream) getOutPath(encoder_id int) string { - return fmt.Sprintf("%s/segment-%s-%d-%%d.m4s", vs.file.Out, vs.quality, encoder_id) +func (vs *VideoStream) getSegmentName() string { + return fmt.Sprintf("segment-%s-%%d.m4s", vs.quality) } func closestMultiple(n int32, x int32) int32 { diff --git a/transcoder/utils.go b/transcoder/utils.go index df498967..f5c587f7 100644 --- a/transcoder/utils.go +++ b/transcoder/utils.go @@ -71,6 +71,9 @@ func GetClientId(c echo.Context) (string, error) { } func ParseSegment(segment string) (int32, error) { + if segment == "init.mp4" { + return -1, nil + } var ret int32 _, err := fmt.Sscanf(segment, "segment-%d.m4s", &ret) if err != nil {