mirror of
https://github.com/caddyserver/caddy.git
synced 2026-05-26 16:52:40 -04:00
Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b892bd2acf | |||
| 3bcfeee97a | |||
| ed9afb05d8 | |||
| 7668108b5d | |||
| e6d44851b1 | |||
| f7d16df78e | |||
| 7ac7ca3ff4 | |||
| 9f586657e8 | |||
| 07ad9534fb | |||
| 030ade0f98 | |||
| da8322bc6e | |||
| e2104d3235 | |||
| 6cc2f7b581 | |||
| 9d3e9e7826 | |||
| 50de66ce12 | |||
| 61d163217f |
@@ -61,12 +61,14 @@ require (
|
|||||||
github.com/google/go-tpm v0.9.0 // indirect
|
github.com/google/go-tpm v0.9.0 // indirect
|
||||||
github.com/google/go-tspi v0.3.0 // indirect
|
github.com/google/go-tspi v0.3.0 // indirect
|
||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
|
||||||
|
github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect
|
||||||
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
|
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
github.com/quic-go/qpack v0.5.1 // indirect
|
github.com/quic-go/qpack v0.5.1 // indirect
|
||||||
github.com/smallstep/go-attestation v0.4.4-0.20240109183208-413678f90935 // indirect
|
github.com/smallstep/go-attestation v0.4.4-0.20240109183208-413678f90935 // indirect
|
||||||
github.com/smallstep/pkcs7 v0.0.0-20231024181729-3b98ecc1ca81 // indirect
|
github.com/smallstep/pkcs7 v0.0.0-20231024181729-3b98ecc1ca81 // indirect
|
||||||
github.com/smallstep/scep v0.0.0-20231024192529-aee96d7ad34d // indirect
|
github.com/smallstep/scep v0.0.0-20231024192529-aee96d7ad34d // indirect
|
||||||
|
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||||
github.com/x448/float16 v0.8.4 // indirect
|
github.com/x448/float16 v0.8.4 // indirect
|
||||||
github.com/zeebo/blake3 v0.2.4 // indirect
|
github.com/zeebo/blake3 v0.2.4 // indirect
|
||||||
go.opentelemetry.io/contrib/propagators/aws v1.17.0 // indirect
|
go.opentelemetry.io/contrib/propagators/aws v1.17.0 // indirect
|
||||||
@@ -127,6 +129,7 @@ require (
|
|||||||
github.com/prometheus/client_model v0.5.0
|
github.com/prometheus/client_model v0.5.0
|
||||||
github.com/prometheus/common v0.48.0 // indirect
|
github.com/prometheus/common v0.48.0 // indirect
|
||||||
github.com/prometheus/procfs v0.12.0 // indirect
|
github.com/prometheus/procfs v0.12.0 // indirect
|
||||||
|
github.com/rosedblabs/wal v1.3.6
|
||||||
github.com/rs/xid v1.5.0 // indirect
|
github.com/rs/xid v1.5.0 // indirect
|
||||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||||
github.com/shopspring/decimal v1.4.0 // indirect
|
github.com/shopspring/decimal v1.4.0 // indirect
|
||||||
|
|||||||
@@ -240,6 +240,8 @@ github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:Fecb
|
|||||||
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
|
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
|
||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys=
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys=
|
||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I=
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I=
|
||||||
|
github.com/hashicorp/golang-lru/v2 v2.0.2 h1:Dwmkdr5Nc/oBiXgJS3CDHNhJtIHkuZ3DZF5twqnfBdU=
|
||||||
|
github.com/hashicorp/golang-lru/v2 v2.0.2/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||||
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
|
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
|
||||||
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
|
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
|
||||||
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
|
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
|
||||||
@@ -394,6 +396,8 @@ github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQ
|
|||||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||||
|
github.com/rosedblabs/wal v1.3.6 h1:oxZYTPX/u4JuGDW98wQ1YamWqerlrlSUFKhgP6Gd/Ao=
|
||||||
|
github.com/rosedblabs/wal v1.3.6/go.mod h1:wdq54KJUyVTOv1uddMc6Cdh2d/YCIo8yjcwJAb1RCEM=
|
||||||
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
||||||
github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
|
github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
|
||||||
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
||||||
@@ -496,6 +500,8 @@ github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cb
|
|||||||
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
|
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
|
||||||
github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk=
|
github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk=
|
||||||
github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA=
|
github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA=
|
||||||
|
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
||||||
|
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
||||||
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
|
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
|
||||||
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
|
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
|
||||||
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
|
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
|
||||||
|
|||||||
@@ -122,7 +122,6 @@ func TestPreferOrder(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func TestValidate(t *testing.T) {
|
func TestValidate(t *testing.T) {
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
name string
|
name string
|
||||||
|
|||||||
+420
-87
@@ -15,24 +15,32 @@
|
|||||||
package logging
|
package logging
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log/slog"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/rosedblabs/wal"
|
||||||
|
|
||||||
"github.com/caddyserver/caddy/v2"
|
"github.com/caddyserver/caddy/v2"
|
||||||
"github.com/caddyserver/caddy/v2/caddyconfig/caddyfile"
|
"github.com/caddyserver/caddy/v2/caddyconfig/caddyfile"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
caddy.RegisterModule(NetWriter{})
|
caddy.RegisterModule(&NetWriter{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// NetWriter implements a log writer that outputs to a network socket. If
|
// NetWriter implements a log writer that outputs to a network socket. If
|
||||||
// the socket goes down, it will dump logs to stderr while it attempts to
|
// the socket goes down, it will dump logs to stderr while it attempts to
|
||||||
// reconnect.
|
// reconnect. Logs are written to a WAL first and then asynchronously
|
||||||
|
// flushed to the network to avoid blocking HTTP request handling.
|
||||||
type NetWriter struct {
|
type NetWriter struct {
|
||||||
// The address of the network socket to which to connect.
|
// The address of the network socket to which to connect.
|
||||||
Address string `json:"address,omitempty"`
|
Address string `json:"address,omitempty"`
|
||||||
@@ -45,11 +53,26 @@ type NetWriter struct {
|
|||||||
// to stderr instead until a connection can be re-established.
|
// to stderr instead until a connection can be re-established.
|
||||||
SoftStart bool `json:"soft_start,omitempty"`
|
SoftStart bool `json:"soft_start,omitempty"`
|
||||||
|
|
||||||
addr caddy.NetworkAddress
|
// How often to attempt reconnection when the network connection fails.
|
||||||
|
ReconnectInterval caddy.Duration `json:"reconnect_interval,omitempty"`
|
||||||
|
|
||||||
|
// Buffer size for the WAL flush channel.
|
||||||
|
BufferSize int `json:"buffer_size,omitempty"`
|
||||||
|
|
||||||
|
logger *slog.Logger
|
||||||
|
addr caddy.NetworkAddress
|
||||||
|
wal *wal.WAL
|
||||||
|
walDir string
|
||||||
|
flushCtx context.Context
|
||||||
|
flushCtxCancel context.CancelFunc
|
||||||
|
flushWg sync.WaitGroup
|
||||||
|
lastProcessedOffset int64
|
||||||
|
mu sync.RWMutex
|
||||||
|
walMu sync.Mutex // synchronizes WAL read/write operations
|
||||||
}
|
}
|
||||||
|
|
||||||
// CaddyModule returns the Caddy module information.
|
// CaddyModule returns the Caddy module information.
|
||||||
func (NetWriter) CaddyModule() caddy.ModuleInfo {
|
func (*NetWriter) CaddyModule() caddy.ModuleInfo {
|
||||||
return caddy.ModuleInfo{
|
return caddy.ModuleInfo{
|
||||||
ID: "caddy.logging.writers.net",
|
ID: "caddy.logging.writers.net",
|
||||||
New: func() caddy.Module { return new(NetWriter) },
|
New: func() caddy.Module { return new(NetWriter) },
|
||||||
@@ -58,6 +81,7 @@ func (NetWriter) CaddyModule() caddy.ModuleInfo {
|
|||||||
|
|
||||||
// Provision sets up the module.
|
// Provision sets up the module.
|
||||||
func (nw *NetWriter) Provision(ctx caddy.Context) error {
|
func (nw *NetWriter) Provision(ctx caddy.Context) error {
|
||||||
|
nw.logger = slog.Default()
|
||||||
repl := caddy.NewReplacer()
|
repl := caddy.NewReplacer()
|
||||||
address, err := repl.ReplaceOrErr(nw.Address, true, true)
|
address, err := repl.ReplaceOrErr(nw.Address, true, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -77,37 +101,411 @@ func (nw *NetWriter) Provision(ctx caddy.Context) error {
|
|||||||
return fmt.Errorf("timeout cannot be less than 0")
|
return fmt.Errorf("timeout cannot be less than 0")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if nw.DialTimeout == 0 {
|
||||||
|
nw.DialTimeout = caddy.Duration(10 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
if nw.ReconnectInterval == 0 {
|
||||||
|
nw.ReconnectInterval = caddy.Duration(10 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
if nw.BufferSize <= 0 {
|
||||||
|
nw.BufferSize = 1000
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nw NetWriter) String() string {
|
func (nw *NetWriter) String() string {
|
||||||
return nw.addr.String()
|
return nw.addr.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriterKey returns a unique key representing this nw.
|
// WriterKey returns a unique key representing this nw.
|
||||||
func (nw NetWriter) WriterKey() string {
|
func (nw *NetWriter) WriterKey() string {
|
||||||
return nw.addr.String()
|
return nw.addr.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenWriter opens a new network connection.
|
// OpenWriter opens a new network connection and sets up the WAL.
|
||||||
func (nw NetWriter) OpenWriter() (io.WriteCloser, error) {
|
func (nw *NetWriter) OpenWriter() (io.WriteCloser, error) {
|
||||||
reconn := &redialerConn{
|
// Set up WAL directory
|
||||||
nw: nw,
|
baseDir := caddy.AppDataDir()
|
||||||
timeout: time.Duration(nw.DialTimeout),
|
nw.walDir = filepath.Join(baseDir, "wal", "netwriter", strings.ReplaceAll(nw.addr.String(), ":", "-"))
|
||||||
|
if err := os.MkdirAll(nw.walDir, 0o755); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create WAL directory: %v", err)
|
||||||
}
|
}
|
||||||
conn, err := reconn.dial()
|
|
||||||
|
// Open WAL
|
||||||
|
opts := wal.DefaultOptions
|
||||||
|
opts.DirPath = nw.walDir
|
||||||
|
opts.SegmentSize = 64 * 1024 * 1024 // 64MB segments
|
||||||
|
w, err := wal.Open(opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !nw.SoftStart {
|
return nil, fmt.Errorf("failed to open WAL: %v", err)
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// don't block config load if remote is down or some other external problem;
|
|
||||||
// we can dump logs to stderr for now (see issue #5520)
|
|
||||||
fmt.Fprintf(os.Stderr, "[ERROR] net log writer failed to connect: %v (will retry connection and print errors here in the meantime)\n", err)
|
|
||||||
}
|
}
|
||||||
reconn.connMu.Lock()
|
nw.wal = w
|
||||||
reconn.Conn = conn
|
|
||||||
reconn.connMu.Unlock()
|
// Load last processed offset from metadata file if it exists
|
||||||
return reconn, nil
|
nw.loadLastProcessedOffset()
|
||||||
|
|
||||||
|
// If SoftStart is disabled, test the connection immediately
|
||||||
|
if !nw.SoftStart {
|
||||||
|
testConn, err := net.DialTimeout(nw.addr.Network, nw.addr.JoinHostPort(0), time.Duration(nw.DialTimeout))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to connect to log destination (SoftStart disabled): %v", err)
|
||||||
|
}
|
||||||
|
testConn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the writer wrapper
|
||||||
|
writer := &netWriterConn{
|
||||||
|
nw: nw,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the background flusher
|
||||||
|
nw.flushCtx, nw.flushCtxCancel = context.WithCancel(context.Background())
|
||||||
|
nw.flushWg.Add(1)
|
||||||
|
go nw.backgroundFlusher()
|
||||||
|
|
||||||
|
return writer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadLastProcessedOffset loads the last processed offset from a metadata file
|
||||||
|
func (nw *NetWriter) loadLastProcessedOffset() {
|
||||||
|
metaFile := filepath.Join(nw.walDir, "last_processed")
|
||||||
|
data, err := os.ReadFile(metaFile)
|
||||||
|
if err != nil {
|
||||||
|
// Use -1 to indicate "no entries processed yet"
|
||||||
|
nw.lastProcessedOffset = -1
|
||||||
|
nw.logger.Debug("no last processed offset file found, starting from beginning", "file", metaFile, "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var offset int64
|
||||||
|
if _, err := fmt.Sscanf(string(data), "%d", &offset); err != nil {
|
||||||
|
// Use -1 to indicate "no entries processed yet"
|
||||||
|
nw.lastProcessedOffset = -1
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
nw.lastProcessedOffset = offset
|
||||||
|
nw.logger.Debug("loaded last processed offset", "offset", offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
// saveLastProcessedOffset saves the last processed offset to a metadata file
|
||||||
|
func (nw *NetWriter) saveLastProcessedOffset(cp *wal.ChunkPosition) {
|
||||||
|
// Create a unique offset by combining segment, block, and chunk offset
|
||||||
|
offset := (int64(cp.SegmentId) << 32) | (int64(cp.BlockNumber) << 16) | (cp.ChunkOffset)
|
||||||
|
|
||||||
|
nw.mu.Lock()
|
||||||
|
nw.lastProcessedOffset = offset
|
||||||
|
nw.mu.Unlock()
|
||||||
|
|
||||||
|
metaFile := filepath.Join(nw.walDir, "last_processed")
|
||||||
|
data := fmt.Sprintf("%d", offset)
|
||||||
|
if err := os.WriteFile(metaFile, []byte(data), 0o600); err != nil {
|
||||||
|
nw.logger.Error("failed to save last processed offset", "error", err)
|
||||||
|
} else {
|
||||||
|
nw.logger.Debug("saved last processed offset", "offset", offset)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// backgroundFlusher runs in the background and flushes WAL entries to the network
|
||||||
|
func (nw *NetWriter) backgroundFlusher() {
|
||||||
|
defer nw.flushWg.Done()
|
||||||
|
nw.logger.Debug("background flusher started")
|
||||||
|
|
||||||
|
var conn net.Conn
|
||||||
|
var connMu sync.RWMutex
|
||||||
|
|
||||||
|
// Function to establish connection
|
||||||
|
dial := func() error {
|
||||||
|
newConn, err := net.DialTimeout(nw.addr.Network, nw.addr.JoinHostPort(0), time.Duration(nw.DialTimeout))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
connMu.Lock()
|
||||||
|
if conn != nil {
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
|
conn = newConn
|
||||||
|
connMu.Unlock()
|
||||||
|
|
||||||
|
nw.logger.Info("connected to log destination", "address", nw.addr.String())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Function to write data to connection
|
||||||
|
writeToConn := func(data []byte) error {
|
||||||
|
connMu.RLock()
|
||||||
|
currentConn := conn
|
||||||
|
connMu.RUnlock()
|
||||||
|
|
||||||
|
if currentConn == nil {
|
||||||
|
return errors.New("no connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := currentConn.Write(data)
|
||||||
|
if err != nil {
|
||||||
|
// Connection failed, clear it so reconnection logic kicks in
|
||||||
|
connMu.Lock()
|
||||||
|
if conn == currentConn {
|
||||||
|
conn.Close()
|
||||||
|
conn = nil
|
||||||
|
}
|
||||||
|
connMu.Unlock()
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try initial connection
|
||||||
|
if err := dial(); err != nil {
|
||||||
|
if !nw.SoftStart {
|
||||||
|
nw.logger.Error("failed to connect to log destination", "error", err)
|
||||||
|
} else {
|
||||||
|
nw.logger.Warn("failed to connect to log destination, will retry", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process any existing entries in the WAL immediately
|
||||||
|
nw.processWALEntries(writeToConn)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(100 * time.Millisecond) // Check for new entries every 100ms
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
reconnectTicker := time.NewTicker(time.Duration(nw.ReconnectInterval))
|
||||||
|
defer reconnectTicker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-nw.flushCtx.Done():
|
||||||
|
// Flush remaining entries before shutting down
|
||||||
|
nw.flushRemainingWALEntries(writeToConn)
|
||||||
|
|
||||||
|
connMu.Lock()
|
||||||
|
if conn != nil {
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
|
connMu.Unlock()
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-ticker.C:
|
||||||
|
// Process available WAL entries
|
||||||
|
nw.processWALEntries(writeToConn)
|
||||||
|
|
||||||
|
case <-reconnectTicker.C:
|
||||||
|
// Try to reconnect if we don't have a connection
|
||||||
|
connMu.RLock()
|
||||||
|
hasConn := conn != nil
|
||||||
|
connMu.RUnlock()
|
||||||
|
|
||||||
|
nw.logger.Debug("reconnect ticker fired", "hasConn", hasConn)
|
||||||
|
if !hasConn {
|
||||||
|
if err := dial(); err != nil {
|
||||||
|
nw.logger.Debug("reconnection attempt failed", "error", err)
|
||||||
|
} else {
|
||||||
|
// Successfully reconnected, process any buffered WAL entries
|
||||||
|
nw.logger.Info("reconnected, processing buffered WAL entries")
|
||||||
|
nw.processWALEntries(writeToConn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// processWALEntries processes all available entries in the WAL using a fresh reader
|
||||||
|
func (nw *NetWriter) processWALEntries(writeToConn func([]byte) error) {
|
||||||
|
// Synchronize WAL access to prevent race conditions with writers
|
||||||
|
nw.walMu.Lock()
|
||||||
|
// Create a fresh reader to see all current entries
|
||||||
|
reader := nw.wal.NewReader()
|
||||||
|
nw.walMu.Unlock()
|
||||||
|
|
||||||
|
processed := 0
|
||||||
|
skipped := 0
|
||||||
|
nw.logger.Debug("processing available WAL entries")
|
||||||
|
for {
|
||||||
|
nw.walMu.Lock()
|
||||||
|
data, cp, err := reader.Next()
|
||||||
|
nw.walMu.Unlock()
|
||||||
|
|
||||||
|
if err == io.EOF {
|
||||||
|
if processed > 0 {
|
||||||
|
nw.logger.Debug("processed WAL entries", "processed", processed, "skipped", skipped)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
if err == wal.ErrClosed {
|
||||||
|
nw.logger.Debug("WAL closed during processing")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
nw.logger.Error("error reading from WAL", "error", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we've already processed this entry
|
||||||
|
nw.mu.RLock()
|
||||||
|
lastProcessedOffset := nw.lastProcessedOffset
|
||||||
|
nw.mu.RUnlock()
|
||||||
|
|
||||||
|
// Create current entry offset for comparison
|
||||||
|
currentOffset := (int64(cp.SegmentId) << 32) | (int64(cp.BlockNumber) << 16) | (cp.ChunkOffset)
|
||||||
|
nw.logger.Debug("found WAL entry", "segmentId", cp.SegmentId, "blockNumber", cp.BlockNumber, "chunkOffset", cp.ChunkOffset, "currentOffset", currentOffset, "lastProcessedOffset", lastProcessedOffset, "size", len(data))
|
||||||
|
|
||||||
|
if currentOffset <= lastProcessedOffset {
|
||||||
|
// Already processed, skip
|
||||||
|
nw.logger.Debug("skipping already processed entry", "currentOffset", currentOffset, "lastProcessedOffset", lastProcessedOffset)
|
||||||
|
skipped++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := nw.processWALEntry(data, cp, writeToConn); err != nil {
|
||||||
|
nw.logger.Error("error processing WAL entry", "error", err)
|
||||||
|
// Don't break here - we want to continue processing other entries
|
||||||
|
} else {
|
||||||
|
processed++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// processWALEntry processes a single WAL entry
|
||||||
|
func (nw *NetWriter) processWALEntry(data []byte, cp *wal.ChunkPosition, writeToConn func([]byte) error) error {
|
||||||
|
if err := writeToConn(data); err != nil {
|
||||||
|
// Connection failed, dump to stderr as fallback
|
||||||
|
os.Stderr.Write(data)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark this entry as processed
|
||||||
|
nw.saveLastProcessedOffset(cp)
|
||||||
|
nw.logger.Debug("processed WAL entry", "segmentId", cp.SegmentId, "blockNumber", cp.BlockNumber, "chunkOffset", cp.ChunkOffset, "data", string(data))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// flushRemainingWALEntries flushes all remaining entries during shutdown
|
||||||
|
func (nw *NetWriter) flushRemainingWALEntries(writeToConn func([]byte) error) {
|
||||||
|
nw.logger.Info("flushing remaining WAL entries during shutdown")
|
||||||
|
|
||||||
|
// Synchronize WAL access to prevent race conditions with writers
|
||||||
|
nw.walMu.Lock()
|
||||||
|
// Create a fresh reader for shutdown processing
|
||||||
|
reader := nw.wal.NewReader()
|
||||||
|
nw.walMu.Unlock()
|
||||||
|
|
||||||
|
count := 0
|
||||||
|
for {
|
||||||
|
nw.walMu.Lock()
|
||||||
|
data, cp, err := reader.Next()
|
||||||
|
nw.walMu.Unlock()
|
||||||
|
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
nw.logger.Error("error reading from WAL during shutdown flush", "error", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we've already processed this entry
|
||||||
|
nw.mu.RLock()
|
||||||
|
lastProcessedOffset := nw.lastProcessedOffset
|
||||||
|
nw.mu.RUnlock()
|
||||||
|
|
||||||
|
// Create current entry offset for comparison
|
||||||
|
currentOffset := (int64(cp.SegmentId) << 32) | (int64(cp.BlockNumber) << 16) | (cp.ChunkOffset)
|
||||||
|
|
||||||
|
if currentOffset <= lastProcessedOffset {
|
||||||
|
// Already processed, skip
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// During shutdown, we try harder to deliver logs
|
||||||
|
maxRetries := 3
|
||||||
|
for i := 0; i < maxRetries; i++ {
|
||||||
|
if err := writeToConn(data); err != nil {
|
||||||
|
if i == maxRetries-1 {
|
||||||
|
// Final attempt failed, dump to stderr
|
||||||
|
os.Stderr.Write(data)
|
||||||
|
nw.logger.Error("failed to send log entry during shutdown, dumped to stderr", "error", err)
|
||||||
|
} else {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
nw.saveLastProcessedOffset(cp)
|
||||||
|
nw.logger.Debug("flushed WAL entry during shutdown", "segmentId", cp.SegmentId, "blockNumber", cp.BlockNumber, "chunkOffset", cp.ChunkOffset)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
|
||||||
|
if count > 0 {
|
||||||
|
nw.logger.Info("flushed WAL entries during shutdown", "count", count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// netWriterConn implements io.WriteCloser and writes to the WAL
|
||||||
|
type netWriterConn struct {
|
||||||
|
nw *NetWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write writes data to the WAL (non-blocking)
|
||||||
|
func (w *netWriterConn) Write(p []byte) (n int, err error) {
|
||||||
|
if w.nw.wal == nil {
|
||||||
|
w.nw.logger.Error("WAL not initialized")
|
||||||
|
return 0, errors.New("WAL not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
w.nw.logger.Debug("writing to WAL", "size", len(p))
|
||||||
|
|
||||||
|
// Synchronize WAL access to prevent race conditions
|
||||||
|
w.nw.walMu.Lock()
|
||||||
|
defer w.nw.walMu.Unlock()
|
||||||
|
|
||||||
|
// Write to WAL - this should be fast and non-blocking
|
||||||
|
_, err = w.nw.wal.Write(p)
|
||||||
|
if err != nil {
|
||||||
|
w.nw.logger.Error("failed to write to WAL", "error", err)
|
||||||
|
return 0, fmt.Errorf("failed to write to WAL: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sync WAL to ensure data is available for reading
|
||||||
|
if err = w.nw.wal.Sync(); err != nil {
|
||||||
|
w.nw.logger.Error("failed to sync WAL", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.nw.logger.Debug("wrote data to WAL", "size", len(p))
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the writer and flushes all remaining data
|
||||||
|
func (w *netWriterConn) Close() error {
|
||||||
|
if w.nw.flushCtxCancel != nil {
|
||||||
|
w.nw.flushCtxCancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for background flusher to complete
|
||||||
|
w.nw.flushWg.Wait()
|
||||||
|
|
||||||
|
var errs []error
|
||||||
|
|
||||||
|
// Sync and close WAL with synchronization
|
||||||
|
if w.nw.wal != nil {
|
||||||
|
w.nw.walMu.Lock()
|
||||||
|
if err := w.nw.wal.Sync(); err != nil {
|
||||||
|
errs = append(errs, fmt.Errorf("WAL sync error: %v", err))
|
||||||
|
}
|
||||||
|
if err := w.nw.wal.Close(); err != nil {
|
||||||
|
errs = append(errs, fmt.Errorf("WAL close error: %v", err))
|
||||||
|
}
|
||||||
|
w.nw.walMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(errs) > 0 {
|
||||||
|
return errors.Join(errs...)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax:
|
// UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax:
|
||||||
@@ -153,71 +551,6 @@ func (nw *NetWriter) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// redialerConn wraps an underlying Conn so that if any
|
|
||||||
// writes fail, the connection is redialed and the write
|
|
||||||
// is retried.
|
|
||||||
type redialerConn struct {
|
|
||||||
net.Conn
|
|
||||||
connMu sync.RWMutex
|
|
||||||
nw NetWriter
|
|
||||||
timeout time.Duration
|
|
||||||
lastRedial time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write wraps the underlying Conn.Write method, but if that fails,
|
|
||||||
// it will re-dial the connection anew and try writing again.
|
|
||||||
func (reconn *redialerConn) Write(b []byte) (n int, err error) {
|
|
||||||
reconn.connMu.RLock()
|
|
||||||
conn := reconn.Conn
|
|
||||||
reconn.connMu.RUnlock()
|
|
||||||
if conn != nil {
|
|
||||||
if n, err = conn.Write(b); err == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// problem with the connection - lock it and try to fix it
|
|
||||||
reconn.connMu.Lock()
|
|
||||||
defer reconn.connMu.Unlock()
|
|
||||||
|
|
||||||
// if multiple concurrent writes failed on the same broken conn, then
|
|
||||||
// one of them might have already re-dialed by now; try writing again
|
|
||||||
if reconn.Conn != nil {
|
|
||||||
if n, err = reconn.Conn.Write(b); err == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// there's still a problem, so try to re-attempt dialing the socket
|
|
||||||
// if some time has passed in which the issue could have potentially
|
|
||||||
// been resolved - we don't want to block at every single log
|
|
||||||
// emission (!) - see discussion in #4111
|
|
||||||
if time.Since(reconn.lastRedial) > 10*time.Second {
|
|
||||||
reconn.lastRedial = time.Now()
|
|
||||||
conn2, err2 := reconn.dial()
|
|
||||||
if err2 != nil {
|
|
||||||
// logger socket still offline; instead of discarding the log, dump it to stderr
|
|
||||||
os.Stderr.Write(b)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if n, err = conn2.Write(b); err == nil {
|
|
||||||
if reconn.Conn != nil {
|
|
||||||
reconn.Conn.Close()
|
|
||||||
}
|
|
||||||
reconn.Conn = conn2
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// last redial attempt was too recent; just dump to stderr for now
|
|
||||||
os.Stderr.Write(b)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (reconn *redialerConn) dial() (net.Conn, error) {
|
|
||||||
return net.DialTimeout(reconn.nw.addr.Network, reconn.nw.addr.JoinHostPort(0), reconn.timeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Interface guards
|
// Interface guards
|
||||||
var (
|
var (
|
||||||
_ caddy.Provisioner = (*NetWriter)(nil)
|
_ caddy.Provisioner = (*NetWriter)(nil)
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user