From aed1af59763d54520a9c72f1fd0222d43904ebfd Mon Sep 17 00:00:00 2001 From: Daniil Sivak Date: Tue, 21 Apr 2026 21:59:31 +0300 Subject: [PATCH] reverseproxy: add `lb_retry_match` condition on response status (#7569) --- ...se_proxy_retry_match_oneline.caddyfiletest | 58 +++ ...e_proxy_retry_match_response.caddyfiletest | 147 ++++++ caddytest/integration/reverseproxy_test.go | 231 +++++++++ modules/caddyhttp/matchers.go | 8 + modules/caddyhttp/reverseproxy/caddyfile.go | 2 +- .../caddyhttp/reverseproxy/retries_test.go | 475 ++++++++++++++++++ .../caddyhttp/reverseproxy/reverseproxy.go | 116 ++++- replacer.go | 12 + 8 files changed, 1029 insertions(+), 20 deletions(-) create mode 100644 caddytest/integration/caddyfile_adapt/reverse_proxy_retry_match_oneline.caddyfiletest create mode 100644 caddytest/integration/caddyfile_adapt/reverse_proxy_retry_match_response.caddyfiletest diff --git a/caddytest/integration/caddyfile_adapt/reverse_proxy_retry_match_oneline.caddyfiletest b/caddytest/integration/caddyfile_adapt/reverse_proxy_retry_match_oneline.caddyfiletest new file mode 100644 index 000000000..8faed5220 --- /dev/null +++ b/caddytest/integration/caddyfile_adapt/reverse_proxy_retry_match_oneline.caddyfiletest @@ -0,0 +1,58 @@ +:8884 + +reverse_proxy 127.0.0.1:65535 { + lb_retries 3 + lb_retry_match expression `{rp.status_code} in [502, 503]` + lb_retry_match expression `{rp.is_transport_error} || {rp.status_code} == 502` + lb_retry_match expression `method('POST') && {rp.status_code} == 503` + lb_retry_match `{rp.status_code} == 504` + lb_retry_match `{rp.is_transport_error} && method('PUT')` +} +---------- +{ + "apps": { + "http": { + "servers": { + "srv0": { + "listen": [ + ":8884" + ], + "routes": [ + { + "handle": [ + { + "handler": "reverse_proxy", + "load_balancing": { + "retries": 3, + "retry_match": [ + { + "expression": "{http.reverse_proxy.status_code} in [502, 503]" + }, + { + "expression": "{http.reverse_proxy.is_transport_error} || {http.reverse_proxy.status_code} == 502" + }, + { + "expression": "method('POST') \u0026\u0026 {http.reverse_proxy.status_code} == 503" + }, + { + "expression": "{http.reverse_proxy.status_code} == 504" + }, + { + "expression": "{http.reverse_proxy.is_transport_error} \u0026\u0026 method('PUT')" + } + ] + }, + "upstreams": [ + { + "dial": "127.0.0.1:65535" + } + ] + } + ] + } + ] + } + } + } + } +} diff --git a/caddytest/integration/caddyfile_adapt/reverse_proxy_retry_match_response.caddyfiletest b/caddytest/integration/caddyfile_adapt/reverse_proxy_retry_match_response.caddyfiletest new file mode 100644 index 000000000..d5a1a9a40 --- /dev/null +++ b/caddytest/integration/caddyfile_adapt/reverse_proxy_retry_match_response.caddyfiletest @@ -0,0 +1,147 @@ +:8884 + +reverse_proxy 127.0.0.1:65535 { + lb_retries 5 + + # request matchers (backward-compatible, non-expression) + lb_retry_match { + method POST PUT + } + lb_retry_match { + path /foo* + } + lb_retry_match { + header X-Idempotency-Key * + } + + # response status code via expression + lb_retry_match { + expression `{rp.status_code} in [502, 503, 504]` + } + + # response header via expression + lb_retry_match { + expression `{rp.header.X-Retry} == "true"` + } + + # CEL request functions combined with response placeholders + lb_retry_match { + expression `method('POST') && {rp.status_code} >= 500` + } + lb_retry_match { + expression `path('/api*') && {rp.status_code} in [502, 503]` + } + lb_retry_match { + expression `host('example.com') && {rp.status_code} == 503` + } + lb_retry_match { + expression `query({'retry': 'true'}) && {rp.status_code} >= 500` + } + lb_retry_match { + expression `header({'X-Idempotency-Key': '*'}) && {rp.status_code} in [502, 503]` + } + lb_retry_match { + expression `protocol('https') && {rp.status_code} == 502` + } + lb_retry_match { + expression `path_regexp('^/api/v[0-9]+/') && {rp.status_code} >= 500` + } + lb_retry_match { + expression `header_regexp('Content-Type', '^application/json') && {rp.status_code} == 502` + } + + # transport error handling via placeholder + lb_retry_match { + expression `{rp.is_transport_error} || {rp.status_code} in [502, 503]` + } + lb_retry_match { + expression `{rp.is_transport_error} && method('POST')` + } +} +---------- +{ + "apps": { + "http": { + "servers": { + "srv0": { + "listen": [ + ":8884" + ], + "routes": [ + { + "handle": [ + { + "handler": "reverse_proxy", + "load_balancing": { + "retries": 5, + "retry_match": [ + { + "method": [ + "POST", + "PUT" + ] + }, + { + "path": [ + "/foo*" + ] + }, + { + "header": { + "X-Idempotency-Key": [ + "*" + ] + } + }, + { + "expression": "{http.reverse_proxy.status_code} in [502, 503, 504]" + }, + { + "expression": "{http.reverse_proxy.header.X-Retry} == \"true\"" + }, + { + "expression": "method('POST') \u0026\u0026 {http.reverse_proxy.status_code} \u003e= 500" + }, + { + "expression": "path('/api*') \u0026\u0026 {http.reverse_proxy.status_code} in [502, 503]" + }, + { + "expression": "host('example.com') \u0026\u0026 {http.reverse_proxy.status_code} == 503" + }, + { + "expression": "query({'retry': 'true'}) \u0026\u0026 {http.reverse_proxy.status_code} \u003e= 500" + }, + { + "expression": "header({'X-Idempotency-Key': '*'}) \u0026\u0026 {http.reverse_proxy.status_code} in [502, 503]" + }, + { + "expression": "protocol('https') \u0026\u0026 {http.reverse_proxy.status_code} == 502" + }, + { + "expression": "path_regexp('^/api/v[0-9]+/') \u0026\u0026 {http.reverse_proxy.status_code} \u003e= 500" + }, + { + "expression": "header_regexp('Content-Type', '^application/json') \u0026\u0026 {http.reverse_proxy.status_code} == 502" + }, + { + "expression": "{http.reverse_proxy.is_transport_error} || {http.reverse_proxy.status_code} in [502, 503]" + }, + { + "expression": "{http.reverse_proxy.is_transport_error} \u0026\u0026 method('POST')" + } + ] + }, + "upstreams": [ + { + "dial": "127.0.0.1:65535" + } + ] + } + ] + } + ] + } + } + } + } +} diff --git a/caddytest/integration/reverseproxy_test.go b/caddytest/integration/reverseproxy_test.go index 6e0b3dcff..cbccfd74f 100644 --- a/caddytest/integration/reverseproxy_test.go +++ b/caddytest/integration/reverseproxy_test.go @@ -7,6 +7,7 @@ import ( "os" "runtime" "strings" + "sync/atomic" "testing" "github.com/caddyserver/caddy/v2/caddytest" @@ -562,3 +563,233 @@ func TestReverseProxyHealthCheckUnixSocketWithoutPort(t *testing.T) { tester.AssertGetResponse("http://localhost:9080/", 200, "Hello, World!") } + +// TestReverseProxyRetryMatchStatusCode verifies that lb_retry_match with a +// CEL expression matching on {rp.status_code} causes the request to be +// retried on the next upstream when the first upstream returns a matching +// status code +func TestReverseProxyRetryMatchStatusCode(t *testing.T) { + // Bad upstream: returns 502 + badSrv := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadGateway) + }), + } + badLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + go badSrv.Serve(badLn) + t.Cleanup(func() { badSrv.Close(); badLn.Close() }) + + // Good upstream: returns 200 + goodSrv := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("ok")) + }), + } + goodLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + go goodSrv.Serve(goodLn) + t.Cleanup(func() { goodSrv.Close(); goodLn.Close() }) + + tester := caddytest.NewTester(t) + tester.InitServer(fmt.Sprintf(` + { + skip_install_trust + admin localhost:2999 + http_port 9080 + https_port 9443 + grace_period 1ns + } + http://localhost:9080 { + reverse_proxy %s %s { + lb_policy round_robin + lb_retries 1 + lb_retry_match { + expression `+"`{rp.status_code} in [502, 503]`"+` + } + } + } + `, goodLn.Addr().String(), badLn.Addr().String()), "caddyfile") + + tester.AssertGetResponse("http://localhost:9080/", 200, "ok") +} + +// TestReverseProxyRetryMatchHeader verifies that lb_retry_match with a CEL +// expression matching on {rp.header.*} causes the request to be retried when +// the upstream sets a matching response header +func TestReverseProxyRetryMatchHeader(t *testing.T) { + var badHits atomic.Int32 + + // Bad upstream: returns 200 but signals retry via header + badSrv := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + badHits.Add(1) + w.Header().Set("X-Upstream-Retry", "true") + w.Write([]byte("bad")) + }), + } + badLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + go badSrv.Serve(badLn) + t.Cleanup(func() { badSrv.Close(); badLn.Close() }) + + // Good upstream: returns 200 without retry header + goodSrv := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("good")) + }), + } + goodLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + go goodSrv.Serve(goodLn) + t.Cleanup(func() { goodSrv.Close(); goodLn.Close() }) + + tester := caddytest.NewTester(t) + tester.InitServer(fmt.Sprintf(` + { + skip_install_trust + admin localhost:2999 + http_port 9080 + https_port 9443 + grace_period 1ns + } + http://localhost:9080 { + reverse_proxy %s %s { + lb_policy round_robin + lb_retries 1 + lb_retry_match { + expression `+"`{rp.header.X-Upstream-Retry} == \"true\"`"+` + } + } + } + `, goodLn.Addr().String(), badLn.Addr().String()), "caddyfile") + + tester.AssertGetResponse("http://localhost:9080/", 200, "good") + + if badHits.Load() != 1 { + t.Errorf("bad upstream hits: got %d, want 1", badHits.Load()) + } +} + +// TestReverseProxyRetryMatchCombined verifies that a CEL expression combining +// request path matching with response status code matching works correctly - +// only retrying when both conditions are met +func TestReverseProxyRetryMatchCombined(t *testing.T) { + // Upstream: returns 502 for all requests + srv := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadGateway) + }), + } + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + go srv.Serve(ln) + t.Cleanup(func() { srv.Close(); ln.Close() }) + + // Good upstream + goodSrv := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("ok")) + }), + } + goodLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + go goodSrv.Serve(goodLn) + t.Cleanup(func() { goodSrv.Close(); goodLn.Close() }) + + tester := caddytest.NewTester(t) + tester.InitServer(fmt.Sprintf(` + { + skip_install_trust + admin localhost:2999 + http_port 9080 + https_port 9443 + grace_period 1ns + } + http://localhost:9080 { + reverse_proxy %s %s { + lb_policy round_robin + lb_retries 1 + lb_retry_match { + expression `+"`path('/retry*') && {rp.status_code} in [502, 503]`"+` + } + } + } + `, goodLn.Addr().String(), ln.Addr().String()), "caddyfile") + + // /retry path matches the expression - should retry to good upstream + tester.AssertGetResponse("http://localhost:9080/retry", 200, "ok") + + // /other path does NOT match - should return the 502 + req, _ := http.NewRequest(http.MethodGet, "http://localhost:9080/other", nil) + tester.AssertResponse(req, 502, "") +} + +// TestReverseProxyRetryMatchIsTransportError verifies that the +// {rp.is_transport_error} == true CEL function correctly identifies transport errors +// and allows retrying them alongside response-based matching +func TestReverseProxyRetryMatchIsTransportError(t *testing.T) { + // Good upstream: returns 200 + goodSrv := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("ok")) + }), + } + goodLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + go goodSrv.Serve(goodLn) + t.Cleanup(func() { goodSrv.Close(); goodLn.Close() }) + + // Broken upstream: accepts connections but closes immediately + brokenLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + t.Cleanup(func() { brokenLn.Close() }) + go func() { + for { + conn, err := brokenLn.Accept() + if err != nil { + return + } + conn.Close() + } + }() + + tester := caddytest.NewTester(t) + tester.InitServer(fmt.Sprintf(` + { + skip_install_trust + admin localhost:2999 + http_port 9080 + https_port 9443 + grace_period 1ns + } + http://localhost:9080 { + reverse_proxy %s %s { + lb_policy round_robin + lb_retries 1 + lb_retry_match { + expression `+"`{rp.is_transport_error} || {rp.status_code} in [502, 503]`"+` + } + } + } + `, goodLn.Addr().String(), brokenLn.Addr().String()), "caddyfile") + + // Transport error on broken upstream should be retried to good upstream + tester.AssertGetResponse("http://localhost:9080/", 200, "ok") +} diff --git a/modules/caddyhttp/matchers.go b/modules/caddyhttp/matchers.go index 27e5c5ae6..f179b9c11 100644 --- a/modules/caddyhttp/matchers.go +++ b/modules/caddyhttp/matchers.go @@ -1562,6 +1562,14 @@ func ParseCaddyfileNestedMatcherSet(d *caddyfile.Dispenser) (caddy.ModuleMap, er // instances of the matcher in this set tokensByMatcherName := make(map[string][]caddyfile.Token) for nesting := d.Nesting(); d.NextArg() || d.NextBlock(nesting); { + // if the token is quoted (backtick), treat it as a shorthand + // for an expression matcher, same as @named matcher parsing + if d.Token().Quoted() { + expressionToken := d.Token().Clone() + expressionToken.Text = "expression" + tokensByMatcherName["expression"] = append(tokensByMatcherName["expression"], expressionToken, d.Token()) + continue + } matcherName := d.Val() tokensByMatcherName[matcherName] = append(tokensByMatcherName[matcherName], d.NextSegment()...) } diff --git a/modules/caddyhttp/reverseproxy/caddyfile.go b/modules/caddyhttp/reverseproxy/caddyfile.go index a370a2873..8716babe3 100644 --- a/modules/caddyhttp/reverseproxy/caddyfile.go +++ b/modules/caddyhttp/reverseproxy/caddyfile.go @@ -67,7 +67,7 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error) // lb_retries // lb_try_duration // lb_try_interval -// lb_retry_match +// lb_retry_match // // # active health checking // health_uri diff --git a/modules/caddyhttp/reverseproxy/retries_test.go b/modules/caddyhttp/reverseproxy/retries_test.go index 056223d4c..b0f78bac0 100644 --- a/modules/caddyhttp/reverseproxy/retries_test.go +++ b/modules/caddyhttp/reverseproxy/retries_test.go @@ -1,6 +1,7 @@ package reverseproxy import ( + "context" "errors" "io" "net" @@ -8,11 +9,13 @@ import ( "net/http/httptest" "strings" "sync" + "sync/atomic" "testing" "go.uber.org/zap" "github.com/caddyserver/caddy/v2" + "github.com/caddyserver/caddy/v2/caddyconfig/caddyfile" "github.com/caddyserver/caddy/v2/modules/caddyhttp" ) @@ -255,3 +258,475 @@ func TestDialErrorBodyRetry(t *testing.T) { }) } } + +// newExpressionMatcher provisions a MatchExpression for use in tests +func newExpressionMatcher(t *testing.T, expr string) *caddyhttp.MatchExpression { + t.Helper() + ctx, cancel := caddy.NewContext(caddy.Context{Context: context.Background()}) + t.Cleanup(cancel) + m := &caddyhttp.MatchExpression{Expr: expr} + if err := m.Provision(ctx); err != nil { + t.Fatalf("failed to provision expression %q: %v", expr, err) + } + return m +} + +// minimalHandlerWithRetryMatch is like minimalHandler but also configures +// RetryMatch so that response-based retry can be tested +func minimalHandlerWithRetryMatch(retries int, retryMatch caddyhttp.MatcherSets, upstreams ...*Upstream) *Handler { + h := minimalHandler(retries, upstreams...) + h.LoadBalancing.RetryMatch = retryMatch + return h +} + +// TestResponseRetryStatusCode verifies that when an upstream returns a status +// code matching a retry_match expression, the request is retried on the next +// upstream +func TestResponseRetryStatusCode(t *testing.T) { + // Bad upstream: returns 502 + badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadGateway) + })) + t.Cleanup(badServer.Close) + + // Good upstream: returns 200 + goodServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("ok")) + })) + t.Cleanup(goodServer.Close) + + retryMatch := caddyhttp.MatcherSets{ + caddyhttp.MatcherSet{ + newExpressionMatcher(t, "{http.reverse_proxy.status_code} in [502, 503]"), + }, + } + + // RoundRobin picks index 1 first, then 0 + upstreams := []*Upstream{ + {Host: new(Host), Dial: goodServer.Listener.Addr().String()}, + {Host: new(Host), Dial: badServer.Listener.Addr().String()}, + } + + h := minimalHandlerWithRetryMatch(1, retryMatch, upstreams...) + + req := httptest.NewRequest(http.MethodGet, "http://example.com/", nil) + req = prepareTestRequest(req) + rec := httptest.NewRecorder() + + err := h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error { + return nil + })) + + gotStatus := rec.Code + if err != nil { + if herr, ok := err.(caddyhttp.HandlerError); ok { + gotStatus = herr.StatusCode + } + } + + if gotStatus != http.StatusOK { + t.Errorf("status: got %d, want %d (err=%v)", gotStatus, http.StatusOK, err) + } +} + +// TestResponseRetryHeader verifies that response header matching triggers +// retries via a CEL expression checking {rp.header.*} +func TestResponseRetryHeader(t *testing.T) { + // Bad upstream: returns 200 but with X-Upstream-Retry header + badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Upstream-Retry", "true") + w.WriteHeader(http.StatusOK) + w.Write([]byte("bad")) + })) + t.Cleanup(badServer.Close) + + // Good upstream: returns 200 without retry header + goodServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("good")) + })) + t.Cleanup(goodServer.Close) + + retryMatch := caddyhttp.MatcherSets{ + caddyhttp.MatcherSet{ + newExpressionMatcher(t, `{http.reverse_proxy.header.X-Upstream-Retry} == "true"`), + }, + } + + // RoundRobin picks index 1 first, then 0 + upstreams := []*Upstream{ + {Host: new(Host), Dial: goodServer.Listener.Addr().String()}, + {Host: new(Host), Dial: badServer.Listener.Addr().String()}, + } + + h := minimalHandlerWithRetryMatch(1, retryMatch, upstreams...) + + req := httptest.NewRequest(http.MethodGet, "http://example.com/", nil) + req = prepareTestRequest(req) + rec := httptest.NewRecorder() + + err := h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error { + return nil + })) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if rec.Code != http.StatusOK { + t.Errorf("status: got %d, want %d", rec.Code, http.StatusOK) + } + if rec.Body.String() != "good" { + t.Errorf("body: got %q, want %q (retried to wrong upstream)", rec.Body.String(), "good") + } +} + +// TestResponseRetryNoMatchNoRetry verifies that when no retry_match entries +// match the response, the original response is returned without retrying +func TestResponseRetryNoMatchNoRetry(t *testing.T) { + var hits atomic.Int32 + + // Server that returns 500 - but retry_match only matches 502/503 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + hits.Add(1) + w.WriteHeader(http.StatusInternalServerError) + })) + t.Cleanup(server.Close) + + retryMatch := caddyhttp.MatcherSets{ + caddyhttp.MatcherSet{ + newExpressionMatcher(t, "{http.reverse_proxy.status_code} in [502, 503]"), + }, + } + + upstreams := []*Upstream{ + {Host: new(Host), Dial: server.Listener.Addr().String()}, + {Host: new(Host), Dial: server.Listener.Addr().String()}, + } + + h := minimalHandlerWithRetryMatch(2, retryMatch, upstreams...) + + req := httptest.NewRequest(http.MethodGet, "http://example.com/", nil) + req = prepareTestRequest(req) + rec := httptest.NewRecorder() + + _ = h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error { + return nil + })) + + // Only one hit - no retry since 500 doesn't match [502, 503] + if hits.Load() != 1 { + t.Errorf("upstream hits: got %d, want 1 (should not have retried)", hits.Load()) + } +} + +// TestResponseRetryExhaustedPreservesStatusCode verifies that when retries +// are exhausted, the actual upstream status code (e.g. 503) is reported +// to the client, not a generic 502 +func TestResponseRetryExhaustedPreservesStatusCode(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) // 503 + })) + t.Cleanup(server.Close) + + retryMatch := caddyhttp.MatcherSets{ + caddyhttp.MatcherSet{ + newExpressionMatcher(t, "{http.reverse_proxy.status_code} == 503"), + }, + } + + upstreams := []*Upstream{ + {Host: new(Host), Dial: server.Listener.Addr().String()}, + {Host: new(Host), Dial: server.Listener.Addr().String()}, + } + + h := minimalHandlerWithRetryMatch(1, retryMatch, upstreams...) + + req := httptest.NewRequest(http.MethodGet, "http://example.com/", nil) + req = prepareTestRequest(req) + rec := httptest.NewRecorder() + + err := h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error { + return nil + })) + + gotStatus := rec.Code + if err != nil { + if herr, ok := err.(caddyhttp.HandlerError); ok { + gotStatus = herr.StatusCode + } + } + + // Must return 503 (actual upstream status), not 502 (generic proxy error) + if gotStatus != http.StatusServiceUnavailable { + t.Errorf("status: got %d, want %d (status code not preserved)", gotStatus, http.StatusServiceUnavailable) + } +} + +// TestResponseRetryHeaderCleanup verifies that stale response header +// placeholders from a previous upstream attempt are cleaned up before the +// next retry evaluation. Without cleanup, a header like X-Retry: true from +// upstream A would leak into the retry match for upstream B even if B does +// not set that header +func TestResponseRetryHeaderCleanup(t *testing.T) { + // First upstream: returns 200 with X-Retry header (triggers retry) + firstServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Retry", "true") + w.WriteHeader(http.StatusOK) + w.Write([]byte("first")) + })) + t.Cleanup(firstServer.Close) + + // Second upstream: returns 200 WITHOUT X-Retry header (should NOT retry) + secondServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("second")) + })) + t.Cleanup(secondServer.Close) + + retryMatch := caddyhttp.MatcherSets{ + caddyhttp.MatcherSet{ + newExpressionMatcher(t, `{http.reverse_proxy.header.X-Retry} == "true"`), + }, + } + + // RoundRobin picks index 1 first, then 0 + upstreams := []*Upstream{ + {Host: new(Host), Dial: secondServer.Listener.Addr().String()}, + {Host: new(Host), Dial: firstServer.Listener.Addr().String()}, + } + + h := minimalHandlerWithRetryMatch(2, retryMatch, upstreams...) + + req := httptest.NewRequest(http.MethodGet, "http://example.com/", nil) + req = prepareTestRequest(req) + rec := httptest.NewRecorder() + + err := h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error { + return nil + })) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Should get "second" - the first upstream's X-Retry header must not + // leak into the second upstream's retry evaluation + if rec.Body.String() != "second" { + t.Errorf("body: got %q, want %q (stale header leaked between retries)", rec.Body.String(), "second") + } +} + +// TestRequestOnlyMatcherDoesNotRetryResponses verifies that a pure request +// matcher like method PUT in lb_retry_match does NOT trigger response-based +// retries. Only expression matchers (which can reference response data) +// should trigger response retries +func TestRequestOnlyMatcherDoesNotRetryResponses(t *testing.T) { + var hits atomic.Int32 + + // Server returns 200 OK for all requests + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + hits.Add(1) + w.WriteHeader(http.StatusOK) + w.Write([]byte("ok")) + })) + t.Cleanup(server.Close) + + // method PUT matcher - should NOT trigger response retries + retryMatch := caddyhttp.MatcherSets{ + caddyhttp.MatcherSet{ + caddyhttp.MatchMethod{"PUT"}, + }, + } + + upstreams := []*Upstream{ + {Host: new(Host), Dial: server.Listener.Addr().String()}, + {Host: new(Host), Dial: server.Listener.Addr().String()}, + } + + h := minimalHandlerWithRetryMatch(2, retryMatch, upstreams...) + + req := httptest.NewRequest(http.MethodPut, "http://example.com/", nil) + req = prepareTestRequest(req) + rec := httptest.NewRecorder() + + err := h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error { + return nil + })) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Should hit only once - no retry for 200 OK even though method matches + if hits.Load() != 1 { + t.Errorf("upstream hits: got %d, want 1 (should not retry successful responses)", hits.Load()) + } + if rec.Code != http.StatusOK { + t.Errorf("status: got %d, want %d", rec.Code, http.StatusOK) + } +} + +// brokenUpstreamAddr returns the address of a TCP listener that accepts +// connections but immediately closes them, causing a transport error (not +// a dial error). This simulates an upstream that is reachable but broken +func brokenUpstreamAddr(t *testing.T) string { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + t.Cleanup(func() { ln.Close() }) + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + conn.Close() + } + }() + return ln.Addr().String() +} + +// TestTransportErrorPlaceholder verifies that the is_transport_error +// placeholder is set to true during transport error evaluation in tryAgain() +// and that expression matchers using {rp.is_transport_error} can match it +func TestTransportErrorPlaceholder(t *testing.T) { + broken := brokenUpstreamAddr(t) + + goodServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("ok")) + })) + t.Cleanup(goodServer.Close) + + retryMatch := caddyhttp.MatcherSets{ + caddyhttp.MatcherSet{ + newExpressionMatcher(t, "{http.reverse_proxy.is_transport_error} == true"), + }, + } + + // RoundRobin picks index 1 first (broken), then 0 (good) + upstreams := []*Upstream{ + {Host: new(Host), Dial: goodServer.Listener.Addr().String()}, + {Host: new(Host), Dial: broken}, + } + + h := minimalHandlerWithRetryMatch(1, retryMatch, upstreams...) + + req := httptest.NewRequest(http.MethodPost, "http://example.com/", nil) + req = prepareTestRequest(req) + rec := httptest.NewRecorder() + + err := h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error { + return nil + })) + + gotStatus := rec.Code + if err != nil { + if herr, ok := err.(caddyhttp.HandlerError); ok { + gotStatus = herr.StatusCode + } + } + + // POST transport error should be retried because is_transport_error matched + if gotStatus != http.StatusOK { + t.Errorf("status: got %d, want %d (transport error should have been retried)", gotStatus, http.StatusOK) + } +} + +// TestTransportErrorPlaceholderNotSetForResponses verifies that the +// is_transport_error placeholder is NOT set when evaluating response +// matchers, so {rp.is_transport_error} is false for response retries +func TestTransportErrorPlaceholderNotSetForResponses(t *testing.T) { + var hits atomic.Int32 + + // Server returns 502 - but the matcher only checks is_transport_error + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + hits.Add(1) + w.WriteHeader(http.StatusBadGateway) + })) + t.Cleanup(server.Close) + + // Only matches transport errors, not response errors + retryMatch := caddyhttp.MatcherSets{ + caddyhttp.MatcherSet{ + newExpressionMatcher(t, "{http.reverse_proxy.is_transport_error} == true"), + }, + } + + upstreams := []*Upstream{ + {Host: new(Host), Dial: server.Listener.Addr().String()}, + {Host: new(Host), Dial: server.Listener.Addr().String()}, + } + + h := minimalHandlerWithRetryMatch(2, retryMatch, upstreams...) + + req := httptest.NewRequest(http.MethodGet, "http://example.com/", nil) + req = prepareTestRequest(req) + rec := httptest.NewRecorder() + + _ = h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error { + return nil + })) + + // Should hit only once - is_transport_error is false during response + // evaluation so the 502 is NOT retried + if hits.Load() != 1 { + t.Errorf("upstream hits: got %d, want 1 (is_transport_error should be false for responses)", hits.Load()) + } +} + +// TestRetryMatchAllowsExpressionMixedWithOtherMatchers verifies that +// lb_retry_match accepts a block mixing expression with other matchers +func TestRetryMatchAllowsExpressionMixedWithOtherMatchers(t *testing.T) { + tests := []struct { + name string + input string + }{ + { + name: "expression alone", + input: `reverse_proxy localhost:9080 { + lb_retry_match { + expression ` + "`{rp.status_code} in [502, 503]`" + ` + } + }`, + }, + { + name: "method alone", + input: `reverse_proxy localhost:9080 { + lb_retry_match { + method PUT + } + }`, + }, + { + name: "expression mixed with method", + input: `reverse_proxy localhost:9080 { + lb_retry_match { + method POST + expression ` + "`{rp.status_code} in [502, 503]`" + ` + } + }`, + }, + { + name: "expression mixed with path", + input: `reverse_proxy localhost:9080 { + lb_retry_match { + path /api* + expression ` + "`{rp.status_code} == 502`" + ` + } + }`, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + h := &Handler{} + d := caddyfile.NewTestDispenser(tc.input) + err := h.UnmarshalCaddyfile(d) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + } +} diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index 3b9b56a05..52d2b1ab3 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -670,8 +670,12 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h return true, succ.error } - // remember this failure (if enabled) - h.countFailure(upstream) + // remember this failure (if enabled); response-based retries + // are not counted as failures since the upstream did respond + // successfully - only the response content triggered a retry + if _, isRetryableResponse := proxyErr.(retryableResponseError); !isRetryableResponse { + h.countFailure(upstream) + } // if we've tried long enough, break if !h.LoadBalancing.tryAgain(h.ctx, start, retries, proxyErr, r, h.logger) { @@ -1055,6 +1059,45 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe res.Body, _ = h.bufferedBody(res.Body, h.ResponseBuffers) } + // set response placeholders so they can be used in retry match + // expressions and handle_response routes; clear stale header + // placeholders from a previous attempt first so they don't + // leak into the next retry evaluation + repl.DeleteByPrefix("http.reverse_proxy.header.") + for field, value := range res.Header { + repl.Set("http.reverse_proxy.header."+field, strings.Join(value, ",")) + } + repl.Set("http.reverse_proxy.status_code", res.StatusCode) + repl.Set("http.reverse_proxy.status_text", res.Status) + + // check if the response matches a retry match entry; if so, + // close the body and return a retryable error so the request + // is retried with the next upstream. Only evaluate matcher sets + // that contain at least one expression matcher, since those are + // the ones that can reference response data ({rp.status_code}, + // {rp.header.*}). Pure request-only matchers (method, path, etc.) + // are skipped to avoid retrying every response that matches a + // request condition + if h.LoadBalancing != nil && len(h.LoadBalancing.RetryMatch) > 0 { + for _, matcherSet := range h.LoadBalancing.RetryMatch { + if !matcherSetHasExpressionMatcher(matcherSet) { + continue + } + match, err := matcherSet.MatchWithError(req) + if err != nil { + h.logger.Error("error matching request for retry", zap.Error(err)) + break + } + if match { + res.Body.Close() + return retryableResponseError{ + error: fmt.Errorf("upstream response matched retry_match (status %d)", res.StatusCode), + statusCode: res.StatusCode, + } + } + } + } + // see if any response handler is configured for this response from the backend for i, rh := range h.HandleResponse { if rh.Match != nil && !rh.Match.Match(res.StatusCode, res.Header) { @@ -1074,14 +1117,6 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe break } - // set up the replacer so that parts of the original response can be - // used for routing decisions - for field, value := range res.Header { - repl.Set("http.reverse_proxy.header."+field, strings.Join(value, ",")) - } - repl.Set("http.reverse_proxy.status_code", res.StatusCode) - repl.Set("http.reverse_proxy.status_text", res.Status) - if c := logger.Check(zapcore.DebugLevel, "handling response"); c != nil { c.Write(zap.Int("handler", i)) } @@ -1266,18 +1301,29 @@ func (lb LoadBalancing) tryAgain(ctx caddy.Context, start time.Time, retries int // specifically a dialer error, we need to be careful if proxyErr != nil { _, isDialError := proxyErr.(DialError) + _, isRetryableResponse := proxyErr.(retryableResponseError) herr, isHandlerError := proxyErr.(caddyhttp.HandlerError) // if the error occurred after a connection was established, // we have to assume the upstream received the request, and // retries need to be carefully decided, because some requests - // are not idempotent - if !isDialError && (!isHandlerError || !errors.Is(herr, errNoUpstream)) { + // are not idempotent; retryableResponseError is excluded here + // because its retry decision was already made in reverseProxy() + // when the response matchers were evaluated + if !isDialError && !isRetryableResponse && (!isHandlerError || !errors.Is(herr, errNoUpstream)) { if lb.RetryMatch == nil && req.Method != "GET" { // by default, don't retry requests if they aren't GET return false } + // set transport error flag so CEL expressions can use + // {rp.is_transport_error} to decide whether to retry + repl, _ := req.Context().Value(caddy.ReplacerCtxKey).(*caddy.Replacer) + if repl != nil { + repl.Set("http.reverse_proxy.is_transport_error", true) + defer repl.Delete("http.reverse_proxy.is_transport_error") + } + match, err := lb.RetryMatch.AnyMatchWithError(req) if err != nil { logger.Error("error matching request for retry", zap.Error(err)) @@ -1507,6 +1553,12 @@ func removeConnectionHeaders(h http.Header) { // statusError returns an error value that has a status code. func statusError(err error) error { + // if a response-based retry was exhausted, use the actual upstream + // status code instead of a generic 502 + if rre, ok := err.(retryableResponseError); ok { + return caddyhttp.Error(rre.statusCode, err) + } + // errors proxying usually mean there is a problem with the upstream(s) statusCode := http.StatusBadGateway @@ -1558,13 +1610,15 @@ type LoadBalancing struct { // to spin if all backends are down and latency is very low. TryInterval caddy.Duration `json:"try_interval,omitempty"` - // A list of matcher sets that restricts with which requests retries are - // allowed. A request must match any of the given matcher sets in order - // to be retried if the connection to the upstream succeeded but the - // subsequent round-trip failed. If the connection to the upstream failed, - // a retry is always allowed. If unspecified, only GET requests will be - // allowed to be retried. Note that a retry is done with the next available - // host according to the load balancing policy. + // A list of matcher sets that controls retry behavior. Matcher sets + // without expression matchers (e.g. method, path) restrict which + // requests are retried on transport errors - if unspecified, only + // GET requests will be retried. Matcher sets with CEL expression + // matchers are evaluated against upstream responses and can + // reference {rp.status_code}, {rp.header.*}, and + // {rp.is_transport_error}. Dial errors are always retried + // regardless of this setting. Retries use the next available + // upstream per the load balancing policy RetryMatchRaw caddyhttp.RawMatcherSets `json:"retry_match,omitempty" caddy:"namespace=http.matchers"` SelectionPolicy Selector `json:"-"` @@ -1662,10 +1716,34 @@ type RequestHeaderOpsTransport interface { RequestHeaderOps() *headers.HeaderOps } +// matcherSetHasExpressionMatcher reports whether a matcher set contains +// at least one expression matcher. Expression matchers can reference +// response data via placeholders like {rp.status_code}. Matcher sets +// without expression matchers only test request properties and should +// not be evaluated for response-based retry decisions +func matcherSetHasExpressionMatcher(matcherSet caddyhttp.MatcherSet) bool { + for _, m := range matcherSet { + if _, ok := m.(*caddyhttp.MatchExpression); ok { + return true + } + } + return false +} + // roundtripSucceededError is an error type that is returned if the // roundtrip succeeded, but an error occurred after-the-fact. type roundtripSucceededError struct{ error } +// retryableResponseError is returned when the upstream response matched +// a retry_match entry, indicating the request should be retried with the +// next upstream. It preserves the original status code so that if retries +// are exhausted, the actual upstream status is reported instead of a +// generic 502 +type retryableResponseError struct { + error + statusCode int +} + // bodyReadCloser is a reader that, upon closing, will return // its buffer to the pool and close the underlying body reader. type bodyReadCloser struct { diff --git a/replacer.go b/replacer.go index 1a2aa5771..2ab02b602 100644 --- a/replacer.go +++ b/replacer.go @@ -121,6 +121,18 @@ func (r *Replacer) Delete(variable string) { r.mapMutex.Unlock() } +// DeleteByPrefix removes all static variables with +// keys starting with the given prefix +func (r *Replacer) DeleteByPrefix(prefix string) { + r.mapMutex.Lock() + for key := range r.static { + if strings.HasPrefix(key, prefix) { + delete(r.static, key) + } + } + r.mapMutex.Unlock() +} + // fromStatic provides values from r.static. func (r *Replacer) fromStatic(key string) (any, bool) { r.mapMutex.RLock()