Distrust -to and -ss to be precise, use the segment splitter for that (wip)

This commit is contained in:
Zoe Roux 2024-02-11 02:25:17 +01:00
parent 5b27eab680
commit d27cf2afc8
3 changed files with 61 additions and 44 deletions

View File

@ -18,8 +18,8 @@ func NewAudioStream(file *FileStream, idx int32) *AudioStream {
return ret return ret
} }
func (as *AudioStream) getOutPath() string { func (as *AudioStream) getOutPath(encoder_id int) string {
return fmt.Sprintf("%s/segment-a%d-%%06d.ts", as.file.Out, as.index) return fmt.Sprintf("%s/segment-a%d-%d-%%d.ts", as.file.Out, as.index, encoder_id)
} }
func (as *AudioStream) getTranscodeArgs(segments string) []string { func (as *AudioStream) getTranscodeArgs(segments string) []string {

View File

@ -17,12 +17,19 @@ import (
type StreamHandle interface { type StreamHandle interface {
getTranscodeArgs(segments string) []string getTranscodeArgs(segments string) []string
getOutPath() string getOutPath(encoder_id int) string
} }
type Stream struct { type Stream struct {
handle StreamHandle handle StreamHandle
file *FileStream file *FileStream
segments []Segment
heads []Head
// the lock used for the the heads
lock sync.RWMutex
}
type Segment struct {
// channel open if the segment is not ready. closed if ready. // channel open if the segment is not ready. closed if ready.
// one can check if segment 1 is open by doing: // one can check if segment 1 is open by doing:
// //
@ -30,10 +37,8 @@ type Stream struct {
// //
// You can also wait for it to be ready (non-blocking if already ready) by doing: // You can also wait for it to be ready (non-blocking if already ready) by doing:
// <-ts.segments[i] // <-ts.segments[i]
segments []chan (struct{}) channel chan (struct{})
heads []Head encoder int
// the lock used for the segments array and the heads
lock sync.RWMutex
} }
type Head struct { type Head struct {
@ -52,11 +57,11 @@ func NewStream(file *FileStream, handle StreamHandle) Stream {
ret := Stream{ ret := Stream{
handle: handle, handle: handle,
file: file, file: file,
segments: make([]chan struct{}, len(file.Keyframes)), segments: make([]Segment, len(file.Keyframes)),
heads: make([]Head, 0), heads: make([]Head, 0),
} }
for seg := range ret.segments { for seg := range ret.segments {
ret.segments[seg] = make(chan struct{}) ret.segments[seg].channel = make(chan struct{})
} }
// Copy default value before use is safe. Next warning can be safely ignored // Copy default value before use is safe. Next warning can be safely ignored
return ret return ret
@ -65,7 +70,7 @@ func NewStream(file *FileStream, handle StreamHandle) Stream {
// Remember to lock before calling this. // Remember to lock before calling this.
func (ts *Stream) isSegmentReady(segment int32) bool { func (ts *Stream) isSegmentReady(segment int32) bool {
select { select {
case <-ts.segments[segment]: case <-ts.segments[segment].channel:
// if the channel returned, it means it was closed // if the channel returned, it means it was closed
return true return true
default: default:
@ -85,7 +90,7 @@ func (ts *Stream) isSegmentTranscoding(segment int32) bool {
func (ts *Stream) run(start int32) error { func (ts *Stream) run(start int32) error {
// Start the transcode up to the 100th segment (or less) // Start the transcode up to the 100th segment (or less)
// Stop at the first finished segment // Stop at the first finished segment
end := min(start+100, int32(len(ts.file.Keyframes))-1) end := min(start+3, int32(len(ts.file.Keyframes)))
ts.lock.Lock() ts.lock.Lock()
for i := start; i < end; i++ { for i := start; i < end; i++ {
if ts.isSegmentReady(i) { if ts.isSegmentReady(i) {
@ -105,26 +110,32 @@ func (ts *Stream) run(start int32) error {
len(ts.file.Keyframes), len(ts.file.Keyframes),
) )
var segments_str string start_padding := int32(0)
if end-start-1 > 0 { if start == 0 {
// We do not need the first value (start of the transcode) start_padding = 1
// nor the last one (end of the -to argument)
// if we specify one of those, ffmpeg creates a really small segment and
// we miss a segment's worth of data
segments := make([]string, end-start-1)
for i := range segments {
time := ts.file.Keyframes[int(start)+i+1] - ts.file.Keyframes[start]
segments[i] = fmt.Sprintf("%.6f", time)
} }
segments_str = strings.Join(segments, ",") end_padding := int32(1)
} else { if end == int32(len(ts.file.Keyframes)) {
// we set a dummy value when we want to have a single segment. end_padding = 0
// this can't be left empty else ffmpeg errors out }
// this can't be the length of the file else ffmpeg risk creating a second segment segments := ts.file.Keyframes[start+start_padding : end+end_padding]
segments_str = "9999999999" segments_str := strings.Join(Map(segments, func(seg float64, _ int) string {
seg = seg - ts.file.Keyframes[start]
return fmt.Sprintf("%.6f", seg)
}), ",")
if segments_str == "" {
// we can't leave that empty else ffmpeg errors out.
segments_str = "99999999"
} }
outpath := ts.handle.getOutPath() // We do not need the first value (start of the transcode)
// but we want the last one (the same as the -to argument)
// ffmpeg might create an extra segment for (starting from the -to aka end-limit and ending a few ms afters)
// this extra segment will contains duplicate value as
// if we specify one of those, ffmpeg creates a really small segment and
// we miss a segment's worth of data
outpath := ts.handle.getOutPath(encoder_id)
err := os.MkdirAll(filepath.Dir(outpath), 0o755) err := os.MkdirAll(filepath.Dir(outpath), 0o755)
if err != nil { if err != nil {
return err return err
@ -144,16 +155,20 @@ func (ts *Stream) run(start int32) error {
) )
} }
args = append(args, ts.handle.getTranscodeArgs(segments_str)...) args = append(args, ts.handle.getTranscodeArgs(segments_str)...)
args = append(args, []string{ args = append(args,
"-f", "segment", "-f", "segment",
"-segment_time_delta", "0.2", "-segment_time_delta", "0.2",
"-segment_format", "mpegts", "-segment_format", "mpegts",
"-segment_times", segments_str, "-segment_times", segments_str,
"-segment_start_number", fmt.Sprint(start),
"-segment_list_type", "flat", "-segment_list_type", "flat",
"-segment_list", "pipe:1", "-segment_list", "pipe:1",
outpath, )
}...) if start != 0 {
args = append(args,
"-segment_start_number", fmt.Sprint(start-1),
)
}
args = append(args, outpath)
cmd := exec.Command("ffmpeg", args...) cmd := exec.Command("ffmpeg", args...)
log.Printf("Running %s", strings.Join(cmd.Args, " ")) log.Printf("Running %s", strings.Join(cmd.Args, " "))
@ -182,15 +197,21 @@ func (ts *Stream) run(start int32) error {
var segment int32 var segment int32
_, _ = fmt.Sscanf(scanner.Text(), format, &segment) _, _ = fmt.Sscanf(scanner.Text(), format, &segment)
if segment < start {
log.Printf("Ignore pre-start segment (%d), start %d", segment, start)
continue
}
ts.lock.Lock() ts.lock.Lock()
ts.heads[encoder_id].segment = segment ts.heads[encoder_id].segment = segment
log.Printf("Segment got ready %d %d", segment, end)
if ts.isSegmentReady(segment) { if ts.isSegmentReady(segment) {
// the current segment is already marked at done so another process has already gone up to here. // the current segment is already marked at done so another process has already gone up to here.
cmd.Process.Signal(os.Interrupt) cmd.Process.Signal(os.Interrupt)
log.Printf("Killing ffmpeg because segment %d is already ready", segment) log.Printf("Killing ffmpeg because segment %d is already ready", segment)
should_stop = true should_stop = true
} else { } else {
close(ts.segments[segment]) ts.segments[segment].encoder = encoder_id
close(ts.segments[segment].channel)
if segment == end-1 { if segment == end-1 {
// file finished, ffmped will finish soon on it's own // file finished, ffmped will finish soon on it's own
should_stop = true should_stop = true
@ -292,17 +313,13 @@ func (ts *Stream) GetSegment(segment int32) (string, error) {
log.Printf("Waiting for segment %d since encoder head is %fs aways", segment, distance) log.Printf("Waiting for segment %d since encoder head is %fs aways", segment, distance)
} }
ts.lock.RLock()
ready_chan := ts.segments[segment]
ts.lock.RUnlock()
select { select {
case <-ready_chan: case <-ts.segments[segment].channel:
case <-time.After(60 * time.Second): case <-time.After(60 * time.Second):
return "", errors.New("could not retrive the selected segment (timeout)") return "", errors.New("could not retrive the selected segment (timeout)")
} }
} }
return fmt.Sprintf(ts.handle.getOutPath(), segment), nil return fmt.Sprintf(ts.handle.getOutPath(ts.segments[segment].encoder), segment), nil
} }
func (ts *Stream) getMinEncoderDistance(segment int32) float64 { func (ts *Stream) getMinEncoderDistance(segment int32) float64 {

View File

@ -18,8 +18,8 @@ func NewVideoStream(file *FileStream, quality Quality) *VideoStream {
return ret return ret
} }
func (vs *VideoStream) getOutPath() string { func (vs *VideoStream) getOutPath(encoder_id int) string {
return fmt.Sprintf("%s/segment-%s-%%06d.ts", vs.file.Out, vs.quality) return fmt.Sprintf("%s/segment-%s-%d-%%d.ts", vs.file.Out, vs.quality, encoder_id)
} }
func (vs *VideoStream) getTranscodeArgs(segments string) []string { func (vs *VideoStream) getTranscodeArgs(segments string) []string {