diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index cb577441..dec0c47f 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -31,19 +31,29 @@ type Stream struct { // You can also wait for it to be ready (non-blocking if already ready) by doing: // <-ts.segments[i] segments []chan (struct{}) - heads []int32 - commands []*exec.Cmd + heads []Head // the lock used for the segments array and the heads lock sync.RWMutex } +type Head struct { + segment int32 + end int32 + command *exec.Cmd +} + +var DeletedHead = Head{ + segment: -1, + end: -1, + command: nil, +} + func NewStream(file *FileStream, handle StreamHandle) Stream { ret := Stream{ handle: handle, file: file, segments: make([]chan struct{}, len(file.Keyframes)), - heads: make([]int32, 0), - commands: make([]*exec.Cmd, 0), + heads: make([]Head, 0), } for seg := range ret.segments { ret.segments[seg] = make(chan struct{}) @@ -63,6 +73,15 @@ func (ts *Stream) isSegmentReady(segment int32) bool { } } +func (ts *Stream) isSegmentTranscoding(segment int32) bool { + for _, head := range ts.heads { + if head.segment == segment { + return true + } + } + return false +} + func (ts *Stream) run(start int32) error { // Start the transcode up to the 100th segment (or less) // Stop at the first finished segment @@ -75,9 +94,7 @@ func (ts *Stream) run(start int32) error { } } encoder_id := len(ts.heads) - ts.heads = append(ts.heads, start) - // we set nil while the command has not started, this is just to reserve the index - ts.commands = append(ts.commands, nil) + ts.heads = append(ts.heads, Head{segment: start, end: end, command: nil}) ts.lock.Unlock() log.Printf( @@ -147,7 +164,7 @@ func (ts *Stream) run(start int32) error { return err } ts.lock.Lock() - ts.commands[encoder_id] = cmd + ts.heads[encoder_id].command = cmd ts.lock.Unlock() go func() { @@ -160,7 +177,7 @@ func (ts *Stream) run(start int32) error { _, _ = fmt.Sscanf(scanner.Text(), format, &segment) ts.lock.Lock() - ts.heads[encoder_id] = segment + ts.heads[encoder_id].segment = segment if ts.isSegmentReady(segment) { // the current segment is already marked at done so another process has already gone up to here. cmd.Process.Signal(os.Interrupt) @@ -203,8 +220,7 @@ func (ts *Stream) run(start int32) error { ts.lock.Lock() defer ts.lock.Unlock() // we can't delete the head directly because it would invalidate the others encoder_id - ts.heads[encoder_id] = -1 - ts.commands[encoder_id] = nil + ts.heads[encoder_id] = DeletedHead }() return nil @@ -233,12 +249,19 @@ func (ts *Stream) GetSegment(segment int32) (string, error) { ready := ts.isSegmentReady(segment) // we want to calculate distance in the same lock else it can be funky distance := 0. + is_scheduled := false if !ready { distance = ts.getMinEncoderDistance(segment) - } else if !slices.Contains(ts.heads, segment) { + for _, head := range ts.heads { + if head.segment <= segment && segment < head.end { + is_scheduled = true + break + } + } + } else if !ts.isSegmentTranscoding(segment) { // if the current segment is ready, check next segments are ready and if not start a transcode for them for i := segment + 1; i <= min(segment+10, int32(len(ts.segments)-1)); i++ { - if slices.Contains(ts.heads, i) { + if ts.isSegmentTranscoding(i) { // no need to create a new transcoder if there is already one running for this segment break } @@ -253,7 +276,7 @@ func (ts *Stream) GetSegment(segment int32) (string, error) { if !ready { // Only start a new encode if there is too big a distance between the current encoder and the segment. - if distance > 60 { + if distance > 60 || !is_scheduled { log.Printf("Creating new head for %d since closest head is %fs aways", segment, distance) err := ts.run(segment) if err != nil { @@ -278,12 +301,12 @@ func (ts *Stream) GetSegment(segment int32) (string, error) { func (ts *Stream) getMinEncoderDistance(segment int32) float64 { time := ts.file.Keyframes[segment] - distances := Map(ts.heads, func(i int32, _ int) float64 { + distances := Map(ts.heads, func(head Head, _ int) float64 { // ignore killed heads or heads after the current time - if i < 0 || ts.file.Keyframes[i] > time { + if head.segment < 0 || ts.file.Keyframes[head.segment] > time { return math.Inf(1) } - return time - ts.file.Keyframes[i] + return time - ts.file.Keyframes[head.segment] }) if len(distances) == 0 { return math.Inf(1) @@ -295,17 +318,16 @@ func (ts *Stream) Kill() { ts.lock.Lock() defer ts.lock.Unlock() - for id := range ts.commands { + for id := range ts.heads { ts.KillHead(id) } } // Stream assume to be locked func (ts *Stream) KillHead(encoder_id int) { - if ts.commands[encoder_id] == nil { + if ts.heads[encoder_id] == DeletedHead { return } - ts.commands[encoder_id].Process.Signal(os.Interrupt) - ts.commands[encoder_id] = nil - ts.heads[encoder_id] = -1 + ts.heads[encoder_id].command.Process.Signal(os.Interrupt) + ts.heads[encoder_id] = DeletedHead } diff --git a/transcoder/src/tracker.go b/transcoder/src/tracker.go index 933f1573..f5127b5b 100644 --- a/transcoder/src/tracker.go +++ b/transcoder/src/tracker.go @@ -182,9 +182,9 @@ func (t *Tracker) killOrphanedeheads(stream *Stream) { if info.head == -1 { continue } - distance = min(Abs(info.head-head), distance) + distance = min(Abs(info.head-head.segment), distance) } - if distance > 100 { + if distance > 20 { log.Printf("Killing orphaned head %s %d", stream.file.Path, encoder_id) stream.KillHead(encoder_id) }