diff --git a/transcoder/src/audiostream.go b/transcoder/src/audiostream.go index 8ce0e822..8e393501 100644 --- a/transcoder/src/audiostream.go +++ b/transcoder/src/audiostream.go @@ -18,8 +18,8 @@ func NewAudioStream(file *FileStream, idx int32) *AudioStream { return ret } -func (as *AudioStream) getOutPath() string { - return fmt.Sprintf("%s/segment-a%d-%%06d.ts", as.file.Out, as.index) +func (as *AudioStream) getOutPath(encoder_id int) string { + return fmt.Sprintf("%s/segment-a%d-%d-%%d.ts", as.file.Out, as.index, encoder_id) } func (as *AudioStream) getTranscodeArgs(segments string) []string { diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index 35cba9a1..734bbcd9 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -17,12 +17,19 @@ import ( type StreamHandle interface { getTranscodeArgs(segments string) []string - getOutPath() string + getOutPath(encoder_id int) string } type Stream struct { - handle StreamHandle - file *FileStream + handle StreamHandle + file *FileStream + segments []Segment + heads []Head + // the lock used for the the heads + lock sync.RWMutex +} + +type Segment struct { // channel open if the segment is not ready. closed if ready. // one can check if segment 1 is open by doing: // @@ -30,10 +37,8 @@ type Stream struct { // // You can also wait for it to be ready (non-blocking if already ready) by doing: // <-ts.segments[i] - segments []chan (struct{}) - heads []Head - // the lock used for the segments array and the heads - lock sync.RWMutex + channel chan (struct{}) + encoder int } type Head struct { @@ -52,11 +57,11 @@ func NewStream(file *FileStream, handle StreamHandle) Stream { ret := Stream{ handle: handle, file: file, - segments: make([]chan struct{}, len(file.Keyframes)), + segments: make([]Segment, len(file.Keyframes)), heads: make([]Head, 0), } for seg := range ret.segments { - ret.segments[seg] = make(chan struct{}) + ret.segments[seg].channel = make(chan struct{}) } // Copy default value before use is safe. Next warning can be safely ignored return ret @@ -65,7 +70,7 @@ func NewStream(file *FileStream, handle StreamHandle) Stream { // Remember to lock before calling this. func (ts *Stream) isSegmentReady(segment int32) bool { select { - case <-ts.segments[segment]: + case <-ts.segments[segment].channel: // if the channel returned, it means it was closed return true default: @@ -85,7 +90,7 @@ func (ts *Stream) isSegmentTranscoding(segment int32) bool { func (ts *Stream) run(start int32) error { // Start the transcode up to the 100th segment (or less) // Stop at the first finished segment - end := min(start+100, int32(len(ts.file.Keyframes))-1) + end := min(start+3, int32(len(ts.file.Keyframes))) ts.lock.Lock() for i := start; i < end; i++ { if ts.isSegmentReady(i) { @@ -105,26 +110,32 @@ func (ts *Stream) run(start int32) error { len(ts.file.Keyframes), ) - var segments_str string - if end-start-1 > 0 { - // We do not need the first value (start of the transcode) - // nor the last one (end of the -to argument) - // if we specify one of those, ffmpeg creates a really small segment and - // we miss a segment's worth of data - segments := make([]string, end-start-1) - for i := range segments { - time := ts.file.Keyframes[int(start)+i+1] - ts.file.Keyframes[start] - segments[i] = fmt.Sprintf("%.6f", time) - } - segments_str = strings.Join(segments, ",") - } else { - // we set a dummy value when we want to have a single segment. - // this can't be left empty else ffmpeg errors out - // this can't be the length of the file else ffmpeg risk creating a second segment - segments_str = "9999999999" + start_padding := int32(0) + if start == 0 { + start_padding = 1 + } + end_padding := int32(1) + if end == int32(len(ts.file.Keyframes)) { + end_padding = 0 + } + segments := ts.file.Keyframes[start+start_padding : end+end_padding] + segments_str := strings.Join(Map(segments, func(seg float64, _ int) string { + seg = seg - ts.file.Keyframes[start] + return fmt.Sprintf("%.6f", seg) + }), ",") + if segments_str == "" { + // we can't leave that empty else ffmpeg errors out. + segments_str = "99999999" } - outpath := ts.handle.getOutPath() + // We do not need the first value (start of the transcode) + // but we want the last one (the same as the -to argument) + // ffmpeg might create an extra segment for (starting from the -to aka end-limit and ending a few ms afters) + // this extra segment will contains duplicate value as + // if we specify one of those, ffmpeg creates a really small segment and + // we miss a segment's worth of data + + outpath := ts.handle.getOutPath(encoder_id) err := os.MkdirAll(filepath.Dir(outpath), 0o755) if err != nil { return err @@ -144,16 +155,20 @@ func (ts *Stream) run(start int32) error { ) } args = append(args, ts.handle.getTranscodeArgs(segments_str)...) - args = append(args, []string{ + args = append(args, "-f", "segment", "-segment_time_delta", "0.2", "-segment_format", "mpegts", "-segment_times", segments_str, - "-segment_start_number", fmt.Sprint(start), "-segment_list_type", "flat", "-segment_list", "pipe:1", - outpath, - }...) + ) + if start != 0 { + args = append(args, + "-segment_start_number", fmt.Sprint(start-1), + ) + } + args = append(args, outpath) cmd := exec.Command("ffmpeg", args...) log.Printf("Running %s", strings.Join(cmd.Args, " ")) @@ -182,15 +197,21 @@ func (ts *Stream) run(start int32) error { var segment int32 _, _ = fmt.Sscanf(scanner.Text(), format, &segment) + if segment < start { + log.Printf("Ignore pre-start segment (%d), start %d", segment, start) + continue + } ts.lock.Lock() ts.heads[encoder_id].segment = segment + log.Printf("Segment got ready %d %d", segment, end) if ts.isSegmentReady(segment) { // the current segment is already marked at done so another process has already gone up to here. cmd.Process.Signal(os.Interrupt) log.Printf("Killing ffmpeg because segment %d is already ready", segment) should_stop = true } else { - close(ts.segments[segment]) + ts.segments[segment].encoder = encoder_id + close(ts.segments[segment].channel) if segment == end-1 { // file finished, ffmped will finish soon on it's own should_stop = true @@ -292,17 +313,13 @@ func (ts *Stream) GetSegment(segment int32) (string, error) { log.Printf("Waiting for segment %d since encoder head is %fs aways", segment, distance) } - ts.lock.RLock() - ready_chan := ts.segments[segment] - ts.lock.RUnlock() - select { - case <-ready_chan: + case <-ts.segments[segment].channel: case <-time.After(60 * time.Second): return "", errors.New("could not retrive the selected segment (timeout)") } } - return fmt.Sprintf(ts.handle.getOutPath(), segment), nil + return fmt.Sprintf(ts.handle.getOutPath(ts.segments[segment].encoder), segment), nil } func (ts *Stream) getMinEncoderDistance(segment int32) float64 { diff --git a/transcoder/src/videostream.go b/transcoder/src/videostream.go index ae73e562..bc5b3056 100644 --- a/transcoder/src/videostream.go +++ b/transcoder/src/videostream.go @@ -18,8 +18,8 @@ func NewVideoStream(file *FileStream, quality Quality) *VideoStream { return ret } -func (vs *VideoStream) getOutPath() string { - return fmt.Sprintf("%s/segment-%s-%%06d.ts", vs.file.Out, vs.quality) +func (vs *VideoStream) getOutPath(encoder_id int) string { + return fmt.Sprintf("%s/segment-%s-%d-%%d.ts", vs.file.Out, vs.quality, encoder_id) } func (vs *VideoStream) getTranscodeArgs(segments string) []string {