Add encoder heads

This commit is contained in:
Zoe Roux 2024-01-15 12:53:43 +01:00
parent e5954712e2
commit e9738c2bc1
8 changed files with 62 additions and 19 deletions

View File

@ -1,4 +1,4 @@
FROM golang:1.20-alpine
FROM golang:1.21-alpine
RUN apk add --no-cache build-base ffmpeg libmediainfo-dev
WORKDIR /app
COPY go.mod go.sum ./

View File

@ -1,4 +1,4 @@
FROM golang:1.20-alpine
FROM golang:1.21-alpine
RUN apk add --no-cache build-base ffmpeg libmediainfo-dev
RUN go install github.com/bokwoon95/wgo@latest
WORKDIR /app

View File

@ -1,6 +1,6 @@
module github.com/zoriya/kyoo/transcoder
go 1.20
go 1.21
require github.com/labstack/echo/v4 v4.11.4 // direct

View File

@ -189,3 +189,8 @@ func (fs *FileStream) GetVideoIndex(quality Quality, client string) (string, err
stream := fs.streams[quality]
return stream.GetIndex(client)
}
func (fs *FileStream) GetVideoSegment(quality Quality, segment int32, client string) (string, error) {
stream := fs.streams[quality]
return stream.GetSegment(segment, client)
}

View File

@ -132,11 +132,12 @@ func Or[T comparable](vals ...T) T {
return zero
}
func Map[T any](ts []T, f func(T, int) T) []T {
func Map[T, U any](ts []T, f func(T, int) U) []U {
us := make([]U, len(ts))
for i := range ts {
ts[i] = f(ts[i], i)
us[i] = f(ts[i], i)
}
return ts
return us
}
func OrNull(str string) *string {

View File

@ -6,17 +6,11 @@ import (
"fmt"
"log"
"os/exec"
"slices"
"strings"
"sync"
)
func Min(a int32, b int32) int32 {
if a < b {
return a
}
return b
}
type TranscodeStream interface {
getTranscodeArgs(segments string) []string
getOutPath() string
@ -28,13 +22,27 @@ type Stream struct {
Clients []string
// true if the segment at given index is completed/transcoded, false otherwise
segments []bool
// the lock used for the segments array
heads []int32
// the lock used for the segments array and the heads
lock sync.RWMutex
ctx context.Context
// TODO: add ffmpeg process
}
func (ts *Stream) run(start int32, end int32) error {
func (ts *Stream) run(start int32) error {
// Start the transcode up to the 100th segment (or less)
// Stop at the first finished segment
end := min(start+100, int32(len(ts.file.Keyframes)))
ts.lock.RLock()
for i := start; i < end; i++ {
if ts.segments[i] {
end = i
break
}
}
encoder_id := len(ts.heads)
ts.heads = append(ts.heads, start)
ts.lock.RUnlock()
log.Printf(
"Starting transcode for %s (from %d to %d out of %d segments)",
ts.file.Path,
@ -92,6 +100,7 @@ func (ts *Stream) run(start int32, end int32) error {
ts.lock.Lock()
ts.segments[segment] = true
ts.heads[encoder_id] = segment
ts.lock.Unlock()
}
@ -128,3 +137,28 @@ func (ts *Stream) GetIndex(client string) (string, error) {
index += `#EXT-X-ENDLIST`
return index, nil
}
func (ts *Stream) GetSegment(segment int32, client string) (string, error) {
ts.lock.RLock()
ready := ts.segments[segment]
ts.lock.RUnlock()
if !ready {
// Only start a new encode if there is more than 10s between the current encoder and the segment.
if ts.getMinEncoderDistance(ts.file.Keyframes[segment]) > 10 {
err := ts.run(segment)
if err != nil {
return "", err
}
}
// TODO: wait for ready
}
return fmt.Sprintf(ts.getOutPath(), segment), nil
}
func (ts *Stream) getMinEncoderDistance(time float64) float64 {
ts.lock.RLock()
defer ts.lock.RUnlock()
distances := Map(ts.heads, func(i int32, _ int) float64 { return max(0, ts.file.Keyframes[i]-time) })
return slices.Min(distances)
}

View File

@ -97,7 +97,11 @@ func (t *Transcoder) GetVideoSegment(
segment int32,
client string,
) (string, error) {
return "", nil
stream, err := t.getFileStream(path)
if err != nil {
return "", err
}
return stream.GetVideoSegment(quality, segment, client)
}
func (t *Transcoder) GetAudioSegment(

View File

@ -16,8 +16,7 @@ func NewVideoStream(file *FileStream, quality Quality) (*VideoStream, error) {
},
quality: quality,
}
// Start the transcode up to the 100th segment (or less)
ret.run(0, Min(100, int32(len(file.Keyframes))))
ret.run(0)
return &ret, nil
}