mirror of
https://github.com/caddyserver/caddy.git
synced 2026-04-23 17:39:33 -04:00
reverseproxy: add lb_retry_match condition on response status (#7569)
Some checks failed
Tests / test (./cmd/caddy/caddy, ~1.26.0, ubuntu-latest, 0, 1.26, linux) (push) Failing after 15m2s
Tests / test (s390x on IBM Z) (push) Has been skipped
Tests / goreleaser-check (push) Has been skipped
Cross-Build / build (~1.26.0, 1.26, aix) (push) Failing after 11m36s
Cross-Build / build (~1.26.0, 1.26, darwin) (push) Successful in 1m32s
Cross-Build / build (~1.26.0, 1.26, dragonfly) (push) Failing after 2m3s
Cross-Build / build (~1.26.0, 1.26, freebsd) (push) Failing after 2m2s
Cross-Build / build (~1.26.0, 1.26, illumos) (push) Failing after 11m12s
Cross-Build / build (~1.26.0, 1.26, linux) (push) Successful in 6m45s
Cross-Build / build (~1.26.0, 1.26, netbsd) (push) Failing after 2m3s
Cross-Build / build (~1.26.0, 1.26, openbsd) (push) Successful in 1m57s
Cross-Build / build (~1.26.0, 1.26, solaris) (push) Failing after 56s
Cross-Build / build (~1.26.0, 1.26, windows) (push) Failing after 51s
Lint / lint (ubuntu-latest, linux) (push) Failing after 1m24s
Lint / govulncheck (push) Failing after 1m51s
Lint / dependency-review (push) Failing after 3m55s
OpenSSF Scorecard supply-chain security / Scorecard analysis (push) Failing after 20s
Tests / test (./cmd/caddy/caddy, ~1.26.0, macos-14, 0, 1.26, mac) (push) Has been cancelled
Tests / test (./cmd/caddy/caddy.exe, ~1.26.0, windows-latest, True, 1.26, windows) (push) Has been cancelled
Lint / lint (macos-14, mac) (push) Has been cancelled
Lint / lint (windows-latest, windows) (push) Has been cancelled
Some checks failed
Tests / test (./cmd/caddy/caddy, ~1.26.0, ubuntu-latest, 0, 1.26, linux) (push) Failing after 15m2s
Tests / test (s390x on IBM Z) (push) Has been skipped
Tests / goreleaser-check (push) Has been skipped
Cross-Build / build (~1.26.0, 1.26, aix) (push) Failing after 11m36s
Cross-Build / build (~1.26.0, 1.26, darwin) (push) Successful in 1m32s
Cross-Build / build (~1.26.0, 1.26, dragonfly) (push) Failing after 2m3s
Cross-Build / build (~1.26.0, 1.26, freebsd) (push) Failing after 2m2s
Cross-Build / build (~1.26.0, 1.26, illumos) (push) Failing after 11m12s
Cross-Build / build (~1.26.0, 1.26, linux) (push) Successful in 6m45s
Cross-Build / build (~1.26.0, 1.26, netbsd) (push) Failing after 2m3s
Cross-Build / build (~1.26.0, 1.26, openbsd) (push) Successful in 1m57s
Cross-Build / build (~1.26.0, 1.26, solaris) (push) Failing after 56s
Cross-Build / build (~1.26.0, 1.26, windows) (push) Failing after 51s
Lint / lint (ubuntu-latest, linux) (push) Failing after 1m24s
Lint / govulncheck (push) Failing after 1m51s
Lint / dependency-review (push) Failing after 3m55s
OpenSSF Scorecard supply-chain security / Scorecard analysis (push) Failing after 20s
Tests / test (./cmd/caddy/caddy, ~1.26.0, macos-14, 0, 1.26, mac) (push) Has been cancelled
Tests / test (./cmd/caddy/caddy.exe, ~1.26.0, windows-latest, True, 1.26, windows) (push) Has been cancelled
Lint / lint (macos-14, mac) (push) Has been cancelled
Lint / lint (windows-latest, windows) (push) Has been cancelled
This commit is contained in:
parent
4430756d5c
commit
aed1af5976
@ -0,0 +1,58 @@
|
||||
:8884
|
||||
|
||||
reverse_proxy 127.0.0.1:65535 {
|
||||
lb_retries 3
|
||||
lb_retry_match expression `{rp.status_code} in [502, 503]`
|
||||
lb_retry_match expression `{rp.is_transport_error} || {rp.status_code} == 502`
|
||||
lb_retry_match expression `method('POST') && {rp.status_code} == 503`
|
||||
lb_retry_match `{rp.status_code} == 504`
|
||||
lb_retry_match `{rp.is_transport_error} && method('PUT')`
|
||||
}
|
||||
----------
|
||||
{
|
||||
"apps": {
|
||||
"http": {
|
||||
"servers": {
|
||||
"srv0": {
|
||||
"listen": [
|
||||
":8884"
|
||||
],
|
||||
"routes": [
|
||||
{
|
||||
"handle": [
|
||||
{
|
||||
"handler": "reverse_proxy",
|
||||
"load_balancing": {
|
||||
"retries": 3,
|
||||
"retry_match": [
|
||||
{
|
||||
"expression": "{http.reverse_proxy.status_code} in [502, 503]"
|
||||
},
|
||||
{
|
||||
"expression": "{http.reverse_proxy.is_transport_error} || {http.reverse_proxy.status_code} == 502"
|
||||
},
|
||||
{
|
||||
"expression": "method('POST') \u0026\u0026 {http.reverse_proxy.status_code} == 503"
|
||||
},
|
||||
{
|
||||
"expression": "{http.reverse_proxy.status_code} == 504"
|
||||
},
|
||||
{
|
||||
"expression": "{http.reverse_proxy.is_transport_error} \u0026\u0026 method('PUT')"
|
||||
}
|
||||
]
|
||||
},
|
||||
"upstreams": [
|
||||
{
|
||||
"dial": "127.0.0.1:65535"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,147 @@
|
||||
:8884
|
||||
|
||||
reverse_proxy 127.0.0.1:65535 {
|
||||
lb_retries 5
|
||||
|
||||
# request matchers (backward-compatible, non-expression)
|
||||
lb_retry_match {
|
||||
method POST PUT
|
||||
}
|
||||
lb_retry_match {
|
||||
path /foo*
|
||||
}
|
||||
lb_retry_match {
|
||||
header X-Idempotency-Key *
|
||||
}
|
||||
|
||||
# response status code via expression
|
||||
lb_retry_match {
|
||||
expression `{rp.status_code} in [502, 503, 504]`
|
||||
}
|
||||
|
||||
# response header via expression
|
||||
lb_retry_match {
|
||||
expression `{rp.header.X-Retry} == "true"`
|
||||
}
|
||||
|
||||
# CEL request functions combined with response placeholders
|
||||
lb_retry_match {
|
||||
expression `method('POST') && {rp.status_code} >= 500`
|
||||
}
|
||||
lb_retry_match {
|
||||
expression `path('/api*') && {rp.status_code} in [502, 503]`
|
||||
}
|
||||
lb_retry_match {
|
||||
expression `host('example.com') && {rp.status_code} == 503`
|
||||
}
|
||||
lb_retry_match {
|
||||
expression `query({'retry': 'true'}) && {rp.status_code} >= 500`
|
||||
}
|
||||
lb_retry_match {
|
||||
expression `header({'X-Idempotency-Key': '*'}) && {rp.status_code} in [502, 503]`
|
||||
}
|
||||
lb_retry_match {
|
||||
expression `protocol('https') && {rp.status_code} == 502`
|
||||
}
|
||||
lb_retry_match {
|
||||
expression `path_regexp('^/api/v[0-9]+/') && {rp.status_code} >= 500`
|
||||
}
|
||||
lb_retry_match {
|
||||
expression `header_regexp('Content-Type', '^application/json') && {rp.status_code} == 502`
|
||||
}
|
||||
|
||||
# transport error handling via placeholder
|
||||
lb_retry_match {
|
||||
expression `{rp.is_transport_error} || {rp.status_code} in [502, 503]`
|
||||
}
|
||||
lb_retry_match {
|
||||
expression `{rp.is_transport_error} && method('POST')`
|
||||
}
|
||||
}
|
||||
----------
|
||||
{
|
||||
"apps": {
|
||||
"http": {
|
||||
"servers": {
|
||||
"srv0": {
|
||||
"listen": [
|
||||
":8884"
|
||||
],
|
||||
"routes": [
|
||||
{
|
||||
"handle": [
|
||||
{
|
||||
"handler": "reverse_proxy",
|
||||
"load_balancing": {
|
||||
"retries": 5,
|
||||
"retry_match": [
|
||||
{
|
||||
"method": [
|
||||
"POST",
|
||||
"PUT"
|
||||
]
|
||||
},
|
||||
{
|
||||
"path": [
|
||||
"/foo*"
|
||||
]
|
||||
},
|
||||
{
|
||||
"header": {
|
||||
"X-Idempotency-Key": [
|
||||
"*"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"expression": "{http.reverse_proxy.status_code} in [502, 503, 504]"
|
||||
},
|
||||
{
|
||||
"expression": "{http.reverse_proxy.header.X-Retry} == \"true\""
|
||||
},
|
||||
{
|
||||
"expression": "method('POST') \u0026\u0026 {http.reverse_proxy.status_code} \u003e= 500"
|
||||
},
|
||||
{
|
||||
"expression": "path('/api*') \u0026\u0026 {http.reverse_proxy.status_code} in [502, 503]"
|
||||
},
|
||||
{
|
||||
"expression": "host('example.com') \u0026\u0026 {http.reverse_proxy.status_code} == 503"
|
||||
},
|
||||
{
|
||||
"expression": "query({'retry': 'true'}) \u0026\u0026 {http.reverse_proxy.status_code} \u003e= 500"
|
||||
},
|
||||
{
|
||||
"expression": "header({'X-Idempotency-Key': '*'}) \u0026\u0026 {http.reverse_proxy.status_code} in [502, 503]"
|
||||
},
|
||||
{
|
||||
"expression": "protocol('https') \u0026\u0026 {http.reverse_proxy.status_code} == 502"
|
||||
},
|
||||
{
|
||||
"expression": "path_regexp('^/api/v[0-9]+/') \u0026\u0026 {http.reverse_proxy.status_code} \u003e= 500"
|
||||
},
|
||||
{
|
||||
"expression": "header_regexp('Content-Type', '^application/json') \u0026\u0026 {http.reverse_proxy.status_code} == 502"
|
||||
},
|
||||
{
|
||||
"expression": "{http.reverse_proxy.is_transport_error} || {http.reverse_proxy.status_code} in [502, 503]"
|
||||
},
|
||||
{
|
||||
"expression": "{http.reverse_proxy.is_transport_error} \u0026\u0026 method('POST')"
|
||||
}
|
||||
]
|
||||
},
|
||||
"upstreams": [
|
||||
{
|
||||
"dial": "127.0.0.1:65535"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -7,6 +7,7 @@ import (
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/caddyserver/caddy/v2/caddytest"
|
||||
@ -562,3 +563,233 @@ func TestReverseProxyHealthCheckUnixSocketWithoutPort(t *testing.T) {
|
||||
|
||||
tester.AssertGetResponse("http://localhost:9080/", 200, "Hello, World!")
|
||||
}
|
||||
|
||||
// TestReverseProxyRetryMatchStatusCode verifies that lb_retry_match with a
|
||||
// CEL expression matching on {rp.status_code} causes the request to be
|
||||
// retried on the next upstream when the first upstream returns a matching
|
||||
// status code
|
||||
func TestReverseProxyRetryMatchStatusCode(t *testing.T) {
|
||||
// Bad upstream: returns 502
|
||||
badSrv := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
}),
|
||||
}
|
||||
badLn, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
go badSrv.Serve(badLn)
|
||||
t.Cleanup(func() { badSrv.Close(); badLn.Close() })
|
||||
|
||||
// Good upstream: returns 200
|
||||
goodSrv := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte("ok"))
|
||||
}),
|
||||
}
|
||||
goodLn, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
go goodSrv.Serve(goodLn)
|
||||
t.Cleanup(func() { goodSrv.Close(); goodLn.Close() })
|
||||
|
||||
tester := caddytest.NewTester(t)
|
||||
tester.InitServer(fmt.Sprintf(`
|
||||
{
|
||||
skip_install_trust
|
||||
admin localhost:2999
|
||||
http_port 9080
|
||||
https_port 9443
|
||||
grace_period 1ns
|
||||
}
|
||||
http://localhost:9080 {
|
||||
reverse_proxy %s %s {
|
||||
lb_policy round_robin
|
||||
lb_retries 1
|
||||
lb_retry_match {
|
||||
expression `+"`{rp.status_code} in [502, 503]`"+`
|
||||
}
|
||||
}
|
||||
}
|
||||
`, goodLn.Addr().String(), badLn.Addr().String()), "caddyfile")
|
||||
|
||||
tester.AssertGetResponse("http://localhost:9080/", 200, "ok")
|
||||
}
|
||||
|
||||
// TestReverseProxyRetryMatchHeader verifies that lb_retry_match with a CEL
|
||||
// expression matching on {rp.header.*} causes the request to be retried when
|
||||
// the upstream sets a matching response header
|
||||
func TestReverseProxyRetryMatchHeader(t *testing.T) {
|
||||
var badHits atomic.Int32
|
||||
|
||||
// Bad upstream: returns 200 but signals retry via header
|
||||
badSrv := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
badHits.Add(1)
|
||||
w.Header().Set("X-Upstream-Retry", "true")
|
||||
w.Write([]byte("bad"))
|
||||
}),
|
||||
}
|
||||
badLn, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
go badSrv.Serve(badLn)
|
||||
t.Cleanup(func() { badSrv.Close(); badLn.Close() })
|
||||
|
||||
// Good upstream: returns 200 without retry header
|
||||
goodSrv := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte("good"))
|
||||
}),
|
||||
}
|
||||
goodLn, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
go goodSrv.Serve(goodLn)
|
||||
t.Cleanup(func() { goodSrv.Close(); goodLn.Close() })
|
||||
|
||||
tester := caddytest.NewTester(t)
|
||||
tester.InitServer(fmt.Sprintf(`
|
||||
{
|
||||
skip_install_trust
|
||||
admin localhost:2999
|
||||
http_port 9080
|
||||
https_port 9443
|
||||
grace_period 1ns
|
||||
}
|
||||
http://localhost:9080 {
|
||||
reverse_proxy %s %s {
|
||||
lb_policy round_robin
|
||||
lb_retries 1
|
||||
lb_retry_match {
|
||||
expression `+"`{rp.header.X-Upstream-Retry} == \"true\"`"+`
|
||||
}
|
||||
}
|
||||
}
|
||||
`, goodLn.Addr().String(), badLn.Addr().String()), "caddyfile")
|
||||
|
||||
tester.AssertGetResponse("http://localhost:9080/", 200, "good")
|
||||
|
||||
if badHits.Load() != 1 {
|
||||
t.Errorf("bad upstream hits: got %d, want 1", badHits.Load())
|
||||
}
|
||||
}
|
||||
|
||||
// TestReverseProxyRetryMatchCombined verifies that a CEL expression combining
|
||||
// request path matching with response status code matching works correctly -
|
||||
// only retrying when both conditions are met
|
||||
func TestReverseProxyRetryMatchCombined(t *testing.T) {
|
||||
// Upstream: returns 502 for all requests
|
||||
srv := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
}),
|
||||
}
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
go srv.Serve(ln)
|
||||
t.Cleanup(func() { srv.Close(); ln.Close() })
|
||||
|
||||
// Good upstream
|
||||
goodSrv := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte("ok"))
|
||||
}),
|
||||
}
|
||||
goodLn, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
go goodSrv.Serve(goodLn)
|
||||
t.Cleanup(func() { goodSrv.Close(); goodLn.Close() })
|
||||
|
||||
tester := caddytest.NewTester(t)
|
||||
tester.InitServer(fmt.Sprintf(`
|
||||
{
|
||||
skip_install_trust
|
||||
admin localhost:2999
|
||||
http_port 9080
|
||||
https_port 9443
|
||||
grace_period 1ns
|
||||
}
|
||||
http://localhost:9080 {
|
||||
reverse_proxy %s %s {
|
||||
lb_policy round_robin
|
||||
lb_retries 1
|
||||
lb_retry_match {
|
||||
expression `+"`path('/retry*') && {rp.status_code} in [502, 503]`"+`
|
||||
}
|
||||
}
|
||||
}
|
||||
`, goodLn.Addr().String(), ln.Addr().String()), "caddyfile")
|
||||
|
||||
// /retry path matches the expression - should retry to good upstream
|
||||
tester.AssertGetResponse("http://localhost:9080/retry", 200, "ok")
|
||||
|
||||
// /other path does NOT match - should return the 502
|
||||
req, _ := http.NewRequest(http.MethodGet, "http://localhost:9080/other", nil)
|
||||
tester.AssertResponse(req, 502, "")
|
||||
}
|
||||
|
||||
// TestReverseProxyRetryMatchIsTransportError verifies that the
|
||||
// {rp.is_transport_error} == true CEL function correctly identifies transport errors
|
||||
// and allows retrying them alongside response-based matching
|
||||
func TestReverseProxyRetryMatchIsTransportError(t *testing.T) {
|
||||
// Good upstream: returns 200
|
||||
goodSrv := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte("ok"))
|
||||
}),
|
||||
}
|
||||
goodLn, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
go goodSrv.Serve(goodLn)
|
||||
t.Cleanup(func() { goodSrv.Close(); goodLn.Close() })
|
||||
|
||||
// Broken upstream: accepts connections but closes immediately
|
||||
brokenLn, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { brokenLn.Close() })
|
||||
go func() {
|
||||
for {
|
||||
conn, err := brokenLn.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
tester := caddytest.NewTester(t)
|
||||
tester.InitServer(fmt.Sprintf(`
|
||||
{
|
||||
skip_install_trust
|
||||
admin localhost:2999
|
||||
http_port 9080
|
||||
https_port 9443
|
||||
grace_period 1ns
|
||||
}
|
||||
http://localhost:9080 {
|
||||
reverse_proxy %s %s {
|
||||
lb_policy round_robin
|
||||
lb_retries 1
|
||||
lb_retry_match {
|
||||
expression `+"`{rp.is_transport_error} || {rp.status_code} in [502, 503]`"+`
|
||||
}
|
||||
}
|
||||
}
|
||||
`, goodLn.Addr().String(), brokenLn.Addr().String()), "caddyfile")
|
||||
|
||||
// Transport error on broken upstream should be retried to good upstream
|
||||
tester.AssertGetResponse("http://localhost:9080/", 200, "ok")
|
||||
}
|
||||
|
||||
@ -1562,6 +1562,14 @@ func ParseCaddyfileNestedMatcherSet(d *caddyfile.Dispenser) (caddy.ModuleMap, er
|
||||
// instances of the matcher in this set
|
||||
tokensByMatcherName := make(map[string][]caddyfile.Token)
|
||||
for nesting := d.Nesting(); d.NextArg() || d.NextBlock(nesting); {
|
||||
// if the token is quoted (backtick), treat it as a shorthand
|
||||
// for an expression matcher, same as @named matcher parsing
|
||||
if d.Token().Quoted() {
|
||||
expressionToken := d.Token().Clone()
|
||||
expressionToken.Text = "expression"
|
||||
tokensByMatcherName["expression"] = append(tokensByMatcherName["expression"], expressionToken, d.Token())
|
||||
continue
|
||||
}
|
||||
matcherName := d.Val()
|
||||
tokensByMatcherName[matcherName] = append(tokensByMatcherName[matcherName], d.NextSegment()...)
|
||||
}
|
||||
|
||||
@ -67,7 +67,7 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error)
|
||||
// lb_retries <retries>
|
||||
// lb_try_duration <duration>
|
||||
// lb_try_interval <interval>
|
||||
// lb_retry_match <request-matcher>
|
||||
// lb_retry_match <matcher>
|
||||
//
|
||||
// # active health checking
|
||||
// health_uri <uri>
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package reverseproxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
@ -8,11 +9,13 @@ import (
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/caddyserver/caddy/v2"
|
||||
"github.com/caddyserver/caddy/v2/caddyconfig/caddyfile"
|
||||
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
|
||||
)
|
||||
|
||||
@ -255,3 +258,475 @@ func TestDialErrorBodyRetry(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// newExpressionMatcher provisions a MatchExpression for use in tests
|
||||
func newExpressionMatcher(t *testing.T, expr string) *caddyhttp.MatchExpression {
|
||||
t.Helper()
|
||||
ctx, cancel := caddy.NewContext(caddy.Context{Context: context.Background()})
|
||||
t.Cleanup(cancel)
|
||||
m := &caddyhttp.MatchExpression{Expr: expr}
|
||||
if err := m.Provision(ctx); err != nil {
|
||||
t.Fatalf("failed to provision expression %q: %v", expr, err)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// minimalHandlerWithRetryMatch is like minimalHandler but also configures
|
||||
// RetryMatch so that response-based retry can be tested
|
||||
func minimalHandlerWithRetryMatch(retries int, retryMatch caddyhttp.MatcherSets, upstreams ...*Upstream) *Handler {
|
||||
h := minimalHandler(retries, upstreams...)
|
||||
h.LoadBalancing.RetryMatch = retryMatch
|
||||
return h
|
||||
}
|
||||
|
||||
// TestResponseRetryStatusCode verifies that when an upstream returns a status
|
||||
// code matching a retry_match expression, the request is retried on the next
|
||||
// upstream
|
||||
func TestResponseRetryStatusCode(t *testing.T) {
|
||||
// Bad upstream: returns 502
|
||||
badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
}))
|
||||
t.Cleanup(badServer.Close)
|
||||
|
||||
// Good upstream: returns 200
|
||||
goodServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("ok"))
|
||||
}))
|
||||
t.Cleanup(goodServer.Close)
|
||||
|
||||
retryMatch := caddyhttp.MatcherSets{
|
||||
caddyhttp.MatcherSet{
|
||||
newExpressionMatcher(t, "{http.reverse_proxy.status_code} in [502, 503]"),
|
||||
},
|
||||
}
|
||||
|
||||
// RoundRobin picks index 1 first, then 0
|
||||
upstreams := []*Upstream{
|
||||
{Host: new(Host), Dial: goodServer.Listener.Addr().String()},
|
||||
{Host: new(Host), Dial: badServer.Listener.Addr().String()},
|
||||
}
|
||||
|
||||
h := minimalHandlerWithRetryMatch(1, retryMatch, upstreams...)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "http://example.com/", nil)
|
||||
req = prepareTestRequest(req)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
err := h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
|
||||
return nil
|
||||
}))
|
||||
|
||||
gotStatus := rec.Code
|
||||
if err != nil {
|
||||
if herr, ok := err.(caddyhttp.HandlerError); ok {
|
||||
gotStatus = herr.StatusCode
|
||||
}
|
||||
}
|
||||
|
||||
if gotStatus != http.StatusOK {
|
||||
t.Errorf("status: got %d, want %d (err=%v)", gotStatus, http.StatusOK, err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResponseRetryHeader verifies that response header matching triggers
|
||||
// retries via a CEL expression checking {rp.header.*}
|
||||
func TestResponseRetryHeader(t *testing.T) {
|
||||
// Bad upstream: returns 200 but with X-Upstream-Retry header
|
||||
badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("X-Upstream-Retry", "true")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("bad"))
|
||||
}))
|
||||
t.Cleanup(badServer.Close)
|
||||
|
||||
// Good upstream: returns 200 without retry header
|
||||
goodServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("good"))
|
||||
}))
|
||||
t.Cleanup(goodServer.Close)
|
||||
|
||||
retryMatch := caddyhttp.MatcherSets{
|
||||
caddyhttp.MatcherSet{
|
||||
newExpressionMatcher(t, `{http.reverse_proxy.header.X-Upstream-Retry} == "true"`),
|
||||
},
|
||||
}
|
||||
|
||||
// RoundRobin picks index 1 first, then 0
|
||||
upstreams := []*Upstream{
|
||||
{Host: new(Host), Dial: goodServer.Listener.Addr().String()},
|
||||
{Host: new(Host), Dial: badServer.Listener.Addr().String()},
|
||||
}
|
||||
|
||||
h := minimalHandlerWithRetryMatch(1, retryMatch, upstreams...)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "http://example.com/", nil)
|
||||
req = prepareTestRequest(req)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
err := h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
|
||||
return nil
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Errorf("status: got %d, want %d", rec.Code, http.StatusOK)
|
||||
}
|
||||
if rec.Body.String() != "good" {
|
||||
t.Errorf("body: got %q, want %q (retried to wrong upstream)", rec.Body.String(), "good")
|
||||
}
|
||||
}
|
||||
|
||||
// TestResponseRetryNoMatchNoRetry verifies that when no retry_match entries
|
||||
// match the response, the original response is returned without retrying
|
||||
func TestResponseRetryNoMatchNoRetry(t *testing.T) {
|
||||
var hits atomic.Int32
|
||||
|
||||
// Server that returns 500 - but retry_match only matches 502/503
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
hits.Add(1)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}))
|
||||
t.Cleanup(server.Close)
|
||||
|
||||
retryMatch := caddyhttp.MatcherSets{
|
||||
caddyhttp.MatcherSet{
|
||||
newExpressionMatcher(t, "{http.reverse_proxy.status_code} in [502, 503]"),
|
||||
},
|
||||
}
|
||||
|
||||
upstreams := []*Upstream{
|
||||
{Host: new(Host), Dial: server.Listener.Addr().String()},
|
||||
{Host: new(Host), Dial: server.Listener.Addr().String()},
|
||||
}
|
||||
|
||||
h := minimalHandlerWithRetryMatch(2, retryMatch, upstreams...)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "http://example.com/", nil)
|
||||
req = prepareTestRequest(req)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
_ = h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Only one hit - no retry since 500 doesn't match [502, 503]
|
||||
if hits.Load() != 1 {
|
||||
t.Errorf("upstream hits: got %d, want 1 (should not have retried)", hits.Load())
|
||||
}
|
||||
}
|
||||
|
||||
// TestResponseRetryExhaustedPreservesStatusCode verifies that when retries
|
||||
// are exhausted, the actual upstream status code (e.g. 503) is reported
|
||||
// to the client, not a generic 502
|
||||
func TestResponseRetryExhaustedPreservesStatusCode(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusServiceUnavailable) // 503
|
||||
}))
|
||||
t.Cleanup(server.Close)
|
||||
|
||||
retryMatch := caddyhttp.MatcherSets{
|
||||
caddyhttp.MatcherSet{
|
||||
newExpressionMatcher(t, "{http.reverse_proxy.status_code} == 503"),
|
||||
},
|
||||
}
|
||||
|
||||
upstreams := []*Upstream{
|
||||
{Host: new(Host), Dial: server.Listener.Addr().String()},
|
||||
{Host: new(Host), Dial: server.Listener.Addr().String()},
|
||||
}
|
||||
|
||||
h := minimalHandlerWithRetryMatch(1, retryMatch, upstreams...)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "http://example.com/", nil)
|
||||
req = prepareTestRequest(req)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
err := h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
|
||||
return nil
|
||||
}))
|
||||
|
||||
gotStatus := rec.Code
|
||||
if err != nil {
|
||||
if herr, ok := err.(caddyhttp.HandlerError); ok {
|
||||
gotStatus = herr.StatusCode
|
||||
}
|
||||
}
|
||||
|
||||
// Must return 503 (actual upstream status), not 502 (generic proxy error)
|
||||
if gotStatus != http.StatusServiceUnavailable {
|
||||
t.Errorf("status: got %d, want %d (status code not preserved)", gotStatus, http.StatusServiceUnavailable)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResponseRetryHeaderCleanup verifies that stale response header
|
||||
// placeholders from a previous upstream attempt are cleaned up before the
|
||||
// next retry evaluation. Without cleanup, a header like X-Retry: true from
|
||||
// upstream A would leak into the retry match for upstream B even if B does
|
||||
// not set that header
|
||||
func TestResponseRetryHeaderCleanup(t *testing.T) {
|
||||
// First upstream: returns 200 with X-Retry header (triggers retry)
|
||||
firstServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("X-Retry", "true")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("first"))
|
||||
}))
|
||||
t.Cleanup(firstServer.Close)
|
||||
|
||||
// Second upstream: returns 200 WITHOUT X-Retry header (should NOT retry)
|
||||
secondServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("second"))
|
||||
}))
|
||||
t.Cleanup(secondServer.Close)
|
||||
|
||||
retryMatch := caddyhttp.MatcherSets{
|
||||
caddyhttp.MatcherSet{
|
||||
newExpressionMatcher(t, `{http.reverse_proxy.header.X-Retry} == "true"`),
|
||||
},
|
||||
}
|
||||
|
||||
// RoundRobin picks index 1 first, then 0
|
||||
upstreams := []*Upstream{
|
||||
{Host: new(Host), Dial: secondServer.Listener.Addr().String()},
|
||||
{Host: new(Host), Dial: firstServer.Listener.Addr().String()},
|
||||
}
|
||||
|
||||
h := minimalHandlerWithRetryMatch(2, retryMatch, upstreams...)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "http://example.com/", nil)
|
||||
req = prepareTestRequest(req)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
err := h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
|
||||
return nil
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Should get "second" - the first upstream's X-Retry header must not
|
||||
// leak into the second upstream's retry evaluation
|
||||
if rec.Body.String() != "second" {
|
||||
t.Errorf("body: got %q, want %q (stale header leaked between retries)", rec.Body.String(), "second")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRequestOnlyMatcherDoesNotRetryResponses verifies that a pure request
|
||||
// matcher like method PUT in lb_retry_match does NOT trigger response-based
|
||||
// retries. Only expression matchers (which can reference response data)
|
||||
// should trigger response retries
|
||||
func TestRequestOnlyMatcherDoesNotRetryResponses(t *testing.T) {
|
||||
var hits atomic.Int32
|
||||
|
||||
// Server returns 200 OK for all requests
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
hits.Add(1)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("ok"))
|
||||
}))
|
||||
t.Cleanup(server.Close)
|
||||
|
||||
// method PUT matcher - should NOT trigger response retries
|
||||
retryMatch := caddyhttp.MatcherSets{
|
||||
caddyhttp.MatcherSet{
|
||||
caddyhttp.MatchMethod{"PUT"},
|
||||
},
|
||||
}
|
||||
|
||||
upstreams := []*Upstream{
|
||||
{Host: new(Host), Dial: server.Listener.Addr().String()},
|
||||
{Host: new(Host), Dial: server.Listener.Addr().String()},
|
||||
}
|
||||
|
||||
h := minimalHandlerWithRetryMatch(2, retryMatch, upstreams...)
|
||||
|
||||
req := httptest.NewRequest(http.MethodPut, "http://example.com/", nil)
|
||||
req = prepareTestRequest(req)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
err := h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
|
||||
return nil
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Should hit only once - no retry for 200 OK even though method matches
|
||||
if hits.Load() != 1 {
|
||||
t.Errorf("upstream hits: got %d, want 1 (should not retry successful responses)", hits.Load())
|
||||
}
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Errorf("status: got %d, want %d", rec.Code, http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
// brokenUpstreamAddr returns the address of a TCP listener that accepts
|
||||
// connections but immediately closes them, causing a transport error (not
|
||||
// a dial error). This simulates an upstream that is reachable but broken
|
||||
func brokenUpstreamAddr(t *testing.T) string {
|
||||
t.Helper()
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { ln.Close() })
|
||||
go func() {
|
||||
for {
|
||||
conn, err := ln.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
}()
|
||||
return ln.Addr().String()
|
||||
}
|
||||
|
||||
// TestTransportErrorPlaceholder verifies that the is_transport_error
|
||||
// placeholder is set to true during transport error evaluation in tryAgain()
|
||||
// and that expression matchers using {rp.is_transport_error} can match it
|
||||
func TestTransportErrorPlaceholder(t *testing.T) {
|
||||
broken := brokenUpstreamAddr(t)
|
||||
|
||||
goodServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("ok"))
|
||||
}))
|
||||
t.Cleanup(goodServer.Close)
|
||||
|
||||
retryMatch := caddyhttp.MatcherSets{
|
||||
caddyhttp.MatcherSet{
|
||||
newExpressionMatcher(t, "{http.reverse_proxy.is_transport_error} == true"),
|
||||
},
|
||||
}
|
||||
|
||||
// RoundRobin picks index 1 first (broken), then 0 (good)
|
||||
upstreams := []*Upstream{
|
||||
{Host: new(Host), Dial: goodServer.Listener.Addr().String()},
|
||||
{Host: new(Host), Dial: broken},
|
||||
}
|
||||
|
||||
h := minimalHandlerWithRetryMatch(1, retryMatch, upstreams...)
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "http://example.com/", nil)
|
||||
req = prepareTestRequest(req)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
err := h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
|
||||
return nil
|
||||
}))
|
||||
|
||||
gotStatus := rec.Code
|
||||
if err != nil {
|
||||
if herr, ok := err.(caddyhttp.HandlerError); ok {
|
||||
gotStatus = herr.StatusCode
|
||||
}
|
||||
}
|
||||
|
||||
// POST transport error should be retried because is_transport_error matched
|
||||
if gotStatus != http.StatusOK {
|
||||
t.Errorf("status: got %d, want %d (transport error should have been retried)", gotStatus, http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransportErrorPlaceholderNotSetForResponses verifies that the
|
||||
// is_transport_error placeholder is NOT set when evaluating response
|
||||
// matchers, so {rp.is_transport_error} is false for response retries
|
||||
func TestTransportErrorPlaceholderNotSetForResponses(t *testing.T) {
|
||||
var hits atomic.Int32
|
||||
|
||||
// Server returns 502 - but the matcher only checks is_transport_error
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
hits.Add(1)
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
}))
|
||||
t.Cleanup(server.Close)
|
||||
|
||||
// Only matches transport errors, not response errors
|
||||
retryMatch := caddyhttp.MatcherSets{
|
||||
caddyhttp.MatcherSet{
|
||||
newExpressionMatcher(t, "{http.reverse_proxy.is_transport_error} == true"),
|
||||
},
|
||||
}
|
||||
|
||||
upstreams := []*Upstream{
|
||||
{Host: new(Host), Dial: server.Listener.Addr().String()},
|
||||
{Host: new(Host), Dial: server.Listener.Addr().String()},
|
||||
}
|
||||
|
||||
h := minimalHandlerWithRetryMatch(2, retryMatch, upstreams...)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "http://example.com/", nil)
|
||||
req = prepareTestRequest(req)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
_ = h.ServeHTTP(rec, req, caddyhttp.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error {
|
||||
return nil
|
||||
}))
|
||||
|
||||
// Should hit only once - is_transport_error is false during response
|
||||
// evaluation so the 502 is NOT retried
|
||||
if hits.Load() != 1 {
|
||||
t.Errorf("upstream hits: got %d, want 1 (is_transport_error should be false for responses)", hits.Load())
|
||||
}
|
||||
}
|
||||
|
||||
// TestRetryMatchAllowsExpressionMixedWithOtherMatchers verifies that
|
||||
// lb_retry_match accepts a block mixing expression with other matchers
|
||||
func TestRetryMatchAllowsExpressionMixedWithOtherMatchers(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input string
|
||||
}{
|
||||
{
|
||||
name: "expression alone",
|
||||
input: `reverse_proxy localhost:9080 {
|
||||
lb_retry_match {
|
||||
expression ` + "`{rp.status_code} in [502, 503]`" + `
|
||||
}
|
||||
}`,
|
||||
},
|
||||
{
|
||||
name: "method alone",
|
||||
input: `reverse_proxy localhost:9080 {
|
||||
lb_retry_match {
|
||||
method PUT
|
||||
}
|
||||
}`,
|
||||
},
|
||||
{
|
||||
name: "expression mixed with method",
|
||||
input: `reverse_proxy localhost:9080 {
|
||||
lb_retry_match {
|
||||
method POST
|
||||
expression ` + "`{rp.status_code} in [502, 503]`" + `
|
||||
}
|
||||
}`,
|
||||
},
|
||||
{
|
||||
name: "expression mixed with path",
|
||||
input: `reverse_proxy localhost:9080 {
|
||||
lb_retry_match {
|
||||
path /api*
|
||||
expression ` + "`{rp.status_code} == 502`" + `
|
||||
}
|
||||
}`,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
h := &Handler{}
|
||||
d := caddyfile.NewTestDispenser(tc.input)
|
||||
err := h.UnmarshalCaddyfile(d)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -670,8 +670,12 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h
|
||||
return true, succ.error
|
||||
}
|
||||
|
||||
// remember this failure (if enabled)
|
||||
h.countFailure(upstream)
|
||||
// remember this failure (if enabled); response-based retries
|
||||
// are not counted as failures since the upstream did respond
|
||||
// successfully - only the response content triggered a retry
|
||||
if _, isRetryableResponse := proxyErr.(retryableResponseError); !isRetryableResponse {
|
||||
h.countFailure(upstream)
|
||||
}
|
||||
|
||||
// if we've tried long enough, break
|
||||
if !h.LoadBalancing.tryAgain(h.ctx, start, retries, proxyErr, r, h.logger) {
|
||||
@ -1055,6 +1059,45 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe
|
||||
res.Body, _ = h.bufferedBody(res.Body, h.ResponseBuffers)
|
||||
}
|
||||
|
||||
// set response placeholders so they can be used in retry match
|
||||
// expressions and handle_response routes; clear stale header
|
||||
// placeholders from a previous attempt first so they don't
|
||||
// leak into the next retry evaluation
|
||||
repl.DeleteByPrefix("http.reverse_proxy.header.")
|
||||
for field, value := range res.Header {
|
||||
repl.Set("http.reverse_proxy.header."+field, strings.Join(value, ","))
|
||||
}
|
||||
repl.Set("http.reverse_proxy.status_code", res.StatusCode)
|
||||
repl.Set("http.reverse_proxy.status_text", res.Status)
|
||||
|
||||
// check if the response matches a retry match entry; if so,
|
||||
// close the body and return a retryable error so the request
|
||||
// is retried with the next upstream. Only evaluate matcher sets
|
||||
// that contain at least one expression matcher, since those are
|
||||
// the ones that can reference response data ({rp.status_code},
|
||||
// {rp.header.*}). Pure request-only matchers (method, path, etc.)
|
||||
// are skipped to avoid retrying every response that matches a
|
||||
// request condition
|
||||
if h.LoadBalancing != nil && len(h.LoadBalancing.RetryMatch) > 0 {
|
||||
for _, matcherSet := range h.LoadBalancing.RetryMatch {
|
||||
if !matcherSetHasExpressionMatcher(matcherSet) {
|
||||
continue
|
||||
}
|
||||
match, err := matcherSet.MatchWithError(req)
|
||||
if err != nil {
|
||||
h.logger.Error("error matching request for retry", zap.Error(err))
|
||||
break
|
||||
}
|
||||
if match {
|
||||
res.Body.Close()
|
||||
return retryableResponseError{
|
||||
error: fmt.Errorf("upstream response matched retry_match (status %d)", res.StatusCode),
|
||||
statusCode: res.StatusCode,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// see if any response handler is configured for this response from the backend
|
||||
for i, rh := range h.HandleResponse {
|
||||
if rh.Match != nil && !rh.Match.Match(res.StatusCode, res.Header) {
|
||||
@ -1074,14 +1117,6 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe
|
||||
break
|
||||
}
|
||||
|
||||
// set up the replacer so that parts of the original response can be
|
||||
// used for routing decisions
|
||||
for field, value := range res.Header {
|
||||
repl.Set("http.reverse_proxy.header."+field, strings.Join(value, ","))
|
||||
}
|
||||
repl.Set("http.reverse_proxy.status_code", res.StatusCode)
|
||||
repl.Set("http.reverse_proxy.status_text", res.Status)
|
||||
|
||||
if c := logger.Check(zapcore.DebugLevel, "handling response"); c != nil {
|
||||
c.Write(zap.Int("handler", i))
|
||||
}
|
||||
@ -1266,18 +1301,29 @@ func (lb LoadBalancing) tryAgain(ctx caddy.Context, start time.Time, retries int
|
||||
// specifically a dialer error, we need to be careful
|
||||
if proxyErr != nil {
|
||||
_, isDialError := proxyErr.(DialError)
|
||||
_, isRetryableResponse := proxyErr.(retryableResponseError)
|
||||
herr, isHandlerError := proxyErr.(caddyhttp.HandlerError)
|
||||
|
||||
// if the error occurred after a connection was established,
|
||||
// we have to assume the upstream received the request, and
|
||||
// retries need to be carefully decided, because some requests
|
||||
// are not idempotent
|
||||
if !isDialError && (!isHandlerError || !errors.Is(herr, errNoUpstream)) {
|
||||
// are not idempotent; retryableResponseError is excluded here
|
||||
// because its retry decision was already made in reverseProxy()
|
||||
// when the response matchers were evaluated
|
||||
if !isDialError && !isRetryableResponse && (!isHandlerError || !errors.Is(herr, errNoUpstream)) {
|
||||
if lb.RetryMatch == nil && req.Method != "GET" {
|
||||
// by default, don't retry requests if they aren't GET
|
||||
return false
|
||||
}
|
||||
|
||||
// set transport error flag so CEL expressions can use
|
||||
// {rp.is_transport_error} to decide whether to retry
|
||||
repl, _ := req.Context().Value(caddy.ReplacerCtxKey).(*caddy.Replacer)
|
||||
if repl != nil {
|
||||
repl.Set("http.reverse_proxy.is_transport_error", true)
|
||||
defer repl.Delete("http.reverse_proxy.is_transport_error")
|
||||
}
|
||||
|
||||
match, err := lb.RetryMatch.AnyMatchWithError(req)
|
||||
if err != nil {
|
||||
logger.Error("error matching request for retry", zap.Error(err))
|
||||
@ -1507,6 +1553,12 @@ func removeConnectionHeaders(h http.Header) {
|
||||
|
||||
// statusError returns an error value that has a status code.
|
||||
func statusError(err error) error {
|
||||
// if a response-based retry was exhausted, use the actual upstream
|
||||
// status code instead of a generic 502
|
||||
if rre, ok := err.(retryableResponseError); ok {
|
||||
return caddyhttp.Error(rre.statusCode, err)
|
||||
}
|
||||
|
||||
// errors proxying usually mean there is a problem with the upstream(s)
|
||||
statusCode := http.StatusBadGateway
|
||||
|
||||
@ -1558,13 +1610,15 @@ type LoadBalancing struct {
|
||||
// to spin if all backends are down and latency is very low.
|
||||
TryInterval caddy.Duration `json:"try_interval,omitempty"`
|
||||
|
||||
// A list of matcher sets that restricts with which requests retries are
|
||||
// allowed. A request must match any of the given matcher sets in order
|
||||
// to be retried if the connection to the upstream succeeded but the
|
||||
// subsequent round-trip failed. If the connection to the upstream failed,
|
||||
// a retry is always allowed. If unspecified, only GET requests will be
|
||||
// allowed to be retried. Note that a retry is done with the next available
|
||||
// host according to the load balancing policy.
|
||||
// A list of matcher sets that controls retry behavior. Matcher sets
|
||||
// without expression matchers (e.g. method, path) restrict which
|
||||
// requests are retried on transport errors - if unspecified, only
|
||||
// GET requests will be retried. Matcher sets with CEL expression
|
||||
// matchers are evaluated against upstream responses and can
|
||||
// reference {rp.status_code}, {rp.header.*}, and
|
||||
// {rp.is_transport_error}. Dial errors are always retried
|
||||
// regardless of this setting. Retries use the next available
|
||||
// upstream per the load balancing policy
|
||||
RetryMatchRaw caddyhttp.RawMatcherSets `json:"retry_match,omitempty" caddy:"namespace=http.matchers"`
|
||||
|
||||
SelectionPolicy Selector `json:"-"`
|
||||
@ -1662,10 +1716,34 @@ type RequestHeaderOpsTransport interface {
|
||||
RequestHeaderOps() *headers.HeaderOps
|
||||
}
|
||||
|
||||
// matcherSetHasExpressionMatcher reports whether a matcher set contains
|
||||
// at least one expression matcher. Expression matchers can reference
|
||||
// response data via placeholders like {rp.status_code}. Matcher sets
|
||||
// without expression matchers only test request properties and should
|
||||
// not be evaluated for response-based retry decisions
|
||||
func matcherSetHasExpressionMatcher(matcherSet caddyhttp.MatcherSet) bool {
|
||||
for _, m := range matcherSet {
|
||||
if _, ok := m.(*caddyhttp.MatchExpression); ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// roundtripSucceededError is an error type that is returned if the
|
||||
// roundtrip succeeded, but an error occurred after-the-fact.
|
||||
type roundtripSucceededError struct{ error }
|
||||
|
||||
// retryableResponseError is returned when the upstream response matched
|
||||
// a retry_match entry, indicating the request should be retried with the
|
||||
// next upstream. It preserves the original status code so that if retries
|
||||
// are exhausted, the actual upstream status is reported instead of a
|
||||
// generic 502
|
||||
type retryableResponseError struct {
|
||||
error
|
||||
statusCode int
|
||||
}
|
||||
|
||||
// bodyReadCloser is a reader that, upon closing, will return
|
||||
// its buffer to the pool and close the underlying body reader.
|
||||
type bodyReadCloser struct {
|
||||
|
||||
12
replacer.go
12
replacer.go
@ -121,6 +121,18 @@ func (r *Replacer) Delete(variable string) {
|
||||
r.mapMutex.Unlock()
|
||||
}
|
||||
|
||||
// DeleteByPrefix removes all static variables with
|
||||
// keys starting with the given prefix
|
||||
func (r *Replacer) DeleteByPrefix(prefix string) {
|
||||
r.mapMutex.Lock()
|
||||
for key := range r.static {
|
||||
if strings.HasPrefix(key, prefix) {
|
||||
delete(r.static, key)
|
||||
}
|
||||
}
|
||||
r.mapMutex.Unlock()
|
||||
}
|
||||
|
||||
// fromStatic provides values from r.static.
|
||||
func (r *Replacer) fromStatic(key string) (any, bool) {
|
||||
r.mapMutex.RLock()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user