logging: implement WAL for the net writer

Signed-off-by: Mohammed Al Sahaf <msaa1990@gmail.com>
This commit is contained in:
Mohammed Al Sahaf 2025-06-02 19:46:26 +03:00
parent 50de66ce12
commit 9d3e9e7826
No known key found for this signature in database
2 changed files with 1789 additions and 135 deletions

View File

@ -19,9 +19,10 @@ import (
"errors"
"fmt"
"io"
"log"
"log/slog"
"net"
"os"
"path/filepath"
"sync"
"time"
@ -32,12 +33,13 @@ import (
)
func init() {
caddy.RegisterModule(NetWriter{})
caddy.RegisterModule(&NetWriter{})
}
// NetWriter implements a log writer that outputs to a network socket. If
// the socket goes down, it will dump logs to stderr while it attempts to
// reconnect.
// reconnect. Logs are written to a WAL first and then asynchronously
// flushed to the network to avoid blocking HTTP request handling.
type NetWriter struct {
// The address of the network socket to which to connect.
Address string `json:"address,omitempty"`
@ -50,15 +52,25 @@ type NetWriter struct {
// to stderr instead until a connection can be re-established.
SoftStart bool `json:"soft_start,omitempty"`
// How often to attempt reconnection when the network connection fails.
ReconnectInterval caddy.Duration `json:"reconnect_interval,omitempty"`
// Buffer size for the WAL flush channel.
BufferSize int `json:"buffer_size,omitempty"`
logger *slog.Logger
addr caddy.NetworkAddress
w *wal.WAL
// wr *wal.Reader
walReaderCtx context.Context
walReaderCtxCancel context.CancelFunc
wal *wal.WAL
walDir string
flushCtx context.Context
flushCtxCancel context.CancelFunc
flushWg sync.WaitGroup
lastProcessedChunk uint32
mu sync.RWMutex
}
// CaddyModule returns the Caddy module information.
func (NetWriter) CaddyModule() caddy.ModuleInfo {
func (*NetWriter) CaddyModule() caddy.ModuleInfo {
return caddy.ModuleInfo{
ID: "caddy.logging.writers.net",
New: func() caddy.Module { return new(NetWriter) },
@ -67,6 +79,7 @@ func (NetWriter) CaddyModule() caddy.ModuleInfo {
// Provision sets up the module.
func (nw *NetWriter) Provision(ctx caddy.Context) error {
nw.logger = slog.Default()
repl := caddy.NewReplacer()
address, err := repl.ReplaceOrErr(nw.Address, true, true)
if err != nil {
@ -86,77 +99,359 @@ func (nw *NetWriter) Provision(ctx caddy.Context) error {
return fmt.Errorf("timeout cannot be less than 0")
}
if nw.DialTimeout == 0 {
nw.DialTimeout = caddy.Duration(10 * time.Second)
}
if nw.ReconnectInterval == 0 {
nw.ReconnectInterval = caddy.Duration(10 * time.Second)
}
if nw.BufferSize <= 0 {
nw.BufferSize = 1000
}
return nil
}
func (nw NetWriter) String() string {
func (nw *NetWriter) String() string {
return nw.addr.String()
}
// WriterKey returns a unique key representing this nw.
func (nw NetWriter) WriterKey() string {
func (nw *NetWriter) WriterKey() string {
return nw.addr.String()
}
// OpenWriter opens a new network connection.
// OpenWriter opens a new network connection and sets up the WAL.
func (nw *NetWriter) OpenWriter() (io.WriteCloser, error) {
if err := os.MkdirAll(caddy.AppDataDir()+"/wal", 0o755); err != nil {
return nil, err
// Set up WAL directory
nw.walDir = filepath.Join(caddy.AppDataDir(), "wal", "netwriter", nw.addr.String())
if err := os.MkdirAll(nw.walDir, 0o755); err != nil {
return nil, fmt.Errorf("failed to create WAL directory: %v", err)
}
// Open WAL
opts := wal.DefaultOptions
opts.DirPath = caddy.AppDataDir() + "/wal"
opts.DirPath = nw.walDir
opts.SegmentSize = 64 * 1024 * 1024 // 64MB segments
w, err := wal.Open(opts)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to open WAL: %v", err)
}
nw.w = w
reconn := &redialerConn{
nw.wal = w
// Load last processed chunk position from metadata file if it exists
nw.loadLastProcessedChunk()
// Create the writer wrapper
writer := &netWriterConn{
nw: nw,
timeout: time.Duration(nw.DialTimeout),
}
conn, err := reconn.dial()
if err != nil {
if !nw.SoftStart {
return nil, err
}
// don't block config load if remote is down or some other external problem;
// we can dump logs to stderr for now (see issue #5520)
fmt.Fprintf(os.Stderr, "[ERROR] net log writer failed to connect: %v (will retry connection and print errors here in the meantime)\n", err)
}
reconn.connMu.Lock()
reconn.Conn = conn
reconn.connMu.Unlock()
nw.walReaderCtx, nw.walReaderCtxCancel = context.WithCancel(context.Background())
go reconn.readWal(nw.walReaderCtx)
return reconn, nil
// Start the background flusher
nw.flushCtx, nw.flushCtxCancel = context.WithCancel(context.Background())
nw.flushWg.Add(1)
go nw.backgroundFlusher()
return writer, nil
}
func (rc *redialerConn) readWal(ctx context.Context) {
reader := rc.nw.w.NewReader()
// loadLastProcessedChunk loads the last processed chunk position from a metadata file
func (nw *NetWriter) loadLastProcessedChunk() {
metaFile := filepath.Join(nw.walDir, "last_processed")
data, err := os.ReadFile(metaFile)
if err != nil {
nw.lastProcessedChunk = 0
return
}
var chunk uint32
if _, err := fmt.Sscanf(string(data), "%d", &chunk); err != nil {
nw.lastProcessedChunk = 0
return
}
nw.lastProcessedChunk = chunk
nw.logger.Info("loaded last processed chunk", "block", chunk)
}
// saveLastProcessedChunk saves the last processed chunk position to a metadata file
func (nw *NetWriter) saveLastProcessedChunk(chunk uint32) {
nw.mu.Lock()
nw.lastProcessedChunk = chunk
nw.mu.Unlock()
metaFile := filepath.Join(nw.walDir, "last_processed")
data := fmt.Sprintf("%d", chunk)
if err := os.WriteFile(metaFile, []byte(data), 0o644); err != nil {
nw.logger.Error("failed to save last processed chunk", "error", err)
}
}
// backgroundFlusher runs in the background and flushes WAL entries to the network
func (nw *NetWriter) backgroundFlusher() {
defer nw.flushWg.Done()
var conn net.Conn
var connMu sync.RWMutex
// Function to establish connection
dial := func() error {
newConn, err := net.DialTimeout(nw.addr.Network, nw.addr.JoinHostPort(0), time.Duration(nw.DialTimeout))
if err != nil {
return err
}
connMu.Lock()
if conn != nil {
conn.Close()
}
conn = newConn
connMu.Unlock()
nw.logger.Info("connected to log destination", "address", nw.addr.String())
return nil
}
// Function to write data to connection
writeToConn := func(data []byte) error {
connMu.RLock()
currentConn := conn
connMu.RUnlock()
if currentConn == nil {
return errors.New("no connection")
}
_, err := currentConn.Write(data)
return err
}
// Try initial connection
if err := dial(); err != nil {
if !nw.SoftStart {
nw.logger.Error("failed to connect to log destination", "error", err)
} else {
nw.logger.Warn("failed to connect to log destination, will retry", "error", err)
}
}
// Set up WAL reader
reader := nw.wal.NewReader()
// Skip already processed entries
nw.mu.RLock()
lastChunk := nw.lastProcessedChunk
nw.mu.RUnlock()
if lastChunk > 0 {
nw.logger.Info("skipping already processed entries", "lastProcessedBlock", lastChunk)
// Skip already processed entries
skipped := 0
for {
select {
case <-ctx.Done():
log.Println("context canceled, stopping readWal loop")
return
default:
for data, cp, err := reader.Next(); err != io.EOF; data, cp, err = reader.Next() {
if err == wal.ErrClosed {
log.Printf("wal closed")
return
data, cp, err := reader.Next()
if err == io.EOF {
break
}
if err != nil {
log.Printf("error reading from wal: %v", err)
nw.logger.Error("error reading WAL during skip", "error", err)
break
}
// Skip entries that have already been processed
if cp.BlockNumber <= lastChunk {
skipped++
continue
}
log.Printf("readWal: ChunkPosition: %+v", cp)
log.Printf("data is: %s", string(data))
for _, err := rc.write(data); err != nil; _, err = rc.write(data) {
// This is a new entry, process it
if err := nw.processWALEntry(data, cp, writeToConn); err != nil {
nw.logger.Error("error processing WAL entry", "error", err)
}
}
nw.logger.Info("skipped processed entries", "count", skipped)
}
ticker := time.NewTicker(100 * time.Millisecond) // Check for new entries every 100ms
defer ticker.Stop()
reconnectTicker := time.NewTicker(time.Duration(nw.ReconnectInterval))
defer reconnectTicker.Stop()
for {
select {
case <-nw.flushCtx.Done():
// Flush remaining entries before shutting down
nw.flushRemainingEntries(reader, writeToConn)
connMu.Lock()
if conn != nil {
conn.Close()
}
connMu.Unlock()
return
case <-ticker.C:
// Process available WAL entries
nw.processAvailableEntries(reader, writeToConn)
case <-reconnectTicker.C:
// Try to reconnect if we don't have a connection
connMu.RLock()
hasConn := conn != nil
connMu.RUnlock()
if !hasConn {
if err := dial(); err != nil {
nw.logger.Debug("reconnection attempt failed", "error", err)
}
}
}
}
}
// processAvailableEntries processes all available entries in the WAL
func (nw *NetWriter) processAvailableEntries(reader *wal.Reader, writeToConn func([]byte) error) {
for {
data, cp, err := reader.Next()
if err == io.EOF {
break
}
if err != nil {
if err == wal.ErrClosed {
return
}
nw.logger.Error("error reading from WAL", "error", err)
break
}
// Check if we've already processed this block
nw.mu.RLock()
lastProcessed := nw.lastProcessedChunk
nw.mu.RUnlock()
if cp.BlockNumber <= lastProcessed {
// Already processed, skip
continue
}
if err := nw.processWALEntry(data, cp, writeToConn); err != nil {
nw.logger.Error("error processing WAL entry", "error", err)
// Don't break here - we want to continue processing other entries
}
}
}
// processWALEntry processes a single WAL entry
func (nw *NetWriter) processWALEntry(data []byte, cp *wal.ChunkPosition, writeToConn func([]byte) error) error {
if err := writeToConn(data); err != nil {
// Connection failed, dump to stderr as fallback
os.Stderr.Write(data)
return err
}
// Mark this block as processed
nw.saveLastProcessedChunk(cp.BlockNumber)
nw.logger.Debug("processed WAL entry", "blockNumber", cp.BlockNumber)
return nil
}
// flushRemainingEntries flushes all remaining entries during shutdown
func (nw *NetWriter) flushRemainingEntries(reader *wal.Reader, writeToConn func([]byte) error) {
nw.logger.Info("flushing remaining WAL entries during shutdown")
count := 0
for {
data, cp, err := reader.Next()
if err == io.EOF {
break
}
if err != nil {
nw.logger.Error("error reading from WAL during shutdown flush", "error", err)
break
}
// Check if we've already processed this block
nw.mu.RLock()
lastProcessed := nw.lastProcessedChunk
nw.mu.RUnlock()
if cp.BlockNumber <= lastProcessed {
// Already processed, skip
continue
}
// During shutdown, we try harder to deliver logs
maxRetries := 3
for i := 0; i < maxRetries; i++ {
if err := writeToConn(data); err != nil {
if i == maxRetries-1 {
// Final attempt failed, dump to stderr
os.Stderr.Write(data)
nw.logger.Error("failed to send log entry during shutdown, dumped to stderr", "error", err)
} else {
time.Sleep(time.Second)
}
} else {
nw.saveLastProcessedChunk(cp.BlockNumber)
nw.logger.Debug("flushed WAL entry during shutdown", "blockNumber", cp.BlockNumber)
break
}
}
rc.nw.w.NewReaderWithStart()
count++
}
if count > 0 {
nw.logger.Info("flushed WAL entries during shutdown", "count", count)
}
}
// netWriterConn implements io.WriteCloser and writes to the WAL
type netWriterConn struct {
nw *NetWriter
}
// Write writes data to the WAL (non-blocking)
func (w *netWriterConn) Write(p []byte) (n int, err error) {
if w.nw.wal == nil {
return 0, errors.New("WAL not initialized")
}
// Write to WAL - this should be fast and non-blocking
_, err = w.nw.wal.Write(p)
if err != nil {
return 0, fmt.Errorf("failed to write to WAL: %v", err)
}
return len(p), nil
}
// Close closes the writer and flushes all remaining data
func (w *netWriterConn) Close() error {
if w.nw.flushCtxCancel != nil {
w.nw.flushCtxCancel()
}
// Wait for background flusher to complete
w.nw.flushWg.Wait()
var errs []error
// Sync and close WAL
if w.nw.wal != nil {
if err := w.nw.wal.Sync(); err != nil {
errs = append(errs, fmt.Errorf("WAL sync error: %v", err))
}
if err := w.nw.wal.Close(); err != nil {
errs = append(errs, fmt.Errorf("WAL close error: %v", err))
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}
// UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax:
@ -202,87 +497,6 @@ func (nw *NetWriter) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
return nil
}
// redialerConn wraps an underlying Conn so that if any
// writes fail, the connection is redialed and the write
// is retried.
type redialerConn struct {
net.Conn
connMu sync.RWMutex
nw *NetWriter
timeout time.Duration
lastRedial time.Time
}
func (reconn *redialerConn) Write(b []byte) (n int, err error) {
log.Printf("writing '%d' bytes to wal: %s", len(b), b)
cp, err := reconn.nw.w.Write(b)
log.Printf("wrote to WAL: %+v", cp)
return len(b), err
}
// Write wraps the underlying Conn.Write method, but if that fails,
// it will re-dial the connection anew and try writing again.
func (reconn *redialerConn) write(b []byte) (n int, err error) {
reconn.connMu.RLock()
conn := reconn.Conn
reconn.connMu.RUnlock()
if conn != nil {
if n, err = conn.Write(b); err == nil {
return
}
}
// problem with the connection - lock it and try to fix it
reconn.connMu.Lock()
defer reconn.connMu.Unlock()
// if multiple concurrent writes failed on the same broken conn, then
// one of them might have already re-dialed by now; try writing again
if reconn.Conn != nil {
if n, err = reconn.Conn.Write(b); err == nil {
return
}
}
// there's still a problem, so try to re-attempt dialing the socket
// if some time has passed in which the issue could have potentially
// been resolved - we don't want to block at every single log
// emission (!) - see discussion in #4111
if time.Since(reconn.lastRedial) > 10*time.Second {
reconn.lastRedial = time.Now()
conn2, err2 := reconn.dial()
if err2 != nil {
// logger socket still offline; instead of discarding the log, dump it to stderr
os.Stderr.Write(b)
err = err2
return
}
if n, err = conn2.Write(b); err == nil {
if reconn.Conn != nil {
reconn.Conn.Close()
}
reconn.Conn = conn2
}
}
return
}
func (reconn *redialerConn) Close() error {
reconn.nw.w.Sync()
reconn.nw.walReaderCtxCancel()
return errors.Join(
reconn.nw.w.Sync(),
reconn.nw.w.Close(),
reconn.nw.w.Delete(),
reconn.Conn.Close(),
)
}
func (reconn *redialerConn) dial() (net.Conn, error) {
return net.DialTimeout(reconn.nw.addr.Network, reconn.nw.addr.JoinHostPort(0), reconn.timeout)
}
// Interface guards
var (
_ caddy.Provisioner = (*NetWriter)(nil)

File diff suppressed because it is too large Load Diff