diff --git a/transcoder/Dockerfile b/transcoder/Dockerfile index 04cf6688..00eda7ea 100644 --- a/transcoder/Dockerfile +++ b/transcoder/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20-alpine +FROM golang:1.21-alpine RUN apk add --no-cache build-base ffmpeg libmediainfo-dev WORKDIR /app COPY go.mod go.sum ./ diff --git a/transcoder/Dockerfile.dev b/transcoder/Dockerfile.dev index 019cded1..1d85a65f 100644 --- a/transcoder/Dockerfile.dev +++ b/transcoder/Dockerfile.dev @@ -1,4 +1,4 @@ -FROM golang:1.20-alpine +FROM golang:1.21-alpine RUN apk add --no-cache build-base ffmpeg libmediainfo-dev RUN go install github.com/bokwoon95/wgo@latest WORKDIR /app diff --git a/transcoder/go.mod b/transcoder/go.mod index 5f684925..db2cc6e1 100644 --- a/transcoder/go.mod +++ b/transcoder/go.mod @@ -1,6 +1,6 @@ module github.com/zoriya/kyoo/transcoder -go 1.20 +go 1.21 require github.com/labstack/echo/v4 v4.11.4 // direct diff --git a/transcoder/src/filestream.go b/transcoder/src/filestream.go index cda4282d..1c88d106 100644 --- a/transcoder/src/filestream.go +++ b/transcoder/src/filestream.go @@ -189,3 +189,8 @@ func (fs *FileStream) GetVideoIndex(quality Quality, client string) (string, err stream := fs.streams[quality] return stream.GetIndex(client) } + +func (fs *FileStream) GetVideoSegment(quality Quality, segment int32, client string) (string, error) { + stream := fs.streams[quality] + return stream.GetSegment(segment, client) +} diff --git a/transcoder/src/info.go b/transcoder/src/info.go index 1103915b..7afb0591 100644 --- a/transcoder/src/info.go +++ b/transcoder/src/info.go @@ -132,11 +132,12 @@ func Or[T comparable](vals ...T) T { return zero } -func Map[T any](ts []T, f func(T, int) T) []T { +func Map[T, U any](ts []T, f func(T, int) U) []U { + us := make([]U, len(ts)) for i := range ts { - ts[i] = f(ts[i], i) + us[i] = f(ts[i], i) } - return ts + return us } func OrNull(str string) *string { diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index d09a3cc0..e1d11e8f 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -6,17 +6,11 @@ import ( "fmt" "log" "os/exec" + "slices" "strings" "sync" ) -func Min(a int32, b int32) int32 { - if a < b { - return a - } - return b -} - type TranscodeStream interface { getTranscodeArgs(segments string) []string getOutPath() string @@ -28,13 +22,27 @@ type Stream struct { Clients []string // true if the segment at given index is completed/transcoded, false otherwise segments []bool - // the lock used for the segments array + heads []int32 + // the lock used for the segments array and the heads lock sync.RWMutex ctx context.Context - // TODO: add ffmpeg process } -func (ts *Stream) run(start int32, end int32) error { +func (ts *Stream) run(start int32) error { + // Start the transcode up to the 100th segment (or less) + // Stop at the first finished segment + end := min(start+100, int32(len(ts.file.Keyframes))) + ts.lock.RLock() + for i := start; i < end; i++ { + if ts.segments[i] { + end = i + break + } + } + encoder_id := len(ts.heads) + ts.heads = append(ts.heads, start) + ts.lock.RUnlock() + log.Printf( "Starting transcode for %s (from %d to %d out of %d segments)", ts.file.Path, @@ -92,6 +100,7 @@ func (ts *Stream) run(start int32, end int32) error { ts.lock.Lock() ts.segments[segment] = true + ts.heads[encoder_id] = segment ts.lock.Unlock() } @@ -128,3 +137,28 @@ func (ts *Stream) GetIndex(client string) (string, error) { index += `#EXT-X-ENDLIST` return index, nil } + +func (ts *Stream) GetSegment(segment int32, client string) (string, error) { + ts.lock.RLock() + ready := ts.segments[segment] + ts.lock.RUnlock() + + if !ready { + // Only start a new encode if there is more than 10s between the current encoder and the segment. + if ts.getMinEncoderDistance(ts.file.Keyframes[segment]) > 10 { + err := ts.run(segment) + if err != nil { + return "", err + } + } + // TODO: wait for ready + } + return fmt.Sprintf(ts.getOutPath(), segment), nil +} + +func (ts *Stream) getMinEncoderDistance(time float64) float64 { + ts.lock.RLock() + defer ts.lock.RUnlock() + distances := Map(ts.heads, func(i int32, _ int) float64 { return max(0, ts.file.Keyframes[i]-time) }) + return slices.Min(distances) +} diff --git a/transcoder/src/transcoder.go b/transcoder/src/transcoder.go index 681ce4e3..09b9b957 100644 --- a/transcoder/src/transcoder.go +++ b/transcoder/src/transcoder.go @@ -97,7 +97,11 @@ func (t *Transcoder) GetVideoSegment( segment int32, client string, ) (string, error) { - return "", nil + stream, err := t.getFileStream(path) + if err != nil { + return "", err + } + return stream.GetVideoSegment(quality, segment, client) } func (t *Transcoder) GetAudioSegment( diff --git a/transcoder/src/videostream.go b/transcoder/src/videostream.go index 63cfb9c6..de448f21 100644 --- a/transcoder/src/videostream.go +++ b/transcoder/src/videostream.go @@ -16,8 +16,7 @@ func NewVideoStream(file *FileStream, quality Quality) (*VideoStream, error) { }, quality: quality, } - // Start the transcode up to the 100th segment (or less) - ret.run(0, Min(100, int32(len(file.Keyframes)))) + ret.run(0) return &ret, nil }