diff --git a/transcoder/src/filestream.go b/transcoder/src/filestream.go index 3090a24e..c0b526f1 100644 --- a/transcoder/src/filestream.go +++ b/transcoder/src/filestream.go @@ -170,16 +170,16 @@ func (fs *FileStream) IsDead() bool { // if the encode is relatively new, don't mark it as dead even if nobody is listening. return false } - for _, s := range fs.streams { - if len(s.Clients) > 0 { - return false - } - } - for _, s := range fs.audios { - if len(s.Clients) > 0 { - return false - } - } + // for _, s := range fs.streams { + // if len(s.Clients) > 0 { + // return false + // } + // } + // for _, s := range fs.audios { + // if len(s.Clients) > 0 { + // return false + // } + // } return true } @@ -259,14 +259,14 @@ func (fs *FileStream) getVideoStream(quality Quality) *VideoStream { return fs.streams[quality] } -func (fs *FileStream) GetVideoIndex(quality Quality, client string) (string, error) { +func (fs *FileStream) GetVideoIndex(quality Quality) (string, error) { stream := fs.getVideoStream(quality) - return stream.GetIndex(client) + return stream.GetIndex() } -func (fs *FileStream) GetVideoSegment(quality Quality, segment int32, client string) (string, error) { +func (fs *FileStream) GetVideoSegment(quality Quality, segment int32) (string, error) { stream := fs.getVideoStream(quality) - return stream.GetSegment(segment, client) + return stream.GetSegment(segment) } func (fs *FileStream) getAudioStream(audio int32) *AudioStream { @@ -285,12 +285,12 @@ func (fs *FileStream) getAudioStream(audio int32) *AudioStream { return fs.audios[audio] } -func (fs *FileStream) GetAudioIndex(audio int32, client string) (string, error) { +func (fs *FileStream) GetAudioIndex(audio int32) (string, error) { stream := fs.getAudioStream(audio) - return stream.GetIndex(client) + return stream.GetIndex() } -func (fs *FileStream) GetAudioSegment(audio int32, segment int32, client string) (string, error) { +func (fs *FileStream) GetAudioSegment(audio int32, segment int32) (string, error) { stream := fs.getAudioStream(audio) - return stream.GetSegment(segment, client) + return stream.GetSegment(segment) } diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index 374d4ee6..8950bcd2 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -23,7 +23,6 @@ type StreamHandle interface { type Stream struct { handle StreamHandle file *FileStream - Clients []string // channel open if the segment is not ready. closed if ready. // one can check if segment 1 is open by doing: // @@ -42,7 +41,6 @@ func NewStream(file *FileStream, handle StreamHandle) Stream { ret := Stream{ handle: handle, file: file, - Clients: make([]string, 0), segments: make([]chan struct{}, len(file.Keyframes)), heads: make([]int32, 0), commands: make([]*exec.Cmd, 0), @@ -201,7 +199,7 @@ func (ts *Stream) run(start int32) error { return nil } -func (ts *Stream) GetIndex(client string) (string, error) { +func (ts *Stream) GetIndex() (string, error) { index := `#EXTM3U #EXT-X-VERSION:3 #EXT-X-PLAYLIST-TYPE:VOD @@ -218,7 +216,7 @@ func (ts *Stream) GetIndex(client string) (string, error) { return index, nil } -func (ts *Stream) GetSegment(segment int32, client string) (string, error) { +func (ts *Stream) GetSegment(segment int32) (string, error) { ts.lock.RLock() ready := ts.isSegmentReady(segment) // we want to calculate distance in the same lock else it can be funky diff --git a/transcoder/src/tracker.go b/transcoder/src/tracker.go new file mode 100644 index 00000000..160fd9f1 --- /dev/null +++ b/transcoder/src/tracker.go @@ -0,0 +1,94 @@ +package src + +type ClientInfo struct { + client string + path string + quality *Quality + audio int32 + head int32 +} + +type Tracker struct { + clients map[string]ClientInfo + transcoder *Transcoder +} + +func NewTracker(t *Transcoder) *Tracker { + ret := &Tracker{ + clients: make(map[string]ClientInfo), + transcoder: t, + } + go ret.start() + return ret +} + +func (t *Tracker) start() { + for { + info := <-t.transcoder.clientChan + old, ok := t.clients[info.client] + if ok && old.path == info.path { + // First fixup the info. Most routes ruturn partial infos + if info.quality == nil { + info.quality = old.quality + } + if info.audio == -1 { + info.audio = old.audio + } + if info.head == -1 { + info.head = old.head + } + + if old.audio != info.audio && old.audio != -1 { + t.KillAudioIfDead(old.path, old.audio) + } + if old.quality != info.quality && old.quality != nil { + t.KillQualityIfDead(old.path, *old.quality) + } + } else if ok { + t.KillStreamIfDead(old.path) + } + t.clients[info.client] = info + } +} + +func (t *Tracker) KillStreamIfDead(path string) { + for _, stream := range t.clients { + if stream.path == path { + return + } + } + t.transcoder.mutex.Lock() + defer t.transcoder.mutex.Unlock() + t.transcoder.streams[path].Destroy() + delete(t.transcoder.streams, path) +} + +func (t *Tracker) KillAudioIfDead(path string, audio int32) { + for _, stream := range t.clients { + if stream.path == path && stream.audio == audio { + return + } + } + t.transcoder.mutex.RLock() + stream := t.transcoder.streams[path] + t.transcoder.mutex.RUnlock() + + stream.alock.RLock() + defer stream.alock.RUnlock() + stream.audios[audio].Kill() +} + +func (t *Tracker) KillQualityIfDead(path string, quality Quality) { + for _, stream := range t.clients { + if stream.path == path && stream.quality != nil && *stream.quality == quality { + return + } + } + t.transcoder.mutex.RLock() + stream := t.transcoder.streams[path] + t.transcoder.mutex.RUnlock() + + stream.vlock.RLock() + defer stream.vlock.RUnlock() + stream.streams[quality].Kill() +} diff --git a/transcoder/src/transcoder.go b/transcoder/src/transcoder.go index 57d1899e..78d01052 100644 --- a/transcoder/src/transcoder.go +++ b/transcoder/src/transcoder.go @@ -12,8 +12,10 @@ type Transcoder struct { // All file streams currently running, index is file path streams map[string]*FileStream // Streams that are staring up - preparing map[string]chan *FileStream - mutex sync.RWMutex + preparing map[string]chan *FileStream + mutex sync.RWMutex + clientChan chan ClientInfo + tracker *Tracker } func NewTranscoder() (*Transcoder, error) { @@ -29,10 +31,13 @@ func NewTranscoder() (*Transcoder, error) { } } - return &Transcoder{ - streams: make(map[string]*FileStream), - preparing: make(map[string]chan *FileStream), - }, nil + ret := &Transcoder{ + streams: make(map[string]*FileStream), + preparing: make(map[string]chan *FileStream), + clientChan: make(chan ClientInfo, 10), + } + ret.tracker = NewTracker(ret) + return ret, nil } func (t *Transcoder) getFileStream(path string) (*FileStream, error) { @@ -50,7 +55,6 @@ func (t *Transcoder) getFileStream(path string) (*FileStream, error) { t.mutex.Lock() channel = make(chan *FileStream, 1) t.preparing[path] = channel - t.cleanUnused() t.mutex.Unlock() var err error @@ -75,23 +79,18 @@ func (t *Transcoder) getFileStream(path string) (*FileStream, error) { return stream, nil } -// This method assume the lock is already taken. -func (t *Transcoder) cleanUnused() { - for path, stream := range t.streams { - if !stream.IsDead() { - continue - } - log.Printf("Steam is dead (%s). Killing it.", path) - stream.Destroy() - delete(t.streams, path) - } -} - func (t *Transcoder) GetMaster(path string, client string) (string, error) { stream, err := t.getFileStream(path) if err != nil { return "", err } + t.clientChan <- ClientInfo{ + client: client, + path: path, + quality: nil, + audio: -1, + head: -1, + } return stream.GetMaster(), nil } @@ -100,7 +99,14 @@ func (t *Transcoder) GetVideoIndex(path string, quality Quality, client string) if err != nil { return "", err } - return stream.GetVideoIndex(quality, client) + t.clientChan <- ClientInfo{ + client: client, + path: path, + quality: &quality, + audio: -1, + head: -1, + } + return stream.GetVideoIndex(quality) } func (t *Transcoder) GetAudioIndex(path string, audio int32, client string) (string, error) { @@ -108,7 +114,13 @@ func (t *Transcoder) GetAudioIndex(path string, audio int32, client string) (str if err != nil { return "", err } - return stream.GetAudioIndex(audio, client) + t.clientChan <- ClientInfo{ + client: client, + path: path, + audio: audio, + head: -1, + } + return stream.GetAudioIndex(audio) } func (t *Transcoder) GetVideoSegment( @@ -121,7 +133,14 @@ func (t *Transcoder) GetVideoSegment( if err != nil { return "", err } - return stream.GetVideoSegment(quality, segment, client) + t.clientChan <- ClientInfo{ + client: client, + path: path, + quality: &quality, + audio: -1, + head: segment, + } + return stream.GetVideoSegment(quality, segment) } func (t *Transcoder) GetAudioSegment( @@ -134,5 +153,11 @@ func (t *Transcoder) GetAudioSegment( if err != nil { return "", err } - return stream.GetAudioSegment(audio, segment, client) + t.clientChan <- ClientInfo{ + client: client, + path: path, + audio: audio, + head: segment, + } + return stream.GetAudioSegment(audio, segment) }