Fix a lot of bugs

This commit is contained in:
Zoe Roux 2024-01-15 23:08:37 +01:00
parent 9df5eb4758
commit 677cab6a57

View File

@ -2,14 +2,17 @@ package src
import ( import (
"bufio" "bufio"
"errors"
"fmt" "fmt"
"log" "log"
"math" "math"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
"slices" "slices"
"strings" "strings"
"sync" "sync"
"time"
) )
type StreamHandle interface { type StreamHandle interface {
@ -54,8 +57,9 @@ func NewStream(file *FileStream, handle StreamHandle) Stream {
// Remember to lock before calling this. // Remember to lock before calling this.
func (ts *Stream) isSegmentReady(segment int32) bool { func (ts *Stream) isSegmentReady(segment int32) bool {
select { select {
case _, ready := <-ts.segments[segment]: case <-ts.segments[segment]:
return ready // if the channel returned, it means it was closed
return true
default: default:
return false return false
} }
@ -64,7 +68,7 @@ func (ts *Stream) isSegmentReady(segment int32) bool {
func (ts *Stream) run(start int32) error { func (ts *Stream) run(start int32) error {
// Start the transcode up to the 100th segment (or less) // Start the transcode up to the 100th segment (or less)
// Stop at the first finished segment // Stop at the first finished segment
end := min(start+100, int32(len(ts.file.Keyframes))) end := min(start+100, int32(len(ts.file.Keyframes))-1)
ts.lock.RLock() ts.lock.RLock()
for i := start; i < end; i++ { for i := start; i < end; i++ {
if ts.isSegmentReady(i) { if ts.isSegmentReady(i) {
@ -86,10 +90,11 @@ func (ts *Stream) run(start int32) error {
len(ts.file.Keyframes), len(ts.file.Keyframes),
) )
// We do not need the first value (start of the transcode) segments := make([]string, end-start)
segments := make([]string, end-start-1)
for i := range segments { for i := range segments {
segments[i] = fmt.Sprintf("%.6f", ts.file.Keyframes[int(start)+i+1]) // We do not need the first value (start of the transcode)
time := ts.file.Keyframes[int(start)+i+1] - ts.file.Keyframes[start]
segments[i] = fmt.Sprintf("%.6f", time)
} }
segments_str := strings.Join(segments, ",") segments_str := strings.Join(segments, ",")
@ -101,11 +106,11 @@ func (ts *Stream) run(start int32) error {
args := []string{ args := []string{
"-nostats", "-hide_banner", "-loglevel", "warning", "-nostats", "-hide_banner", "-loglevel", "warning",
"-copyts",
"-ss", fmt.Sprintf("%.6f", ts.file.Keyframes[start]), "-ss", fmt.Sprintf("%.6f", ts.file.Keyframes[start]),
"-to", fmt.Sprintf("%.6f", ts.file.Keyframes[end]),
"-i", ts.file.Path, "-i", ts.file.Path,
"-to", fmt.Sprintf("%.6f", ts.file.Keyframes[end]),
"-copyts",
} }
args = append(args, ts.handle.getTranscodeArgs(segments_str)...) args = append(args, ts.handle.getTranscodeArgs(segments_str)...)
args = append(args, []string{ args = append(args, []string{
@ -139,20 +144,36 @@ func (ts *Stream) run(start int32) error {
go func() { go func() {
scanner := bufio.NewScanner(stdout) scanner := bufio.NewScanner(stdout)
format := filepath.Base(outpath)
should_stop := false
for scanner.Scan() { for scanner.Scan() {
var segment int32 var segment int32
_, _ = fmt.Sscanf(scanner.Text(), "segment-%d.ts", &segment) _, _ = fmt.Sscanf(scanner.Text(), format, &segment)
ts.lock.Lock() ts.lock.Lock()
close(ts.segments[segment])
ts.heads[encoder_id] = segment ts.heads[encoder_id] = segment
ts.lock.Unlock() log.Printf("encode %d finished %d", encoder_id, segment)
if ts.isSegmentReady(segment) {
if int32(len(ts.segments)) == segment+1 { // the current segment is already marked at done so another process has already gone up to here.
// file finished, ffmped will finish soon on it's own
} else if ts.isSegmentReady(segment + 1) {
// ask ffmpeg to stop gracefully (nicer cmd.Process.Kill())
cmd.Process.Signal(os.Interrupt) cmd.Process.Signal(os.Interrupt)
should_stop = true
} else {
close(ts.segments[segment])
if int32(len(ts.segments)) == segment+1 {
// file finished, ffmped will finish soon on it's own
should_stop = true
} else if ts.isSegmentReady(segment + 1) {
cmd.Process.Signal(os.Interrupt)
should_stop = true
}
}
ts.lock.Unlock()
// we need this and not a return in the condition because we want to unlock
// the lock (and can't defer since this is a loop)
if should_stop {
println("close because requested", encoder_id)
return
} }
} }
@ -188,8 +209,8 @@ func (ts *Stream) GetIndex(client string) (string, error) {
#EXT-X-MEDIA-SEQUENCE:0 #EXT-X-MEDIA-SEQUENCE:0
` `
for segment := 1; segment < len(ts.file.Keyframes); segment++ { for segment := 0; segment < len(ts.file.Keyframes)-1; segment++ {
index += fmt.Sprintf("#EXTINF:%.6f\n", ts.file.Keyframes[segment]-ts.file.Keyframes[segment-1]) index += fmt.Sprintf("#EXTINF:%.6f\n", ts.file.Keyframes[segment+1]-ts.file.Keyframes[segment])
index += fmt.Sprintf("segment-%d.ts\n", segment) index += fmt.Sprintf("segment-%d.ts\n", segment)
} }
index += `#EXT-X-ENDLIST` index += `#EXT-X-ENDLIST`
@ -199,34 +220,43 @@ func (ts *Stream) GetIndex(client string) (string, error) {
func (ts *Stream) GetSegment(segment int32, client string) (string, error) { func (ts *Stream) GetSegment(segment int32, client string) (string, error) {
ts.lock.RLock() ts.lock.RLock()
ready := ts.isSegmentReady(segment) ready := ts.isSegmentReady(segment)
// we want to calculate distance in the same lock else it can be funky
distance := 0.
if !ready {
distance = ts.getMinEncoderDistance(segment)
}
ts.lock.RUnlock() ts.lock.RUnlock()
if !ready { if !ready {
// Only start a new encode if there is more than 10s between the current encoder and the segment. // Only start a new encode if there is more than 10s between the current encoder and the segment.
if distance := ts.getMinEncoderDistance(ts.file.Keyframes[segment]); distance > 10 { if distance > 10_000 {
log.Printf("Creating new head for %d since closest head is %fs aways", segment, distance/1000)
err := ts.run(segment) err := ts.run(segment)
if err != nil { if err != nil {
return "", err return "", err
} }
} else { } else {
log.Printf("Waiting for segment %d since encoder head is %fs aways", segment, distance) log.Printf("Waiting for segment %d since encoder head is %fs aways", segment, distance/1000)
} }
ts.lock.RLock() ts.lock.RLock()
ready_chan := ts.segments[segment] ready_chan := ts.segments[segment]
ts.lock.RUnlock() ts.lock.RUnlock()
<-ready_chan select {
case <-ready_chan:
case <-time.After(10 * time.Second):
return "", errors.New("could not retrive the selected segment (timeout)")
}
} }
return fmt.Sprintf(ts.getOutPath(), segment), nil return fmt.Sprintf(ts.handle.getOutPath(), segment), nil
} }
func (ts *Stream) getMinEncoderDistance(time float64) float64 { func (ts *Stream) getMinEncoderDistance(segment int32) float64 {
ts.lock.RLock() time := ts.file.Keyframes[segment]
defer ts.lock.RUnlock()
distances := Map(ts.heads, func(i int32, _ int) float64 { distances := Map(ts.heads, func(i int32, _ int) float64 {
// ignore killed heads or heads after the current time // ignore killed heads or heads after the current time
if i < 0 || ts.file.Keyframes[i] > time { if i < 0 || ts.file.Keyframes[i] < time {
return math.Inf(1) return math.Inf(1)
} }
return ts.file.Keyframes[i] - time return ts.file.Keyframes[i] - time