diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index 899ef273..a90c2241 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -93,11 +93,19 @@ func (ts *Stream) run(start int32) error { end := min(start+3, int32(len(ts.file.Keyframes))) ts.lock.Lock() for i := start; i < end; i++ { - if ts.isSegmentReady(i) { + if ts.isSegmentReady(i) || ts.isSegmentTranscoding(i) { end = i break } } + if start == end { + // this can happens if the start segment was finished between the check + // to call run() and the actual call. + // since most checks are done in a RLock() instead of a Lock() this can + // happens when two goroutines try to make the same segment ready + ts.lock.Unlock() + return nil + } encoder_id := len(ts.heads) ts.heads = append(ts.heads, Head{segment: start, end: end, command: nil}) ts.lock.Unlock() @@ -211,6 +219,7 @@ func (ts *Stream) run(start int32) error { } ts.lock.Lock() ts.heads[encoder_id].segment = segment + log.Printf("Segment %d got ready (%d)", segment, encoder_id) if ts.isSegmentReady(segment) { // the current segment is already marked at done so another process has already gone up to here. cmd.Process.Signal(os.Interrupt) @@ -244,11 +253,11 @@ func (ts *Stream) run(start int32) error { go func() { err := cmd.Wait() if exiterr, ok := err.(*exec.ExitError); ok && exiterr.ExitCode() == 255 { - log.Println("ffmpeg was killed by us") + log.Printf("ffmpeg %d was killed by us", encoder_id) } else if err != nil { - log.Println("ffmpeg occured an error", err, stderr.String()) + log.Printf("ffmpeg %d occured an error: %s: %s", encoder_id, err, stderr.String()) } else { - log.Println("ffmpeg finished successfully") + log.Printf("ffmpeg %d finished successfully", encoder_id) } ts.lock.Lock() @@ -292,19 +301,6 @@ func (ts *Stream) GetSegment(segment int32) (string, error) { break } } - } else if !ts.isSegmentTranscoding(segment) { - // if the current segment is ready, check next segments are ready and if not start a transcode for them - for i := segment + 1; i <= min(segment+10, int32(len(ts.segments)-1)); i++ { - if ts.isSegmentTranscoding(i) { - // no need to create a new transcoder if there is already one running for this segment - break - } - if !ts.isSegmentReady(i) { - log.Printf("Creating new head for future segment (%d)", i) - go ts.run(i) - break - } - } } ts.lock.RUnlock() @@ -326,14 +322,33 @@ func (ts *Stream) GetSegment(segment int32) (string, error) { return "", errors.New("could not retrive the selected segment (timeout)") } } + ts.prerareNextSegements(segment) return fmt.Sprintf(ts.handle.getOutPath(ts.segments[segment].encoder), segment), nil } +func (ts *Stream) prerareNextSegements(segment int32) { + ts.lock.RLock() + defer ts.lock.RUnlock() + for i := segment + 1; i <= min(segment+10, int32(len(ts.segments)-1)); i++ { + if ts.isSegmentReady(i) { + continue + } + // only start encode for segments not planned (getMinEncoderDistance returns Inf for them) + // or if they are 60s away (asume 5s per segments) + if ts.getMinEncoderDistance(i) < 60+(5*float64(i-segment)) { + continue + } + log.Printf("Creating new head for future segment (%d)", i) + go ts.run(i) + return + } +} + func (ts *Stream) getMinEncoderDistance(segment int32) float64 { time := ts.file.Keyframes[segment] distances := Map(ts.heads, func(head Head, _ int) float64 { // ignore killed heads or heads after the current time - if head.segment < 0 || ts.file.Keyframes[head.segment] > time { + if head.segment < 0 || ts.file.Keyframes[head.segment] > time || segment >= head.end { return math.Inf(1) } return time - ts.file.Keyframes[head.segment]