Rename to stream_detached

This commit is contained in:
Francis Lavoie 2026-04-21 08:38:38 -04:00
parent b0dc5d8bb8
commit 12139f2f99
No known key found for this signature in database
5 changed files with 48 additions and 46 deletions

View File

@ -40,13 +40,13 @@ func TestReverseProxyReloadStressUpgradedStreamsHeapProfiles(t *testing.T) {
//
// legacy no delay, close on reload immediately (old default)
// close_delay stream_close_delay, the old "keep-alive workaround"
// retain stream_retain_on_reload, the new explicit retain flag
// detached stream_detached, the new explicit detached flag
//
// Reloads are spread across time and interleaved with echo-checks so
// stream health is exercised at each reload boundary, not only at the end.
legacy := runReloadStress(t, tester, backend.addr, "legacy", false, 0)
closeDelay := runReloadStress(t, tester, backend.addr, "close_delay", false, stressCloseDelay(t))
retain := runReloadStress(t, tester, backend.addr, "retain", true, 0)
detached := runReloadStress(t, tester, backend.addr, "detached", true, 0)
if legacy.aliveAfterReloads != 0 {
t.Fatalf("legacy mode left %d upgraded streams alive after reloads", legacy.aliveAfterReloads)
@ -57,8 +57,8 @@ func TestReverseProxyReloadStressUpgradedStreamsHeapProfiles(t *testing.T) {
if closeDelay.aliveAfterReloads != 0 {
t.Fatalf("close_delay mode left %d upgraded streams alive after delay expiry", closeDelay.aliveAfterReloads)
}
if retain.aliveAfterReloads != retain.streamCount {
t.Fatalf("retain mode kept %d/%d upgraded streams alive after reloads", retain.aliveAfterReloads, retain.streamCount)
if detached.aliveAfterReloads != detached.streamCount {
t.Fatalf("detached mode kept %d/%d upgraded streams alive after reloads", detached.aliveAfterReloads, detached.streamCount)
}
t.Logf("legacy heap: before=%s mid=%s after=%s delta(before→after)=%s objects(before=%d after=%d) handler_frames(before=%d after=%d)",
@ -77,13 +77,13 @@ func TestReverseProxyReloadStressUpgradedStreamsHeapProfiles(t *testing.T) {
closeDelay.beforeReload.HeapObjects, closeDelay.afterReload.HeapObjects,
closeDelay.beforeReload.handlerFrames, closeDelay.afterReload.handlerFrames,
)
t.Logf("retain heap: before=%s mid=%s after=%s delta(before→after)=%s objects(before=%d after=%d) handler_frames(before=%d after=%d)",
formatBytes(retain.beforeReload.HeapInuse),
formatBytes(retain.midReload.HeapInuse),
formatBytes(retain.afterReload.HeapInuse),
formatBytesDiff(retain.beforeReload.HeapInuse, retain.afterReload.HeapInuse),
retain.beforeReload.HeapObjects, retain.afterReload.HeapObjects,
retain.beforeReload.handlerFrames, retain.afterReload.handlerFrames,
t.Logf("detached heap: before=%s mid=%s after=%s delta(before→after)=%s objects(before=%d after=%d) handler_frames(before=%d after=%d)",
formatBytes(detached.beforeReload.HeapInuse),
formatBytes(detached.midReload.HeapInuse),
formatBytes(detached.afterReload.HeapInuse),
formatBytesDiff(detached.beforeReload.HeapInuse, detached.afterReload.HeapInuse),
detached.beforeReload.HeapObjects, detached.afterReload.HeapObjects,
detached.beforeReload.handlerFrames, detached.afterReload.handlerFrames,
)
}
@ -107,7 +107,7 @@ type heapSnapshot struct {
// config reloads spread over time. An echo check is performed every 6 reloads so
// stream health is exercised at each reload boundary rather than only at the end.
// closeDelay mirrors the stream_close_delay config option; pass 0 to disable.
func runReloadStress(t *testing.T, tester *caddytest.Tester, backendAddr, mode string, retain bool, closeDelay time.Duration) stressRunResult {
func runReloadStress(t *testing.T, tester *caddytest.Tester, backendAddr, mode string, detach bool, closeDelay time.Duration) stressRunResult {
t.Helper()
const echoEvery = 6 // perform an echo check every N reloads
@ -115,7 +115,7 @@ func runReloadStress(t *testing.T, tester *caddytest.Tester, backendAddr, mode s
streamCount := envIntOrDefault(t, "CADDY_STRESS_STREAM_COUNT", defaultStressStreamCount)
reloadCount := envIntOrDefault(t, "CADDY_STRESS_RELOAD_COUNT", defaultStressReloadCount)
tester.InitServer(reloadStressConfig(backendAddr, retain, closeDelay, 0), "caddyfile")
tester.InitServer(reloadStressConfig(backendAddr, detach, closeDelay, 0), "caddyfile")
clients := make([]*upgradedStreamClient, 0, streamCount)
for i := 0; i < streamCount; i++ {
@ -134,7 +134,7 @@ func runReloadStress(t *testing.T, tester *caddytest.Tester, backendAddr, mode s
// pause briefly and measure stream health so the snapshot reflects real-world
// reload cadence rather than a tight loop.
for i := 1; i <= reloadCount; i++ {
loadCaddyfileConfig(t, reloadStressConfig(backendAddr, retain, closeDelay, i))
loadCaddyfileConfig(t, reloadStressConfig(backendAddr, detach, closeDelay, i))
// Small pause after each reload to let connection teardown propagate.
time.Sleep(50 * time.Millisecond)
@ -143,11 +143,11 @@ func runReloadStress(t *testing.T, tester *caddytest.Tester, backendAddr, mode s
alive := countAliveStreams(clients)
t.Logf("%s mode: %d/%d streams alive after reload %d", mode, alive, streamCount, i)
// In retain mode every stream must survive every reload (upstream unchanged).
if retain {
// In detached mode, every stream must survive every reload (upstream unchanged).
if detach {
for j, client := range clients {
if err := client.echo(fmt.Sprintf("%s-mid-%02d-%02d\n", mode, i, j)); err != nil {
t.Fatalf("retain mode stream %d died at reload %d: %v", j, i, err)
t.Fatalf("detached mode stream %d died at reload %d: %v", j, i, err)
}
}
}
@ -160,11 +160,11 @@ func runReloadStress(t *testing.T, tester *caddytest.Tester, backendAddr, mode s
// For legacy mode: the reloads close streams immediately; wait for that to complete.
// For close_delay mode: streams are still alive here; wait for the delay to fire.
// For retain mode: streams survive indefinitely; no wait needed.
// For detached mode: streams survive indefinitely; no wait needed.
var aliveBeforeDelayExpiry int
aliveAfterReloads := countAliveStreams(clients)
switch {
case retain:
case detach:
// nothing to wait for
case closeDelay > 0:
// streams should still be alive at this point (delay hasn't expired)
@ -251,10 +251,10 @@ func loadCaddyfileConfig(t *testing.T, rawConfig string) {
}
}
func reloadStressConfig(backendAddr string, retain bool, closeDelay time.Duration, revision int) string {
func reloadStressConfig(backendAddr string, detach bool, closeDelay time.Duration, revision int) string {
var directives string
if retain {
directives += "\n\t\tstream_retain_on_reload"
if detach {
directives += "\n\t\tstream_detached"
}
if closeDelay > 0 {
directives += fmt.Sprintf("\n\t\tstream_close_delay %s", closeDelay)

View File

@ -99,7 +99,7 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error)
// stream_buffer_size <size>
// stream_timeout <duration>
// stream_close_delay <duration>
// stream_retain_on_reload
// stream_detached
// stream_logs {
// level <debug|info|warn|error>
// logger_name <name|access>
@ -709,11 +709,11 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
h.StreamCloseDelay = caddy.Duration(dur)
}
case "stream_retain_on_reload":
case "stream_detached":
if d.NextArg() {
return d.ArgErr()
}
h.StreamRetainOnReload = true
h.StreamDetached = true
case "stream_logs":
if d.NextArg() {

View File

@ -186,15 +186,17 @@ type Handler struct {
// by the previous config closing. Default: no delay.
StreamCloseDelay caddy.Duration `json:"stream_close_delay,omitempty"`
// If true, upgraded connections such as WebSockets are retained across
// config reloads when their upstream still exists in the new config.
// Connections using upstreams that are removed are closed during cleanup.
// By default this is false, preserving legacy behavior where upgraded
// connections are closed on reload (optionally delayed by stream_close_delay).
// Only http1.1 websocket connections are affected, websockets for h2/h3 are not affected.
// If true, bytes transferred for http1.1 in the access logs will be zero but those stats
// can be found in the stream logs for http1/2/3 regardless if this is enabled.
StreamRetainOnReload bool `json:"stream_retain_on_reload,omitempty"`
// If true, upgraded connections such as WebSockets are detached from
// the handler and retained across config reloads when their upstream
// still exists in the new config. Connections using upstreams that are
// removed are closed during cleanup. By default this is false, preserving
// legacy behavior where upgraded connections are closed on reload
// (optionally delayed by stream_close_delay).
// Only http1.1 websocket connections are affected, websockets for h2/h3
// are not affected. If true, bytes transferred for http1.1 in the access
// logs will be zero but those stats can be found in the stream logs for
// http1/2/3 regardless if this is enabled.
StreamDetached bool `json:"stream_detached,omitempty"`
// Controls logging behavior for upgraded stream lifecycle events.
// If omitted, defaults are used (level=DEBUG, logger_name="http.handlers.reverse_proxy.stream").
@ -299,7 +301,7 @@ func (h *Handler) Provision(ctx caddy.Context) error {
}
}
if h.StreamRetainOnReload {
if h.StreamDetached {
registerDetachedTunnelTrackers(h.tunnelTracker)
}
@ -535,7 +537,7 @@ func unregisterDetachedTunnelTrackers(ts *tunnelTracker) {
// Cleanup cleans up the resources made by h.
func (h *Handler) Cleanup() error {
// even if StreamRetainOnReload is true, extended connect websockets may still be running
// even if StreamDetached is true, extended connect websockets may still be running
err := h.tunnelTracker.cleanupAttachedConnections()
for _, upstream := range h.Upstreams {
// hosts.Delete returns deleted=true when the ref count reaches zero,

View File

@ -99,7 +99,7 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit
bufferSize := h.StreamBufferSize
streamTimeout := time.Duration(h.StreamTimeout)
if h.StreamRetainOnReload {
if h.StreamDetached {
// the return value should be true as it's not hijacked yet,
// but some middleware may wrap response writers incorrectly
if !caddyhttp.DetachResponseWriterAfterHijack(rw, true) {
@ -112,7 +112,7 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWrit
var (
conn io.ReadWriteCloser
brw *bufio.ReadWriter
detached = h.StreamRetainOnReload
detached = h.StreamDetached
)
// websocket over http2 or http3 if extended connect is enabled,
// assuming backend doesn't support this, the request will be

View File

@ -121,8 +121,8 @@ func TestHandlerCleanupLegacyModeClosesAllConnections(t *testing.T) {
ts.registerConnection(connB, nil, false, "b")
h := &Handler{
tunnelTracker: ts,
StreamRetainOnReload: false,
tunnelTracker: ts,
StreamDetached: false,
}
if err := h.Cleanup(); err != nil {
@ -139,8 +139,8 @@ func TestHandlerCleanupLegacyModeHonorsDelay(t *testing.T) {
ts.registerConnection(conn, nil, false, "a")
h := &Handler{
tunnelTracker: ts,
StreamRetainOnReload: false,
tunnelTracker: ts,
StreamDetached: false,
}
if err := h.Cleanup(); err != nil {
@ -157,7 +157,7 @@ func TestHandlerCleanupLegacyModeHonorsDelay(t *testing.T) {
}
}
func TestHandlerCleanupRetainModeClosesOnlyRemovedUpstreams(t *testing.T) {
func TestHandlerCleanupDetachedModeClosesOnlyRemovedUpstreams(t *testing.T) {
const upstreamA = "upstream-a"
const upstreamB = "upstream-b"
@ -180,8 +180,8 @@ func TestHandlerCleanupRetainModeClosesOnlyRemovedUpstreams(t *testing.T) {
ts.registerConnection(connB, nil, true, upstreamB)
h := &Handler{
tunnelTracker: ts,
StreamRetainOnReload: true,
tunnelTracker: ts,
StreamDetached: true,
Upstreams: UpstreamPool{
&Upstream{Dial: upstreamA},
&Upstream{Dial: upstreamB},
@ -193,7 +193,7 @@ func TestHandlerCleanupRetainModeClosesOnlyRemovedUpstreams(t *testing.T) {
}
if connA.isClosed() {
t.Fatal("connection for retained upstream should remain open")
t.Fatal("connection for detached upstream should remain open")
}
if !connB.isClosed() {
t.Fatal("connection for removed upstream should be closed")