diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index e1d11e8f..6af6543c 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -20,8 +20,12 @@ type Stream struct { TranscodeStream file *FileStream Clients []string - // true if the segment at given index is completed/transcoded, false otherwise - segments []bool + // channel open if the segment is not ready. closed if ready. + // one can check if segment 1 is open by doing: + // + // _, ready := <- ts.segments[1] + // + segments []chan (struct{}) heads []int32 // the lock used for the segments array and the heads lock sync.RWMutex @@ -34,7 +38,7 @@ func (ts *Stream) run(start int32) error { end := min(start+100, int32(len(ts.file.Keyframes))) ts.lock.RLock() for i := start; i < end; i++ { - if ts.segments[i] { + if _, ready := <-ts.segments[i]; ready { end = i break } @@ -99,7 +103,7 @@ func (ts *Stream) run(start int32) error { _, _ = fmt.Sscanf(scanner.Text(), "segment-%d.ts", &segment) ts.lock.Lock() - ts.segments[segment] = true + close(ts.segments[segment]) ts.heads[encoder_id] = segment ts.lock.Unlock() } @@ -140,7 +144,7 @@ func (ts *Stream) GetIndex(client string) (string, error) { func (ts *Stream) GetSegment(segment int32, client string) (string, error) { ts.lock.RLock() - ready := ts.segments[segment] + _, ready := <-ts.segments[segment] ts.lock.RUnlock() if !ready { @@ -151,7 +155,12 @@ func (ts *Stream) GetSegment(segment int32, client string) (string, error) { return "", err } } - // TODO: wait for ready + + ts.lock.RLock() + ready_chan := ts.segments[segment] + ts.lock.RUnlock() + + <-ready_chan } return fmt.Sprintf(ts.getOutPath(), segment), nil } diff --git a/transcoder/src/videostream.go b/transcoder/src/videostream.go index de448f21..901b1092 100644 --- a/transcoder/src/videostream.go +++ b/transcoder/src/videostream.go @@ -12,10 +12,13 @@ func NewVideoStream(file *FileStream, quality Quality) (*VideoStream, error) { Stream: Stream{ file: file, Clients: make([]string, 4), - segments: make([]bool, len(file.Keyframes)), + segments: make([]chan struct{}, len(file.Keyframes)), }, quality: quality, } + for seg := range ret.segments { + ret.segments[seg] = make(chan struct{}) + } ret.run(0) return &ret, nil }