diff --git a/transcoder/go.mod b/transcoder/go.mod index 23412e76..881f4caa 100644 --- a/transcoder/go.mod +++ b/transcoder/go.mod @@ -15,19 +15,21 @@ require ( github.com/swaggo/echo-swagger v1.4.1 github.com/swaggo/swag v1.16.6 gitlab.com/opennota/screengen v1.0.2 + go.opentelemetry.io/contrib/bridges/otelslog v0.13.0 go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.63.0 go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.14.0 go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.14.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.38.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 go.opentelemetry.io/otel/log v0.14.0 + go.opentelemetry.io/otel/metric v1.38.0 go.opentelemetry.io/otel/sdk v1.38.0 go.opentelemetry.io/otel/sdk/log v0.14.0 go.opentelemetry.io/otel/sdk/metric v1.38.0 + go.opentelemetry.io/otel/trace v1.38.0 golang.org/x/sync v0.18.0 gopkg.in/vansante/go-ffprobe.v2 v2.2.1 ) @@ -59,8 +61,7 @@ require ( github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/swaggo/files/v2 v2.0.2 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/otel/metric v1.38.0 // indirect - go.opentelemetry.io/otel/trace v1.38.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/mod v0.30.0 // indirect diff --git a/transcoder/go.sum b/transcoder/go.sum index 88611a14..d85a649c 100644 --- a/transcoder/go.sum +++ b/transcoder/go.sum @@ -214,6 +214,8 @@ gitlab.com/opennota/screengen v1.0.2 h1:GxYTJdAPEzmg5v5CV4dgn45JVW+EcXXAvCxhE7w6 gitlab.com/opennota/screengen v1.0.2/go.mod h1:4kED4yriw2zslwYmXFCa5qCvEKwleBA7l5OE+d94NTU= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/bridges/otelslog v0.13.0 h1:bwnLpizECbPr1RrQ27waeY2SPIPeccCx/xLuoYADZ9s= +go.opentelemetry.io/contrib/bridges/otelslog v0.13.0/go.mod h1:3nWlOiiqA9UtUnrcNk82mYasNxD8ehOspL0gOfEo6Y4= go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.63.0 h1:6YeICKmGrvgJ5th4+OMNpcuoB6q/Xs8gt0YCO7MUv1k= go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.63.0/go.mod h1:ZEA7j2B35siNV0T00aapacNzjz4tvOlNoHp0ncCfwNQ= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 h1:F7Jx+6hwnZ41NSFTO5q4LYDtJRXBf2PD0rNBkeB/lus= diff --git a/transcoder/main.go b/transcoder/main.go index c1926cdc..25e0c4b9 100644 --- a/transcoder/main.go +++ b/transcoder/main.go @@ -4,8 +4,11 @@ import ( "context" "errors" "fmt" + "log/slog" "net/http" "slices" + "sort" + "strings" _ "github.com/zoriya/kyoo/transcoder/docs" @@ -88,17 +91,71 @@ func RequireCorePlayPermission(next echo.HandlerFunc) echo.HandlerFunc { // @in header // @name Authorization func main() { - e := echo.New() - e.Use(middleware.Logger()) - e.GET("/video/swagger/*", echoSwagger.WrapHandler) - e.HTTPErrorHandler = ErrorHandler + ctx := context.Background() - cleanup, err := setupOtel(e) + logger, _, err := SetupLogger(ctx) if err != nil { - e.Logger.Fatal("Failed to setup otel: ", err) + slog.Error("logger init", "err", err) + } + + cleanup, err := setupOtel(ctx) + if err != nil { + slog.Error("Failed to setup otel: ", "err", err) return } - defer cleanup() + defer cleanup(ctx) + + e := echo.New() + e.HideBanner = true + e.Logger.SetOutput(logger) + instrument(e) + + ignorepath := []string{ + "/health", + "/ready", + } + slog.Info("Skipping request logging for these paths", "paths", func() string { sort.Strings(ignorepath); return strings.Join(ignorepath, ",") }()) + + // using example from https://echo.labstack.com/docs/middleware/logger#examples + // full configs https://github.com/labstack/echo/blob/master/middleware/request_logger.go + e.Use(middleware.RequestLoggerWithConfig(middleware.RequestLoggerConfig{ + // declare a small set of paths to ignore + Skipper: func(c echo.Context) bool { + p := c.Request().URL.Path + return slices.Contains(ignorepath, p) + }, + LogStatus: true, + LogURI: true, + LogError: true, + LogHost: true, + LogMethod: true, + LogUserAgent: true, + HandleError: true, // forwards error to the global error handler, so it can decide appropriate status code + LogValuesFunc: func(c echo.Context, v middleware.RequestLoggerValues) error { + if v.Error == nil { + logger.LogAttrs(ctx, slog.LevelInfo, "web_request", + slog.String("method", v.Method), + slog.Int("status", v.Status), + slog.String("host", v.Host), + slog.String("uri", v.URI), + slog.String("agent", v.UserAgent), + ) + } else { + logger.LogAttrs(ctx, slog.LevelError, "web_request_error", + slog.String("method", v.Method), + slog.Int("status", v.Status), + slog.String("host", v.Host), + slog.String("uri", v.URI), + slog.String("agent", v.UserAgent), + slog.String("err", v.Error.Error()), + ) + } + return nil + }, + })) + + e.GET("/video/swagger/*", echoSwagger.WrapHandler) + e.HTTPErrorHandler = ErrorHandler metadata, err := src.NewMetadataService() if err != nil { diff --git a/transcoder/otel.go b/transcoder/otel.go index e2f669d0..98cf84a5 100644 --- a/transcoder/otel.go +++ b/transcoder/otel.go @@ -2,7 +2,10 @@ package main import ( "context" + "errors" + "log/slog" "os" + "strings" "github.com/labstack/echo/v4" "go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho" @@ -11,24 +14,23 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" - "go.opentelemetry.io/otel/log/global" - "go.opentelemetry.io/otel/sdk/log" - "go.opentelemetry.io/otel/sdk/metric" + logotel "go.opentelemetry.io/otel/log" + logotelglobal "go.opentelemetry.io/otel/log/global" + logotelnoop "go.opentelemetry.io/otel/log/noop" + metricotel "go.opentelemetry.io/otel/metric" + metricotelnoop "go.opentelemetry.io/otel/metric/noop" + logsdk "go.opentelemetry.io/otel/sdk/log" + metricsdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" + tracesdk "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + traceotel "go.opentelemetry.io/otel/trace" + traceotelnoop "go.opentelemetry.io/otel/trace/noop" ) -func setupOtel(e *echo.Echo) (func(), error) { - ctx := context.Background() - proto := os.Getenv("OTEL_EXPORTER_OTLP_PROTOCOL") - if proto == "" { - proto = "http/protobuf" - } - +func setupOtel(ctx context.Context) (func(context.Context) error, error) { res, err := resource.New( ctx, resource.WithAttributes(semconv.ServiceNameKey.String("kyoo.transcoder")), @@ -43,60 +45,137 @@ func setupOtel(e *echo.Echo) (func(), error) { return nil, err } - var le log.Exporter - if proto == "http/protobuf" { - le, err = otlploghttp.New(ctx) - } else { + slog.Info("Configuring OTEL") + + var le logsdk.Exporter + var me metricsdk.Exporter + var te tracesdk.SpanExporter + switch { + case strings.TrimSpace(os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT")) == "": + slog.Info("Using OLTP type", "type", "noop") + le = nil + me = nil + te = nil + case strings.ToLower(strings.TrimSpace(os.Getenv("OTEL_EXPORTER_OTLP_PROTOCOL"))) == "grpc": + slog.Info("Using OLTP type", "type", "grpc") le, err = otlploggrpc.New(ctx) + if err != nil { + slog.Error("Failed setting up OLTP: ", err) + return nil, err + } + me, err = otlpmetricgrpc.New(ctx) + if err != nil { + slog.Error("Failed setting up OLTP: ", err) + return nil, err + } + te, err = otlptracegrpc.New(ctx) + if err != nil { + slog.Error("Failed setting up OLTP: ", err) + return nil, err + } + default: + slog.Info("Using OLTP type", "type", "http") + le, err = otlploghttp.New(ctx) + if err != nil { + slog.Error("Failed setting up OLTP: ", err) + return nil, err + } + me, err = otlpmetrichttp.New(ctx) + if err != nil { + slog.Error("Failed setting up OLTP: ", err) + return nil, err + } + te, err = otlptracehttp.New(ctx) + if err != nil { + slog.Error("Failed setting up OLTP: ", err) + return nil, err + } } if err != nil { return nil, err } - lp := log.NewLoggerProvider( - log.WithProcessor(log.NewBatchProcessor(le)), - log.WithResource(res), - ) - global.SetLoggerProvider(lp) - var me metric.Exporter - if proto == "http/protobuf" { - me, err = otlpmetrichttp.New(ctx) - } else { - me, err = otlpmetricgrpc.New(ctx) + // default to noop providers + var lp logotel.LoggerProvider = logotelnoop.NewLoggerProvider() + var mp metricotel.MeterProvider = metricotelnoop.NewMeterProvider() + var tp traceotel.TracerProvider = traceotelnoop.NewTracerProvider() + + // use exporter if configured + if le != nil { + lp = logsdk.NewLoggerProvider( + logsdk.WithProcessor(logsdk.NewBatchProcessor(le)), + logsdk.WithResource(res), + ) } - if err != nil { - return func() {}, err + + if me != nil { + mp = metricsdk.NewMeterProvider( + metricsdk.WithReader( + metricsdk.NewPeriodicReader(me), + ), + metricsdk.WithResource(res), + ) } - mp := metric.NewMeterProvider( - metric.WithReader( - metric.NewPeriodicReader(me), - ), - metric.WithResource(res), - ) + + if te != nil { + tp = tracesdk.NewTracerProvider( + tracesdk.WithBatcher(te), + tracesdk.WithResource(res), + ) + } + + // set providers + logotelglobal.SetLoggerProvider(lp) otel.SetMeterProvider(mp) - - var te *otlptrace.Exporter - if proto == "http/protobuf" { - te, err = otlptracehttp.New(ctx) - } else { - te, err = otlptracegrpc.New(ctx) - } - if err != nil { - return func() {}, err - } - tp := trace.NewTracerProvider( - trace.WithBatcher(te), - trace.WithResource(res), - ) otel.SetTracerProvider(tp) + // configure shutting down + // noop providers do not have a Shudown method + log_shutdown := func(ctx context.Context) error { + if otelprovider, ok := lp.(*logsdk.LoggerProvider); ok && otelprovider != nil { + return otelprovider.Shutdown(ctx) + } + return nil + } + + metric_shutdown := func(ctx context.Context) error { + if otelprovider, ok := mp.(*metricsdk.MeterProvider); ok && otelprovider != nil { + return otelprovider.Shutdown(ctx) + } + return nil + } + + trace_shutdown := func(ctx context.Context) error { + if otelprovider, ok := tp.(*tracesdk.TracerProvider); ok && otelprovider != nil { + return otelprovider.Shutdown(ctx) + } + return nil + } + + return func(ctx context.Context) error { + slog.Info("Shutting down OTEL") + + // run shutdowns and collect errors + var errs []error + if err := trace_shutdown(ctx); err != nil { + errs = append(errs, err) + } + if err := metric_shutdown(ctx); err != nil { + errs = append(errs, err) + } + if err := log_shutdown(ctx); err != nil { + errs = append(errs, err) + } + + if len(errs) == 0 { + return nil + } + return errors.Join(errs...) + }, nil +} + +func instrument(e *echo.Echo) { e.Use(otelecho.Middleware("kyoo.transcoder", otelecho.WithSkipper(func(c echo.Context) bool { return c.Path() == "/video/health" || c.Path() == "/video/ready" }))) - - return func() { - lp.Shutdown(ctx) - mp.Shutdown(ctx) - tp.Shutdown(ctx) - }, nil } diff --git a/transcoder/slog.go b/transcoder/slog.go new file mode 100644 index 00000000..7e57b33e --- /dev/null +++ b/transcoder/slog.go @@ -0,0 +1,111 @@ +package main + +import ( + "context" + "log/slog" + "os" + "strings" + + logotelbridge "go.opentelemetry.io/contrib/bridges/otelslog" + logotelglobal "go.opentelemetry.io/otel/log/global" +) + +type SlogAdapter struct { + *slog.Logger +} + +// add Write so SlogAdapter satisfies io.Writer (Echo's logger output) +func (a *SlogAdapter) Write(p []byte) (int, error) { + msg := strings.TrimSpace(string(p)) + // Echo middleware writes request lines at INFO; use Info here. + a.Info(msg) + return len(p), nil +} + +type tee struct { + a, b slog.Handler + minA slog.Level + minB slog.Level +} + +// a = stdout +// b = otel +// minA = minimum level for stdout +// minB = minimum level for otel (from OTEL_LOG_LEVEL) +func NewTee(a, b slog.Handler, minA, minB slog.Level) slog.Handler { + return &tee{a: a, b: b, minA: minA, minB: minB} +} + +func (t *tee) Enabled(ctx context.Context, level slog.Level) bool { + if (t.minA == 0 || level >= t.minA) && t.a.Enabled(ctx, level) { + return true + } + if (t.minB == 0 || level >= t.minB) && t.b.Enabled(ctx, level) { + return true + } + return false +} + +func (t *tee) Handle(ctx context.Context, r slog.Record) error { + if t.minA == 0 || r.Level >= t.minA { + if err := t.a.Handle(ctx, r); err != nil { + return err + } + } + if t.minB == 0 || r.Level >= t.minB { + return t.b.Handle(ctx, r) + } + return nil +} + +func (t *tee) WithAttrs(attrs []slog.Attr) slog.Handler { + return NewTee(t.a.WithAttrs(attrs), t.b.WithAttrs(attrs), t.minA, t.minB) +} + +func (t *tee) WithGroup(name string) slog.Handler { + return NewTee(t.a.WithGroup(name), t.b.WithGroup(name), t.minA, t.minB) +} + +func SetupLogger(ctx context.Context) (*SlogAdapter, func(context.Context) error, error) { + stdout := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { + // drop the default time attribute so text output has no timestamp + if a.Key == "time" { + return slog.Attr{} + } + return a + }, + }) + otelHandler := logotelbridge.NewHandler("slog", logotelbridge.WithLoggerProvider(logotelglobal.GetLoggerProvider())) + + minStdout := parseLogLevel(os.Getenv("STDOUT_LOG_LEVEL")) + minOtel := parseLogLevel(os.Getenv("OTEL_LOG_LEVEL")) + + handler := NewTee(stdout, otelHandler, minStdout, minOtel) + + logger := slog.New(handler) + adapter := &SlogAdapter{logger} + shutdown := func(ctx context.Context) error { return nil } + + slog.SetDefault(adapter.Logger) + return adapter, shutdown, nil +} + +func parseLogLevel(v string) slog.Level { + v = strings.ToUpper(strings.TrimSpace(v)) + if v == "" { + return slog.LevelInfo + } + m := map[string]slog.Level{ + "TRACE": slog.LevelDebug, + "DEBUG": slog.LevelDebug, + "INFO": slog.LevelInfo, + "WARN": slog.LevelWarn, + "WARNING": slog.LevelWarn, + "ERROR": slog.LevelError, + } + if lv, ok := m[v]; ok { + return lv + } + return slog.LevelInfo +} diff --git a/transcoder/src/metadata.go b/transcoder/src/metadata.go index 2ff6653a..9e7f739e 100644 --- a/transcoder/src/metadata.go +++ b/transcoder/src/metadata.go @@ -8,6 +8,7 @@ import ( "os" "os/user" "strings" + "log/slog" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -116,7 +117,7 @@ func (s *MetadataService) setupDb() (*pgxpool.Pool, error) { return nil, err } - fmt.Println("Migrating database") + slog.Info("Database migration state", "state", "starting") dbi := stdlib.OpenDBFromPool(db) defer dbi.Close() @@ -132,7 +133,7 @@ func (s *MetadataService) setupDb() (*pgxpool.Pool, error) { return nil, err } m.Up() - fmt.Println("Migrating finished") + slog.Info("Database migration state", "state", "completed") return db, nil }