diff --git a/caddytest/integration/stream_reload_stress_test.go b/caddytest/integration/stream_reload_stress_test.go index ff140f9a4..6ae6e9fa0 100644 --- a/caddytest/integration/stream_reload_stress_test.go +++ b/caddytest/integration/stream_reload_stress_test.go @@ -40,13 +40,13 @@ func TestReverseProxyReloadStressUpgradedStreamsHeapProfiles(t *testing.T) { // // legacy – no delay, close on reload immediately (old default) // close_delay – stream_close_delay, the old "keep-alive workaround" - // retain – stream_retain_on_reload, the new explicit retain flag + // detached – stream_detached, the new explicit detached flag // // Reloads are spread across time and interleaved with echo-checks so // stream health is exercised at each reload boundary, not only at the end. legacy := runReloadStress(t, tester, backend.addr, "legacy", false, 0) closeDelay := runReloadStress(t, tester, backend.addr, "close_delay", false, stressCloseDelay(t)) - retain := runReloadStress(t, tester, backend.addr, "retain", true, 0) + detached := runReloadStress(t, tester, backend.addr, "detached", true, 0) if legacy.aliveAfterReloads != 0 { t.Fatalf("legacy mode left %d upgraded streams alive after reloads", legacy.aliveAfterReloads) @@ -57,8 +57,8 @@ func TestReverseProxyReloadStressUpgradedStreamsHeapProfiles(t *testing.T) { if closeDelay.aliveAfterReloads != 0 { t.Fatalf("close_delay mode left %d upgraded streams alive after delay expiry", closeDelay.aliveAfterReloads) } - if retain.aliveAfterReloads != retain.streamCount { - t.Fatalf("retain mode kept %d/%d upgraded streams alive after reloads", retain.aliveAfterReloads, retain.streamCount) + if detached.aliveAfterReloads != detached.streamCount { + t.Fatalf("detached mode kept %d/%d upgraded streams alive after reloads", detached.aliveAfterReloads, detached.streamCount) } t.Logf("legacy heap: before=%s mid=%s after=%s delta(before→after)=%s objects(before=%d after=%d) handler_frames(before=%d after=%d)", @@ -77,13 +77,13 @@ func TestReverseProxyReloadStressUpgradedStreamsHeapProfiles(t *testing.T) { closeDelay.beforeReload.HeapObjects, closeDelay.afterReload.HeapObjects, closeDelay.beforeReload.handlerFrames, closeDelay.afterReload.handlerFrames, ) - t.Logf("retain heap: before=%s mid=%s after=%s delta(before→after)=%s objects(before=%d after=%d) handler_frames(before=%d after=%d)", - formatBytes(retain.beforeReload.HeapInuse), - formatBytes(retain.midReload.HeapInuse), - formatBytes(retain.afterReload.HeapInuse), - formatBytesDiff(retain.beforeReload.HeapInuse, retain.afterReload.HeapInuse), - retain.beforeReload.HeapObjects, retain.afterReload.HeapObjects, - retain.beforeReload.handlerFrames, retain.afterReload.handlerFrames, + t.Logf("detached heap: before=%s mid=%s after=%s delta(before→after)=%s objects(before=%d after=%d) handler_frames(before=%d after=%d)", + formatBytes(detached.beforeReload.HeapInuse), + formatBytes(detached.midReload.HeapInuse), + formatBytes(detached.afterReload.HeapInuse), + formatBytesDiff(detached.beforeReload.HeapInuse, detached.afterReload.HeapInuse), + detached.beforeReload.HeapObjects, detached.afterReload.HeapObjects, + detached.beforeReload.handlerFrames, detached.afterReload.handlerFrames, ) } @@ -107,7 +107,7 @@ type heapSnapshot struct { // config reloads spread over time. An echo check is performed every 6 reloads so // stream health is exercised at each reload boundary rather than only at the end. // closeDelay mirrors the stream_close_delay config option; pass 0 to disable. -func runReloadStress(t *testing.T, tester *caddytest.Tester, backendAddr, mode string, retain bool, closeDelay time.Duration) stressRunResult { +func runReloadStress(t *testing.T, tester *caddytest.Tester, backendAddr, mode string, detach bool, closeDelay time.Duration) stressRunResult { t.Helper() const echoEvery = 6 // perform an echo check every N reloads @@ -115,7 +115,7 @@ func runReloadStress(t *testing.T, tester *caddytest.Tester, backendAddr, mode s streamCount := envIntOrDefault(t, "CADDY_STRESS_STREAM_COUNT", defaultStressStreamCount) reloadCount := envIntOrDefault(t, "CADDY_STRESS_RELOAD_COUNT", defaultStressReloadCount) - tester.InitServer(reloadStressConfig(backendAddr, retain, closeDelay, 0), "caddyfile") + tester.InitServer(reloadStressConfig(backendAddr, detach, closeDelay, 0), "caddyfile") clients := make([]*upgradedStreamClient, 0, streamCount) for i := 0; i < streamCount; i++ { @@ -134,7 +134,7 @@ func runReloadStress(t *testing.T, tester *caddytest.Tester, backendAddr, mode s // pause briefly and measure stream health so the snapshot reflects real-world // reload cadence rather than a tight loop. for i := 1; i <= reloadCount; i++ { - loadCaddyfileConfig(t, reloadStressConfig(backendAddr, retain, closeDelay, i)) + loadCaddyfileConfig(t, reloadStressConfig(backendAddr, detach, closeDelay, i)) // Small pause after each reload to let connection teardown propagate. time.Sleep(50 * time.Millisecond) @@ -143,11 +143,11 @@ func runReloadStress(t *testing.T, tester *caddytest.Tester, backendAddr, mode s alive := countAliveStreams(clients) t.Logf("%s mode: %d/%d streams alive after reload %d", mode, alive, streamCount, i) - // In retain mode every stream must survive every reload (upstream unchanged). - if retain { + // In detached mode, every stream must survive every reload (upstream unchanged). + if detach { for j, client := range clients { if err := client.echo(fmt.Sprintf("%s-mid-%02d-%02d\n", mode, i, j)); err != nil { - t.Fatalf("retain mode stream %d died at reload %d: %v", j, i, err) + t.Fatalf("detached mode stream %d died at reload %d: %v", j, i, err) } } } @@ -160,11 +160,11 @@ func runReloadStress(t *testing.T, tester *caddytest.Tester, backendAddr, mode s // For legacy mode: the reloads close streams immediately; wait for that to complete. // For close_delay mode: streams are still alive here; wait for the delay to fire. - // For retain mode: streams survive indefinitely; no wait needed. + // For detached mode: streams survive indefinitely; no wait needed. var aliveBeforeDelayExpiry int aliveAfterReloads := countAliveStreams(clients) switch { - case retain: + case detach: // nothing to wait for case closeDelay > 0: // streams should still be alive at this point (delay hasn't expired) @@ -251,10 +251,10 @@ func loadCaddyfileConfig(t *testing.T, rawConfig string) { } } -func reloadStressConfig(backendAddr string, retain bool, closeDelay time.Duration, revision int) string { +func reloadStressConfig(backendAddr string, detach bool, closeDelay time.Duration, revision int) string { var directives string - if retain { - directives += "\n\t\tstream_retain_on_reload" + if detach { + directives += "\n\t\tstream_detached" } if closeDelay > 0 { directives += fmt.Sprintf("\n\t\tstream_close_delay %s", closeDelay) diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index 148bf4aa1..0d4d858d7 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -99,7 +99,7 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error) // stream_buffer_size // stream_timeout // stream_close_delay -// stream_retain_on_reload +// stream_detached // stream_logs { // level // logger_name @@ -709,11 +709,11 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { h.StreamCloseDelay = caddy.Duration(dur) } - case "stream_retain_on_reload": + case "stream_detached": if d.NextArg() { return d.ArgErr() } - h.StreamRetainOnReload = true + h.StreamDetached = true case "stream_logs": if d.NextArg() { diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index dcabe8d2c..38c0b228e 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -186,15 +186,17 @@ type Handler struct { // by the previous config closing. Default: no delay. StreamCloseDelay caddy.Duration `json:"stream_close_delay,omitempty"` - // If true, upgraded connections such as WebSockets are retained across - // config reloads when their upstream still exists in the new config. - // Connections using upstreams that are removed are closed during cleanup. - // By default this is false, preserving legacy behavior where upgraded - // connections are closed on reload (optionally delayed by stream_close_delay). - // Only http1.1 websocket connections are affected, websockets for h2/h3 are not affected. - // If true, bytes transferred for http1.1 in the access logs will be zero but those stats - // can be found in the stream logs for http1/2/3 regardless if this is enabled. - StreamRetainOnReload bool `json:"stream_retain_on_reload,omitempty"` + // If true, upgraded connections such as WebSockets are detached from + // the handler and retained across config reloads when their upstream + // still exists in the new config. Connections using upstreams that are + // removed are closed during cleanup. By default this is false, preserving + // legacy behavior where upgraded connections are closed on reload + // (optionally delayed by stream_close_delay). + // Only http1.1 websocket connections are affected, websockets for h2/h3 + // are not affected. If true, bytes transferred for http1.1 in the access + // logs will be zero but those stats can be found in the stream logs for + // http1/2/3 regardless if this is enabled. + StreamDetached bool `json:"stream_detached,omitempty"` // Controls logging behavior for upgraded stream lifecycle events. // If omitted, defaults are used (level=DEBUG, logger_name="http.handlers.reverse_proxy.stream"). @@ -299,7 +301,7 @@ func (h *Handler) Provision(ctx caddy.Context) error { } } - if h.StreamRetainOnReload { + if h.StreamDetached { registerDetachedTunnelTrackers(h.tunnelTracker) } @@ -535,7 +537,7 @@ func unregisterDetachedTunnelTrackers(ts *tunnelTracker) { // Cleanup cleans up the resources made by h. func (h *Handler) Cleanup() error { - // even if StreamRetainOnReload is true, extended connect websockets may still be running + // even if StreamDetached is true, extended connect websockets may still be running err := h.tunnelTracker.cleanupAttachedConnections() for _, upstream := range h.Upstreams { // hosts.Delete returns deleted=true when the ref count reaches zero, diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 3f6b40cd9..7cb7ff7da 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -99,7 +99,7 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit bufferSize := h.StreamBufferSize streamTimeout := time.Duration(h.StreamTimeout) - if h.StreamRetainOnReload { + if h.StreamDetached { // the return value should be true as it's not hijacked yet, // but some middleware may wrap response writers incorrectly if !caddyhttp.DetachResponseWriterAfterHijack(rw, true) { @@ -112,7 +112,7 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit var ( conn io.ReadWriteCloser brw *bufio.ReadWriter - detached = h.StreamRetainOnReload + detached = h.StreamDetached ) // websocket over http2 or http3 if extended connect is enabled, // assuming backend doesn't support this, the request will be diff --git a/modules/caddyhttp/reverseproxy/streaming_test.go b/modules/caddyhttp/reverseproxy/streaming_test.go index 2de24a864..7dc5e476c 100644 --- a/modules/caddyhttp/reverseproxy/streaming_test.go +++ b/modules/caddyhttp/reverseproxy/streaming_test.go @@ -121,8 +121,8 @@ func TestHandlerCleanupLegacyModeClosesAllConnections(t *testing.T) { ts.registerConnection(connB, nil, false, "b") h := &Handler{ - tunnelTracker: ts, - StreamRetainOnReload: false, + tunnelTracker: ts, + StreamDetached: false, } if err := h.Cleanup(); err != nil { @@ -139,8 +139,8 @@ func TestHandlerCleanupLegacyModeHonorsDelay(t *testing.T) { ts.registerConnection(conn, nil, false, "a") h := &Handler{ - tunnelTracker: ts, - StreamRetainOnReload: false, + tunnelTracker: ts, + StreamDetached: false, } if err := h.Cleanup(); err != nil { @@ -157,7 +157,7 @@ func TestHandlerCleanupLegacyModeHonorsDelay(t *testing.T) { } } -func TestHandlerCleanupRetainModeClosesOnlyRemovedUpstreams(t *testing.T) { +func TestHandlerCleanupDetachedModeClosesOnlyRemovedUpstreams(t *testing.T) { const upstreamA = "upstream-a" const upstreamB = "upstream-b" @@ -180,8 +180,8 @@ func TestHandlerCleanupRetainModeClosesOnlyRemovedUpstreams(t *testing.T) { ts.registerConnection(connB, nil, true, upstreamB) h := &Handler{ - tunnelTracker: ts, - StreamRetainOnReload: true, + tunnelTracker: ts, + StreamDetached: true, Upstreams: UpstreamPool{ &Upstream{Dial: upstreamA}, &Upstream{Dial: upstreamB}, @@ -193,7 +193,7 @@ func TestHandlerCleanupRetainModeClosesOnlyRemovedUpstreams(t *testing.T) { } if connA.isClosed() { - t.Fatal("connection for retained upstream should remain open") + t.Fatal("connection for detached upstream should remain open") } if !connB.isClosed() { t.Fatal("connection for removed upstream should be closed")