Use pointers to store streams

This commit is contained in:
Zoe Roux 2024-01-14 03:45:18 +01:00
parent 049965cdc9
commit 95b1caeb26
2 changed files with 26 additions and 29 deletions

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"log" "log"
"math" "math"
"os"
"os/exec" "os/exec"
"strconv" "strconv"
"strings" "strings"
@ -12,10 +13,19 @@ import (
type FileStream struct { type FileStream struct {
Path string Path string
Out string
Keyframes []float64 Keyframes []float64
CanTransmux bool CanTransmux bool
Info *MediaInfo Info *MediaInfo
streams map[Quality]TranscodeStream streams map[Quality]*TranscodeStream
}
func GetOutPath() string {
out := os.Getenv("CACHE_ROOT")
if out == "" {
return "/cache"
}
return out
} }
func NewFileStream(path string) (*FileStream, error) { func NewFileStream(path string) (*FileStream, error) {
@ -42,16 +52,17 @@ func NewFileStream(path string) (*FileStream, error) {
return &FileStream{ return &FileStream{
Path: path, Path: path,
Out: fmt.Sprintf("%s/%s", GetOutPath(), info.info.Sha),
Keyframes: keyframes, Keyframes: keyframes,
CanTransmux: can_transmux, CanTransmux: can_transmux,
Info: info.info, Info: info.info,
streams: make(map[Quality]TranscodeStream), streams: make(map[Quality]*TranscodeStream),
}, nil }, nil
} }
func GetKeyframes(path string) ([]float64, bool, error) { func GetKeyframes(path string) ([]float64, bool, error) {
// run ffprobe to return all IFrames, IFrames are points where we can split the video in segments.
log.Printf("Starting ffprobe for keyframes analysis for %s", path) log.Printf("Starting ffprobe for keyframes analysis for %s", path)
// run ffprobe to return all IFrames, IFrames are points where we can split the video in segments.
start := time.Now() start := time.Now()
out, err := exec.Command( out, err := exec.Command(
"ffprobe", "ffprobe",
@ -173,19 +184,7 @@ func (fs *FileStream) GetMaster() string {
return master return master
} }
func (fs *FileStream) GetIndex(quality Quality, client string) (string, error) { func (fs *FileStream) GetVideoIndex(quality Quality, client string) (string, error) {
index := `#EXTM3U stream := fs.streams[quality]
#EXT-X-VERSION:3 return stream.GetIndex(client)
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-ALLOW-CACHE:YES
#EXT-X-TARGETDURATION:4
#EXT-X-MEDIA-SEQUENCE:0
`
for segment := 1; segment < len(fs.Keyframes); segment++ {
index += fmt.Sprintf("#EXTINF:%.6f\n", fs.Keyframes[segment]-fs.Keyframes[segment-1])
index += fmt.Sprintf("segment-%d.ts\n", segment)
}
index += `#EXT-X-ENDLIST`
return index, nil
} }

View File

@ -8,7 +8,7 @@ import (
type Transcoder struct { type Transcoder struct {
// All file streams currently running, index is file path // All file streams currently running, index is file path
streams map[string]FileStream streams map[string]*FileStream
// Streams that are staring up // Streams that are staring up
preparing map[string]chan *FileStream preparing map[string]chan *FileStream
mutex sync.RWMutex mutex sync.RWMutex
@ -16,7 +16,7 @@ type Transcoder struct {
func NewTranscoder() *Transcoder { func NewTranscoder() *Transcoder {
return &Transcoder{ return &Transcoder{
streams: make(map[string]FileStream), streams: make(map[string]*FileStream),
preparing: make(map[string]chan *FileStream), preparing: make(map[string]chan *FileStream),
} }
} }
@ -28,11 +28,10 @@ func (t *Transcoder) getFileStream(path string) (*FileStream, error) {
t.mutex.RUnlock() t.mutex.RUnlock()
if preparing { if preparing {
pstream := <-channel stream = <-channel
if pstream == nil { if stream == nil {
return nil, errors.New("could not transcode file. Try again later") return nil, errors.New("could not transcode file. Try again later")
} }
stream = *pstream
} else if !ok { } else if !ok {
t.mutex.Lock() t.mutex.Lock()
channel = make(chan *FileStream, 1) channel = make(chan *FileStream, 1)
@ -40,7 +39,7 @@ func (t *Transcoder) getFileStream(path string) (*FileStream, error) {
t.cleanUnused() t.cleanUnused()
t.mutex.Unlock() t.mutex.Unlock()
newstream, err := NewFileStream(path) stream, err := NewFileStream(path)
log.Printf("Stream created for %s", path) log.Printf("Stream created for %s", path)
if err != nil { if err != nil {
t.mutex.Lock() t.mutex.Lock()
@ -52,14 +51,13 @@ func (t *Transcoder) getFileStream(path string) (*FileStream, error) {
} }
t.mutex.Lock() t.mutex.Lock()
t.streams[path] = *newstream t.streams[path] = stream
stream = t.streams[path]
delete(t.preparing, path) delete(t.preparing, path)
t.mutex.Unlock() t.mutex.Unlock()
channel <- &stream channel <- stream
} }
return &stream, nil return stream, nil
} }
// This method assume the lock is already taken. // This method assume the lock is already taken.
@ -86,7 +84,7 @@ func (t *Transcoder) GetVideoIndex(path string, quality Quality, client string)
if err != nil { if err != nil {
return "", err return "", err
} }
return stream.GetIndex(quality, client) return stream.GetVideoIndex(quality, client)
} }
func (t *Transcoder) GetAudioIndex(path string, audio string, client string) (string, error) { func (t *Transcoder) GetAudioIndex(path string, audio string, client string) (string, error) {