From 4c7e335ef4584908e0f8732e74c6396f5a8b40d1 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Mon, 26 Feb 2024 21:58:55 +0100 Subject: [PATCH] Lazy load keyframes --- transcoder/src/keyframes.go | 38 ++++++++++++++++++++++++++++++++++--- transcoder/src/stream.go | 18 +++++++++++++++--- 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/transcoder/src/keyframes.go b/transcoder/src/keyframes.go index ce4718d6..15a05f17 100644 --- a/transcoder/src/keyframes.go +++ b/transcoder/src/keyframes.go @@ -17,6 +17,7 @@ type Keyframe struct { IsDone bool mutex sync.RWMutex ready sync.WaitGroup + listeners []func(keyframes []float64) } func (kf *Keyframe) Get(idx int32) float64 { @@ -43,6 +44,21 @@ func (kf *Keyframe) Length() (int32, bool) { return int32(len(kf.Keyframes)), kf.IsDone } +func (kf *Keyframe) add(values []float64) { + kf.mutex.Lock() + defer kf.mutex.Unlock() + kf.Keyframes = append(kf.Keyframes, values...) + for _, listener := range kf.listeners { + listener(kf.Keyframes) + } +} + +func (kf *Keyframe) AddListener(callback func(keyframes []float64)) { + kf.mutex.RLock() + defer kf.mutex.RUnlock() + kf.listeners = append(kf.listeners, callback) +} + var keyframes = NewCMap[string, *Keyframe]() func GetKeyframes(sha string, path string) *Keyframe { @@ -96,6 +112,8 @@ func getKeyframes(path string, kf *Keyframe) error { scanner := bufio.NewScanner(stdout) ret := make([]float64, 0, 1000) + max := 100 + done := 0 for scanner.Scan() { frame := scanner.Text() if frame == "" { @@ -120,7 +138,7 @@ func getKeyframes(path string, kf *Keyframe) error { // the segment time and decide to cut at a random keyframe. Having every keyframe // handled as a segment prevents that. - if len(ret) == 0 { + if done == 0 && len(ret) == 0 { // sometimes, videos can start at a timing greater than 0:00. We need to take that into account // and only list keyframes that come after the start of the video (without that, our segments count // mismatch and we can have the same segment twice on the stream). @@ -131,9 +149,23 @@ func getKeyframes(path string, kf *Keyframe) error { continue } ret = append(ret, fpts) + + if len(ret) == max { + kf.add(ret) + if done == 0 { + kf.ready.Done() + } else if done >= 500 { + max = 500 + } + done += max + // clear the array without reallocing it + ret = ret[:0] + } + } + kf.add(ret) + if done == 0 { + kf.ready.Done() } - kf.Keyframes = ret kf.IsDone = true - kf.ready.Done() return nil } diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index 90726010..c6bf23bd 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -68,13 +68,25 @@ func NewStream(file *FileStream, handle StreamHandle, ret *Stream) { ret.heads = make([]Head, 0) length, is_done := file.Keyframes.Length() - ret.segments = make([]Segment, length) + ret.segments = make([]Segment, length, 2000) for seg := range ret.segments { ret.segments[seg].channel = make(chan struct{}) } if !is_done { - // TODO: create new ret.segments for every new keyframes that get created in the file. + file.Keyframes.AddListener(func(keyframes []float64) { + ret.lock.Lock() + defer ret.lock.Unlock() + old_length := len(ret.segments) + if cap(ret.segments) > len(keyframes) { + ret.segments = ret.segments[:len(keyframes)] + } else { + ret.segments = append(ret.segments, make([]Segment, len(keyframes)-old_length)...) + } + for seg := old_length; seg < len(keyframes); seg++ { + ret.segments[seg].channel = make(chan struct{}) + } + }) } } @@ -111,7 +123,7 @@ func (ts *Stream) run(start int32) error { // if keyframes analysys is not finished, always have a 1-segment padding // for the extra segment needed for precise split (look comment before -to flag) if !is_done { - end-- + end -= 2 } // Stop at the first finished segment ts.lock.Lock()