diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index 8bb04e8d..7fc1cd2d 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -24,8 +24,10 @@ type Stream struct { // channel open if the segment is not ready. closed if ready. // one can check if segment 1 is open by doing: // - // _, ready := <- ts.segments[1] + // ts.isSegmentReady(1). // + // You can also wait for it to be ready (non-blocking if already ready) by doing: + // <-ts.segments[i] segments []chan (struct{}) heads []int32 commands []*exec.Cmd @@ -33,13 +35,38 @@ type Stream struct { lock sync.RWMutex } +func NewStream(file *FileStream) Stream { + ret := Stream{ + file: file, + Clients: make([]string, 0), + segments: make([]chan struct{}, len(file.Keyframes)), + heads: make([]int32, 0), + commands: make([]*exec.Cmd, 0), + } + for seg := range ret.segments { + ret.segments[seg] = make(chan struct{}) + } + // Copy default value before use is safe. Next warning can be safely ignored + return ret +} + +// Remember to lock before calling this. +func (ts *Stream) isSegmentReady(segment int32) bool { + select { + case _, ready := <-ts.segments[segment]: + return ready + default: + return false + } +} + func (ts *Stream) run(start int32) error { // Start the transcode up to the 100th segment (or less) // Stop at the first finished segment end := min(start+100, int32(len(ts.file.Keyframes))) ts.lock.RLock() for i := start; i < end; i++ { - if _, ready := <-ts.segments[i]; ready { + if ts.isSegmentReady(i) { end = i break } @@ -112,14 +139,14 @@ func (ts *Stream) run(start int32) error { ts.lock.Lock() close(ts.segments[segment]) ts.heads[encoder_id] = segment + ts.lock.Unlock() if int32(len(ts.segments)) == segment+1 { // file finished, ffmped will finish soon on it's own - } else if _, ready := <-ts.segments[segment+1]; ready { + } else if ts.isSegmentReady(segment + 1) { // ask ffmpeg to stop gracefully (nicer cmd.Process.Kill()) cmd.Process.Signal(os.Interrupt) } - ts.lock.Unlock() } if err := scanner.Err(); err != nil { @@ -164,16 +191,18 @@ func (ts *Stream) GetIndex(client string) (string, error) { func (ts *Stream) GetSegment(segment int32, client string) (string, error) { ts.lock.RLock() - _, ready := <-ts.segments[segment] + ready := ts.isSegmentReady(segment) ts.lock.RUnlock() if !ready { // Only start a new encode if there is more than 10s between the current encoder and the segment. - if ts.getMinEncoderDistance(ts.file.Keyframes[segment]) > 10 { + if distance := ts.getMinEncoderDistance(ts.file.Keyframes[segment]); distance > 10 { err := ts.run(segment) if err != nil { return "", err } + } else { + log.Printf("Waiting for segment %d since encoder head is %fs aways", segment, distance) } ts.lock.RLock() @@ -195,6 +224,9 @@ func (ts *Stream) getMinEncoderDistance(time float64) float64 { } return ts.file.Keyframes[i] - time }) + if len(distances) == 0 { + return math.Inf(1) + } return slices.Min(distances) } diff --git a/transcoder/src/videostream.go b/transcoder/src/videostream.go index 0a660101..7a7f322c 100644 --- a/transcoder/src/videostream.go +++ b/transcoder/src/videostream.go @@ -2,7 +2,7 @@ package src import ( "fmt" - "os/exec" + "log" ) type VideoStream struct { @@ -10,22 +10,12 @@ type VideoStream struct { quality Quality } -func NewVideoStream(file *FileStream, quality Quality) (*VideoStream, error) { - ret := VideoStream{ - Stream: Stream{ - file: file, - Clients: make([]string, 4), - segments: make([]chan struct{}, len(file.Keyframes)), - heads: make([]int32, 1), - commands: make([]*exec.Cmd, 1), - }, +func NewVideoStream(file *FileStream, quality Quality) *VideoStream { + log.Printf("Creating a new video stream for %s in quality %s", file.Path, quality) + return &VideoStream{ + Stream: NewStream(file), quality: quality, } - for seg := range ret.segments { - ret.segments[seg] = make(chan struct{}) - } - ret.run(0) - return &ret, nil } func (vs *VideoStream) getOutPath() string {