diff --git a/modules/caddyhttp/reverseproxy/admin.go b/modules/caddyhttp/reverseproxy/admin.go index 7e72a4cdb..18215f0ae 100644 --- a/modules/caddyhttp/reverseproxy/admin.go +++ b/modules/caddyhttp/reverseproxy/admin.go @@ -73,6 +73,7 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er // Collect the results to respond with results := []upstreamStatus{} + knownHosts := make(map[string]struct{}) // Iterate over the upstream pool (needs to be fast) var rangeErr error @@ -95,6 +96,8 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er return false } + knownHosts[address] = struct{}{} + results = append(results, upstreamStatus{ Address: address, NumRequests: upstream.NumRequests(), @@ -103,7 +106,17 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er return true }) - // If an error happened during the range, return it + currentInFlight := getInFlightRequests() + for address, count := range currentInFlight { + if _, exists := knownHosts[address]; !exists && count > 0 { + results = append(results, upstreamStatus{ + Address: address, + NumRequests: int(count), + Fails: 0, + }) + } + } + if rangeErr != nil { return rangeErr } diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index fb42828bf..d83c3e709 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -32,6 +32,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "go.uber.org/zap" @@ -46,6 +47,31 @@ import ( "github.com/caddyserver/caddy/v2/modules/caddyhttp/rewrite" ) +// inFlightRequests uses sync.Map with atomic.Int64 for lock-free updates on the hot path +var inFlightRequests sync.Map + +func incInFlightRequest(address string) { + v, _ := inFlightRequests.LoadOrStore(address, new(atomic.Int64)) + v.(*atomic.Int64).Add(1) +} + +func decInFlightRequest(address string) { + if v, ok := inFlightRequests.Load(address); ok { + if v.(*atomic.Int64).Add(-1) <= 0 { + inFlightRequests.Delete(address) + } + } +} + +func getInFlightRequests() map[string]int64 { + copyMap := make(map[string]int64) + inFlightRequests.Range(func(key, value any) bool { + copyMap[key.(string)] = value.(*atomic.Int64).Load() + return true + }) + return copyMap +} + func init() { caddy.RegisterModule(Handler{}) } @@ -904,8 +930,16 @@ func (h Handler) addForwardedHeaders(req *http.Request) error { // Go standard library which was used as the foundation.) func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origReq *http.Request, repl *caddy.Replacer, di DialInfo, next caddyhttp.Handler) error { _ = di.Upstream.Host.countRequest(1) + + // Increment the in-flight request count + incInFlightRequest(di.Address) + //nolint:errcheck - defer di.Upstream.Host.countRequest(-1) + defer func() { + di.Upstream.Host.countRequest(-1) + // Decrement the in-flight request count + decInFlightRequest(di.Address) + }() // point the request to this upstream h.directRequest(req, di)