api: Add all in-flight requests /reverse_proxy/upstreams (Fixes #7277) (#7517)

This refactors the initial approach in PR #7281, replacing the UsagePool
with a dedicated package-level sync.Map and atomic.Int64 to track
in-flight requests without global lock contention.

It also introduces a lookup map in the admin API to fix a potential
O(n^2) iteration over upstreams, ensuring that draining upstreams
are correctly exposed across config reloads without leaking memory.

Co-authored-by: Y.Horie <u5.horie@gmail.com>

reverseproxy: optimize in-flight tracking and admin API

- Replaced sync.RWMutex with sync.Map and atomic.Int64 to avoid lock contention under high RPS.
- Introduced a lookup map in the admin API to fix a potential O(n^2) iteration over upstreams.
This commit is contained in:
Paulo Henrique 2026-03-03 19:14:55 -03:00 committed by GitHub
parent 7b34e3107e
commit 88616e86e6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 49 additions and 2 deletions

View File

@ -73,6 +73,7 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er
// Collect the results to respond with
results := []upstreamStatus{}
knownHosts := make(map[string]struct{})
// Iterate over the upstream pool (needs to be fast)
var rangeErr error
@ -95,6 +96,8 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er
return false
}
knownHosts[address] = struct{}{}
results = append(results, upstreamStatus{
Address: address,
NumRequests: upstream.NumRequests(),
@ -103,7 +106,17 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er
return true
})
// If an error happened during the range, return it
currentInFlight := getInFlightRequests()
for address, count := range currentInFlight {
if _, exists := knownHosts[address]; !exists && count > 0 {
results = append(results, upstreamStatus{
Address: address,
NumRequests: int(count),
Fails: 0,
})
}
}
if rangeErr != nil {
return rangeErr
}

View File

@ -32,6 +32,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
@ -46,6 +47,31 @@ import (
"github.com/caddyserver/caddy/v2/modules/caddyhttp/rewrite"
)
// inFlightRequests uses sync.Map with atomic.Int64 for lock-free updates on the hot path
var inFlightRequests sync.Map
func incInFlightRequest(address string) {
v, _ := inFlightRequests.LoadOrStore(address, new(atomic.Int64))
v.(*atomic.Int64).Add(1)
}
func decInFlightRequest(address string) {
if v, ok := inFlightRequests.Load(address); ok {
if v.(*atomic.Int64).Add(-1) <= 0 {
inFlightRequests.Delete(address)
}
}
}
func getInFlightRequests() map[string]int64 {
copyMap := make(map[string]int64)
inFlightRequests.Range(func(key, value any) bool {
copyMap[key.(string)] = value.(*atomic.Int64).Load()
return true
})
return copyMap
}
func init() {
caddy.RegisterModule(Handler{})
}
@ -904,8 +930,16 @@ func (h Handler) addForwardedHeaders(req *http.Request) error {
// Go standard library which was used as the foundation.)
func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origReq *http.Request, repl *caddy.Replacer, di DialInfo, next caddyhttp.Handler) error {
_ = di.Upstream.Host.countRequest(1)
// Increment the in-flight request count
incInFlightRequest(di.Address)
//nolint:errcheck
defer di.Upstream.Host.countRequest(-1)
defer func() {
di.Upstream.Host.countRequest(-1)
// Decrement the in-flight request count
decInFlightRequest(di.Address)
}()
// point the request to this upstream
h.directRequest(req, di)