Rework keyframes retrieval system

This commit is contained in:
Zoe Roux 2024-07-31 22:23:31 +02:00
parent cc45bb4656
commit e3fdf0af45
8 changed files with 132 additions and 72 deletions

View File

@ -260,7 +260,7 @@ func (h *Handler) GetThumbnailsVtt(c echo.Context) error {
type Handler struct { type Handler struct {
transcoder *src.Transcoder transcoder *src.Transcoder
metadata src.MetadataService metadata *src.MetadataService
} }
func main() { func main() {
@ -268,13 +268,19 @@ func main() {
e.Use(middleware.Logger()) e.Use(middleware.Logger())
e.HTTPErrorHandler = ErrorHandler e.HTTPErrorHandler = ErrorHandler
transcoder, err := src.NewTranscoder() metadata, err := src.NewMetadataService()
if err != nil {
e.Logger.Fatal(err)
return
}
transcoder, err := src.NewTranscoder(metadata)
if err != nil { if err != nil {
e.Logger.Fatal(err) e.Logger.Fatal(err)
return return
} }
h := Handler{ h := Handler{
transcoder: transcoder, transcoder: transcoder,
metadata: metadata,
} }
e.GET("/:path/direct", DirectStream) e.GET("/:path/direct", DirectStream)

View File

@ -10,12 +10,18 @@ type AudioStream struct {
index int32 index int32
} }
func NewAudioStream(file *FileStream, idx int32) *AudioStream { func (t *Transcoder) NewAudioStream(file *FileStream, idx int32) (*AudioStream, error) {
log.Printf("Creating a audio stream %d for %s", idx, file.Path) log.Printf("Creating a audio stream %d for %s", idx, file.Info.Path)
keyframes, err := t.metadataService.GetKeyframes(file.Info, false, idx)
if err != nil {
return nil, err
}
ret := new(AudioStream) ret := new(AudioStream)
ret.index = idx ret.index = idx
NewStream(file, ret, &ret.Stream) NewStream(file, keyframes, ret, &ret.Stream)
return ret return ret, nil
} }
func (as *AudioStream) getOutPath(encoder_id int) string { func (as *AudioStream) getOutPath(encoder_id int) string {

View File

@ -10,40 +10,38 @@ import (
) )
type FileStream struct { type FileStream struct {
ready sync.WaitGroup transcoder *Transcoder
err error ready sync.WaitGroup
Path string err error
Out string Out string
Keyframes *Keyframe Info *MediaInfo
Info *MediaInfo videos CMap[VideoKey, *VideoStream]
videos CMap[Quality, *VideoStream] audios CMap[int32, *AudioStream]
audios CMap[int32, *AudioStream]
} }
func NewFileStream(path string, sha string) *FileStream { type VideoKey struct {
idx int32
quality Quality
}
func (t *Transcoder) newFileStream(path string, sha string) *FileStream {
ret := &FileStream{ ret := &FileStream{
Path: path, transcoder: t,
Out: fmt.Sprintf("%s/%s", Settings.Outpath, sha), Out: fmt.Sprintf("%s/%s", Settings.Outpath, sha),
videos: NewCMap[Quality, *VideoStream](), videos: NewCMap[VideoKey, *VideoStream](),
audios: NewCMap[int32, *AudioStream](), audios: NewCMap[int32, *AudioStream](),
} }
ret.ready.Add(1) ret.ready.Add(1)
go func() { go func() {
defer ret.ready.Done() defer ret.ready.Done()
info, err := GetInfo(path, sha) info, err := t.metadataService.GetMetadata(path, sha)
ret.Info = info ret.Info = info
if err != nil { if err != nil {
ret.err = err ret.err = err
} }
}() }()
ret.ready.Add(1)
go func() {
defer ret.ready.Done()
ret.Keyframes = GetKeyframes(sha, path)
}()
return ret return ret
} }
@ -62,7 +60,7 @@ func (fs *FileStream) Kill() {
} }
func (fs *FileStream) Destroy() { func (fs *FileStream) Destroy() {
log.Printf("Removing all transcode cache files for %s", fs.Path) log.Printf("Removing all transcode cache files for %s", fs.Info.Path)
fs.Kill() fs.Kill()
_ = os.RemoveAll(fs.Out) _ = os.RemoveAll(fs.Out)
} }
@ -135,36 +133,64 @@ func (fs *FileStream) GetMaster() string {
return master return master
} }
func (fs *FileStream) getVideoStream(quality Quality) *VideoStream { func (fs *FileStream) getVideoStream(idx int32, quality Quality) (*VideoStream, error) {
stream, _ := fs.videos.GetOrCreate(quality, func() *VideoStream { var err error
return NewVideoStream(fs, quality) stream, _ := fs.videos.GetOrCreate(VideoKey{idx, quality}, func() *VideoStream {
var ret *VideoStream
ret, err = fs.transcoder.NewVideoStream(fs, idx, quality)
return ret
}) })
return stream if err != nil {
fs.videos.Remove(VideoKey{idx, quality})
return nil, err
}
stream.ready.Wait()
return stream, nil
} }
func (fs *FileStream) GetVideoIndex(quality Quality) (string, error) { func (fs *FileStream) GetVideoIndex(idx int32, quality Quality) (string, error) {
stream := fs.getVideoStream(quality) stream, err := fs.getVideoStream(idx, quality)
if err != nil {
return "", err
}
return stream.GetIndex() return stream.GetIndex()
} }
func (fs *FileStream) GetVideoSegment(quality Quality, segment int32) (string, error) { func (fs *FileStream) GetVideoSegment(idx int32, quality Quality, segment int32) (string, error) {
stream := fs.getVideoStream(quality) stream, err := fs.getVideoStream(idx, quality)
if err != nil {
return "", err
}
return stream.GetSegment(segment) return stream.GetSegment(segment)
} }
func (fs *FileStream) getAudioStream(audio int32) *AudioStream { func (fs *FileStream) getAudioStream(audio int32) (*AudioStream, error) {
var err error
stream, _ := fs.audios.GetOrCreate(audio, func() *AudioStream { stream, _ := fs.audios.GetOrCreate(audio, func() *AudioStream {
return NewAudioStream(fs, audio) var ret *AudioStream
ret, err = fs.transcoder.NewAudioStream(fs, audio)
return ret
}) })
return stream if err != nil {
fs.audios.Remove(audio)
return nil, err
}
stream.ready.Wait()
return stream, nil
} }
func (fs *FileStream) GetAudioIndex(audio int32) (string, error) { func (fs *FileStream) GetAudioIndex(audio int32) (string, error) {
stream := fs.getAudioStream(audio) stream, err := fs.getAudioStream(audio)
if err != nil {
return "", nil
}
return stream.GetIndex() return stream.GetIndex()
} }
func (fs *FileStream) GetAudioSegment(audio int32, segment int32) (string, error) { func (fs *FileStream) GetAudioSegment(audio int32, segment int32) (string, error) {
stream := fs.getAudioStream(audio) stream, err := fs.getAudioStream(audio)
if err != nil {
return "", nil
}
return stream.GetSegment(segment) return stream.GetSegment(segment)
} }

View File

@ -20,6 +20,7 @@ type Keyframe struct {
info *KeyframeInfo info *KeyframeInfo
} }
type KeyframeInfo struct { type KeyframeInfo struct {
ready sync.WaitGroup
mutex sync.RWMutex mutex sync.RWMutex
listeners []func(keyframes []float64) listeners []func(keyframes []float64)
} }
@ -82,10 +83,10 @@ func (kf *Keyframe) Scan(src interface{}) error {
type KeyframeKey struct { type KeyframeKey struct {
Sha string Sha string
IsVideo bool IsVideo bool
Index int Index int32
} }
func (s *MetadataService) GetKeyframe(info *MediaInfo, isVideo bool, idx int) (*Keyframe, error) { func (s *MetadataService) GetKeyframes(info *MediaInfo, isVideo bool, idx int32) (*Keyframe, error) {
get_running, set := s.keyframeLock.Start(KeyframeKey{ get_running, set := s.keyframeLock.Start(KeyframeKey{
Sha: info.Sha, Sha: info.Sha,
IsVideo: isVideo, IsVideo: isVideo,
@ -99,18 +100,17 @@ func (s *MetadataService) GetKeyframe(info *MediaInfo, isVideo bool, idx int) (*
IsDone: false, IsDone: false,
info: &KeyframeInfo{}, info: &KeyframeInfo{},
} }
kf.info.ready.Add(1)
var ready sync.WaitGroup
var err error var err error
ready.Add(1)
go func() { go func() {
var table string var table string
if isVideo { if isVideo {
table = "videos" table = "videos"
err = getVideoKeyframes(info.Path, idx, kf, &ready) err = getVideoKeyframes(info.Path, idx, kf)
} else { } else {
table = "audios" table = "audios"
err = getAudioKeyframes(info, idx, kf, &ready) err = getAudioKeyframes(info, idx, kf)
} }
if err != nil { if err != nil {
@ -134,14 +134,13 @@ func (s *MetadataService) GetKeyframe(info *MediaInfo, isVideo bool, idx int) (*
log.Printf("Couldn't store keyframes on database: %v", err) log.Printf("Couldn't store keyframes on database: %v", err)
} }
}() }()
ready.Wait()
return set(kf, err) return set(kf, err)
} }
// Retrive video's keyframes and store them inside the kf var. // Retrive video's keyframes and store them inside the kf var.
// Returns when all key frames are retrived (or an error occurs) // Returns when all key frames are retrived (or an error occurs)
// ready.Done() is called when more than 100 are retrived (or extraction is done) // info.ready.Done() is called when more than 100 are retrived (or extraction is done)
func getVideoKeyframes(path string, video_idx int, kf *Keyframe, ready *sync.WaitGroup) error { func getVideoKeyframes(path string, video_idx int32, kf *Keyframe) error {
defer printExecTime("ffprobe keyframe analysis for %s video n%d", path, video_idx)() defer printExecTime("ffprobe keyframe analysis for %s video n%d", path, video_idx)()
// run ffprobe to return all IFrames, IFrames are points where we can split the video in segments. // run ffprobe to return all IFrames, IFrames are points where we can split the video in segments.
// We ask ffprobe to return the time of each frame and it's flags // We ask ffprobe to return the time of each frame and it's flags
@ -209,7 +208,7 @@ func getVideoKeyframes(path string, video_idx int, kf *Keyframe, ready *sync.Wai
if len(ret) == max { if len(ret) == max {
kf.add(ret) kf.add(ret)
if done == 0 { if done == 0 {
ready.Done() kf.info.ready.Done()
} else if done >= 500 { } else if done >= 500 {
max = 500 max = 500
} }
@ -220,13 +219,14 @@ func getVideoKeyframes(path string, video_idx int, kf *Keyframe, ready *sync.Wai
} }
kf.add(ret) kf.add(ret)
if done == 0 { if done == 0 {
ready.Done() kf.info.ready.Done()
} }
kf.IsDone = true kf.IsDone = true
return nil return nil
} }
func getAudioKeyframes(info *MediaInfo, audio_idx int, kf *Keyframe, ready *sync.WaitGroup) error { // we can pretty much cut audio at any point so no need to get specific frames, just cut every 4s
func getAudioKeyframes(info *MediaInfo, audio_idx int32, kf *Keyframe) error {
dummyKeyframeDuration := float64(4) dummyKeyframeDuration := float64(4)
segmentCount := int((float64(info.Duration) / dummyKeyframeDuration) + 1) segmentCount := int((float64(info.Duration) / dummyKeyframeDuration) + 1)
kf.Keyframes = make([]float64, segmentCount) kf.Keyframes = make([]float64, segmentCount)
@ -234,7 +234,7 @@ func getAudioKeyframes(info *MediaInfo, audio_idx int, kf *Keyframe, ready *sync
kf.Keyframes[segmentIndex] = float64(segmentIndex) * dummyKeyframeDuration kf.Keyframes[segmentIndex] = float64(segmentIndex) * dummyKeyframeDuration
} }
ready.Done() kf.info.ready.Done()
kf.IsDone = true kf.IsDone = true
return nil return nil
} }

View File

@ -12,6 +12,10 @@ type MetadataService struct {
keyframeLock *RunLock[KeyframeKey, *Keyframe] keyframeLock *RunLock[KeyframeKey, *Keyframe]
} }
func NewMetadataService() (*MetadataService, error) {
return &MetadataService{}, nil
}
func (s MetadataService) GetMetadata(path string, sha string) (*MediaInfo, error) { func (s MetadataService) GetMetadata(path string, sha string) (*MediaInfo, error) {
ret, err := s.getMetadata(path, sha) ret, err := s.getMetadata(path, sha)
if err != nil { if err != nil {

View File

@ -30,10 +30,12 @@ type StreamHandle interface {
} }
type Stream struct { type Stream struct {
handle StreamHandle handle StreamHandle
file *FileStream ready sync.WaitGroup
segments []Segment file *FileStream
heads []Head keyframes *Keyframe
segments []Segment
heads []Head
// the lock used for the the heads // the lock used for the the heads
lock sync.RWMutex lock sync.RWMutex
} }
@ -62,19 +64,20 @@ var DeletedHead = Head{
command: nil, command: nil,
} }
func NewStream(file *FileStream, handle StreamHandle, ret *Stream) { func NewStream(file *FileStream, keyframes *Keyframe, handle StreamHandle, ret *Stream) {
ret.handle = handle ret.handle = handle
ret.file = file ret.file = file
ret.keyframes = keyframes
ret.heads = make([]Head, 0) ret.heads = make([]Head, 0)
length, is_done := file.Keyframes.Length() length, is_done := keyframes.Length()
ret.segments = make([]Segment, length, max(length, 2000)) ret.segments = make([]Segment, length, max(length, 2000))
for seg := range ret.segments { for seg := range ret.segments {
ret.segments[seg].channel = make(chan struct{}) ret.segments[seg].channel = make(chan struct{})
} }
if !is_done { if !is_done {
file.Keyframes.AddListener(func(keyframes []float64) { keyframes.AddListener(func(keyframes []float64) {
ret.lock.Lock() ret.lock.Lock()
defer ret.lock.Unlock() defer ret.lock.Unlock()
old_length := len(ret.segments) old_length := len(ret.segments)

View File

@ -7,12 +7,13 @@ import (
type Transcoder struct { type Transcoder struct {
// All file streams currently running, index is file path // All file streams currently running, index is file path
streams CMap[string, *FileStream] streams CMap[string, *FileStream]
clientChan chan ClientInfo clientChan chan ClientInfo
tracker *Tracker tracker *Tracker
metadataService *MetadataService
} }
func NewTranscoder() (*Transcoder, error) { func NewTranscoder(metadata *MetadataService) (*Transcoder, error) {
out := Settings.Outpath out := Settings.Outpath
dir, err := os.ReadDir(out) dir, err := os.ReadDir(out)
if err != nil { if err != nil {
@ -26,8 +27,9 @@ func NewTranscoder() (*Transcoder, error) {
} }
ret := &Transcoder{ ret := &Transcoder{
streams: NewCMap[string, *FileStream](), streams: NewCMap[string, *FileStream](),
clientChan: make(chan ClientInfo, 10), clientChan: make(chan ClientInfo, 10),
metadataService: metadata,
} }
ret.tracker = NewTracker(ret) ret.tracker = NewTracker(ret)
return ret, nil return ret, nil
@ -35,7 +37,7 @@ func NewTranscoder() (*Transcoder, error) {
func (t *Transcoder) getFileStream(path string, sha string) (*FileStream, error) { func (t *Transcoder) getFileStream(path string, sha string) (*FileStream, error) {
ret, _ := t.streams.GetOrCreate(sha, func() *FileStream { ret, _ := t.streams.GetOrCreate(sha, func() *FileStream {
return NewFileStream(path, sha) return t.newFileStream(path, sha)
}) })
ret.ready.Wait() ret.ready.Wait()
if ret.err != nil { if ret.err != nil {

View File

@ -7,15 +7,28 @@ import (
type VideoStream struct { type VideoStream struct {
Stream Stream
idx int32
quality Quality quality Quality
} }
func NewVideoStream(file *FileStream, quality Quality) *VideoStream { func (t *Transcoder) NewVideoStream(file *FileStream, idx int32, quality Quality) (*VideoStream, error) {
log.Printf("Creating a new video stream for %s in quality %s", file.Path, quality) log.Printf(
"Creating a new video stream for %s (n %d) in quality %s",
file.Info.Path,
idx,
quality,
)
keyframes, err := t.metadataService.GetKeyframes(file.Info, true, idx)
if err != nil {
return nil, err
}
ret := new(VideoStream) ret := new(VideoStream)
ret.idx = idx
ret.quality = quality ret.quality = quality
NewStream(file, ret, &ret.Stream) NewStream(file, keyframes, ret, &ret.Stream)
return ret return ret, nil
} }
func (vs *VideoStream) getFlags() Flags { func (vs *VideoStream) getFlags() Flags {
@ -41,7 +54,7 @@ func closestMultiple(n int32, x int32) int32 {
func (vs *VideoStream) getTranscodeArgs(segments string) []string { func (vs *VideoStream) getTranscodeArgs(segments string) []string {
args := []string{ args := []string{
"-map", "0:V:0", "-map", fmt.Sprint("0:V:%d", vs.idx),
} }
if vs.quality == Original { if vs.quality == Original {