diff --git a/transcoder/src/filestream.go b/transcoder/src/filestream.go index 4f9ff3d1..468185c9 100644 --- a/transcoder/src/filestream.go +++ b/transcoder/src/filestream.go @@ -22,6 +22,9 @@ type FileStream struct { vlock sync.RWMutex audios map[int32]*AudioStream alock sync.RWMutex + lastUsed time.Time + usageChan chan time.Time + luLock sync.RWMutex } func GetOutPath() string { @@ -54,7 +57,7 @@ func NewFileStream(path string) (*FileStream, error) { return nil, err } - return &FileStream{ + ret := &FileStream{ Path: path, Out: fmt.Sprintf("%s/%s", GetOutPath(), info.info.Sha), Keyframes: keyframes, @@ -62,7 +65,22 @@ func NewFileStream(path string) (*FileStream, error) { Info: info.info, streams: make(map[Quality]*VideoStream), audios: make(map[int32]*AudioStream), - }, nil + usageChan: make(chan time.Time, 5), + } + + go func() { + for { + lu, ok := <-ret.usageChan + if !ok { + return + } + ret.luLock.Lock() + ret.lastUsed = lu + ret.luLock.Unlock() + } + }() + + return ret, nil } func GetKeyframes(path string) ([]float64, bool, error) { @@ -132,20 +150,46 @@ func GetKeyframes(path string) ([]float64, bool, error) { } func (fs *FileStream) IsDead() bool { + fs.luLock.Lock() + timeSince := time.Since(fs.lastUsed) + fs.luLock.Unlock() + if timeSince >= 4*time.Hour { + return true + } + if timeSince < 5*time.Minute { + // if the encode is relatively new, don't mark it as dead even if nobody is listening. + return false + } for _, s := range fs.streams { if len(s.Clients) > 0 { return false } } - // TODO: Also check how long this stream has been unused. We dont want to kill streams created 2min ago + for _, s := range fs.audios { + if len(s.Clients) > 0 { + return false + } + } return true } func (fs *FileStream) Destroy() { - // TODO: kill child process and delete data + fs.vlock.Lock() + defer fs.vlock.Lock() + fs.alock.Lock() + defer fs.alock.Lock() + + for _, s := range fs.streams { + s.Kill() + } + for _, s := range fs.audios { + s.Kill() + } + _ = os.RemoveAll(fs.Out) } func (fs *FileStream) GetMaster() string { + fs.usageChan <- time.Now() master := "#EXTM3U\n" // TODO: also check if the codec is valid in a hls before putting transmux if fs.CanTransmux { @@ -190,6 +234,7 @@ func (fs *FileStream) GetMaster() string { } func (fs *FileStream) getVideoStream(quality Quality) *VideoStream { + fs.usageChan <- time.Now() fs.vlock.RLock() stream, ok := fs.streams[quality] fs.vlock.RUnlock() @@ -215,6 +260,7 @@ func (fs *FileStream) GetVideoSegment(quality Quality, segment int32, client str } func (fs *FileStream) getAudioStream(audio int32) *AudioStream { + fs.usageChan <- time.Now() fs.alock.RLock() stream, ok := fs.audios[audio] fs.alock.RUnlock() diff --git a/transcoder/src/transcoder.go b/transcoder/src/transcoder.go index 03d08134..21e9d04a 100644 --- a/transcoder/src/transcoder.go +++ b/transcoder/src/transcoder.go @@ -67,6 +67,7 @@ func (t *Transcoder) cleanUnused() { if !stream.IsDead() { continue } + log.Printf("Steam is dead (%s). Killing it.", path) stream.Destroy() delete(t.streams, path) }