From ca6b581c85f2a2ccfa489cf6f2c601d5b8f44d60 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Wed, 7 Aug 2024 23:58:29 +0200 Subject: [PATCH] Fix master playlist & keyframes deadlock --- transcoder/src/filestream.go | 35 +++++++++++----------------- transcoder/src/stream.go | 45 +++++++++++++++++++----------------- 2 files changed, 37 insertions(+), 43 deletions(-) diff --git a/transcoder/src/filestream.go b/transcoder/src/filestream.go index 5e60e6bd..0813d111 100644 --- a/transcoder/src/filestream.go +++ b/transcoder/src/filestream.go @@ -88,6 +88,7 @@ func (fs *FileStream) GetMaster() string { master += "CHANNELS=\"2\"," master += fmt.Sprintf("URI=\"./audio/%d/index.m3u8\"\n", audio.Index) } + master += "\n" // codec is the prefix + the level, the level is not part of the codec we want to compare for the same_codec check bellow transmux_prefix := "avc1.6400" @@ -108,9 +109,10 @@ func (fs *FileStream) GetMaster() string { if def_video != nil { qualities := Filter(Qualities, func(quality Quality) bool { same_codec := def_video.MimeCodec != nil && strings.HasPrefix(*def_video.MimeCodec, transmux_prefix) - return quality.Height() < def_video.Quality().Height() || - (quality.Height() == def_video.Quality().Height() && !same_codec) + return quality.Height() < def_video.Height || + (quality.Height() == def_video.Height && !same_codec) }) + qualities = append(qualities, Original) for _, quality := range qualities { for _, video := range fs.Info.Videos { @@ -126,13 +128,14 @@ func (fs *FileStream) GetMaster() string { } else { master += fmt.Sprintf("NAME=\"Video %d\",", video.Index) } - if &video == def_video { - master += "DEFAULT=YES" + if video == *def_video { + master += "DEFAULT=YES\n" } else { master += fmt.Sprintf("URI=\"./%d/%s/index.m3u8\"\n", video.Index, quality) } } } + master += "\n" // original stream { @@ -152,7 +155,7 @@ func (fs *FileStream) GetMaster() string { aspectRatio := float32(def_video.Width) / float32(def_video.Height) for i, quality := range qualities { - if i == 0 { + if i == len(qualities)-1 { // skip the original stream that already got handled continue } @@ -164,7 +167,7 @@ func (fs *FileStream) GetMaster() string { master += fmt.Sprintf("CODECS=\"%s\",", strings.Join([]string{transmux_codec, audio_codec}, ",")) master += "AUDIO=\"audio\"," master += "CLOSED-CAPTIONS=NONE\n" - master += fmt.Sprintf("./%s/index.m3u8\n", quality) + master += fmt.Sprintf("./%d/%s/index.m3u8\n", def_video.Index, quality) } } @@ -172,17 +175,11 @@ func (fs *FileStream) GetMaster() string { } func (fs *FileStream) getVideoStream(idx uint32, quality Quality) (*VideoStream, error) { - var err error stream, _ := fs.videos.GetOrCreate(VideoKey{idx, quality}, func() *VideoStream { - var ret *VideoStream - ret, err = fs.transcoder.NewVideoStream(fs, idx, quality) + ret, _ := fs.transcoder.NewVideoStream(fs, idx, quality) return ret }) - if err != nil { - fs.videos.Remove(VideoKey{idx, quality}) - return nil, err - } - stream.ready.Wait() + stream.keyframes.info.ready.Wait() return stream, nil } @@ -203,17 +200,11 @@ func (fs *FileStream) GetVideoSegment(idx uint32, quality Quality, segment int32 } func (fs *FileStream) getAudioStream(audio uint32) (*AudioStream, error) { - var err error stream, _ := fs.audios.GetOrCreate(audio, func() *AudioStream { - var ret *AudioStream - ret, err = fs.transcoder.NewAudioStream(fs, audio) + ret, _ := fs.transcoder.NewAudioStream(fs, audio) return ret }) - if err != nil { - fs.audios.Remove(audio) - return nil, err - } - stream.ready.Wait() + stream.keyframes.info.ready.Wait() return stream, nil } diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index 8fe03755..c1de1a9e 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -31,7 +31,6 @@ type StreamHandle interface { type Stream struct { handle StreamHandle - ready sync.WaitGroup file *FileStream keyframes *Keyframe segments []Segment @@ -70,27 +69,31 @@ func NewStream(file *FileStream, keyframes *Keyframe, handle StreamHandle, ret * ret.keyframes = keyframes ret.heads = make([]Head, 0) - length, is_done := keyframes.Length() - ret.segments = make([]Segment, length, max(length, 2000)) - for seg := range ret.segments { - ret.segments[seg].channel = make(chan struct{}) - } + go func() { + keyframes.info.ready.Wait() - if !is_done { - keyframes.AddListener(func(keyframes []float64) { - ret.lock.Lock() - defer ret.lock.Unlock() - old_length := len(ret.segments) - if cap(ret.segments) > len(keyframes) { - ret.segments = ret.segments[:len(keyframes)] - } else { - ret.segments = append(ret.segments, make([]Segment, len(keyframes)-old_length)...) - } - for seg := old_length; seg < len(keyframes); seg++ { - ret.segments[seg].channel = make(chan struct{}) - } - }) - } + length, is_done := keyframes.Length() + ret.segments = make([]Segment, length, max(length, 2000)) + for seg := range ret.segments { + ret.segments[seg].channel = make(chan struct{}) + } + + if !is_done { + keyframes.AddListener(func(keyframes []float64) { + ret.lock.Lock() + defer ret.lock.Unlock() + old_length := len(ret.segments) + if cap(ret.segments) > len(keyframes) { + ret.segments = ret.segments[:len(keyframes)] + } else { + ret.segments = append(ret.segments, make([]Segment, len(keyframes)-old_length)...) + } + for seg := old_length; seg < len(keyframes); seg++ { + ret.segments[seg].channel = make(chan struct{}) + } + }) + } + }() } // Remember to lock before calling this.