mirror of
https://github.com/zoriya/Kyoo.git
synced 2025-07-09 03:04:20 -04:00
Move last used handling inside the client tracker
This commit is contained in:
parent
ca6ec6a8ed
commit
7b13733c9e
@ -23,9 +23,6 @@ type FileStream struct {
|
|||||||
vlock sync.RWMutex
|
vlock sync.RWMutex
|
||||||
audios map[int32]*AudioStream
|
audios map[int32]*AudioStream
|
||||||
alock sync.RWMutex
|
alock sync.RWMutex
|
||||||
lastUsed time.Time
|
|
||||||
usageChan chan time.Time
|
|
||||||
luLock sync.RWMutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetOutPath() string {
|
func GetOutPath() string {
|
||||||
@ -58,7 +55,7 @@ func NewFileStream(path string) (*FileStream, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ret := &FileStream{
|
return &FileStream{
|
||||||
Path: path,
|
Path: path,
|
||||||
Out: fmt.Sprintf("%s/%s", GetOutPath(), info.info.Sha),
|
Out: fmt.Sprintf("%s/%s", GetOutPath(), info.info.Sha),
|
||||||
Keyframes: keyframes,
|
Keyframes: keyframes,
|
||||||
@ -66,22 +63,7 @@ func NewFileStream(path string) (*FileStream, error) {
|
|||||||
Info: info.info,
|
Info: info.info,
|
||||||
streams: make(map[Quality]*VideoStream),
|
streams: make(map[Quality]*VideoStream),
|
||||||
audios: make(map[int32]*AudioStream),
|
audios: make(map[int32]*AudioStream),
|
||||||
usageChan: make(chan time.Time, 5),
|
}, nil
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
lu, ok := <-ret.usageChan
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ret.luLock.Lock()
|
|
||||||
ret.lastUsed = lu
|
|
||||||
ret.luLock.Unlock()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return ret, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetKeyframes(path string) ([]float64, bool, error) {
|
func GetKeyframes(path string) ([]float64, bool, error) {
|
||||||
@ -159,30 +141,6 @@ func GetKeyframes(path string) ([]float64, bool, error) {
|
|||||||
return ret, can_transmux, nil
|
return ret, can_transmux, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FileStream) IsDead() bool {
|
|
||||||
fs.luLock.Lock()
|
|
||||||
timeSince := time.Since(fs.lastUsed)
|
|
||||||
fs.luLock.Unlock()
|
|
||||||
if timeSince >= 4*time.Hour {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if timeSince < 5*time.Minute {
|
|
||||||
// if the encode is relatively new, don't mark it as dead even if nobody is listening.
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
// for _, s := range fs.streams {
|
|
||||||
// if len(s.Clients) > 0 {
|
|
||||||
// return false
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// for _, s := range fs.audios {
|
|
||||||
// if len(s.Clients) > 0 {
|
|
||||||
// return false
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fs *FileStream) Destroy() {
|
func (fs *FileStream) Destroy() {
|
||||||
fs.vlock.Lock()
|
fs.vlock.Lock()
|
||||||
defer fs.vlock.Lock()
|
defer fs.vlock.Lock()
|
||||||
@ -199,7 +157,6 @@ func (fs *FileStream) Destroy() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FileStream) GetMaster() string {
|
func (fs *FileStream) GetMaster() string {
|
||||||
fs.usageChan <- time.Now()
|
|
||||||
master := "#EXTM3U\n"
|
master := "#EXTM3U\n"
|
||||||
// TODO: also check if the codec is valid in a hls before putting transmux
|
// TODO: also check if the codec is valid in a hls before putting transmux
|
||||||
if fs.CanTransmux {
|
if fs.CanTransmux {
|
||||||
@ -244,7 +201,6 @@ func (fs *FileStream) GetMaster() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FileStream) getVideoStream(quality Quality) *VideoStream {
|
func (fs *FileStream) getVideoStream(quality Quality) *VideoStream {
|
||||||
fs.usageChan <- time.Now()
|
|
||||||
fs.vlock.RLock()
|
fs.vlock.RLock()
|
||||||
stream, ok := fs.streams[quality]
|
stream, ok := fs.streams[quality]
|
||||||
fs.vlock.RUnlock()
|
fs.vlock.RUnlock()
|
||||||
@ -270,7 +226,6 @@ func (fs *FileStream) GetVideoSegment(quality Quality, segment int32) (string, e
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FileStream) getAudioStream(audio int32) *AudioStream {
|
func (fs *FileStream) getAudioStream(audio int32) *AudioStream {
|
||||||
fs.usageChan <- time.Now()
|
|
||||||
fs.alock.RLock()
|
fs.alock.RLock()
|
||||||
stream, ok := fs.audios[audio]
|
stream, ok := fs.audios[audio]
|
||||||
fs.alock.RUnlock()
|
fs.alock.RUnlock()
|
||||||
|
@ -2,6 +2,7 @@ package src
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClientInfo struct {
|
type ClientInfo struct {
|
||||||
@ -14,12 +15,14 @@ type ClientInfo struct {
|
|||||||
|
|
||||||
type Tracker struct {
|
type Tracker struct {
|
||||||
clients map[string]ClientInfo
|
clients map[string]ClientInfo
|
||||||
|
visitDate map[string]time.Time
|
||||||
transcoder *Transcoder
|
transcoder *Transcoder
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTracker(t *Transcoder) *Tracker {
|
func NewTracker(t *Transcoder) *Tracker {
|
||||||
ret := &Tracker{
|
ret := &Tracker{
|
||||||
clients: make(map[string]ClientInfo),
|
clients: make(map[string]ClientInfo),
|
||||||
|
visitDate: make(map[string]time.Time),
|
||||||
transcoder: t,
|
transcoder: t,
|
||||||
}
|
}
|
||||||
go ret.start()
|
go ret.start()
|
||||||
@ -34,53 +37,86 @@ func Abs(x int32) int32 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tracker) start() {
|
func (t *Tracker) start() {
|
||||||
|
inactive_time := 1 * time.Hour
|
||||||
|
timer := time.After(inactive_time)
|
||||||
for {
|
for {
|
||||||
info := <-t.transcoder.clientChan
|
select {
|
||||||
old, ok := t.clients[info.client]
|
case info, ok := <-t.transcoder.clientChan:
|
||||||
if ok && old.path == info.path {
|
if !ok {
|
||||||
// First fixup the info. Most routes ruturn partial infos
|
return
|
||||||
if info.quality == nil {
|
|
||||||
info.quality = old.quality
|
|
||||||
}
|
|
||||||
if info.audio == -1 {
|
|
||||||
info.audio = old.audio
|
|
||||||
}
|
|
||||||
if info.head == -1 {
|
|
||||||
info.head = old.head
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if old.audio != info.audio && old.audio != -1 {
|
old, ok := t.clients[info.client]
|
||||||
t.KillAudioIfDead(old.path, old.audio)
|
if ok && old.path == info.path {
|
||||||
|
// First fixup the info. Most routes ruturn partial infos
|
||||||
|
if info.quality == nil {
|
||||||
|
info.quality = old.quality
|
||||||
|
}
|
||||||
|
if info.audio == -1 {
|
||||||
|
info.audio = old.audio
|
||||||
|
}
|
||||||
|
if info.head == -1 {
|
||||||
|
info.head = old.head
|
||||||
|
}
|
||||||
|
|
||||||
|
if old.audio != info.audio && old.audio != -1 {
|
||||||
|
t.KillAudioIfDead(old.path, old.audio)
|
||||||
|
}
|
||||||
|
if old.quality != info.quality && old.quality != nil {
|
||||||
|
t.KillQualityIfDead(old.path, *old.quality)
|
||||||
|
}
|
||||||
|
if old.head != -1 && Abs(info.head-old.head) > 100 {
|
||||||
|
t.KillOrphanedHeads(old.path, old.quality, old.audio)
|
||||||
|
}
|
||||||
|
} else if ok {
|
||||||
|
t.KillStreamIfDead(old.path)
|
||||||
}
|
}
|
||||||
if old.quality != info.quality && old.quality != nil {
|
|
||||||
t.KillQualityIfDead(old.path, *old.quality)
|
t.clients[info.client] = info
|
||||||
|
t.visitDate[info.client] = time.Now()
|
||||||
|
|
||||||
|
case <-timer:
|
||||||
|
timer = time.After(inactive_time)
|
||||||
|
// Purge old clients
|
||||||
|
for client, date := range t.visitDate {
|
||||||
|
if time.Since(date) < inactive_time {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
info := t.clients[client]
|
||||||
|
|
||||||
|
if !t.KillStreamIfDead(info.path) {
|
||||||
|
audio_cleanup := info.audio != -1 && t.KillAudioIfDead(info.path, info.audio)
|
||||||
|
video_cleanup := info.quality != nil && t.KillQualityIfDead(info.path, *info.quality)
|
||||||
|
if !audio_cleanup || !video_cleanup {
|
||||||
|
t.KillOrphanedHeads(info.path, info.quality, info.audio)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(t.clients, client)
|
||||||
|
delete(t.visitDate, client)
|
||||||
}
|
}
|
||||||
if old.head != -1 && Abs(info.head-old.head) > 100 {
|
|
||||||
t.KillOrphanedHeads(old.path, old.quality, old.audio)
|
|
||||||
}
|
|
||||||
} else if ok {
|
|
||||||
t.KillStreamIfDead(old.path)
|
|
||||||
}
|
}
|
||||||
t.clients[info.client] = info
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tracker) KillStreamIfDead(path string) {
|
func (t *Tracker) KillStreamIfDead(path string) bool {
|
||||||
for _, stream := range t.clients {
|
for _, stream := range t.clients {
|
||||||
if stream.path == path {
|
if stream.path == path {
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
t.transcoder.mutex.Lock()
|
t.transcoder.mutex.Lock()
|
||||||
defer t.transcoder.mutex.Unlock()
|
defer t.transcoder.mutex.Unlock()
|
||||||
t.transcoder.streams[path].Destroy()
|
t.transcoder.streams[path].Destroy()
|
||||||
delete(t.transcoder.streams, path)
|
delete(t.transcoder.streams, path)
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tracker) KillAudioIfDead(path string, audio int32) {
|
func (t *Tracker) KillAudioIfDead(path string, audio int32) bool {
|
||||||
for _, stream := range t.clients {
|
for _, stream := range t.clients {
|
||||||
if stream.path == path && stream.audio == audio {
|
if stream.path == path && stream.audio == audio {
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
t.transcoder.mutex.RLock()
|
t.transcoder.mutex.RLock()
|
||||||
@ -90,12 +126,13 @@ func (t *Tracker) KillAudioIfDead(path string, audio int32) {
|
|||||||
stream.alock.RLock()
|
stream.alock.RLock()
|
||||||
defer stream.alock.RUnlock()
|
defer stream.alock.RUnlock()
|
||||||
stream.audios[audio].Kill()
|
stream.audios[audio].Kill()
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tracker) KillQualityIfDead(path string, quality Quality) {
|
func (t *Tracker) KillQualityIfDead(path string, quality Quality) bool {
|
||||||
for _, stream := range t.clients {
|
for _, stream := range t.clients {
|
||||||
if stream.path == path && stream.quality != nil && *stream.quality == quality {
|
if stream.path == path && stream.quality != nil && *stream.quality == quality {
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
t.transcoder.mutex.RLock()
|
t.transcoder.mutex.RLock()
|
||||||
@ -105,6 +142,7 @@ func (t *Tracker) KillQualityIfDead(path string, quality Quality) {
|
|||||||
stream.vlock.RLock()
|
stream.vlock.RLock()
|
||||||
defer stream.vlock.RUnlock()
|
defer stream.vlock.RUnlock()
|
||||||
stream.streams[quality].Kill()
|
stream.streams[quality].Kill()
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tracker) KillOrphanedHeads(path string, quality *Quality, audio int32) {
|
func (t *Tracker) KillOrphanedHeads(path string, quality *Quality, audio int32) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user