Add a kill command on streams

This commit is contained in:
Zoe Roux 2024-01-15 15:32:13 +01:00
parent 5f33172297
commit aef30fecaa
2 changed files with 31 additions and 8 deletions

View File

@ -2,7 +2,6 @@ package src
import ( import (
"bufio" "bufio"
"context"
"fmt" "fmt"
"log" "log"
"math" "math"
@ -29,9 +28,9 @@ type Stream struct {
// //
segments []chan (struct{}) segments []chan (struct{})
heads []int32 heads []int32
commands []*exec.Cmd
// the lock used for the segments array and the heads // the lock used for the segments array and the heads
lock sync.RWMutex lock sync.RWMutex
ctx context.Context
} }
func (ts *Stream) run(start int32) error { func (ts *Stream) run(start int32) error {
@ -47,6 +46,8 @@ func (ts *Stream) run(start int32) error {
} }
encoder_id := len(ts.heads) encoder_id := len(ts.heads)
ts.heads = append(ts.heads, start) ts.heads = append(ts.heads, start)
// we set nil while the command has not started, this is just to reserve the index
ts.commands = append(ts.commands, nil)
ts.lock.RUnlock() ts.lock.RUnlock()
log.Printf( log.Printf(
@ -84,11 +85,7 @@ func (ts *Stream) run(start int32) error {
ts.getOutPath(), ts.getOutPath(),
}...) }...)
cmd := exec.CommandContext( cmd := exec.Command("ffmpeg", args...)
ts.ctx,
"ffmpeg",
args...,
)
log.Printf("Running %s", strings.Join(cmd.Args, " ")) log.Printf("Running %s", strings.Join(cmd.Args, " "))
stdout, err := cmd.StdoutPipe() stdout, err := cmd.StdoutPipe()
@ -98,6 +95,14 @@ func (ts *Stream) run(start int32) error {
var stderr strings.Builder var stderr strings.Builder
cmd.Stderr = &stderr cmd.Stderr = &stderr
err = cmd.Start()
if err != nil {
return err
}
ts.lock.Lock()
ts.commands[encoder_id] = cmd
ts.lock.Unlock()
go func() { go func() {
scanner := bufio.NewScanner(stdout) scanner := bufio.NewScanner(stdout)
for scanner.Scan() { for scanner.Scan() {
@ -134,6 +139,7 @@ func (ts *Stream) run(start int32) error {
defer ts.lock.Unlock() defer ts.lock.Unlock()
// we can't delete the head directly because it would invalidate the others encoder_id // we can't delete the head directly because it would invalidate the others encoder_id
ts.heads[encoder_id] = -1 ts.heads[encoder_id] = -1
ts.commands[encoder_id] = nil
}() }()
return nil return nil
@ -191,3 +197,15 @@ func (ts *Stream) getMinEncoderDistance(time float64) float64 {
}) })
return slices.Min(distances) return slices.Min(distances)
} }
func (ts *Stream) Kill() {
ts.lock.Lock()
defer ts.lock.Unlock()
for _, cmd := range ts.commands {
if cmd == nil {
continue
}
cmd.Process.Signal(os.Interrupt)
}
}

View File

@ -1,6 +1,9 @@
package src package src
import "fmt" import (
"fmt"
"os/exec"
)
type VideoStream struct { type VideoStream struct {
Stream Stream
@ -13,6 +16,8 @@ func NewVideoStream(file *FileStream, quality Quality) (*VideoStream, error) {
file: file, file: file,
Clients: make([]string, 4), Clients: make([]string, 4),
segments: make([]chan struct{}, len(file.Keyframes)), segments: make([]chan struct{}, len(file.Keyframes)),
heads: make([]int32, 1),
commands: make([]*exec.Cmd, 1),
}, },
quality: quality, quality: quality,
} }