diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index 72da3ab2..8bb04e8d 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -2,7 +2,6 @@ package src import ( "bufio" - "context" "fmt" "log" "math" @@ -29,9 +28,9 @@ type Stream struct { // segments []chan (struct{}) heads []int32 + commands []*exec.Cmd // the lock used for the segments array and the heads lock sync.RWMutex - ctx context.Context } func (ts *Stream) run(start int32) error { @@ -47,6 +46,8 @@ func (ts *Stream) run(start int32) error { } encoder_id := len(ts.heads) ts.heads = append(ts.heads, start) + // we set nil while the command has not started, this is just to reserve the index + ts.commands = append(ts.commands, nil) ts.lock.RUnlock() log.Printf( @@ -84,11 +85,7 @@ func (ts *Stream) run(start int32) error { ts.getOutPath(), }...) - cmd := exec.CommandContext( - ts.ctx, - "ffmpeg", - args..., - ) + cmd := exec.Command("ffmpeg", args...) log.Printf("Running %s", strings.Join(cmd.Args, " ")) stdout, err := cmd.StdoutPipe() @@ -98,6 +95,14 @@ func (ts *Stream) run(start int32) error { var stderr strings.Builder cmd.Stderr = &stderr + err = cmd.Start() + if err != nil { + return err + } + ts.lock.Lock() + ts.commands[encoder_id] = cmd + ts.lock.Unlock() + go func() { scanner := bufio.NewScanner(stdout) for scanner.Scan() { @@ -134,6 +139,7 @@ func (ts *Stream) run(start int32) error { defer ts.lock.Unlock() // we can't delete the head directly because it would invalidate the others encoder_id ts.heads[encoder_id] = -1 + ts.commands[encoder_id] = nil }() return nil @@ -191,3 +197,15 @@ func (ts *Stream) getMinEncoderDistance(time float64) float64 { }) return slices.Min(distances) } + +func (ts *Stream) Kill() { + ts.lock.Lock() + defer ts.lock.Unlock() + + for _, cmd := range ts.commands { + if cmd == nil { + continue + } + cmd.Process.Signal(os.Interrupt) + } +} diff --git a/transcoder/src/videostream.go b/transcoder/src/videostream.go index 901b1092..0a660101 100644 --- a/transcoder/src/videostream.go +++ b/transcoder/src/videostream.go @@ -1,6 +1,9 @@ package src -import "fmt" +import ( + "fmt" + "os/exec" +) type VideoStream struct { Stream @@ -13,6 +16,8 @@ func NewVideoStream(file *FileStream, quality Quality) (*VideoStream, error) { file: file, Clients: make([]string, 4), segments: make([]chan struct{}, len(file.Keyframes)), + heads: make([]int32, 1), + commands: make([]*exec.Cmd, 1), }, quality: quality, }