Improved logging facilities

This commit is contained in:
Francis Lavoie 2026-04-13 05:44:03 -04:00
parent de8e430015
commit 5a1ace3e91
No known key found for this signature in database
4 changed files with 164 additions and 24 deletions

View File

@ -100,7 +100,11 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error)
// stream_timeout <duration>
// stream_close_delay <duration>
// stream_retain_on_reload
// stream_log_skip_handshake
// stream_logs {
// level <debug|info|warn|error>
// logger_name <name|access>
// skip_handshake
// }
// verbose_logs
//
// # request manipulation
@ -711,11 +715,42 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
}
h.StreamRetainOnReload = true
case "stream_log_skip_handshake":
case "stream_logs":
if d.NextArg() {
return d.ArgErr()
}
h.StreamLogSkipHandshake = true
if h.StreamLogs == nil {
h.StreamLogs = new(StreamLogs)
}
nesting := d.Nesting()
for d.NextBlock(nesting) {
switch d.Val() {
case "level":
if !d.NextArg() {
return d.ArgErr()
}
h.StreamLogs.Level = d.Val()
if d.NextArg() {
return d.ArgErr()
}
case "logger_name":
if !d.NextArg() {
return d.ArgErr()
}
h.StreamLogs.LoggerName = d.Val()
if d.NextArg() {
return d.ArgErr()
}
case "skip_handshake":
if d.NextArg() {
return d.ArgErr()
}
h.StreamLogs.SkipHandshake = true
default:
return d.Errf("unrecognized stream_logs option: %s", d.Val())
}
}
case "trusted_proxies":
for d.NextArg() {

View File

@ -193,10 +193,9 @@ type Handler struct {
// connections are closed on reload (optionally delayed by stream_close_delay).
StreamRetainOnReload bool `json:"stream_retain_on_reload,omitempty"`
// If true, suppresses the access log entry normally emitted when an
// upgraded stream handshake completes and the request unwinds. By default
// the handshake is still logged as a normal request with status 101.
StreamLogSkipHandshake bool `json:"stream_log_skip_handshake,omitempty"`
// Controls logging behavior for upgraded stream lifecycle events.
// If omitted, defaults are used (level=DEBUG, logger_name="http.handlers.reverse_proxy.stream").
StreamLogs *StreamLogs `json:"stream_logs,omitempty"`
// If configured, rewrites the copy of the upstream request.
// Allows changing the request method and URI (path and query).
@ -259,8 +258,34 @@ type Handler struct {
ctx caddy.Context
logger *zap.Logger
events *caddyevents.App
streamLogLevel zapcore.Level
streamLogLoggerName string
}
// StreamLogs controls logging for upgraded stream lifecycle events.
type StreamLogs struct {
// The minimum level at which stream lifecycle events are logged.
// Supported values are debug, info, warn, and error. Default: debug.
Level string `json:"level,omitempty"`
// Logger name for stream lifecycle logs. Default: "http.handlers.reverse_proxy.stream".
// Special value "access" uses the access logger namespace and, if set,
// respects the first value in access_logger_names/log_name for the request.
LoggerName string `json:"logger_name,omitempty"`
// If true, suppresses the access log entry normally emitted when an
// upgraded stream handshake completes and the request unwinds.
SkipHandshake bool `json:"skip_handshake,omitempty"`
}
const (
defaultStreamLogLevel = zapcore.DebugLevel
defaultStreamLoggerName = "http.handlers.reverse_proxy.stream"
streamLoggerNameUseAccess = "access"
defaultAccessLoggerBase = "http.log.access"
)
// CaddyModule returns the Caddy module information.
func (Handler) CaddyModule() caddy.ModuleInfo {
return caddy.ModuleInfo{
@ -279,6 +304,20 @@ func (h *Handler) Provision(ctx caddy.Context) error {
h.ctx = ctx
h.logger = ctx.Logger()
h.tunnel = newTunnelState(h.logger, time.Duration(h.StreamCloseDelay))
h.streamLogLevel = defaultStreamLogLevel
h.streamLogLoggerName = defaultStreamLoggerName
if h.StreamLogs != nil {
if h.StreamLogs.Level != "" {
lvl, err := zapcore.ParseLevel(strings.ToLower(strings.TrimSpace(h.StreamLogs.Level)))
if err != nil {
return fmt.Errorf("invalid stream_logs.level %q: %w", h.StreamLogs.Level, err)
}
h.streamLogLevel = lvl
}
if name := strings.TrimSpace(h.StreamLogs.LoggerName); name != "" {
h.streamLogLoggerName = name
}
}
// warn about unsafe buffering config
if h.RequestBuffers == -1 || h.ResponseBuffers == -1 {
@ -447,6 +486,39 @@ func (h *Handler) Provision(ctx caddy.Context) error {
return nil
}
func (h Handler) streamLogsSkipHandshake() bool {
return h.StreamLogs != nil && h.StreamLogs.SkipHandshake
}
func (h Handler) streamLoggerForRequest(req *http.Request) *zap.Logger {
name := strings.TrimSpace(h.streamLogLoggerName)
if name == "" {
name = defaultStreamLoggerName
}
if name == streamLoggerNameUseAccess {
logger := caddy.Log().Named(defaultAccessLoggerBase)
names := caddyhttp.GetVar(req.Context(), caddyhttp.AccessLoggerNameVarKey)
namesSlice, ok := names.([]any)
if !ok {
return logger
}
for _, v := range namesSlice {
name, ok := v.(string)
if !ok {
continue
}
if name == "" {
return logger
}
return logger.Named(name)
}
return logger
}
return caddy.Log().Named(name)
}
// Cleanup cleans up the resources made by h.
func (h *Handler) Cleanup() error {
if !h.StreamRetainOnReload {

View File

@ -208,26 +208,31 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup,
}
deleteFrontConn := tunnel.registerConnection(conn, gracefulClose(conn, false), upstreamAddr)
deleteBackConn := tunnel.registerConnection(backConn, gracefulClose(backConn, true), upstreamAddr)
if h.StreamLogSkipHandshake {
if h.streamLogsSkipHandshake() {
caddyhttp.SetVar(req.Context(), caddyhttp.LogSkipVar, true)
}
repl := req.Context().Value(caddy.ReplacerCtxKey).(*caddy.Replacer)
repl.Set("http.reverse_proxy.upgraded", true)
streamUUID, _ := repl.GetString("http.request.uuid")
streamFields := makeStreamLogFields(streamUUID)
streamLogger := h.streamLoggerForRequest(req)
streamLevel := h.streamLogLevel
finishMetrics := trackActiveStream(upstreamAddr)
start := time.Now()
if isH2 {
h.handleH2UpgradeTunnel(logger, wg, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics)
h.handleH2UpgradeTunnel(streamLogger, streamLevel, wg, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics, streamFields)
} else {
h.handleDetachedUpgradeTunnel(logger, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics)
h.handleDetachedUpgradeTunnel(streamLogger, streamLevel, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics, streamFields)
// Return immediately without touching wg. finalizeResponse's
// wg.Wait() returns at once since wg was never incremented.
}
}
func (h *Handler) handleH2UpgradeTunnel(
logger *zap.Logger,
streamLogger *zap.Logger,
streamLevel zapcore.Level,
wg *sync.WaitGroup,
conn io.ReadWriteCloser,
backConn io.ReadWriteCloser,
@ -237,6 +242,7 @@ func (h *Handler) handleH2UpgradeTunnel(
streamTimeout time.Duration,
start time.Time,
finishMetrics func(result string, duration time.Duration, toBackend, fromBackend int64),
streamFields []zap.Field,
) {
// H2 extended connect: ServeHTTP must block because rw and req.Body are
// only valid while the handler goroutine is running. Defers clean up
@ -275,12 +281,12 @@ func (h *Handler) handleH2UpgradeTunnel(
select {
case err := <-errc:
result = classifyStreamResult(err)
if c := logger.Check(zapcore.DebugLevel, "streaming error"); c != nil {
if c := streamLogger.Check(streamLevel, "streaming error"); c != nil {
c.Write(zap.Error(err))
}
case t := <-timeoutc:
result = "timeout"
if c := logger.Check(zapcore.DebugLevel, "stream timed out"); c != nil {
if c := streamLogger.Check(streamLevel, "stream timed out"); c != nil {
c.Write(zap.Time("timeout", t))
}
}
@ -292,17 +298,20 @@ func (h *Handler) handleH2UpgradeTunnel(
wg.Wait()
finishMetrics(result, time.Since(start), toBackend, fromBackend)
if c := logger.Check(zapcore.DebugLevel, "connection closed"); c != nil {
c.Write(
if c := streamLogger.Check(streamLevel, "connection closed"); c != nil {
fields := append([]zap.Field{}, streamFields...)
fields = append(fields,
zap.Duration("duration", time.Since(start)),
zap.Int64("bytes_to_backend", toBackend),
zap.Int64("bytes_from_backend", fromBackend),
)
c.Write(fields...)
}
}
func (h *Handler) handleDetachedUpgradeTunnel(
logger *zap.Logger,
streamLogger *zap.Logger,
streamLevel zapcore.Level,
conn io.ReadWriteCloser,
backConn io.ReadWriteCloser,
deleteFrontConn func(),
@ -311,6 +320,7 @@ func (h *Handler) handleDetachedUpgradeTunnel(
streamTimeout time.Duration,
start time.Time,
finishMetrics func(result string, duration time.Duration, toBackend, fromBackend int64),
streamFields []zap.Field,
) {
// HTTP/1.1 hijacked connection: launch a detached goroutine so that
// ServeHTTP can return immediately, allowing the Handler to be GC'd
@ -326,12 +336,14 @@ func (h *Handler) handleDetachedUpgradeTunnel(
defer deleteFrontConn()
defer func() {
finishMetrics(result, time.Since(start), toBackend, fromBackend)
if c := logger.Check(zapcore.DebugLevel, "connection closed"); c != nil {
c.Write(
if c := streamLogger.Check(streamLevel, "connection closed"); c != nil {
fields := append([]zap.Field{}, streamFields...)
fields = append(fields,
zap.Duration("duration", time.Since(start)),
zap.Int64("bytes_to_backend", toBackend),
zap.Int64("bytes_from_backend", fromBackend),
)
c.Write(fields...)
}
}()
@ -362,12 +374,12 @@ func (h *Handler) handleDetachedUpgradeTunnel(
select {
case err := <-errc:
result = classifyStreamResult(err)
if c := logger.Check(zapcore.DebugLevel, "streaming error"); c != nil {
if c := streamLogger.Check(streamLevel, "streaming error"); c != nil {
c.Write(zap.Error(err))
}
case t := <-timeoutc:
result = "timeout"
if c := logger.Check(zapcore.DebugLevel, "stream timed out"); c != nil {
if c := streamLogger.Check(streamLevel, "stream timed out"); c != nil {
c.Write(zap.Time("timeout", t))
}
}
@ -388,6 +400,14 @@ func classifyStreamResult(err error) string {
return "error"
}
func makeStreamLogFields(streamUUID string) []zap.Field {
fields := make([]zap.Field, 0, 1)
if streamUUID != "" {
fields = append(fields, zap.String("uuid", streamUUID))
}
return fields
}
// flushInterval returns the p.FlushInterval value, conditionally
// overriding its value for a specific request/response.
func (h Handler) flushInterval(req *http.Request, res *http.Response) time.Duration {

View File

@ -199,10 +199,14 @@ func TestHandlerCleanupRetainModeClosesOnlyRemovedUpstreams(t *testing.T) {
}
}
func TestHandlerUnmarshalCaddyfileStreamLogSkipHandshake(t *testing.T) {
func TestHandlerUnmarshalCaddyfileStreamLogsBlock(t *testing.T) {
d := caddyfile.NewTestDispenser(`
reverse_proxy localhost:9000 {
stream_log_skip_handshake
stream_logs {
level info
logger_name access
skip_handshake
}
}
`)
@ -210,7 +214,16 @@ func TestHandlerUnmarshalCaddyfileStreamLogSkipHandshake(t *testing.T) {
if err := h.UnmarshalCaddyfile(d); err != nil {
t.Fatalf("UnmarshalCaddyfile() error = %v", err)
}
if !h.StreamLogSkipHandshake {
t.Fatal("expected stream_log_skip_handshake to enable StreamLogSkipHandshake")
if h.StreamLogs == nil {
t.Fatal("expected stream_logs to be configured")
}
if h.StreamLogs.Level != "info" {
t.Fatalf("expected stream_logs.level=info, got %q", h.StreamLogs.Level)
}
if h.StreamLogs.LoggerName != "access" {
t.Fatalf("expected stream_logs.logger_name=access, got %q", h.StreamLogs.LoggerName)
}
if !h.StreamLogs.SkipHandshake {
t.Fatal("expected stream_logs.skip_handshake=true")
}
}