From e3fdf0af45a0e04b6da585f697b6ebfebf324b92 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Wed, 31 Jul 2024 22:23:31 +0200 Subject: [PATCH] Rework keyframes retrieval system --- transcoder/main.go | 10 +++- transcoder/src/audiostream.go | 14 ++++-- transcoder/src/filestream.go | 94 ++++++++++++++++++++++------------- transcoder/src/keyframes.go | 26 +++++----- transcoder/src/metadata.go | 4 ++ transcoder/src/stream.go | 17 ++++--- transcoder/src/transcoder.go | 16 +++--- transcoder/src/videostream.go | 23 +++++++-- 8 files changed, 132 insertions(+), 72 deletions(-) diff --git a/transcoder/main.go b/transcoder/main.go index 562bdfa6..2bba4ded 100644 --- a/transcoder/main.go +++ b/transcoder/main.go @@ -260,7 +260,7 @@ func (h *Handler) GetThumbnailsVtt(c echo.Context) error { type Handler struct { transcoder *src.Transcoder - metadata src.MetadataService + metadata *src.MetadataService } func main() { @@ -268,13 +268,19 @@ func main() { e.Use(middleware.Logger()) e.HTTPErrorHandler = ErrorHandler - transcoder, err := src.NewTranscoder() + metadata, err := src.NewMetadataService() + if err != nil { + e.Logger.Fatal(err) + return + } + transcoder, err := src.NewTranscoder(metadata) if err != nil { e.Logger.Fatal(err) return } h := Handler{ transcoder: transcoder, + metadata: metadata, } e.GET("/:path/direct", DirectStream) diff --git a/transcoder/src/audiostream.go b/transcoder/src/audiostream.go index f3cdc9ea..8a1dee20 100644 --- a/transcoder/src/audiostream.go +++ b/transcoder/src/audiostream.go @@ -10,12 +10,18 @@ type AudioStream struct { index int32 } -func NewAudioStream(file *FileStream, idx int32) *AudioStream { - log.Printf("Creating a audio stream %d for %s", idx, file.Path) +func (t *Transcoder) NewAudioStream(file *FileStream, idx int32) (*AudioStream, error) { + log.Printf("Creating a audio stream %d for %s", idx, file.Info.Path) + + keyframes, err := t.metadataService.GetKeyframes(file.Info, false, idx) + if err != nil { + return nil, err + } + ret := new(AudioStream) ret.index = idx - NewStream(file, ret, &ret.Stream) - return ret + NewStream(file, keyframes, ret, &ret.Stream) + return ret, nil } func (as *AudioStream) getOutPath(encoder_id int) string { diff --git a/transcoder/src/filestream.go b/transcoder/src/filestream.go index 4c4660b9..41ddc193 100644 --- a/transcoder/src/filestream.go +++ b/transcoder/src/filestream.go @@ -10,40 +10,38 @@ import ( ) type FileStream struct { - ready sync.WaitGroup - err error - Path string - Out string - Keyframes *Keyframe - Info *MediaInfo - videos CMap[Quality, *VideoStream] - audios CMap[int32, *AudioStream] + transcoder *Transcoder + ready sync.WaitGroup + err error + Out string + Info *MediaInfo + videos CMap[VideoKey, *VideoStream] + audios CMap[int32, *AudioStream] } -func NewFileStream(path string, sha string) *FileStream { +type VideoKey struct { + idx int32 + quality Quality +} + +func (t *Transcoder) newFileStream(path string, sha string) *FileStream { ret := &FileStream{ - Path: path, - Out: fmt.Sprintf("%s/%s", Settings.Outpath, sha), - videos: NewCMap[Quality, *VideoStream](), - audios: NewCMap[int32, *AudioStream](), + transcoder: t, + Out: fmt.Sprintf("%s/%s", Settings.Outpath, sha), + videos: NewCMap[VideoKey, *VideoStream](), + audios: NewCMap[int32, *AudioStream](), } ret.ready.Add(1) go func() { defer ret.ready.Done() - info, err := GetInfo(path, sha) + info, err := t.metadataService.GetMetadata(path, sha) ret.Info = info if err != nil { ret.err = err } }() - ret.ready.Add(1) - go func() { - defer ret.ready.Done() - ret.Keyframes = GetKeyframes(sha, path) - }() - return ret } @@ -62,7 +60,7 @@ func (fs *FileStream) Kill() { } func (fs *FileStream) Destroy() { - log.Printf("Removing all transcode cache files for %s", fs.Path) + log.Printf("Removing all transcode cache files for %s", fs.Info.Path) fs.Kill() _ = os.RemoveAll(fs.Out) } @@ -135,36 +133,64 @@ func (fs *FileStream) GetMaster() string { return master } -func (fs *FileStream) getVideoStream(quality Quality) *VideoStream { - stream, _ := fs.videos.GetOrCreate(quality, func() *VideoStream { - return NewVideoStream(fs, quality) +func (fs *FileStream) getVideoStream(idx int32, quality Quality) (*VideoStream, error) { + var err error + stream, _ := fs.videos.GetOrCreate(VideoKey{idx, quality}, func() *VideoStream { + var ret *VideoStream + ret, err = fs.transcoder.NewVideoStream(fs, idx, quality) + return ret }) - return stream + if err != nil { + fs.videos.Remove(VideoKey{idx, quality}) + return nil, err + } + stream.ready.Wait() + return stream, nil } -func (fs *FileStream) GetVideoIndex(quality Quality) (string, error) { - stream := fs.getVideoStream(quality) +func (fs *FileStream) GetVideoIndex(idx int32, quality Quality) (string, error) { + stream, err := fs.getVideoStream(idx, quality) + if err != nil { + return "", err + } return stream.GetIndex() } -func (fs *FileStream) GetVideoSegment(quality Quality, segment int32) (string, error) { - stream := fs.getVideoStream(quality) +func (fs *FileStream) GetVideoSegment(idx int32, quality Quality, segment int32) (string, error) { + stream, err := fs.getVideoStream(idx, quality) + if err != nil { + return "", err + } return stream.GetSegment(segment) } -func (fs *FileStream) getAudioStream(audio int32) *AudioStream { +func (fs *FileStream) getAudioStream(audio int32) (*AudioStream, error) { + var err error stream, _ := fs.audios.GetOrCreate(audio, func() *AudioStream { - return NewAudioStream(fs, audio) + var ret *AudioStream + ret, err = fs.transcoder.NewAudioStream(fs, audio) + return ret }) - return stream + if err != nil { + fs.audios.Remove(audio) + return nil, err + } + stream.ready.Wait() + return stream, nil } func (fs *FileStream) GetAudioIndex(audio int32) (string, error) { - stream := fs.getAudioStream(audio) + stream, err := fs.getAudioStream(audio) + if err != nil { + return "", nil + } return stream.GetIndex() } func (fs *FileStream) GetAudioSegment(audio int32, segment int32) (string, error) { - stream := fs.getAudioStream(audio) + stream, err := fs.getAudioStream(audio) + if err != nil { + return "", nil + } return stream.GetSegment(segment) } diff --git a/transcoder/src/keyframes.go b/transcoder/src/keyframes.go index b81f0ec4..64db7222 100644 --- a/transcoder/src/keyframes.go +++ b/transcoder/src/keyframes.go @@ -20,6 +20,7 @@ type Keyframe struct { info *KeyframeInfo } type KeyframeInfo struct { + ready sync.WaitGroup mutex sync.RWMutex listeners []func(keyframes []float64) } @@ -82,10 +83,10 @@ func (kf *Keyframe) Scan(src interface{}) error { type KeyframeKey struct { Sha string IsVideo bool - Index int + Index int32 } -func (s *MetadataService) GetKeyframe(info *MediaInfo, isVideo bool, idx int) (*Keyframe, error) { +func (s *MetadataService) GetKeyframes(info *MediaInfo, isVideo bool, idx int32) (*Keyframe, error) { get_running, set := s.keyframeLock.Start(KeyframeKey{ Sha: info.Sha, IsVideo: isVideo, @@ -99,18 +100,17 @@ func (s *MetadataService) GetKeyframe(info *MediaInfo, isVideo bool, idx int) (* IsDone: false, info: &KeyframeInfo{}, } + kf.info.ready.Add(1) - var ready sync.WaitGroup var err error - ready.Add(1) go func() { var table string if isVideo { table = "videos" - err = getVideoKeyframes(info.Path, idx, kf, &ready) + err = getVideoKeyframes(info.Path, idx, kf) } else { table = "audios" - err = getAudioKeyframes(info, idx, kf, &ready) + err = getAudioKeyframes(info, idx, kf) } if err != nil { @@ -134,14 +134,13 @@ func (s *MetadataService) GetKeyframe(info *MediaInfo, isVideo bool, idx int) (* log.Printf("Couldn't store keyframes on database: %v", err) } }() - ready.Wait() return set(kf, err) } // Retrive video's keyframes and store them inside the kf var. // Returns when all key frames are retrived (or an error occurs) -// ready.Done() is called when more than 100 are retrived (or extraction is done) -func getVideoKeyframes(path string, video_idx int, kf *Keyframe, ready *sync.WaitGroup) error { +// info.ready.Done() is called when more than 100 are retrived (or extraction is done) +func getVideoKeyframes(path string, video_idx int32, kf *Keyframe) error { defer printExecTime("ffprobe keyframe analysis for %s video n%d", path, video_idx)() // run ffprobe to return all IFrames, IFrames are points where we can split the video in segments. // We ask ffprobe to return the time of each frame and it's flags @@ -209,7 +208,7 @@ func getVideoKeyframes(path string, video_idx int, kf *Keyframe, ready *sync.Wai if len(ret) == max { kf.add(ret) if done == 0 { - ready.Done() + kf.info.ready.Done() } else if done >= 500 { max = 500 } @@ -220,13 +219,14 @@ func getVideoKeyframes(path string, video_idx int, kf *Keyframe, ready *sync.Wai } kf.add(ret) if done == 0 { - ready.Done() + kf.info.ready.Done() } kf.IsDone = true return nil } -func getAudioKeyframes(info *MediaInfo, audio_idx int, kf *Keyframe, ready *sync.WaitGroup) error { +// we can pretty much cut audio at any point so no need to get specific frames, just cut every 4s +func getAudioKeyframes(info *MediaInfo, audio_idx int32, kf *Keyframe) error { dummyKeyframeDuration := float64(4) segmentCount := int((float64(info.Duration) / dummyKeyframeDuration) + 1) kf.Keyframes = make([]float64, segmentCount) @@ -234,7 +234,7 @@ func getAudioKeyframes(info *MediaInfo, audio_idx int, kf *Keyframe, ready *sync kf.Keyframes[segmentIndex] = float64(segmentIndex) * dummyKeyframeDuration } - ready.Done() + kf.info.ready.Done() kf.IsDone = true return nil } diff --git a/transcoder/src/metadata.go b/transcoder/src/metadata.go index 32d5bd45..50505f60 100644 --- a/transcoder/src/metadata.go +++ b/transcoder/src/metadata.go @@ -12,6 +12,10 @@ type MetadataService struct { keyframeLock *RunLock[KeyframeKey, *Keyframe] } +func NewMetadataService() (*MetadataService, error) { + return &MetadataService{}, nil +} + func (s MetadataService) GetMetadata(path string, sha string) (*MediaInfo, error) { ret, err := s.getMetadata(path, sha) if err != nil { diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index c2f600cc..184ba05f 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -30,10 +30,12 @@ type StreamHandle interface { } type Stream struct { - handle StreamHandle - file *FileStream - segments []Segment - heads []Head + handle StreamHandle + ready sync.WaitGroup + file *FileStream + keyframes *Keyframe + segments []Segment + heads []Head // the lock used for the the heads lock sync.RWMutex } @@ -62,19 +64,20 @@ var DeletedHead = Head{ command: nil, } -func NewStream(file *FileStream, handle StreamHandle, ret *Stream) { +func NewStream(file *FileStream, keyframes *Keyframe, handle StreamHandle, ret *Stream) { ret.handle = handle ret.file = file + ret.keyframes = keyframes ret.heads = make([]Head, 0) - length, is_done := file.Keyframes.Length() + length, is_done := keyframes.Length() ret.segments = make([]Segment, length, max(length, 2000)) for seg := range ret.segments { ret.segments[seg].channel = make(chan struct{}) } if !is_done { - file.Keyframes.AddListener(func(keyframes []float64) { + keyframes.AddListener(func(keyframes []float64) { ret.lock.Lock() defer ret.lock.Unlock() old_length := len(ret.segments) diff --git a/transcoder/src/transcoder.go b/transcoder/src/transcoder.go index 983e31a9..bea18526 100644 --- a/transcoder/src/transcoder.go +++ b/transcoder/src/transcoder.go @@ -7,12 +7,13 @@ import ( type Transcoder struct { // All file streams currently running, index is file path - streams CMap[string, *FileStream] - clientChan chan ClientInfo - tracker *Tracker + streams CMap[string, *FileStream] + clientChan chan ClientInfo + tracker *Tracker + metadataService *MetadataService } -func NewTranscoder() (*Transcoder, error) { +func NewTranscoder(metadata *MetadataService) (*Transcoder, error) { out := Settings.Outpath dir, err := os.ReadDir(out) if err != nil { @@ -26,8 +27,9 @@ func NewTranscoder() (*Transcoder, error) { } ret := &Transcoder{ - streams: NewCMap[string, *FileStream](), - clientChan: make(chan ClientInfo, 10), + streams: NewCMap[string, *FileStream](), + clientChan: make(chan ClientInfo, 10), + metadataService: metadata, } ret.tracker = NewTracker(ret) return ret, nil @@ -35,7 +37,7 @@ func NewTranscoder() (*Transcoder, error) { func (t *Transcoder) getFileStream(path string, sha string) (*FileStream, error) { ret, _ := t.streams.GetOrCreate(sha, func() *FileStream { - return NewFileStream(path, sha) + return t.newFileStream(path, sha) }) ret.ready.Wait() if ret.err != nil { diff --git a/transcoder/src/videostream.go b/transcoder/src/videostream.go index 6c152284..02264467 100644 --- a/transcoder/src/videostream.go +++ b/transcoder/src/videostream.go @@ -7,15 +7,28 @@ import ( type VideoStream struct { Stream + idx int32 quality Quality } -func NewVideoStream(file *FileStream, quality Quality) *VideoStream { - log.Printf("Creating a new video stream for %s in quality %s", file.Path, quality) +func (t *Transcoder) NewVideoStream(file *FileStream, idx int32, quality Quality) (*VideoStream, error) { + log.Printf( + "Creating a new video stream for %s (n %d) in quality %s", + file.Info.Path, + idx, + quality, + ) + + keyframes, err := t.metadataService.GetKeyframes(file.Info, true, idx) + if err != nil { + return nil, err + } + ret := new(VideoStream) + ret.idx = idx ret.quality = quality - NewStream(file, ret, &ret.Stream) - return ret + NewStream(file, keyframes, ret, &ret.Stream) + return ret, nil } func (vs *VideoStream) getFlags() Flags { @@ -41,7 +54,7 @@ func closestMultiple(n int32, x int32) int32 { func (vs *VideoStream) getTranscodeArgs(segments string) []string { args := []string{ - "-map", "0:V:0", + "-map", fmt.Sprint("0:V:%d", vs.idx), } if vs.quality == Original {