Use channels to wait for segments to become ready

This commit is contained in:
Zoe Roux 2024-01-15 14:30:54 +01:00
parent e9738c2bc1
commit 23feea4acc
2 changed files with 19 additions and 7 deletions

View File

@ -20,8 +20,12 @@ type Stream struct {
TranscodeStream TranscodeStream
file *FileStream file *FileStream
Clients []string Clients []string
// true if the segment at given index is completed/transcoded, false otherwise // channel open if the segment is not ready. closed if ready.
segments []bool // one can check if segment 1 is open by doing:
//
// _, ready := <- ts.segments[1]
//
segments []chan (struct{})
heads []int32 heads []int32
// the lock used for the segments array and the heads // the lock used for the segments array and the heads
lock sync.RWMutex lock sync.RWMutex
@ -34,7 +38,7 @@ func (ts *Stream) run(start int32) error {
end := min(start+100, int32(len(ts.file.Keyframes))) end := min(start+100, int32(len(ts.file.Keyframes)))
ts.lock.RLock() ts.lock.RLock()
for i := start; i < end; i++ { for i := start; i < end; i++ {
if ts.segments[i] { if _, ready := <-ts.segments[i]; ready {
end = i end = i
break break
} }
@ -99,7 +103,7 @@ func (ts *Stream) run(start int32) error {
_, _ = fmt.Sscanf(scanner.Text(), "segment-%d.ts", &segment) _, _ = fmt.Sscanf(scanner.Text(), "segment-%d.ts", &segment)
ts.lock.Lock() ts.lock.Lock()
ts.segments[segment] = true close(ts.segments[segment])
ts.heads[encoder_id] = segment ts.heads[encoder_id] = segment
ts.lock.Unlock() ts.lock.Unlock()
} }
@ -140,7 +144,7 @@ func (ts *Stream) GetIndex(client string) (string, error) {
func (ts *Stream) GetSegment(segment int32, client string) (string, error) { func (ts *Stream) GetSegment(segment int32, client string) (string, error) {
ts.lock.RLock() ts.lock.RLock()
ready := ts.segments[segment] _, ready := <-ts.segments[segment]
ts.lock.RUnlock() ts.lock.RUnlock()
if !ready { if !ready {
@ -151,7 +155,12 @@ func (ts *Stream) GetSegment(segment int32, client string) (string, error) {
return "", err return "", err
} }
} }
// TODO: wait for ready
ts.lock.RLock()
ready_chan := ts.segments[segment]
ts.lock.RUnlock()
<-ready_chan
} }
return fmt.Sprintf(ts.getOutPath(), segment), nil return fmt.Sprintf(ts.getOutPath(), segment), nil
} }

View File

@ -12,10 +12,13 @@ func NewVideoStream(file *FileStream, quality Quality) (*VideoStream, error) {
Stream: Stream{ Stream: Stream{
file: file, file: file,
Clients: make([]string, 4), Clients: make([]string, 4),
segments: make([]bool, len(file.Keyframes)), segments: make([]chan struct{}, len(file.Keyframes)),
}, },
quality: quality, quality: quality,
} }
for seg := range ret.segments {
ret.segments[seg] = make(chan struct{})
}
ret.run(0) ret.run(0)
return &ret, nil return &ret, nil
} }