Create a client tracker to delete old streams

This commit is contained in:
Zoe Roux 2024-01-17 14:36:43 +01:00
parent c738e5bda3
commit efe07e39c2
4 changed files with 162 additions and 45 deletions

View File

@ -170,16 +170,16 @@ func (fs *FileStream) IsDead() bool {
// if the encode is relatively new, don't mark it as dead even if nobody is listening.
return false
}
for _, s := range fs.streams {
if len(s.Clients) > 0 {
return false
}
}
for _, s := range fs.audios {
if len(s.Clients) > 0 {
return false
}
}
// for _, s := range fs.streams {
// if len(s.Clients) > 0 {
// return false
// }
// }
// for _, s := range fs.audios {
// if len(s.Clients) > 0 {
// return false
// }
// }
return true
}
@ -259,14 +259,14 @@ func (fs *FileStream) getVideoStream(quality Quality) *VideoStream {
return fs.streams[quality]
}
func (fs *FileStream) GetVideoIndex(quality Quality, client string) (string, error) {
func (fs *FileStream) GetVideoIndex(quality Quality) (string, error) {
stream := fs.getVideoStream(quality)
return stream.GetIndex(client)
return stream.GetIndex()
}
func (fs *FileStream) GetVideoSegment(quality Quality, segment int32, client string) (string, error) {
func (fs *FileStream) GetVideoSegment(quality Quality, segment int32) (string, error) {
stream := fs.getVideoStream(quality)
return stream.GetSegment(segment, client)
return stream.GetSegment(segment)
}
func (fs *FileStream) getAudioStream(audio int32) *AudioStream {
@ -285,12 +285,12 @@ func (fs *FileStream) getAudioStream(audio int32) *AudioStream {
return fs.audios[audio]
}
func (fs *FileStream) GetAudioIndex(audio int32, client string) (string, error) {
func (fs *FileStream) GetAudioIndex(audio int32) (string, error) {
stream := fs.getAudioStream(audio)
return stream.GetIndex(client)
return stream.GetIndex()
}
func (fs *FileStream) GetAudioSegment(audio int32, segment int32, client string) (string, error) {
func (fs *FileStream) GetAudioSegment(audio int32, segment int32) (string, error) {
stream := fs.getAudioStream(audio)
return stream.GetSegment(segment, client)
return stream.GetSegment(segment)
}

View File

@ -23,7 +23,6 @@ type StreamHandle interface {
type Stream struct {
handle StreamHandle
file *FileStream
Clients []string
// channel open if the segment is not ready. closed if ready.
// one can check if segment 1 is open by doing:
//
@ -42,7 +41,6 @@ func NewStream(file *FileStream, handle StreamHandle) Stream {
ret := Stream{
handle: handle,
file: file,
Clients: make([]string, 0),
segments: make([]chan struct{}, len(file.Keyframes)),
heads: make([]int32, 0),
commands: make([]*exec.Cmd, 0),
@ -201,7 +199,7 @@ func (ts *Stream) run(start int32) error {
return nil
}
func (ts *Stream) GetIndex(client string) (string, error) {
func (ts *Stream) GetIndex() (string, error) {
index := `#EXTM3U
#EXT-X-VERSION:3
#EXT-X-PLAYLIST-TYPE:VOD
@ -218,7 +216,7 @@ func (ts *Stream) GetIndex(client string) (string, error) {
return index, nil
}
func (ts *Stream) GetSegment(segment int32, client string) (string, error) {
func (ts *Stream) GetSegment(segment int32) (string, error) {
ts.lock.RLock()
ready := ts.isSegmentReady(segment)
// we want to calculate distance in the same lock else it can be funky

94
transcoder/src/tracker.go Normal file
View File

@ -0,0 +1,94 @@
package src
type ClientInfo struct {
client string
path string
quality *Quality
audio int32
head int32
}
type Tracker struct {
clients map[string]ClientInfo
transcoder *Transcoder
}
func NewTracker(t *Transcoder) *Tracker {
ret := &Tracker{
clients: make(map[string]ClientInfo),
transcoder: t,
}
go ret.start()
return ret
}
func (t *Tracker) start() {
for {
info := <-t.transcoder.clientChan
old, ok := t.clients[info.client]
if ok && old.path == info.path {
// First fixup the info. Most routes ruturn partial infos
if info.quality == nil {
info.quality = old.quality
}
if info.audio == -1 {
info.audio = old.audio
}
if info.head == -1 {
info.head = old.head
}
if old.audio != info.audio && old.audio != -1 {
t.KillAudioIfDead(old.path, old.audio)
}
if old.quality != info.quality && old.quality != nil {
t.KillQualityIfDead(old.path, *old.quality)
}
} else if ok {
t.KillStreamIfDead(old.path)
}
t.clients[info.client] = info
}
}
func (t *Tracker) KillStreamIfDead(path string) {
for _, stream := range t.clients {
if stream.path == path {
return
}
}
t.transcoder.mutex.Lock()
defer t.transcoder.mutex.Unlock()
t.transcoder.streams[path].Destroy()
delete(t.transcoder.streams, path)
}
func (t *Tracker) KillAudioIfDead(path string, audio int32) {
for _, stream := range t.clients {
if stream.path == path && stream.audio == audio {
return
}
}
t.transcoder.mutex.RLock()
stream := t.transcoder.streams[path]
t.transcoder.mutex.RUnlock()
stream.alock.RLock()
defer stream.alock.RUnlock()
stream.audios[audio].Kill()
}
func (t *Tracker) KillQualityIfDead(path string, quality Quality) {
for _, stream := range t.clients {
if stream.path == path && stream.quality != nil && *stream.quality == quality {
return
}
}
t.transcoder.mutex.RLock()
stream := t.transcoder.streams[path]
t.transcoder.mutex.RUnlock()
stream.vlock.RLock()
defer stream.vlock.RUnlock()
stream.streams[quality].Kill()
}

View File

@ -14,6 +14,8 @@ type Transcoder struct {
// Streams that are staring up
preparing map[string]chan *FileStream
mutex sync.RWMutex
clientChan chan ClientInfo
tracker *Tracker
}
func NewTranscoder() (*Transcoder, error) {
@ -29,10 +31,13 @@ func NewTranscoder() (*Transcoder, error) {
}
}
return &Transcoder{
ret := &Transcoder{
streams: make(map[string]*FileStream),
preparing: make(map[string]chan *FileStream),
}, nil
clientChan: make(chan ClientInfo, 10),
}
ret.tracker = NewTracker(ret)
return ret, nil
}
func (t *Transcoder) getFileStream(path string) (*FileStream, error) {
@ -50,7 +55,6 @@ func (t *Transcoder) getFileStream(path string) (*FileStream, error) {
t.mutex.Lock()
channel = make(chan *FileStream, 1)
t.preparing[path] = channel
t.cleanUnused()
t.mutex.Unlock()
var err error
@ -75,23 +79,18 @@ func (t *Transcoder) getFileStream(path string) (*FileStream, error) {
return stream, nil
}
// This method assume the lock is already taken.
func (t *Transcoder) cleanUnused() {
for path, stream := range t.streams {
if !stream.IsDead() {
continue
}
log.Printf("Steam is dead (%s). Killing it.", path)
stream.Destroy()
delete(t.streams, path)
}
}
func (t *Transcoder) GetMaster(path string, client string) (string, error) {
stream, err := t.getFileStream(path)
if err != nil {
return "", err
}
t.clientChan <- ClientInfo{
client: client,
path: path,
quality: nil,
audio: -1,
head: -1,
}
return stream.GetMaster(), nil
}
@ -100,7 +99,14 @@ func (t *Transcoder) GetVideoIndex(path string, quality Quality, client string)
if err != nil {
return "", err
}
return stream.GetVideoIndex(quality, client)
t.clientChan <- ClientInfo{
client: client,
path: path,
quality: &quality,
audio: -1,
head: -1,
}
return stream.GetVideoIndex(quality)
}
func (t *Transcoder) GetAudioIndex(path string, audio int32, client string) (string, error) {
@ -108,7 +114,13 @@ func (t *Transcoder) GetAudioIndex(path string, audio int32, client string) (str
if err != nil {
return "", err
}
return stream.GetAudioIndex(audio, client)
t.clientChan <- ClientInfo{
client: client,
path: path,
audio: audio,
head: -1,
}
return stream.GetAudioIndex(audio)
}
func (t *Transcoder) GetVideoSegment(
@ -121,7 +133,14 @@ func (t *Transcoder) GetVideoSegment(
if err != nil {
return "", err
}
return stream.GetVideoSegment(quality, segment, client)
t.clientChan <- ClientInfo{
client: client,
path: path,
quality: &quality,
audio: -1,
head: segment,
}
return stream.GetVideoSegment(quality, segment)
}
func (t *Transcoder) GetAudioSegment(
@ -134,5 +153,11 @@ func (t *Transcoder) GetAudioSegment(
if err != nil {
return "", err
}
return stream.GetAudioSegment(audio, segment, client)
t.clientChan <- ClientInfo{
client: client,
path: path,
audio: audio,
head: segment,
}
return stream.GetAudioSegment(audio, segment)
}