diff --git a/transcoder/src/filestream.go b/transcoder/src/filestream.go index c0b526f1..aaf501c1 100644 --- a/transcoder/src/filestream.go +++ b/transcoder/src/filestream.go @@ -23,9 +23,6 @@ type FileStream struct { vlock sync.RWMutex audios map[int32]*AudioStream alock sync.RWMutex - lastUsed time.Time - usageChan chan time.Time - luLock sync.RWMutex } func GetOutPath() string { @@ -58,7 +55,7 @@ func NewFileStream(path string) (*FileStream, error) { return nil, err } - ret := &FileStream{ + return &FileStream{ Path: path, Out: fmt.Sprintf("%s/%s", GetOutPath(), info.info.Sha), Keyframes: keyframes, @@ -66,22 +63,7 @@ func NewFileStream(path string) (*FileStream, error) { Info: info.info, streams: make(map[Quality]*VideoStream), audios: make(map[int32]*AudioStream), - usageChan: make(chan time.Time, 5), - } - - go func() { - for { - lu, ok := <-ret.usageChan - if !ok { - return - } - ret.luLock.Lock() - ret.lastUsed = lu - ret.luLock.Unlock() - } - }() - - return ret, nil + }, nil } func GetKeyframes(path string) ([]float64, bool, error) { @@ -159,30 +141,6 @@ func GetKeyframes(path string) ([]float64, bool, error) { return ret, can_transmux, nil } -func (fs *FileStream) IsDead() bool { - fs.luLock.Lock() - timeSince := time.Since(fs.lastUsed) - fs.luLock.Unlock() - if timeSince >= 4*time.Hour { - return true - } - if timeSince < 5*time.Minute { - // if the encode is relatively new, don't mark it as dead even if nobody is listening. - return false - } - // for _, s := range fs.streams { - // if len(s.Clients) > 0 { - // return false - // } - // } - // for _, s := range fs.audios { - // if len(s.Clients) > 0 { - // return false - // } - // } - return true -} - func (fs *FileStream) Destroy() { fs.vlock.Lock() defer fs.vlock.Lock() @@ -199,7 +157,6 @@ func (fs *FileStream) Destroy() { } func (fs *FileStream) GetMaster() string { - fs.usageChan <- time.Now() master := "#EXTM3U\n" // TODO: also check if the codec is valid in a hls before putting transmux if fs.CanTransmux { @@ -244,7 +201,6 @@ func (fs *FileStream) GetMaster() string { } func (fs *FileStream) getVideoStream(quality Quality) *VideoStream { - fs.usageChan <- time.Now() fs.vlock.RLock() stream, ok := fs.streams[quality] fs.vlock.RUnlock() @@ -270,7 +226,6 @@ func (fs *FileStream) GetVideoSegment(quality Quality, segment int32) (string, e } func (fs *FileStream) getAudioStream(audio int32) *AudioStream { - fs.usageChan <- time.Now() fs.alock.RLock() stream, ok := fs.audios[audio] fs.alock.RUnlock() diff --git a/transcoder/src/tracker.go b/transcoder/src/tracker.go index a9958797..46bb7aa3 100644 --- a/transcoder/src/tracker.go +++ b/transcoder/src/tracker.go @@ -2,6 +2,7 @@ package src import ( "log" + "time" ) type ClientInfo struct { @@ -14,12 +15,14 @@ type ClientInfo struct { type Tracker struct { clients map[string]ClientInfo + visitDate map[string]time.Time transcoder *Transcoder } func NewTracker(t *Transcoder) *Tracker { ret := &Tracker{ clients: make(map[string]ClientInfo), + visitDate: make(map[string]time.Time), transcoder: t, } go ret.start() @@ -34,53 +37,86 @@ func Abs(x int32) int32 { } func (t *Tracker) start() { + inactive_time := 1 * time.Hour + timer := time.After(inactive_time) for { - info := <-t.transcoder.clientChan - old, ok := t.clients[info.client] - if ok && old.path == info.path { - // First fixup the info. Most routes ruturn partial infos - if info.quality == nil { - info.quality = old.quality - } - if info.audio == -1 { - info.audio = old.audio - } - if info.head == -1 { - info.head = old.head + select { + case info, ok := <-t.transcoder.clientChan: + if !ok { + return } - if old.audio != info.audio && old.audio != -1 { - t.KillAudioIfDead(old.path, old.audio) + old, ok := t.clients[info.client] + if ok && old.path == info.path { + // First fixup the info. Most routes ruturn partial infos + if info.quality == nil { + info.quality = old.quality + } + if info.audio == -1 { + info.audio = old.audio + } + if info.head == -1 { + info.head = old.head + } + + if old.audio != info.audio && old.audio != -1 { + t.KillAudioIfDead(old.path, old.audio) + } + if old.quality != info.quality && old.quality != nil { + t.KillQualityIfDead(old.path, *old.quality) + } + if old.head != -1 && Abs(info.head-old.head) > 100 { + t.KillOrphanedHeads(old.path, old.quality, old.audio) + } + } else if ok { + t.KillStreamIfDead(old.path) } - if old.quality != info.quality && old.quality != nil { - t.KillQualityIfDead(old.path, *old.quality) + + t.clients[info.client] = info + t.visitDate[info.client] = time.Now() + + case <-timer: + timer = time.After(inactive_time) + // Purge old clients + for client, date := range t.visitDate { + if time.Since(date) < inactive_time { + continue + } + + info := t.clients[client] + + if !t.KillStreamIfDead(info.path) { + audio_cleanup := info.audio != -1 && t.KillAudioIfDead(info.path, info.audio) + video_cleanup := info.quality != nil && t.KillQualityIfDead(info.path, *info.quality) + if !audio_cleanup || !video_cleanup { + t.KillOrphanedHeads(info.path, info.quality, info.audio) + } + } + + delete(t.clients, client) + delete(t.visitDate, client) } - if old.head != -1 && Abs(info.head-old.head) > 100 { - t.KillOrphanedHeads(old.path, old.quality, old.audio) - } - } else if ok { - t.KillStreamIfDead(old.path) } - t.clients[info.client] = info } } -func (t *Tracker) KillStreamIfDead(path string) { +func (t *Tracker) KillStreamIfDead(path string) bool { for _, stream := range t.clients { if stream.path == path { - return + return false } } t.transcoder.mutex.Lock() defer t.transcoder.mutex.Unlock() t.transcoder.streams[path].Destroy() delete(t.transcoder.streams, path) + return true } -func (t *Tracker) KillAudioIfDead(path string, audio int32) { +func (t *Tracker) KillAudioIfDead(path string, audio int32) bool { for _, stream := range t.clients { if stream.path == path && stream.audio == audio { - return + return false } } t.transcoder.mutex.RLock() @@ -90,12 +126,13 @@ func (t *Tracker) KillAudioIfDead(path string, audio int32) { stream.alock.RLock() defer stream.alock.RUnlock() stream.audios[audio].Kill() + return true } -func (t *Tracker) KillQualityIfDead(path string, quality Quality) { +func (t *Tracker) KillQualityIfDead(path string, quality Quality) bool { for _, stream := range t.clients { if stream.path == path && stream.quality != nil && *stream.quality == quality { - return + return false } } t.transcoder.mutex.RLock() @@ -105,6 +142,7 @@ func (t *Tracker) KillQualityIfDead(path string, quality Quality) { stream.vlock.RLock() defer stream.vlock.RUnlock() stream.streams[quality].Kill() + return true } func (t *Tracker) KillOrphanedHeads(path string, quality *Quality, audio int32) {