diff --git a/modules/caddyhttp/encode/encode.go b/modules/caddyhttp/encode/encode.go index 0474768f0..ac995c37b 100644 --- a/modules/caddyhttp/encode/encode.go +++ b/modules/caddyhttp/encode/encode.go @@ -422,20 +422,13 @@ func (rw *responseWriter) Close() error { var err error if rw.w != nil { err = rw.w.Close() - rw.releaseEncoder() + rw.w.Reset(nil) + rw.config.writerPools[rw.encodingName].Put(rw.w) + rw.w = nil } return err } -func (rw *responseWriter) releaseEncoder() { - if rw.w == nil { - return - } - rw.w.Reset(nil) - rw.config.writerPools[rw.encodingName].Put(rw.w) - rw.w = nil -} - // Unwrap returns the underlying ResponseWriter. func (rw *responseWriter) Unwrap() http.ResponseWriter { return rw.ResponseWriter diff --git a/modules/caddyhttp/reverseproxy/reverseproxy.go b/modules/caddyhttp/reverseproxy/reverseproxy.go index da1e15643..57962ddad 100644 --- a/modules/caddyhttp/reverseproxy/reverseproxy.go +++ b/modules/caddyhttp/reverseproxy/reverseproxy.go @@ -191,6 +191,9 @@ type Handler struct { // Connections using upstreams that are removed are closed during cleanup. // By default this is false, preserving legacy behavior where upgraded // connections are closed on reload (optionally delayed by stream_close_delay). + // Only http1.1 websocket connections are affected, websockets for h2/h3 are not affected. + // If true, bytes transferred for http1.1 in the access logs will be zero but those stats + // can be found in the stream logs for http1/2/3 regardless if this is enabled. StreamRetainOnReload bool `json:"stream_retain_on_reload,omitempty"` // Controls logging behavior for upgraded stream lifecycle events. @@ -1239,9 +1242,7 @@ func (h *Handler) finalizeResponse( ) error { // deal with 101 Switching Protocols responses: (WebSocket, h2c, etc) if res.StatusCode == http.StatusSwitchingProtocols { - var wg sync.WaitGroup - h.handleUpgradeResponse(logger, &wg, rw, req, res, upstreamAddr) - wg.Wait() + h.handleUpgradeResponse(logger, rw, req, res, upstreamAddr) return nil } diff --git a/modules/caddyhttp/reverseproxy/streaming.go b/modules/caddyhttp/reverseproxy/streaming.go index 407c4acf9..3835653df 100644 --- a/modules/caddyhttp/reverseproxy/streaming.go +++ b/modules/caddyhttp/reverseproxy/streaming.go @@ -40,12 +40,12 @@ import ( "github.com/caddyserver/caddy/v2/modules/caddyhttp" ) -type h2ReadWriteCloser struct { +type extendedConnectReadWriteCloser struct { io.ReadCloser http.ResponseWriter } -func (rwc h2ReadWriteCloser) Write(p []byte) (n int, err error) { +func (rwc extendedConnectReadWriteCloser) Write(p []byte) (n int, err error) { n, err = rwc.ResponseWriter.Write(p) if err != nil { return 0, err @@ -59,7 +59,7 @@ func (rwc h2ReadWriteCloser) Write(p []byte) (n int, err error) { return n, nil } -func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, rw http.ResponseWriter, req *http.Request, res *http.Response, upstreamAddr string) { +func (h *Handler) handleUpgradeResponse(logger *zap.Logger, rw http.ResponseWriter, req *http.Request, res *http.Response, upstreamAddr string) { reqUpType := upgradeType(req.Header) resUpType := upgradeType(res.Header) @@ -99,15 +99,25 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, bufferSize := h.StreamBufferSize streamTimeout := time.Duration(h.StreamTimeout) + if h.StreamRetainOnReload { + // the return value should be true as it's not hijacked yet, but some middleware may wrap response writers incorrectly + if !caddyhttp.DetachResponseWriterAfterHijack(rw, true) { + if c := logger.Check(zap.DebugLevel, "detaching connection failed"); c != nil { + c.Write(zap.String("tip", "check if your response writers have an Unwrap method or if already hijacked")) + } + } + } + var ( - conn io.ReadWriteCloser - brw *bufio.ReadWriter - isExtendedConnect bool + conn io.ReadWriteCloser + brw *bufio.ReadWriter + detached = h.StreamRetainOnReload ) // websocket over http2 or http3 if extended connect is enabled, assuming backend doesn't support this, the request will be modified to http1.1 upgrade // TODO: once we can reliably detect backend support this, it can be removed for those backends if body, ok := caddyhttp.GetVar(req.Context(), "extended_connect_websocket_body").(io.ReadCloser); ok { - isExtendedConnect = true + // websocket over extended connect can't be detached. rw and req.Body are only valid while the handler goroutine is running + detached = false req.Body = body rw.Header().Del("Upgrade") rw.Header().Del("Connection") @@ -126,7 +136,7 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, } return } - conn = h2ReadWriteCloser{req.Body, rw} + conn = extendedConnectReadWriteCloser{req.Body, rw} // bufio is not needed, use minimal buffer brw = bufio.NewReadWriter(bufio.NewReaderSize(conn, 1), bufio.NewWriterSize(conn, 1)) } else { @@ -202,35 +212,20 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup, start := time.Now() - if isExtendedConnect { - h.handleExtendedConnectUpgradeTunnel(streamLogger, streamLevel, wg, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics, streamFields) + if !detached { + h.handleUpgradeTunnel(streamLogger, streamLevel, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics, streamFields) } else { - h.handleDetachedUpgradeTunnel(streamLogger, streamLevel, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics, streamFields) - // Return immediately without touching wg. finalizeResponse's - // wg.Wait() returns at once since wg was never incremented. + // start a new goroutine + go h.handleUpgradeTunnel(streamLogger, streamLevel, conn, backConn, deleteFrontConn, deleteBackConn, bufferSize, streamTimeout, start, finishMetrics, streamFields) } } -func (h *Handler) handleExtendedConnectUpgradeTunnel( - streamLogger *zap.Logger, - streamLevel zapcore.Level, - wg *sync.WaitGroup, - conn io.ReadWriteCloser, - backConn io.ReadWriteCloser, - deleteFrontConn func(), - deleteBackConn func(), - bufferSize int, - streamTimeout time.Duration, - start time.Time, - finishMetrics func(result string, duration time.Duration, toBackend, fromBackend int64), - streamFields []zap.Field, -) { - // Extended CONNECT: ServeHTTP must block because rw and req.Body are - // only valid while the handler goroutine is running. Defers clean up - // when the select below fires and this function returns. +// handleUpgradeTunnel returns when transfer is done. +func (h *Handler) handleUpgradeTunnel(streamLogger *zap.Logger, streamLevel zapcore.Level, conn io.ReadWriteCloser, backConn io.ReadWriteCloser, deleteFrontConn func(), deleteBackConn func(), bufferSize int, streamTimeout time.Duration, start time.Time, finishMetrics func(result string, duration time.Duration, toBackend int64, fromBackend int64), streamFields []zap.Field) { defer deleteBackConn() defer deleteFrontConn() var ( + wg sync.WaitGroup toBackend int64 fromBackend int64 result string @@ -243,7 +238,7 @@ func (h *Handler) handleExtendedConnectUpgradeTunnel( spc := switchProtocolCopier{ user: conn, backend: backConn, - wg: wg, + wg: &wg, bufferSize: bufferSize, sent: &toBackend, received: &fromBackend, @@ -290,90 +285,6 @@ func (h *Handler) handleExtendedConnectUpgradeTunnel( } } -func (h *Handler) handleDetachedUpgradeTunnel( - streamLogger *zap.Logger, - streamLevel zapcore.Level, - conn io.ReadWriteCloser, - backConn io.ReadWriteCloser, - deleteFrontConn func(), - deleteBackConn func(), - bufferSize int, - streamTimeout time.Duration, - start time.Time, - finishMetrics func(result string, duration time.Duration, toBackend, fromBackend int64), - streamFields []zap.Field, -) { - // HTTP/1.1 hijacked connection: launch a detached goroutine so that - // ServeHTTP can return immediately, allowing the Handler to be GC'd - // after a config reload. The goroutine captures only tunnel (a small - // *tunnelState), logger, conn/backConn, and scalar config values. - go func() { - var ( - toBackend int64 - fromBackend int64 - result = "closed" - ) - defer deleteBackConn() - defer deleteFrontConn() - defer func() { - finishMetrics(result, time.Since(start), toBackend, fromBackend) - if c := streamLogger.Check(streamLevel, "connection closed"); c != nil { - fields := append([]zap.Field{}, streamFields...) - fields = append(fields, - zap.Duration("duration", time.Since(start)), - zap.Int64("bytes_to_backend", toBackend), - zap.Int64("bytes_from_backend", fromBackend), - ) - c.Write(fields...) - } - }() - - var innerWg sync.WaitGroup - // when a stream timeout is encountered, no error will be read from errc - // a buffer size of 2 will allow both the read and write goroutines to send the error and exit - // see: https://github.com/caddyserver/caddy/issues/7418 - errc := make(chan error, 2) - spc := switchProtocolCopier{ - user: conn, - backend: backConn, - wg: &innerWg, - bufferSize: bufferSize, - sent: &toBackend, - received: &fromBackend, - } - innerWg.Add(2) - - var timeoutc <-chan time.Time - if streamTimeout > 0 { - timer := time.NewTimer(streamTimeout) - defer timer.Stop() - timeoutc = timer.C - } - - go spc.copyToBackend(errc) - go spc.copyFromBackend(errc) - select { - case err := <-errc: - result = classifyStreamResult(err) - if c := streamLogger.Check(streamLevel, "streaming error"); c != nil { - c.Write(zap.Error(err)) - } - case t := <-timeoutc: - result = "timeout" - if c := streamLogger.Check(streamLevel, "stream timed out"); c != nil { - c.Write(zap.Time("timeout", t)) - } - } - - // Close both ends to unblock the still-running copy goroutine, - // then wait for it to finish so byte counts are accurate before - // the deferred log fires. - conn.Close() - backConn.Close() - innerWg.Wait() - }() -} - func classifyStreamResult(err error) string { if err == nil || errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) || errors.Is(err, context.Canceled) { return "closed"