From 5a1ace3e91f0ab9476de446b16f4c91ec9c6ca00 Mon Sep 17 00:00:00 2001 From: Francis Lavoie Date: Mon, 13 Apr 2026 05:44:03 -0400 Subject: [PATCH] Improved logging facilities --- modules/caddyhttp/reverseproxy/caddyfile.go | 41 +++++++++- .../caddyhttp/reverseproxy/reverseproxy.go | 80 ++++++++++++++++++- modules/caddyhttp/reverseproxy/streaming.go | 46 ++++++++--- .../caddyhttp/reverseproxy/streaming_test.go | 21 ++++- 4 files changed, 164 insertions(+), 24 deletions(-) diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index c75ecb55f..148bf4aa1 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -100,7 +100,11 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error) // stream_timeout // stream_close_delay // stream_retain_on_reload -// stream_log_skip_handshake +// stream_logs { +// level +// logger_name +// skip_handshake +// } // verbose_logs // // # request manipulation @@ -711,11 +715,42 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } h.StreamRetainOnReload = true - case "stream_log_skip_handshake": + case "stream_logs": if d.NextArg() { return d.ArgErr() } - h.StreamLogSkipHandshake = true + if h.StreamLogs == nil { + h.StreamLogs = new(StreamLogs) + } + + nesting := d.Nesting() + for d.NextBlock(nesting) { + switch d.Val() { + case "level": + if !d.NextArg() { + return d.ArgErr() + } + h.StreamLogs.Level = d.Val() + if d.NextArg() { + return d.ArgErr() + } + case "logger_name": + if !d.NextArg() { + return d.ArgErr() + } + h.StreamLogs.LoggerName = d.Val() + if d.NextArg() { + return d.ArgErr() + } + case "skip_handshake": + if d.NextArg() { + return d.ArgErr() + } + h.StreamLogs.SkipHandshake = true + default: + return d.Errf("unrecognized stream_logs option: %s", d.Val()) + } + } case "trusted_proxies": for d.NextArg() { diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index df731ce3d..da1e15643 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -193,10 +193,9 @@ type Handler struct { // connections are closed on reload (optionally delayed by stream_close_delay). StreamRetainOnReload bool `json:"stream_retain_on_reload,omitempty"` - // If true, suppresses the access log entry normally emitted when an - // upgraded stream handshake completes and the request unwinds. By default - // the handshake is still logged as a normal request with status 101. - StreamLogSkipHandshake bool `json:"stream_log_skip_handshake,omitempty"` + // Controls logging behavior for upgraded stream lifecycle events. + // If omitted, defaults are used (level=DEBUG, logger_name="http.handlers.reverse_proxy.stream"). + StreamLogs *StreamLogs `json:"stream_logs,omitempty"` // If configured, rewrites the copy of the upstream request. // Allows changing the request method and URI (path and query). @@ -259,8 +258,34 @@ type Handler struct { ctx caddy.Context logger *zap.Logger events *caddyevents.App + + streamLogLevel zapcore.Level + streamLogLoggerName string } +// StreamLogs controls logging for upgraded stream lifecycle events. +type StreamLogs struct { + // The minimum level at which stream lifecycle events are logged. + // Supported values are debug, info, warn, and error. Default: debug. + Level string `json:"level,omitempty"` + + // Logger name for stream lifecycle logs. Default: "http.handlers.reverse_proxy.stream". + // Special value "access" uses the access logger namespace and, if set, + // respects the first value in access_logger_names/log_name for the request. + LoggerName string `json:"logger_name,omitempty"` + + // If true, suppresses the access log entry normally emitted when an + // upgraded stream handshake completes and the request unwinds. + SkipHandshake bool `json:"skip_handshake,omitempty"` +} + +const ( + defaultStreamLogLevel = zapcore.DebugLevel + defaultStreamLoggerName = "http.handlers.reverse_proxy.stream" + streamLoggerNameUseAccess = "access" + defaultAccessLoggerBase = "http.log.access" +) + // CaddyModule returns the Caddy module information. func (Handler) CaddyModule() caddy.ModuleInfo { return caddy.ModuleInfo{ @@ -279,6 +304,20 @@ func (h *Handler) Provision(ctx caddy.Context) error { h.ctx = ctx h.logger = ctx.Logger() h.tunnel = newTunnelState(h.logger, time.Duration(h.StreamCloseDelay)) + h.streamLogLevel = defaultStreamLogLevel + h.streamLogLoggerName = defaultStreamLoggerName + if h.StreamLogs != nil { + if h.StreamLogs.Level != "" { + lvl, err := zapcore.ParseLevel(strings.ToLower(strings.TrimSpace(h.StreamLogs.Level))) + if err != nil { + return fmt.Errorf("invalid stream_logs.level %q: %w", h.StreamLogs.Level, err) + } + h.streamLogLevel = lvl + } + if name := strings.TrimSpace(h.StreamLogs.LoggerName); name != "" { + h.streamLogLoggerName = name + } + } // warn about unsafe buffering config if h.RequestBuffers == -1 || h.ResponseBuffers == -1 { @@ -447,6 +486,39 @@ func (h *Handler) Provision(ctx caddy.Context) error { return nil } +func (h Handler) streamLogsSkipHandshake() bool { + return h.StreamLogs != nil && h.StreamLogs.SkipHandshake +} + +func (h Handler) streamLoggerForRequest(req *http.Request) *zap.Logger { + name := strings.TrimSpace(h.streamLogLoggerName) + if name == "" { + name = defaultStreamLoggerName + } + + if name == streamLoggerNameUseAccess { + logger := caddy.Log().Named(defaultAccessLoggerBase) + names := caddyhttp.GetVar(req.Context(), caddyhttp.AccessLoggerNameVarKey) + namesSlice, ok := names.([]any) + if !ok { + return logger + } + for _, v := range namesSlice { + name, ok := v.(string) + if !ok { + continue + } + if name == "" { + return logger + } + return logger.Named(name) + } + return logger + } + + return caddy.Log().Named(name) +} + // Cleanup cleans up the resources made by h. func (h *Handler) Cleanup() error { if !h.StreamRetainOnReload { diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index d86796e58..9aece01e8 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -208,26 +208,31 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, } deleteFrontConn := tunnel.registerConnection(conn, gracefulClose(conn, false), upstreamAddr) deleteBackConn := tunnel.registerConnection(backConn, gracefulClose(backConn, true), upstreamAddr) - if h.StreamLogSkipHandshake { + if h.streamLogsSkipHandshake() { caddyhttp.SetVar(req.Context(), caddyhttp.LogSkipVar, true) } repl := req.Context().Value(caddy.ReplacerCtxKey).(*caddy.Replacer) repl.Set("http.reverse_proxy.upgraded", true) + streamUUID, _ := repl.GetString("http.request.uuid") + streamFields := makeStreamLogFields(streamUUID) + streamLogger := h.streamLoggerForRequest(req) + streamLevel := h.streamLogLevel finishMetrics := trackActiveStream(upstreamAddr) start := time.Now() if isH2 { - h.handleH2UpgradeTunnel(logger, wg, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics) + h.handleH2UpgradeTunnel(streamLogger, streamLevel, wg, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics, streamFields) } else { - h.handleDetachedUpgradeTunnel(logger, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics) + h.handleDetachedUpgradeTunnel(streamLogger, streamLevel, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics, streamFields) // Return immediately without touching wg. finalizeResponse's // wg.Wait() returns at once since wg was never incremented. } } func (h *Handler) handleH2UpgradeTunnel( - logger *zap.Logger, + streamLogger *zap.Logger, + streamLevel zapcore.Level, wg *sync.WaitGroup, conn io.ReadWriteCloser, backConn io.ReadWriteCloser, @@ -237,6 +242,7 @@ func (h *Handler) handleH2UpgradeTunnel( streamTimeout time.Duration, start time.Time, finishMetrics func(result string, duration time.Duration, toBackend, fromBackend int64), + streamFields []zap.Field, ) { // H2 extended connect: ServeHTTP must block because rw and req.Body are // only valid while the handler goroutine is running. Defers clean up @@ -275,12 +281,12 @@ func (h *Handler) handleH2UpgradeTunnel( select { case err := <-errc: result = classifyStreamResult(err) - if c := logger.Check(zapcore.DebugLevel, "streaming error"); c != nil { + if c := streamLogger.Check(streamLevel, "streaming error"); c != nil { c.Write(zap.Error(err)) } case t := <-timeoutc: result = "timeout" - if c := logger.Check(zapcore.DebugLevel, "stream timed out"); c != nil { + if c := streamLogger.Check(streamLevel, "stream timed out"); c != nil { c.Write(zap.Time("timeout", t)) } } @@ -292,17 +298,20 @@ func (h *Handler) handleH2UpgradeTunnel( wg.Wait() finishMetrics(result, time.Since(start), toBackend, fromBackend) - if c := logger.Check(zapcore.DebugLevel, "connection closed"); c != nil { - c.Write( + if c := streamLogger.Check(streamLevel, "connection closed"); c != nil { + fields := append([]zap.Field{}, streamFields...) + fields = append(fields, zap.Duration("duration", time.Since(start)), zap.Int64("bytes_to_backend", toBackend), zap.Int64("bytes_from_backend", fromBackend), ) + c.Write(fields...) } } func (h *Handler) handleDetachedUpgradeTunnel( - logger *zap.Logger, + streamLogger *zap.Logger, + streamLevel zapcore.Level, conn io.ReadWriteCloser, backConn io.ReadWriteCloser, deleteFrontConn func(), @@ -311,6 +320,7 @@ func (h *Handler) handleDetachedUpgradeTunnel( streamTimeout time.Duration, start time.Time, finishMetrics func(result string, duration time.Duration, toBackend, fromBackend int64), + streamFields []zap.Field, ) { // HTTP/1.1 hijacked connection: launch a detached goroutine so that // ServeHTTP can return immediately, allowing the Handler to be GC'd @@ -326,12 +336,14 @@ func (h *Handler) handleDetachedUpgradeTunnel( defer deleteFrontConn() defer func() { finishMetrics(result, time.Since(start), toBackend, fromBackend) - if c := logger.Check(zapcore.DebugLevel, "connection closed"); c != nil { - c.Write( + if c := streamLogger.Check(streamLevel, "connection closed"); c != nil { + fields := append([]zap.Field{}, streamFields...) + fields = append(fields, zap.Duration("duration", time.Since(start)), zap.Int64("bytes_to_backend", toBackend), zap.Int64("bytes_from_backend", fromBackend), ) + c.Write(fields...) } }() @@ -362,12 +374,12 @@ func (h *Handler) handleDetachedUpgradeTunnel( select { case err := <-errc: result = classifyStreamResult(err) - if c := logger.Check(zapcore.DebugLevel, "streaming error"); c != nil { + if c := streamLogger.Check(streamLevel, "streaming error"); c != nil { c.Write(zap.Error(err)) } case t := <-timeoutc: result = "timeout" - if c := logger.Check(zapcore.DebugLevel, "stream timed out"); c != nil { + if c := streamLogger.Check(streamLevel, "stream timed out"); c != nil { c.Write(zap.Time("timeout", t)) } } @@ -388,6 +400,14 @@ func classifyStreamResult(err error) string { return "error" } +func makeStreamLogFields(streamUUID string) []zap.Field { + fields := make([]zap.Field, 0, 1) + if streamUUID != "" { + fields = append(fields, zap.String("uuid", streamUUID)) + } + return fields +} + // flushInterval returns the p.FlushInterval value, conditionally // overriding its value for a specific request/response. func (h Handler) flushInterval(req *http.Request, res *http.Response) time.Duration { diff --git a/modules/caddyhttp/reverseproxy/streaming_test.go b/modules/caddyhttp/reverseproxy/streaming_test.go index d2441739a..e6a3ce3a1 100644 --- a/modules/caddyhttp/reverseproxy/streaming_test.go +++ b/modules/caddyhttp/reverseproxy/streaming_test.go @@ -199,10 +199,14 @@ func TestHandlerCleanupRetainModeClosesOnlyRemovedUpstreams(t *testing.T) { } } -func TestHandlerUnmarshalCaddyfileStreamLogSkipHandshake(t *testing.T) { +func TestHandlerUnmarshalCaddyfileStreamLogsBlock(t *testing.T) { d := caddyfile.NewTestDispenser(` reverse_proxy localhost:9000 { - stream_log_skip_handshake + stream_logs { + level info + logger_name access + skip_handshake + } } `) @@ -210,7 +214,16 @@ func TestHandlerUnmarshalCaddyfileStreamLogSkipHandshake(t *testing.T) { if err := h.UnmarshalCaddyfile(d); err != nil { t.Fatalf("UnmarshalCaddyfile() error = %v", err) } - if !h.StreamLogSkipHandshake { - t.Fatal("expected stream_log_skip_handshake to enable StreamLogSkipHandshake") + if h.StreamLogs == nil { + t.Fatal("expected stream_logs to be configured") + } + if h.StreamLogs.Level != "info" { + t.Fatalf("expected stream_logs.level=info, got %q", h.StreamLogs.Level) + } + if h.StreamLogs.LoggerName != "access" { + t.Fatalf("expected stream_logs.logger_name=access, got %q", h.StreamLogs.LoggerName) + } + if !h.StreamLogs.SkipHandshake { + t.Fatal("expected stream_logs.skip_handshake=true") } }