Rework cache

This commit is contained in:
Zoe Roux 2024-02-19 03:24:14 +01:00
parent 5389e1b783
commit 32b1681573
3 changed files with 97 additions and 65 deletions

View File

@ -61,3 +61,12 @@ func (m *CMap[K, V]) Remove(key K) {
delete(m.data, key) delete(m.data, key)
} }
func (m *CMap[K, V]) GetAndRemove(key K) (V, bool) {
m.lock.Lock()
defer m.lock.Unlock()
val, ok := m.data[key]
delete(m.data, key)
return val, ok
}

View File

@ -19,18 +19,16 @@ type FileStream struct {
Keyframes []float64 Keyframes []float64
CanTransmux bool CanTransmux bool
Info *MediaInfo Info *MediaInfo
streams map[Quality]*VideoStream videos CMap[Quality, *VideoStream]
vlock sync.Mutex audios CMap[int32, *AudioStream]
audios map[int32]*AudioStream
alock sync.Mutex
} }
func NewFileStream(path string, sha string, route string) *FileStream { func NewFileStream(path string, sha string, route string) *FileStream {
ret := &FileStream{ ret := &FileStream{
Path: path, Path: path,
Out: fmt.Sprintf("%s/%s", Settings.Outpath, sha), Out: fmt.Sprintf("%s/%s", Settings.Outpath, sha),
streams: make(map[Quality]*VideoStream), videos: NewCMap[Quality, *VideoStream](),
audios: make(map[int32]*AudioStream), audios: NewCMap[int32, *AudioStream](),
} }
ret.ready.Add(1) ret.ready.Add(1)
@ -82,7 +80,7 @@ func GetKeyframes(path string) ([]float64, bool, error) {
scanner := bufio.NewScanner(stdout) scanner := bufio.NewScanner(stdout)
ret := make([]float64, 1, 300) ret := make([]float64, 1, 1000)
ret[0] = 0 ret[0] = 0
last := 0. last := 0.
can_transmux := true can_transmux := true
@ -135,24 +133,27 @@ func GetKeyframes(path string) ([]float64, bool, error) {
return ret, can_transmux, nil return ret, can_transmux, nil
} }
func (fs *FileStream) Destroy() { func (fs *FileStream) Kill() {
fs.vlock.Lock() fs.videos.lock.Lock()
defer fs.vlock.Unlock() defer fs.videos.lock.Unlock()
fs.alock.Lock() fs.audios.lock.Lock()
defer fs.alock.Unlock() defer fs.audios.lock.Unlock()
for _, s := range fs.streams { for _, s := range fs.videos.data {
s.Kill() s.Kill()
} }
for _, s := range fs.audios { for _, s := range fs.audios.data {
s.Kill() s.Kill()
} }
}
func (fs *FileStream) Destroy() {
fs.Kill()
_ = os.RemoveAll(fs.Out) _ = os.RemoveAll(fs.Out)
} }
func (fs *FileStream) GetMaster() string { func (fs *FileStream) GetMaster() string {
master := "#EXTM3U\n" master := "#EXTM3U\n"
// TODO: also check if the codec is valid in a hls before putting transmux
if fs.Info.Video != nil { if fs.Info.Video != nil {
var transmux_quality Quality var transmux_quality Quality
for _, quality := range Qualities { for _, quality := range Qualities {
@ -161,6 +162,7 @@ func (fs *FileStream) GetMaster() string {
break break
} }
} }
// TODO: also check if the codec is valid in a hls before putting transmux
if fs.CanTransmux { if fs.CanTransmux {
bitrate := float64(fs.Info.Video.Bitrate) bitrate := float64(fs.Info.Video.Bitrate)
master += "#EXT-X-STREAM-INF:" master += "#EXT-X-STREAM-INF:"
@ -207,15 +209,10 @@ func (fs *FileStream) GetMaster() string {
} }
func (fs *FileStream) getVideoStream(quality Quality) *VideoStream { func (fs *FileStream) getVideoStream(quality Quality) *VideoStream {
fs.vlock.Lock() stream, _ := fs.videos.GetOrCreate(quality, func() *VideoStream {
defer fs.vlock.Unlock() return NewVideoStream(fs, quality)
stream, ok := fs.streams[quality] })
return stream
if ok {
return stream
}
fs.streams[quality] = NewVideoStream(fs, quality)
return fs.streams[quality]
} }
func (fs *FileStream) GetVideoIndex(quality Quality) (string, error) { func (fs *FileStream) GetVideoIndex(quality Quality) (string, error) {
@ -229,15 +226,10 @@ func (fs *FileStream) GetVideoSegment(quality Quality, segment int32) (string, e
} }
func (fs *FileStream) getAudioStream(audio int32) *AudioStream { func (fs *FileStream) getAudioStream(audio int32) *AudioStream {
fs.alock.Lock() stream, _ := fs.audios.GetOrCreate(audio, func() *AudioStream {
defer fs.alock.Unlock() return NewAudioStream(fs, audio)
stream, ok := fs.audios[audio] })
return stream
if ok {
return stream
}
fs.audios[audio] = NewAudioStream(fs, audio)
return fs.audios[audio]
} }
func (fs *FileStream) GetAudioIndex(audio int32) (string, error) { func (fs *FileStream) GetAudioIndex(audio int32) (string, error) {

View File

@ -14,15 +14,21 @@ type ClientInfo struct {
} }
type Tracker struct { type Tracker struct {
clients map[string]ClientInfo // key: client_id
visitDate map[string]time.Time clients map[string]ClientInfo
transcoder *Transcoder // key: client_id
visitDate map[string]time.Time
// key: path
lastUsage map[string]time.Time
transcoder *Transcoder
deletedStream chan string
} }
func NewTracker(t *Transcoder) *Tracker { func NewTracker(t *Transcoder) *Tracker {
ret := &Tracker{ ret := &Tracker{
clients: make(map[string]ClientInfo), clients: make(map[string]ClientInfo),
visitDate: make(map[string]time.Time), visitDate: make(map[string]time.Time),
lastUsage: make(map[string]time.Time),
transcoder: t, transcoder: t,
} }
go ret.start() go ret.start()
@ -62,6 +68,7 @@ func (t *Tracker) start() {
t.clients[info.client] = info t.clients[info.client] = info
t.visitDate[info.client] = time.Now() t.visitDate[info.client] = time.Now()
t.lastUsage[info.path] = time.Now()
// now that the new info is stored and fixed, kill old streams // now that the new info is stored and fixed, kill old streams
if ok && old.path == info.path { if ok && old.path == info.path {
@ -99,6 +106,8 @@ func (t *Tracker) start() {
delete(t.clients, client) delete(t.clients, client)
delete(t.visitDate, client) delete(t.visitDate, client)
} }
case path := <-t.deletedStream:
t.DestroyStreamIfOld(path)
} }
} }
} }
@ -110,13 +119,30 @@ func (t *Tracker) KillStreamIfDead(path string) bool {
} }
} }
log.Printf("Nobody is watching %s. Killing it", path) log.Printf("Nobody is watching %s. Killing it", path)
t.transcoder.mutex.Lock()
defer t.transcoder.mutex.Unlock() stream, ok := t.transcoder.streams.Get(path)
t.transcoder.streams[path].Destroy() if !ok {
delete(t.transcoder.streams, path) return false
}
stream.Kill()
go func() {
time.Sleep(4 * time.Hour)
t.deletedStream <- path
}()
return true return true
} }
func (t *Tracker) DestroyStreamIfOld(path string) {
if time.Since(t.lastUsage[path]) < 4*time.Hour {
return
}
stream, ok := t.transcoder.streams.GetAndRemove(path)
if !ok {
return
}
stream.Destroy()
}
func (t *Tracker) KillAudioIfDead(path string, audio int32) bool { func (t *Tracker) KillAudioIfDead(path string, audio int32) bool {
for _, stream := range t.clients { for _, stream := range t.clients {
if stream.path == path && stream.audio == audio { if stream.path == path && stream.audio == audio {
@ -124,13 +150,16 @@ func (t *Tracker) KillAudioIfDead(path string, audio int32) bool {
} }
} }
log.Printf("Nobody is listening audio %d of %s. Killing it", audio, path) log.Printf("Nobody is listening audio %d of %s. Killing it", audio, path)
t.transcoder.mutex.Lock()
stream := t.transcoder.streams[path]
t.transcoder.mutex.Unlock()
stream.alock.Lock() stream, ok := t.transcoder.streams.Get(path)
defer stream.alock.Unlock() if !ok {
stream.audios[audio].Kill() return false
}
astream, aok := stream.audios.Get(audio)
if !aok {
return false
}
astream.Kill()
return true return true
} }
@ -141,34 +170,36 @@ func (t *Tracker) KillQualityIfDead(path string, quality Quality) bool {
} }
} }
log.Printf("Nobody is watching quality %s of %s. Killing it", quality, path) log.Printf("Nobody is watching quality %s of %s. Killing it", quality, path)
t.transcoder.mutex.Lock()
stream := t.transcoder.streams[path]
t.transcoder.mutex.Unlock()
stream.vlock.Lock() stream, ok := t.transcoder.streams.Get(path)
defer stream.vlock.Unlock() if !ok {
stream.streams[quality].Kill() return false
}
vstream, vok := stream.videos.Get(quality)
if !vok {
return false
}
vstream.Kill()
return true return true
} }
func (t *Tracker) KillOrphanedHeads(path string, quality *Quality, audio int32) { func (t *Tracker) KillOrphanedHeads(path string, quality *Quality, audio int32) {
t.transcoder.mutex.Lock() stream, ok := t.transcoder.streams.Get(path)
stream := t.transcoder.streams[path] if !ok {
t.transcoder.mutex.Unlock() return
}
if quality != nil { if quality != nil {
stream.vlock.Lock() vstream, vok := stream.videos.Get(*quality)
vstream := stream.streams[*quality] if vok {
stream.vlock.Unlock() t.killOrphanedeheads(&vstream.Stream)
}
t.killOrphanedeheads(&vstream.Stream)
} }
if audio != -1 { if audio != -1 {
stream.alock.Lock() astream, aok := stream.audios.Get(audio)
astream := stream.audios[audio] if aok {
stream.alock.Unlock() t.killOrphanedeheads(&astream.Stream)
}
t.killOrphanedeheads(&astream.Stream)
} }
} }