Rename to tunnelTracker, reflow some comments

This commit is contained in:
Francis Lavoie 2026-04-21 08:12:57 -04:00
parent 2fba548bae
commit 4ef0e8a9a5
No known key found for this signature in database
4 changed files with 102 additions and 53 deletions

View File

@ -270,9 +270,11 @@ func (rr *responseRecorder) setReadSize(size *int) {
func (rr *responseRecorder) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if !rr.wroteHeader {
// hijacking without writing status code first works as long as subsequent writes follows http1.1
// wire format, but it will show up with a status code of 0 in the access log and bytes written
// will include response headers. Response headers won't be present in the log if not set on the response writer.
// hijacking without writing status code first works as long as
// subsequent writes follows http1.1 wire format, but it will
// show up with a status code of 0 in the access log and bytes
// written will include response headers. Response headers won't
// be present in the log if not set on the response writer.
caddy.Log().Warn("hijacking without writing status code first")
}
//nolint:bodyclose
@ -339,9 +341,11 @@ func (hc *hijackedConn) ReadFrom(r io.Reader) (int64, error) {
return n, err
}
// DetachResponseWriterAfterHijack detaches w or one of its wrapped response
// writers when it's hijacked. Returns true if not already hijacked.
// When detached, bytes read or written stats will not be recorded for the hijacked connection, and it's safe to use the connection after http middleware returns.
// DetachResponseWriterAfterHijack detaches w or one of its wrapped
// response writers when it's hijacked. Returns true if not already
// hijacked. When detached, bytes read or written stats will not be
// recorded for the hijacked connection, and it's safe to use the
// connection after http middleware returns.
func DetachResponseWriterAfterHijack(w http.ResponseWriter, detached bool) bool {
for w != nil {
if detacher, ok := w.(interface{ DetachAfterHijack(bool) bool }); ok {

View File

@ -256,7 +256,7 @@ type Handler struct {
// Tracks hijacked/upgraded connections (WebSocket etc.) so they can be
// closed when their upstream is removed from the config.
tunnel *tunnelState
tunnelTracker *tunnelTracker
ctx caddy.Context
logger *zap.Logger
@ -283,7 +283,7 @@ func (h *Handler) Provision(ctx caddy.Context) error {
h.events = eventAppIface.(*caddyevents.App)
h.ctx = ctx
h.logger = ctx.Logger()
h.tunnel = newTunnelState(h.logger, time.Duration(h.StreamCloseDelay))
h.tunnelTracker = newTunnelTracker(h.logger, time.Duration(h.StreamCloseDelay))
h.streamLogLevel = defaultStreamLogLevel
h.streamLogLoggerName = defaultStreamLoggerName
if h.StreamLogs != nil {
@ -300,7 +300,7 @@ func (h *Handler) Provision(ctx caddy.Context) error {
}
if h.StreamRetainOnReload {
registerDetachedTunnelStates(h.tunnel)
registerDetachedTunnelTrackers(h.tunnelTracker)
}
// warn about unsafe buffering config
@ -504,22 +504,22 @@ func (h Handler) streamLoggerForRequest(req *http.Request) *zap.Logger {
}
var (
detachedTunnelStates = make(map[*tunnelState]struct{})
detachedTunnelStatesMu sync.Mutex
detachedTunnelTrackers = make(map[*tunnelTracker]struct{})
detachedTunnelTrackersMu sync.Mutex
)
func registerDetachedTunnelStates(ts *tunnelState) {
detachedTunnelStatesMu.Lock()
defer detachedTunnelStatesMu.Unlock()
detachedTunnelStates[ts] = struct{}{}
func registerDetachedTunnelTrackers(ts *tunnelTracker) {
detachedTunnelTrackersMu.Lock()
defer detachedTunnelTrackersMu.Unlock()
detachedTunnelTrackers[ts] = struct{}{}
}
func notifyDetachedTunnelStatesOfUpstreamRemoval(upstream string, self *tunnelState) error {
detachedTunnelStatesMu.Lock()
defer detachedTunnelStatesMu.Unlock()
func notifyDetachedTunnelTrackersOfUpstreamRemoval(upstream string, self *tunnelTracker) error {
detachedTunnelTrackersMu.Lock()
defer detachedTunnelTrackersMu.Unlock()
var err error
for tunnel := range detachedTunnelStates {
for tunnel := range detachedTunnelTrackers {
if closeErr := tunnel.closeConnectionsForUpstream(upstream); closeErr != nil && tunnel == self && err == nil {
err = closeErr
}
@ -527,16 +527,16 @@ func notifyDetachedTunnelStatesOfUpstreamRemoval(upstream string, self *tunnelSt
return err
}
func unregisterDetachedTunnelStates(ts *tunnelState) {
detachedTunnelStatesMu.Lock()
defer detachedTunnelStatesMu.Unlock()
delete(detachedTunnelStates, ts)
func unregisterDetachedTunnelTrackers(ts *tunnelTracker) {
detachedTunnelTrackersMu.Lock()
defer detachedTunnelTrackersMu.Unlock()
delete(detachedTunnelTrackers, ts)
}
// Cleanup cleans up the resources made by h.
func (h *Handler) Cleanup() error {
// even if StreamRetainOnReload is true, extended connect websockets may still be running
err := h.tunnel.cleanupAttachedConnections()
err := h.tunnelTracker.cleanupAttachedConnections()
for _, upstream := range h.Upstreams {
// hosts.Delete returns deleted=true when the ref count reaches zero,
// meaning no other active config references this upstream. In that
@ -544,7 +544,7 @@ func (h *Handler) Cleanup() error {
// to their natural end since the upstream is still in use.
deleted, _ := hosts.Delete(upstream.String())
if deleted {
if closeErr := notifyDetachedTunnelStatesOfUpstreamRemoval(upstream.String(), h.tunnel); closeErr != nil && err == nil {
if closeErr := notifyDetachedTunnelTrackersOfUpstreamRemoval(upstream.String(), h.tunnelTracker); closeErr != nil && err == nil {
err = closeErr
}
}

View File

@ -95,12 +95,13 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit
// Capture all h fields needed by the tunnel now, so that the Handler (h)
// is not referenced after this function returns (for HTTP/1.1 hijacked
// connections the tunnel runs in a detached goroutine).
tunnel := h.tunnel
tunnel := h.tunnelTracker
bufferSize := h.StreamBufferSize
streamTimeout := time.Duration(h.StreamTimeout)
if h.StreamRetainOnReload {
// the return value should be true as it's not hijacked yet, but some middleware may wrap response writers incorrectly
// the return value should be true as it's not hijacked yet,
// but some middleware may wrap response writers incorrectly
if !caddyhttp.DetachResponseWriterAfterHijack(rw, true) {
if c := logger.Check(zap.DebugLevel, "detaching connection failed"); c != nil {
c.Write(zap.String("tip", "check if your response writers have an Unwrap method or if already hijacked"))
@ -113,10 +114,14 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit
brw *bufio.ReadWriter
detached = h.StreamRetainOnReload
)
// websocket over http2 or http3 if extended connect is enabled, assuming backend doesn't support this, the request will be modified to http1.1 upgrade
// TODO: once we can reliably detect backend support this, it can be removed for those backends
// websocket over http2 or http3 if extended connect is enabled,
// assuming backend doesn't support this, the request will be
// modified to http1.1 upgrade
// TODO: once we can reliably detect backend support this, it can
// be removed for those backends
if body, ok := caddyhttp.GetVar(req.Context(), "extended_connect_websocket_body").(io.ReadCloser); ok {
// websocket over extended connect can't be detached. rw and req.Body are only valid while the handler goroutine is running
// websocket over extended connect can't be detached. rw and req.Body
// are only valid while the handler goroutine is running
detached = false
req.Body = body
rw.Header().Del("Upgrade")
@ -213,15 +218,51 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit
start := time.Now()
if !detached {
handleUpgradeTunnel(streamLogger, streamLevel, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics, streamFields)
handleUpgradeTunnel(
streamLogger,
streamLevel,
conn,
backConn,
deleteFrontConn,
deleteBackConn,
bufferSize,
streamTimeout,
start,
finishMetrics,
streamFields,
)
} else {
// start a new goroutine
go handleUpgradeTunnel(streamLogger, streamLevel, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics, streamFields)
go handleUpgradeTunnel(
streamLogger,
streamLevel,
conn,
backConn,
deleteFrontConn,
deleteBackConn,
bufferSize,
streamTimeout,
start,
finishMetrics,
streamFields,
)
}
}
// handleUpgradeTunnel returns when transfer is done.
func handleUpgradeTunnel(streamLogger *zap.Logger, streamLevel zapcore.Level, conn io.ReadWriteCloser, backConn io.ReadWriteCloser, deleteFrontConn func(), deleteBackConn func(), bufferSize int, streamTimeout time.Duration, start time.Time, finishMetrics func(result string, duration time.Duration, toBackend int64, fromBackend int64), streamFields []zap.Field) {
func handleUpgradeTunnel(
streamLogger *zap.Logger,
streamLevel zapcore.Level,
conn io.ReadWriteCloser,
backConn io.ReadWriteCloser,
deleteFrontConn func(),
deleteBackConn func(),
bufferSize int,
streamTimeout time.Duration,
start time.Time,
finishMetrics func(result string, duration time.Duration, toBackend int64, fromBackend int64),
streamFields []zap.Field,
) {
defer deleteBackConn()
defer deleteFrontConn()
var (
@ -232,7 +273,8 @@ func handleUpgradeTunnel(streamLogger *zap.Logger, streamLevel zapcore.Level, co
)
// when a stream timeout is encountered, no error will be read from errc
// a buffer size of 2 will allow both the read and write goroutines to send the error and exit
// a buffer size of 2 will allow both the read and write goroutines to
// send the error and exit
// see: https://github.com/caddyserver/caddy/issues/7418
errc := make(chan error, 2)
spc := switchProtocolCopier{
@ -286,7 +328,10 @@ func handleUpgradeTunnel(streamLogger *zap.Logger, streamLevel zapcore.Level, co
}
func classifyStreamResult(err error) string {
if err == nil || errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) || errors.Is(err, context.Canceled) {
if err == nil ||
errors.Is(err, io.EOF) ||
errors.Is(err, net.ErrClosed) ||
errors.Is(err, context.Canceled) {
return "closed"
}
return "error"
@ -446,8 +491,8 @@ type openConnection struct {
upstream string
}
// tunnelState tracks hijacked/upgraded connections for selective cleanup.
type tunnelState struct {
// tunnelTracker tracks hijacked/upgraded connections for selective cleanup.
type tunnelTracker struct {
connections map[io.ReadWriteCloser]openConnection
closeTimer *time.Timer
closeDelay time.Duration
@ -456,8 +501,8 @@ type tunnelState struct {
logger *zap.Logger
}
func newTunnelState(logger *zap.Logger, closeDelay time.Duration) *tunnelState {
return &tunnelState{
func newTunnelTracker(logger *zap.Logger, closeDelay time.Duration) *tunnelTracker {
return &tunnelTracker{
connections: make(map[io.ReadWriteCloser]openConnection),
closeDelay: closeDelay,
logger: logger,
@ -466,7 +511,7 @@ func newTunnelState(logger *zap.Logger, closeDelay time.Duration) *tunnelState {
// registerConnection stores conn in the tracking map. The caller must invoke
// the returned del func when the connection is done.
func (ts *tunnelState) registerConnection(conn io.ReadWriteCloser, gracefulClose func() error, detached bool, upstream string) (del func()) {
func (ts *tunnelTracker) registerConnection(conn io.ReadWriteCloser, gracefulClose func() error, detached bool, upstream string) (del func()) {
ts.mu.Lock()
ts.connections[conn] = openConnection{conn, gracefulClose, detached, upstream}
ts.mu.Unlock()
@ -474,7 +519,7 @@ func (ts *tunnelState) registerConnection(conn io.ReadWriteCloser, gracefulClose
ts.mu.Lock()
delete(ts.connections, conn)
if len(ts.connections) == 0 && ts.stopped {
unregisterDetachedTunnelStates(ts)
unregisterDetachedTunnelTrackers(ts)
if ts.closeTimer != nil {
if ts.closeTimer.Stop() {
ts.logger.Debug("stopped streaming connections close timer - all connections are already closed")
@ -487,7 +532,7 @@ func (ts *tunnelState) registerConnection(conn io.ReadWriteCloser, gracefulClose
}
// closeAttachedConnections closes all tracked attached connections.
func (ts *tunnelState) closeAttachedConnections() error {
func (ts *tunnelTracker) closeAttachedConnections() error {
var err error
ts.mu.Lock()
defer ts.mu.Unlock()
@ -509,9 +554,9 @@ func (ts *tunnelState) closeAttachedConnections() error {
return err
}
// cleanupAttachedConnections closes upgraded attached connections. Depending on closeDelay it
// does that either immediately or after a timer.
func (ts *tunnelState) cleanupAttachedConnections() error {
// cleanupAttachedConnections closes upgraded attached connections.
// Depending on closeDelay it does that either immediately or after a timer.
func (ts *tunnelTracker) cleanupAttachedConnections() error {
if ts.closeDelay == 0 {
return ts.closeAttachedConnections()
}
@ -652,7 +697,7 @@ func isWebsocket(r *http.Request) bool {
// closeConnectionsForUpstream closes all tracked connections that were
// established to the given upstream address.
func (ts *tunnelState) closeConnectionsForUpstream(addr string) error {
func (ts *tunnelTracker) closeConnectionsForUpstream(addr string) error {
var err error
ts.mu.Lock()
defer ts.mu.Unlock()

View File

@ -114,14 +114,14 @@ func (c *trackingReadWriteCloser) isClosed() bool {
}
func TestHandlerCleanupLegacyModeClosesAllConnections(t *testing.T) {
ts := newTunnelState(caddy.Log(), 0)
ts := newTunnelTracker(caddy.Log(), 0)
connA := newTrackingReadWriteCloser()
connB := newTrackingReadWriteCloser()
ts.registerConnection(connA, nil, false, "a")
ts.registerConnection(connB, nil, false, "b")
h := &Handler{
tunnel: ts,
tunnelTracker: ts,
StreamRetainOnReload: false,
}
@ -134,12 +134,12 @@ func TestHandlerCleanupLegacyModeClosesAllConnections(t *testing.T) {
}
func TestHandlerCleanupLegacyModeHonorsDelay(t *testing.T) {
ts := newTunnelState(caddy.Log(), 40*time.Millisecond)
ts := newTunnelTracker(caddy.Log(), 40*time.Millisecond)
conn := newTrackingReadWriteCloser()
ts.registerConnection(conn, nil, false, "a")
h := &Handler{
tunnel: ts,
tunnelTracker: ts,
StreamRetainOnReload: false,
}
@ -172,15 +172,15 @@ func TestHandlerCleanupRetainModeClosesOnlyRemovedUpstreams(t *testing.T) {
_, _ = hosts.Delete(upstreamB)
})
ts := newTunnelState(caddy.Log(), 0)
registerDetachedTunnelStates(ts)
ts := newTunnelTracker(caddy.Log(), 0)
registerDetachedTunnelTrackers(ts)
connA := newTrackingReadWriteCloser()
connB := newTrackingReadWriteCloser()
ts.registerConnection(connA, nil, true, upstreamA)
ts.registerConnection(connB, nil, true, upstreamB)
h := &Handler{
tunnel: ts,
tunnelTracker: ts,
StreamRetainOnReload: true,
Upstreams: UpstreamPool{
&Upstream{Dial: upstreamA},