diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index ab660ac8..bae9651c 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -2,14 +2,17 @@ package src import ( "bufio" + "errors" "fmt" "log" "math" "os" "os/exec" + "path/filepath" "slices" "strings" "sync" + "time" ) type StreamHandle interface { @@ -54,8 +57,9 @@ func NewStream(file *FileStream, handle StreamHandle) Stream { // Remember to lock before calling this. func (ts *Stream) isSegmentReady(segment int32) bool { select { - case _, ready := <-ts.segments[segment]: - return ready + case <-ts.segments[segment]: + // if the channel returned, it means it was closed + return true default: return false } @@ -64,7 +68,7 @@ func (ts *Stream) isSegmentReady(segment int32) bool { func (ts *Stream) run(start int32) error { // Start the transcode up to the 100th segment (or less) // Stop at the first finished segment - end := min(start+100, int32(len(ts.file.Keyframes))) + end := min(start+100, int32(len(ts.file.Keyframes))-1) ts.lock.RLock() for i := start; i < end; i++ { if ts.isSegmentReady(i) { @@ -86,10 +90,11 @@ func (ts *Stream) run(start int32) error { len(ts.file.Keyframes), ) - // We do not need the first value (start of the transcode) - segments := make([]string, end-start-1) + segments := make([]string, end-start) for i := range segments { - segments[i] = fmt.Sprintf("%.6f", ts.file.Keyframes[int(start)+i+1]) + // We do not need the first value (start of the transcode) + time := ts.file.Keyframes[int(start)+i+1] - ts.file.Keyframes[start] + segments[i] = fmt.Sprintf("%.6f", time) } segments_str := strings.Join(segments, ",") @@ -101,11 +106,11 @@ func (ts *Stream) run(start int32) error { args := []string{ "-nostats", "-hide_banner", "-loglevel", "warning", - "-copyts", "-ss", fmt.Sprintf("%.6f", ts.file.Keyframes[start]), - "-to", fmt.Sprintf("%.6f", ts.file.Keyframes[end]), "-i", ts.file.Path, + "-to", fmt.Sprintf("%.6f", ts.file.Keyframes[end]), + "-copyts", } args = append(args, ts.handle.getTranscodeArgs(segments_str)...) args = append(args, []string{ @@ -139,20 +144,36 @@ func (ts *Stream) run(start int32) error { go func() { scanner := bufio.NewScanner(stdout) + format := filepath.Base(outpath) + should_stop := false + for scanner.Scan() { var segment int32 - _, _ = fmt.Sscanf(scanner.Text(), "segment-%d.ts", &segment) + _, _ = fmt.Sscanf(scanner.Text(), format, &segment) ts.lock.Lock() - close(ts.segments[segment]) ts.heads[encoder_id] = segment - ts.lock.Unlock() - - if int32(len(ts.segments)) == segment+1 { - // file finished, ffmped will finish soon on it's own - } else if ts.isSegmentReady(segment + 1) { - // ask ffmpeg to stop gracefully (nicer cmd.Process.Kill()) + log.Printf("encode %d finished %d", encoder_id, segment) + if ts.isSegmentReady(segment) { + // the current segment is already marked at done so another process has already gone up to here. cmd.Process.Signal(os.Interrupt) + should_stop = true + } else { + close(ts.segments[segment]) + if int32(len(ts.segments)) == segment+1 { + // file finished, ffmped will finish soon on it's own + should_stop = true + } else if ts.isSegmentReady(segment + 1) { + cmd.Process.Signal(os.Interrupt) + should_stop = true + } + } + ts.lock.Unlock() + // we need this and not a return in the condition because we want to unlock + // the lock (and can't defer since this is a loop) + if should_stop { + println("close because requested", encoder_id) + return } } @@ -188,8 +209,8 @@ func (ts *Stream) GetIndex(client string) (string, error) { #EXT-X-MEDIA-SEQUENCE:0 ` - for segment := 1; segment < len(ts.file.Keyframes); segment++ { - index += fmt.Sprintf("#EXTINF:%.6f\n", ts.file.Keyframes[segment]-ts.file.Keyframes[segment-1]) + for segment := 0; segment < len(ts.file.Keyframes)-1; segment++ { + index += fmt.Sprintf("#EXTINF:%.6f\n", ts.file.Keyframes[segment+1]-ts.file.Keyframes[segment]) index += fmt.Sprintf("segment-%d.ts\n", segment) } index += `#EXT-X-ENDLIST` @@ -199,34 +220,43 @@ func (ts *Stream) GetIndex(client string) (string, error) { func (ts *Stream) GetSegment(segment int32, client string) (string, error) { ts.lock.RLock() ready := ts.isSegmentReady(segment) + // we want to calculate distance in the same lock else it can be funky + distance := 0. + if !ready { + distance = ts.getMinEncoderDistance(segment) + } ts.lock.RUnlock() if !ready { // Only start a new encode if there is more than 10s between the current encoder and the segment. - if distance := ts.getMinEncoderDistance(ts.file.Keyframes[segment]); distance > 10 { + if distance > 10_000 { + log.Printf("Creating new head for %d since closest head is %fs aways", segment, distance/1000) err := ts.run(segment) if err != nil { return "", err } } else { - log.Printf("Waiting for segment %d since encoder head is %fs aways", segment, distance) + log.Printf("Waiting for segment %d since encoder head is %fs aways", segment, distance/1000) } ts.lock.RLock() ready_chan := ts.segments[segment] ts.lock.RUnlock() - <-ready_chan + select { + case <-ready_chan: + case <-time.After(10 * time.Second): + return "", errors.New("could not retrive the selected segment (timeout)") + } } - return fmt.Sprintf(ts.getOutPath(), segment), nil + return fmt.Sprintf(ts.handle.getOutPath(), segment), nil } -func (ts *Stream) getMinEncoderDistance(time float64) float64 { - ts.lock.RLock() - defer ts.lock.RUnlock() +func (ts *Stream) getMinEncoderDistance(segment int32) float64 { + time := ts.file.Keyframes[segment] distances := Map(ts.heads, func(i int32, _ int) float64 { // ignore killed heads or heads after the current time - if i < 0 || ts.file.Keyframes[i] > time { + if i < 0 || ts.file.Keyframes[i] < time { return math.Inf(1) } return ts.file.Keyframes[i] - time