From 32b1681573fac9da067e033565eaee318cd696d7 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Mon, 19 Feb 2024 03:24:14 +0100 Subject: [PATCH] Rework cache --- transcoder/src/cmap.go | 9 ++++ transcoder/src/filestream.go | 58 ++++++++++------------ transcoder/src/tracker.go | 95 ++++++++++++++++++++++++------------ 3 files changed, 97 insertions(+), 65 deletions(-) diff --git a/transcoder/src/cmap.go b/transcoder/src/cmap.go index 949d5e3f..6352ccd6 100644 --- a/transcoder/src/cmap.go +++ b/transcoder/src/cmap.go @@ -61,3 +61,12 @@ func (m *CMap[K, V]) Remove(key K) { delete(m.data, key) } + +func (m *CMap[K, V]) GetAndRemove(key K) (V, bool) { + m.lock.Lock() + defer m.lock.Unlock() + + val, ok := m.data[key] + delete(m.data, key) + return val, ok +} diff --git a/transcoder/src/filestream.go b/transcoder/src/filestream.go index 132b7f58..62c4f3f4 100644 --- a/transcoder/src/filestream.go +++ b/transcoder/src/filestream.go @@ -19,18 +19,16 @@ type FileStream struct { Keyframes []float64 CanTransmux bool Info *MediaInfo - streams map[Quality]*VideoStream - vlock sync.Mutex - audios map[int32]*AudioStream - alock sync.Mutex + videos CMap[Quality, *VideoStream] + audios CMap[int32, *AudioStream] } func NewFileStream(path string, sha string, route string) *FileStream { ret := &FileStream{ Path: path, Out: fmt.Sprintf("%s/%s", Settings.Outpath, sha), - streams: make(map[Quality]*VideoStream), - audios: make(map[int32]*AudioStream), + videos: NewCMap[Quality, *VideoStream](), + audios: NewCMap[int32, *AudioStream](), } ret.ready.Add(1) @@ -82,7 +80,7 @@ func GetKeyframes(path string) ([]float64, bool, error) { scanner := bufio.NewScanner(stdout) - ret := make([]float64, 1, 300) + ret := make([]float64, 1, 1000) ret[0] = 0 last := 0. can_transmux := true @@ -135,24 +133,27 @@ func GetKeyframes(path string) ([]float64, bool, error) { return ret, can_transmux, nil } -func (fs *FileStream) Destroy() { - fs.vlock.Lock() - defer fs.vlock.Unlock() - fs.alock.Lock() - defer fs.alock.Unlock() +func (fs *FileStream) Kill() { + fs.videos.lock.Lock() + defer fs.videos.lock.Unlock() + fs.audios.lock.Lock() + defer fs.audios.lock.Unlock() - for _, s := range fs.streams { + for _, s := range fs.videos.data { s.Kill() } - for _, s := range fs.audios { + for _, s := range fs.audios.data { s.Kill() } +} + +func (fs *FileStream) Destroy() { + fs.Kill() _ = os.RemoveAll(fs.Out) } func (fs *FileStream) GetMaster() string { master := "#EXTM3U\n" - // TODO: also check if the codec is valid in a hls before putting transmux if fs.Info.Video != nil { var transmux_quality Quality for _, quality := range Qualities { @@ -161,6 +162,7 @@ func (fs *FileStream) GetMaster() string { break } } + // TODO: also check if the codec is valid in a hls before putting transmux if fs.CanTransmux { bitrate := float64(fs.Info.Video.Bitrate) master += "#EXT-X-STREAM-INF:" @@ -207,15 +209,10 @@ func (fs *FileStream) GetMaster() string { } func (fs *FileStream) getVideoStream(quality Quality) *VideoStream { - fs.vlock.Lock() - defer fs.vlock.Unlock() - stream, ok := fs.streams[quality] - - if ok { - return stream - } - fs.streams[quality] = NewVideoStream(fs, quality) - return fs.streams[quality] + stream, _ := fs.videos.GetOrCreate(quality, func() *VideoStream { + return NewVideoStream(fs, quality) + }) + return stream } func (fs *FileStream) GetVideoIndex(quality Quality) (string, error) { @@ -229,15 +226,10 @@ func (fs *FileStream) GetVideoSegment(quality Quality, segment int32) (string, e } func (fs *FileStream) getAudioStream(audio int32) *AudioStream { - fs.alock.Lock() - defer fs.alock.Unlock() - stream, ok := fs.audios[audio] - - if ok { - return stream - } - fs.audios[audio] = NewAudioStream(fs, audio) - return fs.audios[audio] + stream, _ := fs.audios.GetOrCreate(audio, func() *AudioStream { + return NewAudioStream(fs, audio) + }) + return stream } func (fs *FileStream) GetAudioIndex(audio int32) (string, error) { diff --git a/transcoder/src/tracker.go b/transcoder/src/tracker.go index a75e35b6..3554764b 100644 --- a/transcoder/src/tracker.go +++ b/transcoder/src/tracker.go @@ -14,15 +14,21 @@ type ClientInfo struct { } type Tracker struct { - clients map[string]ClientInfo - visitDate map[string]time.Time - transcoder *Transcoder + // key: client_id + clients map[string]ClientInfo + // key: client_id + visitDate map[string]time.Time + // key: path + lastUsage map[string]time.Time + transcoder *Transcoder + deletedStream chan string } func NewTracker(t *Transcoder) *Tracker { ret := &Tracker{ clients: make(map[string]ClientInfo), visitDate: make(map[string]time.Time), + lastUsage: make(map[string]time.Time), transcoder: t, } go ret.start() @@ -62,6 +68,7 @@ func (t *Tracker) start() { t.clients[info.client] = info t.visitDate[info.client] = time.Now() + t.lastUsage[info.path] = time.Now() // now that the new info is stored and fixed, kill old streams if ok && old.path == info.path { @@ -99,6 +106,8 @@ func (t *Tracker) start() { delete(t.clients, client) delete(t.visitDate, client) } + case path := <-t.deletedStream: + t.DestroyStreamIfOld(path) } } } @@ -110,13 +119,30 @@ func (t *Tracker) KillStreamIfDead(path string) bool { } } log.Printf("Nobody is watching %s. Killing it", path) - t.transcoder.mutex.Lock() - defer t.transcoder.mutex.Unlock() - t.transcoder.streams[path].Destroy() - delete(t.transcoder.streams, path) + + stream, ok := t.transcoder.streams.Get(path) + if !ok { + return false + } + stream.Kill() + go func() { + time.Sleep(4 * time.Hour) + t.deletedStream <- path + }() return true } +func (t *Tracker) DestroyStreamIfOld(path string) { + if time.Since(t.lastUsage[path]) < 4*time.Hour { + return + } + stream, ok := t.transcoder.streams.GetAndRemove(path) + if !ok { + return + } + stream.Destroy() +} + func (t *Tracker) KillAudioIfDead(path string, audio int32) bool { for _, stream := range t.clients { if stream.path == path && stream.audio == audio { @@ -124,13 +150,16 @@ func (t *Tracker) KillAudioIfDead(path string, audio int32) bool { } } log.Printf("Nobody is listening audio %d of %s. Killing it", audio, path) - t.transcoder.mutex.Lock() - stream := t.transcoder.streams[path] - t.transcoder.mutex.Unlock() - stream.alock.Lock() - defer stream.alock.Unlock() - stream.audios[audio].Kill() + stream, ok := t.transcoder.streams.Get(path) + if !ok { + return false + } + astream, aok := stream.audios.Get(audio) + if !aok { + return false + } + astream.Kill() return true } @@ -141,34 +170,36 @@ func (t *Tracker) KillQualityIfDead(path string, quality Quality) bool { } } log.Printf("Nobody is watching quality %s of %s. Killing it", quality, path) - t.transcoder.mutex.Lock() - stream := t.transcoder.streams[path] - t.transcoder.mutex.Unlock() - stream.vlock.Lock() - defer stream.vlock.Unlock() - stream.streams[quality].Kill() + stream, ok := t.transcoder.streams.Get(path) + if !ok { + return false + } + vstream, vok := stream.videos.Get(quality) + if !vok { + return false + } + vstream.Kill() return true } func (t *Tracker) KillOrphanedHeads(path string, quality *Quality, audio int32) { - t.transcoder.mutex.Lock() - stream := t.transcoder.streams[path] - t.transcoder.mutex.Unlock() + stream, ok := t.transcoder.streams.Get(path) + if !ok { + return + } if quality != nil { - stream.vlock.Lock() - vstream := stream.streams[*quality] - stream.vlock.Unlock() - - t.killOrphanedeheads(&vstream.Stream) + vstream, vok := stream.videos.Get(*quality) + if vok { + t.killOrphanedeheads(&vstream.Stream) + } } if audio != -1 { - stream.alock.Lock() - astream := stream.audios[audio] - stream.alock.Unlock() - - t.killOrphanedeheads(&astream.Stream) + astream, aok := stream.audios.Get(audio) + if aok { + t.killOrphanedeheads(&astream.Stream) + } } }