Switch to -f hls instead of -f segment

For now, disabled all audios variants since it's handling will be
entierly different.
Found out that audio and video segments don't need to lineup. (same
number/duration). As long as the whole file stays long enough it's fine.

Video handling now fails when there are too many keyfranmes close
enough (like 0.01, 0.3, 0.4, 2, 4). It would only output 3 segments
instead of the 5 we would want. We might get arround using fragments
containing more than 1 keyframe if we handle things right
This commit is contained in:
Zoe Roux 2024-07-01 15:39:24 +00:00
parent 59264bd42f
commit 4fd25ce5ac
No known key found for this signature in database
6 changed files with 114 additions and 56 deletions

View File

@ -1,6 +1,6 @@
module github.com/zoriya/kyoo/transcoder module github.com/zoriya/kyoo/transcoder
go 1.21 go 1.22
require github.com/labstack/echo/v4 v4.12.0 // direct require github.com/labstack/echo/v4 v4.12.0 // direct

View File

@ -18,8 +18,8 @@ func NewAudioStream(file *FileStream, idx int32) *AudioStream {
return ret return ret
} }
func (as *AudioStream) getOutPath(encoder_id int) string { func (as *AudioStream) getSegmentName() string {
return fmt.Sprintf("%s/segment-a%d-%d-%%d.m4s", as.file.Out, as.index, encoder_id) return fmt.Sprintf("segment-a-%d-%%d.m4s", as.index)
} }
func (as *AudioStream) getFlags() Flags { func (as *AudioStream) getFlags() Flags {

View File

@ -114,24 +114,24 @@ func (fs *FileStream) GetMaster() string {
} }
} }
} }
for _, audio := range fs.Info.Audios { // for _, audio := range fs.Info.Audios {
master += "#EXT-X-MEDIA:TYPE=AUDIO," // master += "#EXT-X-MEDIA:TYPE=AUDIO,"
master += "GROUP-ID=\"audio\"," // master += "GROUP-ID=\"audio\","
if audio.Language != nil { // if audio.Language != nil {
master += fmt.Sprintf("LANGUAGE=\"%s\",", *audio.Language) // master += fmt.Sprintf("LANGUAGE=\"%s\",", *audio.Language)
} // }
if audio.Title != nil { // if audio.Title != nil {
master += fmt.Sprintf("NAME=\"%s\",", *audio.Title) // master += fmt.Sprintf("NAME=\"%s\",", *audio.Title)
} else if audio.Language != nil { // } else if audio.Language != nil {
master += fmt.Sprintf("NAME=\"%s\",", *audio.Language) // master += fmt.Sprintf("NAME=\"%s\",", *audio.Language)
} else { // } else {
master += fmt.Sprintf("NAME=\"Audio %d\",", audio.Index) // master += fmt.Sprintf("NAME=\"Audio %d\",", audio.Index)
} // }
if audio.IsDefault { // if audio.IsDefault {
master += "DEFAULT=YES," // master += "DEFAULT=YES,"
} // }
master += fmt.Sprintf("URI=\"./audio/%d/index.m3u8\"\n", audio.Index) // master += fmt.Sprintf("URI=\"./audio/%d/index.m3u8\"\n", audio.Index)
} // }
return master return master
} }

View File

