diff --git a/transcoder/src/exec/exec.go b/transcoder/src/exec/exec.go new file mode 100644 index 00000000..8e4ad610 --- /dev/null +++ b/transcoder/src/exec/exec.go @@ -0,0 +1,145 @@ +package exec + +import ( + "context" + "errors" + "fmt" + osexec "os/exec" + "path/filepath" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + traceotel "go.opentelemetry.io/otel/trace" +) + +var ( + ErrDot = osexec.ErrDot + ErrNotFound = osexec.ErrNotFound + ErrWaitDelay = osexec.ErrWaitDelay +) + +type Error = osexec.Error +type ExitError = osexec.ExitError + +var tracer = otel.Tracer("kyoo.transcoder.cli") + +type Cmd struct { + *osexec.Cmd + span traceotel.Span + spanCtx context.Context + spanned bool + spanDone bool +} + +func LookPath(file string) (string, error) { + return osexec.LookPath(file) +} + +func Command(name string, arg ...string) *Cmd { + return wrap(context.Background(), osexec.Command(name, arg...)) +} + +func CommandContext(ctx context.Context, name string, arg ...string) *Cmd { + if ctx == nil { + ctx = context.Background() + } + return wrap(ctx, osexec.CommandContext(ctx, name, arg...)) +} + +func wrap(ctx context.Context, cmd *osexec.Cmd) *Cmd { + if ctx == nil { + ctx = context.Background() + } + return &Cmd{Cmd: cmd, spanCtx: ctx} +} + +func (c *Cmd) Run() error { + c.startSpan() + err := c.Cmd.Run() + c.endSpan(err) + return err +} + +func (c *Cmd) Output() ([]byte, error) { + c.startSpan() + output, err := c.Cmd.Output() + c.endSpan(err) + return output, err +} + +func (c *Cmd) CombinedOutput() ([]byte, error) { + c.startSpan() + output, err := c.Cmd.CombinedOutput() + c.endSpan(err) + return output, err +} + +func (c *Cmd) Start() error { + c.startSpan() + err := c.Cmd.Start() + if err != nil { + c.endSpan(err) + } + return err +} + +func (c *Cmd) Wait() error { + err := c.Cmd.Wait() + c.endSpan(err) + return err +} + +func (c *Cmd) startSpan() { + if c == nil || c.spanned { + return + } + + ctx := c.spanCtx + if ctx == nil { + ctx = context.Background() + } + + attrs := []attribute.KeyValue{ + attribute.String("process.command", c.Path), + } + if len(c.Args) > 1 { + attrs = append(attrs, attribute.StringSlice("process.command_args", c.Args[1:])) + } + if c.Dir != "" { + attrs = append(attrs, attribute.String("process.working_directory", c.Dir)) + } + + _, span := tracer.Start( + ctx, + fmt.Sprintf("exec %s", filepath.Base(c.Path)), + traceotel.WithAttributes(attrs...), + ) + c.span = span + c.spanned = true + c.spanDone = false +} + +func (c *Cmd) endSpan(err error) { + if c == nil || !c.spanned || c.spanDone || c.span == nil { + return + } + + if err == nil { + c.span.SetAttributes(attribute.Int("process.exit.code", 0)) + c.span.SetStatus(codes.Ok, "") + c.span.End() + c.spanDone = true + return + } + + c.span.RecordError(err) + + if exitErr, ok := errors.AsType[*osexec.ExitError](err); ok { + c.span.SetAttributes(attribute.Int("process.exit.code", exitErr.ExitCode())) + } + + c.span.SetStatus(codes.Error, err.Error()) + c.span.End() + c.spanDone = true +} diff --git a/transcoder/src/extract.go b/transcoder/src/extract.go index 1d5634a3..a4dc358c 100644 --- a/transcoder/src/extract.go +++ b/transcoder/src/extract.go @@ -6,10 +6,10 @@ import ( "io" "log" "os" - "os/exec" "path/filepath" "strings" + "github.com/zoriya/kyoo/transcoder/src/exec" "github.com/zoriya/kyoo/transcoder/src/storage" "github.com/zoriya/kyoo/transcoder/src/utils" ) diff --git a/transcoder/src/fingerprints.go b/transcoder/src/fingerprints.go index 83f7cd6f..61693b31 100644 --- a/transcoder/src/fingerprints.go +++ b/transcoder/src/fingerprints.go @@ -5,9 +5,9 @@ import ( "encoding/binary" "errors" "fmt" - "os/exec" "github.com/jackc/pgx/v5" + "github.com/zoriya/kyoo/transcoder/src/exec" "github.com/zoriya/kyoo/transcoder/src/utils" ) diff --git a/transcoder/src/keyframes.go b/transcoder/src/keyframes.go index 94d4038b..1cf47714 100644 --- a/transcoder/src/keyframes.go +++ b/transcoder/src/keyframes.go @@ -6,12 +6,12 @@ import ( "errors" "fmt" "log" - "os/exec" "strconv" "strings" "sync" "github.com/jackc/pgx/v5/pgtype" + "github.com/zoriya/kyoo/transcoder/src/exec" "github.com/zoriya/kyoo/transcoder/src/utils" ) @@ -201,6 +201,8 @@ func getVideoKeyframes(path string, video_idx uint32, kf *Keyframe) error { if err != nil { return err } + // we don't care about the result but await it for tracess. + go cmd.Wait() scanner := bufio.NewScanner(stdout) @@ -322,6 +324,8 @@ func getAudioKeyframes(info *MediaInfo, audio_idx uint32, kf *Keyframe) error { if err != nil { return err } + // we don't care about the result but await it for tracess. + go cmd.Wait() scanner := bufio.NewScanner(stdout) var duration float64 diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index 69e016ec..cc7dab97 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -7,12 +7,13 @@ import ( "log" "math" "os" - "os/exec" "path/filepath" "slices" "strings" "sync" "time" + + "github.com/zoriya/kyoo/transcoder/src/exec" ) type Flags int32