diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index 8950bcd2..5b8e5df1 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -21,8 +21,8 @@ type StreamHandle interface { } type Stream struct { - handle StreamHandle - file *FileStream + handle StreamHandle + file *FileStream // channel open if the segment is not ready. closed if ready. // one can check if segment 1 is open by doing: // @@ -270,10 +270,17 @@ func (ts *Stream) Kill() { ts.lock.Lock() defer ts.lock.Unlock() - for _, cmd := range ts.commands { - if cmd == nil { - continue - } - cmd.Process.Signal(os.Interrupt) + for id := range ts.commands { + ts.KillHead(id) } } + +// Stream assume to be locked +func (ts *Stream) KillHead(encoder_id int) { + if ts.commands[encoder_id] == nil { + return + } + ts.commands[encoder_id].Process.Signal(os.Interrupt) + ts.commands[encoder_id] = nil + ts.heads[encoder_id] = -1 +} diff --git a/transcoder/src/tracker.go b/transcoder/src/tracker.go index 160fd9f1..a9958797 100644 --- a/transcoder/src/tracker.go +++ b/transcoder/src/tracker.go @@ -1,5 +1,9 @@ package src +import ( + "log" +) + type ClientInfo struct { client string path string @@ -22,6 +26,13 @@ func NewTracker(t *Transcoder) *Tracker { return ret } +func Abs(x int32) int32 { + if x < 0 { + return -x + } + return x +} + func (t *Tracker) start() { for { info := <-t.transcoder.clientChan @@ -44,6 +55,9 @@ func (t *Tracker) start() { if old.quality != info.quality && old.quality != nil { t.KillQualityIfDead(old.path, *old.quality) } + if old.head != -1 && Abs(info.head-old.head) > 100 { + t.KillOrphanedHeads(old.path, old.quality, old.audio) + } } else if ok { t.KillStreamIfDead(old.path) } @@ -92,3 +106,43 @@ func (t *Tracker) KillQualityIfDead(path string, quality Quality) { defer stream.vlock.RUnlock() stream.streams[quality].Kill() } + +func (t *Tracker) KillOrphanedHeads(path string, quality *Quality, audio int32) { + t.transcoder.mutex.RLock() + stream := t.transcoder.streams[path] + t.transcoder.mutex.RUnlock() + + if quality != nil { + stream.vlock.RLock() + vstream := stream.streams[*quality] + stream.vlock.RUnlock() + + t.killOrphanedeheads(&vstream.Stream) + } + if audio != -1 { + stream.alock.RLock() + astream := stream.audios[audio] + stream.alock.RUnlock() + + t.killOrphanedeheads(&astream.Stream) + } +} + +func (t *Tracker) killOrphanedeheads(stream *Stream) { + stream.lock.Lock() + defer stream.lock.Unlock() + + for encoder_id, head := range stream.heads { + distance := int32(99999) + for _, info := range t.clients { + if info.head == -1 { + continue + } + distance = min(Abs(info.head-head), distance) + } + if distance > 100 { + log.Printf("Killing orphaned head %s %d", stream.file.Path, encoder_id) + stream.KillHead(encoder_id) + } + } +}