Fix ready check for segments blocking

This commit is contained in:
Zoe Roux 2024-01-15 19:15:37 +01:00
parent 6bb23d7acd
commit f04d1dcf1e
2 changed files with 43 additions and 21 deletions

View File

@ -24,8 +24,10 @@ type Stream struct {
// channel open if the segment is not ready. closed if ready.
// one can check if segment 1 is open by doing:
//
// _, ready := <- ts.segments[1]
// ts.isSegmentReady(1).
//
// You can also wait for it to be ready (non-blocking if already ready) by doing:
// <-ts.segments[i]
segments []chan (struct{})
heads []int32
commands []*exec.Cmd
@ -33,13 +35,38 @@ type Stream struct {
lock sync.RWMutex
}
func NewStream(file *FileStream) Stream {
ret := Stream{
file: file,
Clients: make([]string, 0),
segments: make([]chan struct{}, len(file.Keyframes)),
heads: make([]int32, 0),
commands: make([]*exec.Cmd, 0),
}
for seg := range ret.segments {
ret.segments[seg] = make(chan struct{})
}
// Copy default value before use is safe. Next warning can be safely ignored
return ret
}
// Remember to lock before calling this.
func (ts *Stream) isSegmentReady(segment int32) bool {
select {
case _, ready := <-ts.segments[segment]:
return ready
default:
return false
}
}
func (ts *Stream) run(start int32) error {
// Start the transcode up to the 100th segment (or less)
// Stop at the first finished segment
end := min(start+100, int32(len(ts.file.Keyframes)))
ts.lock.RLock()
for i := start; i < end; i++ {
if _, ready := <-ts.segments[i]; ready {
if ts.isSegmentReady(i) {
end = i
break
}
@ -112,14 +139,14 @@ func (ts *Stream) run(start int32) error {
ts.lock.Lock()
close(ts.segments[segment])
ts.heads[encoder_id] = segment
ts.lock.Unlock()
if int32(len(ts.segments)) == segment+1 {
// file finished, ffmped will finish soon on it's own
} else if _, ready := <-ts.segments[segment+1]; ready {
} else if ts.isSegmentReady(segment + 1) {
// ask ffmpeg to stop gracefully (nicer cmd.Process.Kill())
cmd.Process.Signal(os.Interrupt)
}
ts.lock.Unlock()
}
if err := scanner.Err(); err != nil {
@ -164,16 +191,18 @@ func (ts *Stream) GetIndex(client string) (string, error) {
func (ts *Stream) GetSegment(segment int32, client string) (string, error) {
ts.lock.RLock()
_, ready := <-ts.segments[segment]
ready := ts.isSegmentReady(segment)
ts.lock.RUnlock()
if !ready {
// Only start a new encode if there is more than 10s between the current encoder and the segment.
if ts.getMinEncoderDistance(ts.file.Keyframes[segment]) > 10 {
if distance := ts.getMinEncoderDistance(ts.file.Keyframes[segment]); distance > 10 {
err := ts.run(segment)
if err != nil {
return "", err
}
} else {
log.Printf("Waiting for segment %d since encoder head is %fs aways", segment, distance)
}
ts.lock.RLock()
@ -195,6 +224,9 @@ func (ts *Stream) getMinEncoderDistance(time float64) float64 {
}
return ts.file.Keyframes[i] - time
})
if len(distances) == 0 {
return math.Inf(1)
}
return slices.Min(distances)
}

View File

@ -2,7 +2,7 @@ package src
import (
"fmt"
"os/exec"
"log"
)
type VideoStream struct {
@ -10,22 +10,12 @@ type VideoStream struct {
quality Quality
}
func NewVideoStream(file *FileStream, quality Quality) (*VideoStream, error) {
ret := VideoStream{
Stream: Stream{
file: file,
Clients: make([]string, 4),
segments: make([]chan struct{}, len(file.Keyframes)),
heads: make([]int32, 1),
commands: make([]*exec.Cmd, 1),
},
func NewVideoStream(file *FileStream, quality Quality) *VideoStream {
log.Printf("Creating a new video stream for %s in quality %s", file.Path, quality)
return &VideoStream{
Stream: NewStream(file),
quality: quality,
}
for seg := range ret.segments {
ret.segments[seg] = make(chan struct{})
}
ret.run(0)
return &ret, nil
}
func (vs *VideoStream) getOutPath() string {