diff --git a/modules/caddyhttp/responsewriter.go b/modules/caddyhttp/responsewriter.go index c593c6162..d710160bd 100644 --- a/modules/caddyhttp/responsewriter.go +++ b/modules/caddyhttp/responsewriter.go @@ -270,9 +270,11 @@ func (rr *responseRecorder) setReadSize(size *int) { func (rr *responseRecorder) Hijack() (net.Conn, *bufio.ReadWriter, error) { if !rr.wroteHeader { - // hijacking without writing status code first works as long as subsequent writes follows http1.1 - // wire format, but it will show up with a status code of 0 in the access log and bytes written - // will include response headers. Response headers won't be present in the log if not set on the response writer. + // hijacking without writing status code first works as long as + // subsequent writes follows http1.1 wire format, but it will + // show up with a status code of 0 in the access log and bytes + // written will include response headers. Response headers won't + // be present in the log if not set on the response writer. caddy.Log().Warn("hijacking without writing status code first") } //nolint:bodyclose @@ -339,9 +341,11 @@ func (hc *hijackedConn) ReadFrom(r io.Reader) (int64, error) { return n, err } -// DetachResponseWriterAfterHijack detaches w or one of its wrapped response -// writers when it's hijacked. Returns true if not already hijacked. -// When detached, bytes read or written stats will not be recorded for the hijacked connection, and it's safe to use the connection after http middleware returns. +// DetachResponseWriterAfterHijack detaches w or one of its wrapped +// response writers when it's hijacked. Returns true if not already +// hijacked. When detached, bytes read or written stats will not be +// recorded for the hijacked connection, and it's safe to use the +// connection after http middleware returns. func DetachResponseWriterAfterHijack(w http.ResponseWriter, detached bool) bool { for w != nil { if detacher, ok := w.(interface{ DetachAfterHijack(bool) bool }); ok { diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 06aa2b101..dcabe8d2c 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -256,7 +256,7 @@ type Handler struct { // Tracks hijacked/upgraded connections (WebSocket etc.) so they can be // closed when their upstream is removed from the config. - tunnel *tunnelState + tunnelTracker *tunnelTracker ctx caddy.Context logger *zap.Logger @@ -283,7 +283,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { h.events = eventAppIface.(*caddyevents.App) h.ctx = ctx h.logger = ctx.Logger() - h.tunnel = newTunnelState(h.logger, time.Duration(h.StreamCloseDelay)) + h.tunnelTracker = newTunnelTracker(h.logger, time.Duration(h.StreamCloseDelay)) h.streamLogLevel = defaultStreamLogLevel h.streamLogLoggerName = defaultStreamLoggerName if h.StreamLogs != nil { @@ -300,7 +300,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { } if h.StreamRetainOnReload { - registerDetachedTunnelStates(h.tunnel) + registerDetachedTunnelTrackers(h.tunnelTracker) } // warn about unsafe buffering config @@ -504,22 +504,22 @@ func (h Handler) streamLoggerForRequest(req *http.Request) *zap.Logger { } var ( - detachedTunnelStates = make(map[*tunnelState]struct{}) - detachedTunnelStatesMu sync.Mutex + detachedTunnelTrackers = make(map[*tunnelTracker]struct{}) + detachedTunnelTrackersMu sync.Mutex ) -func registerDetachedTunnelStates(ts *tunnelState) { - detachedTunnelStatesMu.Lock() - defer detachedTunnelStatesMu.Unlock() - detachedTunnelStates[ts] = struct{}{} +func registerDetachedTunnelTrackers(ts *tunnelTracker) { + detachedTunnelTrackersMu.Lock() + defer detachedTunnelTrackersMu.Unlock() + detachedTunnelTrackers[ts] = struct{}{} } -func notifyDetachedTunnelStatesOfUpstreamRemoval(upstream string, self *tunnelState) error { - detachedTunnelStatesMu.Lock() - defer detachedTunnelStatesMu.Unlock() +func notifyDetachedTunnelTrackersOfUpstreamRemoval(upstream string, self *tunnelTracker) error { + detachedTunnelTrackersMu.Lock() + defer detachedTunnelTrackersMu.Unlock() var err error - for tunnel := range detachedTunnelStates { + for tunnel := range detachedTunnelTrackers { if closeErr := tunnel.closeConnectionsForUpstream(upstream); closeErr != nil && tunnel == self && err == nil { err = closeErr } @@ -527,16 +527,16 @@ func notifyDetachedTunnelStatesOfUpstreamRemoval(upstream string, self *tunnelSt return err } -func unregisterDetachedTunnelStates(ts *tunnelState) { - detachedTunnelStatesMu.Lock() - defer detachedTunnelStatesMu.Unlock() - delete(detachedTunnelStates, ts) +func unregisterDetachedTunnelTrackers(ts *tunnelTracker) { + detachedTunnelTrackersMu.Lock() + defer detachedTunnelTrackersMu.Unlock() + delete(detachedTunnelTrackers, ts) } // Cleanup cleans up the resources made by h. func (h *Handler) Cleanup() error { // even if StreamRetainOnReload is true, extended connect websockets may still be running - err := h.tunnel.cleanupAttachedConnections() + err := h.tunnelTracker.cleanupAttachedConnections() for _, upstream := range h.Upstreams { // hosts.Delete returns deleted=true when the ref count reaches zero, // meaning no other active config references this upstream. In that @@ -544,7 +544,7 @@ func (h *Handler) Cleanup() error { // to their natural end since the upstream is still in use. deleted, _ := hosts.Delete(upstream.String()) if deleted { - if closeErr := notifyDetachedTunnelStatesOfUpstreamRemoval(upstream.String(), h.tunnel); closeErr != nil && err == nil { + if closeErr := notifyDetachedTunnelTrackersOfUpstreamRemoval(upstream.String(), h.tunnelTracker); closeErr != nil && err == nil { err = closeErr } } diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 753e1e6cb..c93a57e47 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -95,12 +95,13 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit // Capture all h fields needed by the tunnel now, so that the Handler (h) // is not referenced after this function returns (for HTTP/1.1 hijacked // connections the tunnel runs in a detached goroutine). - tunnel := h.tunnel + tunnel := h.tunnelTracker bufferSize := h.StreamBufferSize streamTimeout := time.Duration(h.StreamTimeout) if h.StreamRetainOnReload { - // the return value should be true as it's not hijacked yet, but some middleware may wrap response writers incorrectly + // the return value should be true as it's not hijacked yet, + // but some middleware may wrap response writers incorrectly if !caddyhttp.DetachResponseWriterAfterHijack(rw, true) { if c := logger.Check(zap.DebugLevel, "detaching connection failed"); c != nil { c.Write(zap.String("tip", "check if your response writers have an Unwrap method or if already hijacked")) @@ -113,10 +114,14 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit brw *bufio.ReadWriter detached = h.StreamRetainOnReload ) - // websocket over http2 or http3 if extended connect is enabled, assuming backend doesn't support this, the request will be modified to http1.1 upgrade - // TODO: once we can reliably detect backend support this, it can be removed for those backends + // websocket over http2 or http3 if extended connect is enabled, + // assuming backend doesn't support this, the request will be + // modified to http1.1 upgrade + // TODO: once we can reliably detect backend support this, it can + // be removed for those backends if body, ok := caddyhttp.GetVar(req.Context(), "extended_connect_websocket_body").(io.ReadCloser); ok { - // websocket over extended connect can't be detached. rw and req.Body are only valid while the handler goroutine is running + // websocket over extended connect can't be detached. rw and req.Body + // are only valid while the handler goroutine is running detached = false req.Body = body rw.Header().Del("Upgrade") @@ -213,15 +218,51 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit start := time.Now() if !detached { - handleUpgradeTunnel(streamLogger, streamLevel, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics, streamFields) + handleUpgradeTunnel( + streamLogger, + streamLevel, + conn, + backConn, + deleteFrontConn, + deleteBackConn, + bufferSize, + streamTimeout, + start, + finishMetrics, + streamFields, + ) } else { // start a new goroutine - go handleUpgradeTunnel(streamLogger, streamLevel, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics, streamFields) + go handleUpgradeTunnel( + streamLogger, + streamLevel, + conn, + backConn, + deleteFrontConn, + deleteBackConn, + bufferSize, + streamTimeout, + start, + finishMetrics, + streamFields, + ) } } // handleUpgradeTunnel returns when transfer is done. -func handleUpgradeTunnel(streamLogger *zap.Logger, streamLevel zapcore.Level, conn io.ReadWriteCloser, backConn io.ReadWriteCloser, deleteFrontConn func(), deleteBackConn func(), bufferSize int, streamTimeout time.Duration, start time.Time, finishMetrics func(result string, duration time.Duration, toBackend int64, fromBackend int64), streamFields []zap.Field) { +func handleUpgradeTunnel( + streamLogger *zap.Logger, + streamLevel zapcore.Level, + conn io.ReadWriteCloser, + backConn io.ReadWriteCloser, + deleteFrontConn func(), + deleteBackConn func(), + bufferSize int, + streamTimeout time.Duration, + start time.Time, + finishMetrics func(result string, duration time.Duration, toBackend int64, fromBackend int64), + streamFields []zap.Field, +) { defer deleteBackConn() defer deleteFrontConn() var ( @@ -232,7 +273,8 @@ func handleUpgradeTunnel(streamLogger *zap.Logger, streamLevel zapcore.Level, co ) // when a stream timeout is encountered, no error will be read from errc - // a buffer size of 2 will allow both the read and write goroutines to send the error and exit + // a buffer size of 2 will allow both the read and write goroutines to + // send the error and exit // see: https://github.com/caddyserver/caddy/issues/7418 errc := make(chan error, 2) spc := switchProtocolCopier{ @@ -286,7 +328,10 @@ func handleUpgradeTunnel(streamLogger *zap.Logger, streamLevel zapcore.Level, co } func classifyStreamResult(err error) string { - if err == nil || errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) || errors.Is(err, context.Canceled) { + if err == nil || + errors.Is(err, io.EOF) || + errors.Is(err, net.ErrClosed) || + errors.Is(err, context.Canceled) { return "closed" } return "error" @@ -446,8 +491,8 @@ type openConnection struct { upstream string } -// tunnelState tracks hijacked/upgraded connections for selective cleanup. -type tunnelState struct { +// tunnelTracker tracks hijacked/upgraded connections for selective cleanup. +type tunnelTracker struct { connections map[io.ReadWriteCloser]openConnection closeTimer *time.Timer closeDelay time.Duration @@ -456,8 +501,8 @@ type tunnelState struct { logger *zap.Logger } -func newTunnelState(logger *zap.Logger, closeDelay time.Duration) *tunnelState { - return &tunnelState{ +func newTunnelTracker(logger *zap.Logger, closeDelay time.Duration) *tunnelTracker { + return &tunnelTracker{ connections: make(map[io.ReadWriteCloser]openConnection), closeDelay: closeDelay, logger: logger, @@ -466,7 +511,7 @@ func newTunnelState(logger *zap.Logger, closeDelay time.Duration) *tunnelState { // registerConnection stores conn in the tracking map. The caller must invoke // the returned del func when the connection is done. -func (ts *tunnelState) registerConnection(conn io.ReadWriteCloser, gracefulClose func() error, detached bool, upstream string) (del func()) { +func (ts *tunnelTracker) registerConnection(conn io.ReadWriteCloser, gracefulClose func() error, detached bool, upstream string) (del func()) { ts.mu.Lock() ts.connections[conn] = openConnection{conn, gracefulClose, detached, upstream} ts.mu.Unlock() @@ -474,7 +519,7 @@ func (ts *tunnelState) registerConnection(conn io.ReadWriteCloser, gracefulClose ts.mu.Lock() delete(ts.connections, conn) if len(ts.connections) == 0 && ts.stopped { - unregisterDetachedTunnelStates(ts) + unregisterDetachedTunnelTrackers(ts) if ts.closeTimer != nil { if ts.closeTimer.Stop() { ts.logger.Debug("stopped streaming connections close timer - all connections are already closed") @@ -487,7 +532,7 @@ func (ts *tunnelState) registerConnection(conn io.ReadWriteCloser, gracefulClose } // closeAttachedConnections closes all tracked attached connections. -func (ts *tunnelState) closeAttachedConnections() error { +func (ts *tunnelTracker) closeAttachedConnections() error { var err error ts.mu.Lock() defer ts.mu.Unlock() @@ -509,9 +554,9 @@ func (ts *tunnelState) closeAttachedConnections() error { return err } -// cleanupAttachedConnections closes upgraded attached connections. Depending on closeDelay it -// does that either immediately or after a timer. -func (ts *tunnelState) cleanupAttachedConnections() error { +// cleanupAttachedConnections closes upgraded attached connections. +// Depending on closeDelay it does that either immediately or after a timer. +func (ts *tunnelTracker) cleanupAttachedConnections() error { if ts.closeDelay == 0 { return ts.closeAttachedConnections() } @@ -652,7 +697,7 @@ func isWebsocket(r *http.Request) bool { // closeConnectionsForUpstream closes all tracked connections that were // established to the given upstream address. -func (ts *tunnelState) closeConnectionsForUpstream(addr string) error { +func (ts *tunnelTracker) closeConnectionsForUpstream(addr string) error { var err error ts.mu.Lock() defer ts.mu.Unlock() diff --git a/modules/caddyhttp/reverseproxy/streaming_test.go b/modules/caddyhttp/reverseproxy/streaming_test.go index 18acba3f4..2de24a864 100644 --- a/modules/caddyhttp/reverseproxy/streaming_test.go +++ b/modules/caddyhttp/reverseproxy/streaming_test.go @@ -114,14 +114,14 @@ func (c *trackingReadWriteCloser) isClosed() bool { } func TestHandlerCleanupLegacyModeClosesAllConnections(t *testing.T) { - ts := newTunnelState(caddy.Log(), 0) + ts := newTunnelTracker(caddy.Log(), 0) connA := newTrackingReadWriteCloser() connB := newTrackingReadWriteCloser() ts.registerConnection(connA, nil, false, "a") ts.registerConnection(connB, nil, false, "b") h := &Handler{ - tunnel: ts, + tunnelTracker: ts, StreamRetainOnReload: false, } @@ -134,12 +134,12 @@ func TestHandlerCleanupLegacyModeClosesAllConnections(t *testing.T) { } func TestHandlerCleanupLegacyModeHonorsDelay(t *testing.T) { - ts := newTunnelState(caddy.Log(), 40*time.Millisecond) + ts := newTunnelTracker(caddy.Log(), 40*time.Millisecond) conn := newTrackingReadWriteCloser() ts.registerConnection(conn, nil, false, "a") h := &Handler{ - tunnel: ts, + tunnelTracker: ts, StreamRetainOnReload: false, } @@ -172,15 +172,15 @@ func TestHandlerCleanupRetainModeClosesOnlyRemovedUpstreams(t *testing.T) { _, _ = hosts.Delete(upstreamB) }) - ts := newTunnelState(caddy.Log(), 0) - registerDetachedTunnelStates(ts) + ts := newTunnelTracker(caddy.Log(), 0) + registerDetachedTunnelTrackers(ts) connA := newTrackingReadWriteCloser() connB := newTrackingReadWriteCloser() ts.registerConnection(connA, nil, true, upstreamA) ts.registerConnection(connB, nil, true, upstreamB) h := &Handler{ - tunnel: ts, + tunnelTracker: ts, StreamRetainOnReload: true, Upstreams: UpstreamPool{ &Upstream{Dial: upstreamA},