Kyoo/transcoder/otel.go
2026-04-16 00:31:05 +02:00

314 lines
8.1 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"strconv"
"strings"
echootel "github.com/labstack/echo-opentelemetry"
"github.com/labstack/echo/v5"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
logotel "go.opentelemetry.io/otel/log"
logotelglobal "go.opentelemetry.io/otel/log/global"
logotelnoop "go.opentelemetry.io/otel/log/noop"
metricotel "go.opentelemetry.io/otel/metric"
metricotelnoop "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/propagation"
logsdk "go.opentelemetry.io/otel/sdk/log"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
traceotel "go.opentelemetry.io/otel/trace"
traceotelnoop "go.opentelemetry.io/otel/trace/noop"
logotelbridge "go.opentelemetry.io/contrib/bridges/otelslog"
attributeotel "go.opentelemetry.io/otel/attribute"
)
func newOtelBridgeHandler() slog.Handler {
bridgeHandler := logotelbridge.NewHandler(
"slog",
logotelbridge.WithLoggerProvider(logotelglobal.GetLoggerProvider()),
logotelbridge.WithAttributes(attributeotel.String("source", "slog")),
)
return &fullTextMessageHandler{next: bridgeHandler}
}
type fullTextMessageHandler struct {
next slog.Handler
attrs []slog.Attr
groups []string
}
func (h *fullTextMessageHandler) Enabled(ctx context.Context, level slog.Level) bool {
return h.next.Enabled(ctx, level)
}
func (h *fullTextMessageHandler) Handle(ctx context.Context, r slog.Record) error {
parts := make([]string, 0)
for _, attr := range h.attrs {
parts = appendAttrParts(parts, h.groups, attr)
}
r.Attrs(func(attr slog.Attr) bool {
parts = appendAttrParts(parts, h.groups, attr)
return true
})
message := r.Message
if len(parts) > 0 {
message = fmt.Sprintf("%s %s", message, strings.Join(parts, " "))
}
record := slog.NewRecord(r.Time, r.Level, message, r.PC)
r.Attrs(func(attr slog.Attr) bool {
record.AddAttrs(attr)
return true
})
return h.next.Handle(ctx, record)
}
func (h *fullTextMessageHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
combined := make([]slog.Attr, 0, len(h.attrs)+len(attrs))
combined = append(combined, h.attrs...)
combined = append(combined, attrs...)
groups := make([]string, len(h.groups))
copy(groups, h.groups)
return &fullTextMessageHandler{
next: h.next.WithAttrs(attrs),
attrs: combined,
groups: groups,
}
}
func (h *fullTextMessageHandler) WithGroup(name string) slog.Handler {
attrs := make([]slog.Attr, len(h.attrs))
copy(attrs, h.attrs)
groups := make([]string, 0, len(h.groups)+1)
groups = append(groups, h.groups...)
groups = append(groups, name)
return &fullTextMessageHandler{
next: h.next.WithGroup(name),
attrs: attrs,
groups: groups,
}
}
func appendAttrParts(parts []string, groups []string, attr slog.Attr) []string {
attr.Value = attr.Value.Resolve()
if attr.Equal(slog.Attr{}) {
return parts
}
if attr.Value.Kind() == slog.KindGroup {
nextGroups := groups
if attr.Key != "" {
nextGroups = append(append([]string(nil), groups...), attr.Key)
}
for _, nestedAttr := range attr.Value.Group() {
parts = appendAttrParts(parts, nextGroups, nestedAttr)
}
return parts
}
keyParts := groups
if attr.Key != "" {
keyParts = append(append([]string(nil), groups...), attr.Key)
}
key := strings.Join(keyParts, ".")
if key == "" {
return parts
}
return append(parts, fmt.Sprintf("%s=%s", key, formatAttrValue(attr.Value)))
}
func formatAttrValue(value slog.Value) string {
switch value.Kind() {
case slog.KindString:
stringValue := value.String()
if strings.ContainsAny(stringValue, " \t\n\r\"=") {
return strconv.Quote(stringValue)
}
return stringValue
default:
return value.String()
}
}
func setupOtel(ctx context.Context) (func(context.Context) error, error) {
res, err := resource.New(
ctx,
resource.WithAttributes(semconv.ServiceNameKey.String("kyoo.transcoder")),
resource.WithFromEnv(),
resource.WithTelemetrySDK(),
resource.WithOS(),
resource.WithContainer(),
resource.WithHost(),
)
if err != nil {
return nil, err
}
slog.InfoContext(ctx, "Configuring OTEL")
otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
),
)
var le logsdk.Exporter
var me metricsdk.Exporter
var te tracesdk.SpanExporter
switch {
case strings.TrimSpace(os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT")) == "":
slog.InfoContext(ctx, "Using OLTP type", "type", "noop")
le = nil
me = nil
te = nil
case strings.ToLower(strings.TrimSpace(os.Getenv("OTEL_EXPORTER_OTLP_PROTOCOL"))) == "grpc":
slog.InfoContext(ctx, "Using OLTP type", "type", "grpc")
le, err = otlploggrpc.New(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed setting up OLTP", "err", err)
return nil, err
}
me, err = otlpmetricgrpc.New(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed setting up OLTP", "err", err)
return nil, err
}
te, err = otlptracegrpc.New(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed setting up OLTP", "err", err)
return nil, err
}
default:
slog.InfoContext(ctx, "Using OLTP type", "type", "http")
le, err = otlploghttp.New(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed setting up OLTP", "err", err)
return nil, err
}
me, err = otlpmetrichttp.New(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed setting up OLTP", "err", err)
return nil, err
}
te, err = otlptracehttp.New(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed setting up OLTP", "err", err)
return nil, err
}
}
if err != nil {
return nil, err
}
// default to noop providers
var lp logotel.LoggerProvider = logotelnoop.NewLoggerProvider()
var mp metricotel.MeterProvider = metricotelnoop.NewMeterProvider()
var tp traceotel.TracerProvider = traceotelnoop.NewTracerProvider()
// use exporter if configured
if le != nil {
lp = logsdk.NewLoggerProvider(
logsdk.WithProcessor(logsdk.NewBatchProcessor(le)),
logsdk.WithResource(res),
)
}
if me != nil {
mp = metricsdk.NewMeterProvider(
metricsdk.WithReader(
metricsdk.NewPeriodicReader(me),
),
metricsdk.WithResource(res),
)
}
if te != nil {
tp = tracesdk.NewTracerProvider(
tracesdk.WithBatcher(te),
tracesdk.WithResource(res),
)
}
// set providers
logotelglobal.SetLoggerProvider(lp)
otel.SetMeterProvider(mp)
otel.SetTracerProvider(tp)
// configure shutting down
// noop providers do not have a Shudown method
log_shutdown := func(ctx context.Context) error {
if otelprovider, ok := lp.(*logsdk.LoggerProvider); ok && otelprovider != nil {
return otelprovider.Shutdown(ctx)
}
return nil
}
metric_shutdown := func(ctx context.Context) error {
if otelprovider, ok := mp.(*metricsdk.MeterProvider); ok && otelprovider != nil {
return otelprovider.Shutdown(ctx)
}
return nil
}
trace_shutdown := func(ctx context.Context) error {
if otelprovider, ok := tp.(*tracesdk.TracerProvider); ok && otelprovider != nil {
return otelprovider.Shutdown(ctx)
}
return nil
}
return func(ctx context.Context) error {
slog.InfoContext(ctx, "Shutting down OTEL")
// run shutdowns and collect errors
var errs []error
if err := trace_shutdown(ctx); err != nil {
errs = append(errs, err)
}
if err := metric_shutdown(ctx); err != nil {
errs = append(errs, err)
}
if err := log_shutdown(ctx); err != nil {
errs = append(errs, err)
}
if len(errs) == 0 {
return nil
}
return errors.Join(errs...)
}, nil
}
func instrument(e *echo.Echo) {
e.Use(echootel.NewMiddlewareWithConfig(echootel.Config{
ServerName: "kyoo.transcoder",
Skipper: func(c *echo.Context) bool {
return (c.Path() == "/video/health" ||
c.Path() == "/video/ready")
},
}))
}