@ -8,7 +8,6 @@ import (
"math" "math"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
"slices" "slices"
"strings" "strings"
"sync" "sync"
@ -25,7 +24,7 @@ const (
type StreamHandle interface { type StreamHandle interface {
getTranscodeArgs(segments string) []string getTranscodeArgs(segments string) []string
getOutPath(encoder_id int) string getSegmentName() string
getFlags() Flags getFlags() Flags
} }
@ -33,7 +32,9 @@ type Stream struct {
handle StreamHandle handle StreamHandle
file *FileStream file *FileStream
segments []Segment segments []Segment
heads []Head // An init.mp4 reference. Only one exists per stream
init Segment
heads []Head
// the lock used for the heads // the lock used for the heads
lock sync.RWMutex lock sync.RWMutex
} }
@ -67,12 +68,17 @@ func NewStream(file *FileStream, handle StreamHandle, ret *Stream) {
ret.file = file ret.file = file
ret.heads = make([]Head, 0) ret.heads = make([]Head, 0)
ret.init.channel = make(chan struct{})
length, is_done := file.Keyframes.Length() length, is_done := file.Keyframes.Length()
ret.segments = make([]Segment, length, max(length, 2000)) ret.segments = make([]Segment, length, max(length, 2000))
for seg := range ret.segments { for seg := range ret.segments {
ret.segments[seg].channel = make(chan struct{}) ret.segments[seg].channel = make(chan struct{})
} }
// Try to encode asap, the client will first require the init.mp4 anyways so we can't know where to start.
ret.run(0)
if !is_done { if !is_done {
file.Keyframes.AddListener(func(keyframes []float64) { file.Keyframes.AddListener(func(keyframes []float64) {
ret.lock.Lock() ret.lock.Lock()
@ -86,10 +92,25 @@ func NewStream(file *FileStream, handle StreamHandle, ret *Stream) {
for seg := old_length; seg < len(keyframes); seg++ { for seg := old_length; seg < len(keyframes); seg++ {
ret.segments[seg].channel = make(chan struct{}) ret.segments[seg].channel = make(chan struct{})
} }
// if we still haven't created the init.mp4 create it now.
if len(ret.heads) == 0 {
ret.run(0)
}
}) })
} }
} }
func (ts *Stream) isInitReady() bool {
select {
case <-ts.init.channel:
// if the channel returned, it means it was closed
return true
default:
return false
}
}
// Remember to lock before calling this. // Remember to lock before calling this.
func (ts *Stream) isSegmentReady(segment int32) bool { func (ts *Stream) isSegmentReady(segment int32) bool {
select { select {
@ -181,8 +202,8 @@ func (ts *Stream) run(start int32) error {
} }
} }
} }
end_padding := int32(1) end_padding := int32(2)
if end == length { if end == length+1 {
end_padding = 0 end_padding = 0
} }
segments := ts.file.Keyframes.Slice(start_segment+1, end+end_padding) segments := ts.file.Keyframes.Slice(start_segment+1, end+end_padding)
@ -191,8 +212,9 @@ func (ts *Stream) run(start int32) error {
segments = []float64{9999999} segments = []float64{9999999}
} }
outpath := ts.handle.getOutPath(encoder_id) outpath := fmt.Sprintf("%s/%d", ts.file.Out, encoder_id)
err := os.MkdirAll(filepath.Dir(outpath), 0o755) fmt.Print(outpath)
err := os.MkdirAll(outpath, 0o755)
if err != nil { if err != nil {
return err return err
} }
@ -232,7 +254,7 @@ func (ts *Stream) run(start int32) error {
"-i", ts.file.Path, "-i", ts.file.Path,
// this makes behaviors consistent between soft and hardware decodes. // this makes behaviors consistent between soft and hardware decodes.
// this also means that after a -ss 50, the output video will start at 50s // this also means that after a -ss 50, the output video will start at 50s
// "-start_at_zero", "-start_at_zero",
// for hls streams, -copyts is mandatory // for hls streams, -copyts is mandatory
"-copyts", "-copyts",
// this makes output file start at 0s instead of a random delay + the -ss value // this makes output file start at 0s instead of a random delay + the -ss value
@ -244,25 +266,18 @@ func (ts *Stream) run(start int32) error {
) )
args = append(args, ts.handle.getTranscodeArgs(toSegmentStr(segments))...) args = append(args, ts.handle.getTranscodeArgs(toSegmentStr(segments))...)
args = append(args, args = append(args,
"-f", "segment", "-f", "hls",
// needed for rounding issues when forcing keyframes // Cut at every keyframes.
// recommended value is 1/(2*frame_rate), which for a 24fps is ~0.021 "-hls_time", "0",
// we take a little bit more than that to be extra safe but too much can be harmful "-start_number", fmt.Sprint(start_segment),
// when segments are short (can make the video repeat itself) "-hls_segment_type", "fmp4",
"-segment_time_delta", "0.05", "-hls_fmp4_init_filename", fmt.Sprintf("%s/init.mp4", outpath),
"-segment_format", "mp4", "-hls_segment_filename", fmt.Sprintf("%s/%s", outpath, ts.handle.getSegmentName()),
"-segment_format_options", "movflags=frag_keyframe+empty_moov+omit_tfhd_offset", // Make the playlist easier to parse in our program by only outputing 1 segment and no endlist marker
"-output_ts_offset", fmt.Sprint(ts.file.Keyframes.Get(start_segment)), // anyways this list is only read once and we generate our own.
"-segment_times", toSegmentStr(Map(segments, func(seg float64, _ int) float64 { "-hls_list_size", "1",
// segment_times want durations, not timestamps so we must subtract the -ss param "-hls_flags", "omit_endlist",
// since we give a greater value to -ss to prevent wrong seeks but -segment_times "-",
// needs precise segments, we use the keyframe we want to seek to as a reference.
return seg - ts.file.Keyframes.Get(start_segment)
})),
"-segment_list_type", "flat",
"-segment_list", "pipe:1",
"-segment_start_number", fmt.Sprint(start_segment),
outpath,
) )
cmd := exec.Command("ffmpeg", args...) cmd := exec.Command("ffmpeg", args...)
@ -285,10 +300,17 @@ func (ts *Stream) run(start int32) error {
go func() { go func() {
scanner := bufio.NewScanner(stdout) scanner := bufio.NewScanner(stdout)
format := filepath.Base(outpath) format := ts.handle.getSegmentName()
should_stop := false should_stop := false
is_init_ready:= false
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text()
// ignore m3u8 infos, we only want to know when segments are ready.
if line[0] == '#' {
continue
}
var segment int32 var segment int32
_, _ = fmt.Sscanf(scanner.Text(), format, &segment) _, _ = fmt.Sscanf(scanner.Text(), format, &segment)
@ -298,12 +320,19 @@ func (ts *Stream) run(start int32) error {
continue continue
} }
ts.lock.Lock() ts.lock.Lock()
if !is_init_ready && !ts.isInitReady() {
ts.init.encoder = encoder_id
close(ts.init.channel)
is_init_ready = true
}
ts.heads[encoder_id].segment = segment ts.heads[encoder_id].segment = segment
log.Printf("Segment %d got ready (%d)", segment, encoder_id) log.Printf("Segment %d got ready (%d)", segment, encoder_id)
if ts.isSegmentReady(segment) { if ts.isSegmentReady(segment) {
// the current segment is already marked at done so another process has already gone up to here. // the current segment is already marked at done so another process has already gone up to here.
cmd.Process.Signal(os.Interrupt) cmd.Process.Signal(os.Interrupt)
log.Printf("Killing ffmpeg because segment %d is already ready", segment) log.Printf("Killing ffmpeg %d because segment %d is already ready", encoder_id, segment)
should_stop = true should_stop = true
} else { } else {
ts.segments[segment].encoder = encoder_id ts.segments[segment].encoder = encoder_id
@ -313,7 +342,7 @@ func (ts *Stream) run(start int32) error {
should_stop = true should_stop = true
} else if ts.isSegmentReady(segment + 1) { } else if ts.isSegmentReady(segment + 1) {
cmd.Process.Signal(os.Interrupt) cmd.Process.Signal(os.Interrupt)
log.Printf("Killing ffmpeg because next segment %d is ready", segment) log.Printf("Killing ffmpeg %d because next segment %d is ready", encoder_id, segment)
should_stop = true should_stop = true
} }
} }
@ -335,9 +364,9 @@ func (ts *Stream) run(start int32) error {
if exiterr, ok := err.(*exec.ExitError); ok && exiterr.ExitCode() == 255 { if exiterr, ok := err.(*exec.ExitError); ok && exiterr.ExitCode() == 255 {
log.Printf("ffmpeg %d was killed by us", encoder_id) log.Printf("ffmpeg %d was killed by us", encoder_id)
} else if err != nil { } else if err != nil {
log.Printf("ffmpeg %d occurred an error: %s: %s", encoder_id, err, stderr.String()) log.Printf("ffmpeg %d occurred an error: %v: %s", encoder_id, err, stderr.String())
} else { } else {
log.Printf("ffmpeg %d finished successfully", encoder_id) log.Printf("ffmpeg %d finished successfully, last segment: %d", encoder_id, ts.heads[encoder_id].segment)
} }
ts.lock.Lock() ts.lock.Lock()
@ -359,6 +388,7 @@ func (ts *Stream) GetIndex() (string, error) {
#EXT-X-TARGETDURATION:4 #EXT-X-TARGETDURATION:4
#EXT-X-MEDIA-SEQUENCE:0 #EXT-X-MEDIA-SEQUENCE:0
#EXT-X-INDEPENDENT-SEGMENTS #EXT-X-INDEPENDENT-SEGMENTS
#EXT-X-MAP:URI="init.mp4"
` `
length, is_done := ts.file.Keyframes.Length() length, is_done := ts.file.Keyframes.Length()
@ -376,7 +406,27 @@ func (ts *Stream) GetIndex() (string, error) {
return index, nil return index, nil
} }
func (ts *Stream) GetInit() (string, error) {
// No need to lock, the channel won't change.
select {
case <-ts.init.channel:
return fmt.Sprintf(
"%s/%d/%s",
ts.file.Out,
ts.init.encoder,
"init.mp4",
), nil
case <-time.After(60 * time.Second):
return "", errors.New("could not retrieve the selected segment (timeout)")
}
}
func (ts *Stream) GetSegment(segment int32) (string, error) { func (ts *Stream) GetSegment(segment int32) (string, error) {
// I was too lazy to put this appart.
if segment == -1 {
return ts.GetInit()
}
ts.lock.RLock() ts.lock.RLock()
ready := ts.isSegmentReady(segment) ready := ts.isSegmentReady(segment)
// we want to calculate distance in the same lock else it can be funky // we want to calculate distance in the same lock else it can be funky
@ -413,7 +463,12 @@ func (ts *Stream) GetSegment(segment int32) (string, error) {
} }
} }
ts.prerareNextSegements(segment) ts.prerareNextSegements(segment)
return fmt.Sprintf(ts.handle.getOutPath(ts.segments[segment].encoder), segment), nil return fmt.Sprintf(
"%s/%d/%s",
ts.file.Out,
ts.segments[segment].encoder,
fmt.Sprintf(ts.handle.getSegmentName(), segment),
), nil
} }
func (ts *Stream) prerareNextSegements(segment int32) { func (ts *Stream) prerareNextSegements(segment int32) {

View File

@ -25,8 +25,8 @@ func (vs *VideoStream) getFlags() Flags {
return VideoF return VideoF
} }
func (vs *VideoStream) getOutPath(encoder_id int) string { func (vs *VideoStream) getSegmentName() string {
return fmt.Sprintf("%s/segment-%s-%d-%%d.m4s", vs.file.Out, vs.quality, encoder_id) return fmt.Sprintf("segment-%s-%%d.m4s", vs.quality)
} }
func closestMultiple(n int32, x int32) int32 { func closestMultiple(n int32, x int32) int32 {

View File

@ -71,6 +71,9 @@ func GetClientId(c echo.Context) (string, error) {
} }
func ParseSegment(segment string) (int32, error) { func ParseSegment(segment string) (int32, error) {
if segment == "init.mp4" {
return -1, nil
}
var ret int32 var ret int32
_, err := fmt.Sscanf(segment, "segment-%d.m4s", &ret) _, err := fmt.Sscanf(segment, "segment-%d.m4s", &ret)
if err != nil { if err != nil {