Track and kill orphaned heads

This commit is contained in:
Zoe Roux 2024-01-17 15:00:18 +01:00
parent efe07e39c2
commit ca6ec6a8ed
2 changed files with 68 additions and 7 deletions

View File

@ -21,8 +21,8 @@ type StreamHandle interface {
}
type Stream struct {
handle StreamHandle
file *FileStream
handle StreamHandle
file *FileStream
// channel open if the segment is not ready. closed if ready.
// one can check if segment 1 is open by doing:
//
@ -270,10 +270,17 @@ func (ts *Stream) Kill() {
ts.lock.Lock()
defer ts.lock.Unlock()
for _, cmd := range ts.commands {
if cmd == nil {
continue
}
cmd.Process.Signal(os.Interrupt)
for id := range ts.commands {
ts.KillHead(id)
}
}
// Stream assume to be locked
func (ts *Stream) KillHead(encoder_id int) {
if ts.commands[encoder_id] == nil {
return
}
ts.commands[encoder_id].Process.Signal(os.Interrupt)
ts.commands[encoder_id] = nil
ts.heads[encoder_id] = -1
}

View File

@ -1,5 +1,9 @@
package src
import (
"log"
)
type ClientInfo struct {
client string
path string
@ -22,6 +26,13 @@ func NewTracker(t *Transcoder) *Tracker {
return ret
}
func Abs(x int32) int32 {
if x < 0 {
return -x
}
return x
}
func (t *Tracker) start() {
for {
info := <-t.transcoder.clientChan
@ -44,6 +55,9 @@ func (t *Tracker) start() {
if old.quality != info.quality && old.quality != nil {
t.KillQualityIfDead(old.path, *old.quality)
}
if old.head != -1 && Abs(info.head-old.head) > 100 {
t.KillOrphanedHeads(old.path, old.quality, old.audio)
}
} else if ok {
t.KillStreamIfDead(old.path)
}
@ -92,3 +106,43 @@ func (t *Tracker) KillQualityIfDead(path string, quality Quality) {
defer stream.vlock.RUnlock()
stream.streams[quality].Kill()
}
func (t *Tracker) KillOrphanedHeads(path string, quality *Quality, audio int32) {
t.transcoder.mutex.RLock()
stream := t.transcoder.streams[path]
t.transcoder.mutex.RUnlock()
if quality != nil {
stream.vlock.RLock()
vstream := stream.streams[*quality]
stream.vlock.RUnlock()
t.killOrphanedeheads(&vstream.Stream)
}
if audio != -1 {
stream.alock.RLock()
astream := stream.audios[audio]
stream.alock.RUnlock()
t.killOrphanedeheads(&astream.Stream)
}
}
func (t *Tracker) killOrphanedeheads(stream *Stream) {
stream.lock.Lock()
defer stream.lock.Unlock()
for encoder_id, head := range stream.heads {
distance := int32(99999)
for _, info := range t.clients {
if info.head == -1 {
continue
}
distance = min(Abs(info.head-head), distance)
}
if distance > 100 {
log.Printf("Killing orphaned head %s %d", stream.file.Path, encoder_id)
stream.KillHead(encoder_id)
}
}
}