From 2a491ded000adf452d6eaf9f920a8c4c3cc42fee Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Fri, 19 Jan 2024 21:37:52 +0100 Subject: [PATCH] Fix rw mutexes handling --- transcoder/src/filestream.go | 18 ++++++------------ transcoder/src/tracker.go | 28 ++++++++++++++-------------- transcoder/src/transcoder.go | 15 +++++++-------- 3 files changed, 27 insertions(+), 34 deletions(-) diff --git a/transcoder/src/filestream.go b/transcoder/src/filestream.go index 7c9d78cf..65084c53 100644 --- a/transcoder/src/filestream.go +++ b/transcoder/src/filestream.go @@ -20,9 +20,9 @@ type FileStream struct { CanTransmux bool Info *MediaInfo streams map[Quality]*VideoStream - vlock sync.RWMutex + vlock sync.Mutex audios map[int32]*AudioStream - alock sync.RWMutex + alock sync.Mutex } func GetOutPath() string { @@ -201,16 +201,13 @@ func (fs *FileStream) GetMaster() string { } func (fs *FileStream) getVideoStream(quality Quality) *VideoStream { - fs.vlock.RLock() + fs.vlock.Lock() + defer fs.vlock.Unlock() stream, ok := fs.streams[quality] - fs.vlock.RUnlock() if ok { return stream } - - fs.vlock.Lock() - defer fs.vlock.Unlock() fs.streams[quality] = NewVideoStream(fs, quality) return fs.streams[quality] } @@ -226,16 +223,13 @@ func (fs *FileStream) GetVideoSegment(quality Quality, segment int32) (string, e } func (fs *FileStream) getAudioStream(audio int32) *AudioStream { - fs.alock.RLock() + fs.alock.Lock() + defer fs.alock.Unlock() stream, ok := fs.audios[audio] - fs.alock.RUnlock() if ok { return stream } - - fs.alock.Lock() - defer fs.alock.Unlock() fs.audios[audio] = NewAudioStream(fs, audio) return fs.audios[audio] } diff --git a/transcoder/src/tracker.go b/transcoder/src/tracker.go index c1d97110..933f1573 100644 --- a/transcoder/src/tracker.go +++ b/transcoder/src/tracker.go @@ -124,12 +124,12 @@ func (t *Tracker) KillAudioIfDead(path string, audio int32) bool { } } log.Printf("Nobody is listening audio %d of %s. Killing it", audio, path) - t.transcoder.mutex.RLock() + t.transcoder.mutex.Lock() stream := t.transcoder.streams[path] - t.transcoder.mutex.RUnlock() + t.transcoder.mutex.Unlock() - stream.alock.RLock() - defer stream.alock.RUnlock() + stream.alock.Lock() + defer stream.alock.Unlock() stream.audios[audio].Kill() return true } @@ -141,32 +141,32 @@ func (t *Tracker) KillQualityIfDead(path string, quality Quality) bool { } } log.Printf("Nobody is watching quality %s of %s. Killing it", quality, path) - t.transcoder.mutex.RLock() + t.transcoder.mutex.Lock() stream := t.transcoder.streams[path] - t.transcoder.mutex.RUnlock() + t.transcoder.mutex.Unlock() - stream.vlock.RLock() - defer stream.vlock.RUnlock() + stream.vlock.Lock() + defer stream.vlock.Unlock() stream.streams[quality].Kill() return true } func (t *Tracker) KillOrphanedHeads(path string, quality *Quality, audio int32) { - t.transcoder.mutex.RLock() + t.transcoder.mutex.Lock() stream := t.transcoder.streams[path] - t.transcoder.mutex.RUnlock() + t.transcoder.mutex.Unlock() if quality != nil { - stream.vlock.RLock() + stream.vlock.Lock() vstream := stream.streams[*quality] - stream.vlock.RUnlock() + stream.vlock.Unlock() t.killOrphanedeheads(&vstream.Stream) } if audio != -1 { - stream.alock.RLock() + stream.alock.Lock() astream := stream.audios[audio] - stream.alock.RUnlock() + stream.alock.Unlock() t.killOrphanedeheads(&astream.Stream) } diff --git a/transcoder/src/transcoder.go b/transcoder/src/transcoder.go index 78d01052..28a153c9 100644 --- a/transcoder/src/transcoder.go +++ b/transcoder/src/transcoder.go @@ -13,7 +13,7 @@ type Transcoder struct { streams map[string]*FileStream // Streams that are staring up preparing map[string]chan *FileStream - mutex sync.RWMutex + mutex sync.Mutex clientChan chan ClientInfo tracker *Tracker } @@ -41,10 +41,14 @@ func NewTranscoder() (*Transcoder, error) { } func (t *Transcoder) getFileStream(path string) (*FileStream, error) { - t.mutex.RLock() + t.mutex.Lock() stream, ok := t.streams[path] channel, preparing := t.preparing[path] - t.mutex.RUnlock() + if !preparing && !ok { + channel = make(chan *FileStream, 1) + t.preparing[path] = channel + } + t.mutex.Unlock() if preparing { stream = <-channel @@ -52,11 +56,6 @@ func (t *Transcoder) getFileStream(path string) (*FileStream, error) { return nil, errors.New("could not transcode file. Try again later") } } else if !ok { - t.mutex.Lock() - channel = make(chan *FileStream, 1) - t.preparing[path] = channel - t.mutex.Unlock() - var err error stream, err = NewFileStream(path) log.Printf("Stream created for %s", path)