mirror of
https://github.com/caddyserver/caddy.git
synced 2025-07-08 18:55:03 -04:00
506 lines
13 KiB
Go
506 lines
13 KiB
Go
// Copyright 2015 Matthew Holt and The Caddy Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package logging
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rosedblabs/wal"
|
|
|
|
"github.com/caddyserver/caddy/v2"
|
|
"github.com/caddyserver/caddy/v2/caddyconfig/caddyfile"
|
|
)
|
|
|
|
func init() {
|
|
caddy.RegisterModule(&NetWriter{})
|
|
}
|
|
|
|
// NetWriter implements a log writer that outputs to a network socket. If
|
|
// the socket goes down, it will dump logs to stderr while it attempts to
|
|
// reconnect. Logs are written to a WAL first and then asynchronously
|
|
// flushed to the network to avoid blocking HTTP request handling.
|
|
type NetWriter struct {
|
|
// The address of the network socket to which to connect.
|
|
Address string `json:"address,omitempty"`
|
|
|
|
// The timeout to wait while connecting to the socket.
|
|
DialTimeout caddy.Duration `json:"dial_timeout,omitempty"`
|
|
|
|
// If enabled, allow connections errors when first opening the
|
|
// writer. The error and subsequent log entries will be reported
|
|
// to stderr instead until a connection can be re-established.
|
|
SoftStart bool `json:"soft_start,omitempty"`
|
|
|
|
// How often to attempt reconnection when the network connection fails.
|
|
ReconnectInterval caddy.Duration `json:"reconnect_interval,omitempty"`
|
|
|
|
// Buffer size for the WAL flush channel.
|
|
BufferSize int `json:"buffer_size,omitempty"`
|
|
|
|
logger *slog.Logger
|
|
addr caddy.NetworkAddress
|
|
wal *wal.WAL
|
|
walDir string
|
|
flushCtx context.Context
|
|
flushCtxCancel context.CancelFunc
|
|
flushWg sync.WaitGroup
|
|
lastProcessedChunk uint32
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// CaddyModule returns the Caddy module information.
|
|
func (*NetWriter) CaddyModule() caddy.ModuleInfo {
|
|
return caddy.ModuleInfo{
|
|
ID: "caddy.logging.writers.net",
|
|
New: func() caddy.Module { return new(NetWriter) },
|
|
}
|
|
}
|
|
|
|
// Provision sets up the module.
|
|
func (nw *NetWriter) Provision(ctx caddy.Context) error {
|
|
nw.logger = slog.Default()
|
|
repl := caddy.NewReplacer()
|
|
address, err := repl.ReplaceOrErr(nw.Address, true, true)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid host in address: %v", err)
|
|
}
|
|
|
|
nw.addr, err = caddy.ParseNetworkAddress(address)
|
|
if err != nil {
|
|
return fmt.Errorf("parsing network address '%s': %v", address, err)
|
|
}
|
|
|
|
if nw.addr.PortRangeSize() != 1 {
|
|
return fmt.Errorf("multiple ports not supported")
|
|
}
|
|
|
|
if nw.DialTimeout < 0 {
|
|
return fmt.Errorf("timeout cannot be less than 0")
|
|
}
|
|
|
|
if nw.DialTimeout == 0 {
|
|
nw.DialTimeout = caddy.Duration(10 * time.Second)
|
|
}
|
|
|
|
if nw.ReconnectInterval == 0 {
|
|
nw.ReconnectInterval = caddy.Duration(10 * time.Second)
|
|
}
|
|
|
|
if nw.BufferSize <= 0 {
|
|
nw.BufferSize = 1000
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (nw *NetWriter) String() string {
|
|
return nw.addr.String()
|
|
}
|
|
|
|
// WriterKey returns a unique key representing this nw.
|
|
func (nw *NetWriter) WriterKey() string {
|
|
return nw.addr.String()
|
|
}
|
|
|
|
// OpenWriter opens a new network connection and sets up the WAL.
|
|
func (nw *NetWriter) OpenWriter() (io.WriteCloser, error) {
|
|
// Set up WAL directory
|
|
nw.walDir = filepath.Join(caddy.AppDataDir(), "wal", "netwriter", nw.addr.String())
|
|
if err := os.MkdirAll(nw.walDir, 0o755); err != nil {
|
|
return nil, fmt.Errorf("failed to create WAL directory: %v", err)
|
|
}
|
|
|
|
// Open WAL
|
|
opts := wal.DefaultOptions
|
|
opts.DirPath = nw.walDir
|
|
opts.SegmentSize = 64 * 1024 * 1024 // 64MB segments
|
|
w, err := wal.Open(opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open WAL: %v", err)
|
|
}
|
|
nw.wal = w
|
|
|
|
// Load last processed chunk position from metadata file if it exists
|
|
nw.loadLastProcessedChunk()
|
|
|
|
// Create the writer wrapper
|
|
writer := &netWriterConn{
|
|
nw: nw,
|
|
}
|
|
|
|
// Start the background flusher
|
|
nw.flushCtx, nw.flushCtxCancel = context.WithCancel(context.Background())
|
|
nw.flushWg.Add(1)
|
|
go nw.backgroundFlusher()
|
|
|
|
return writer, nil
|
|
}
|
|
|
|
// loadLastProcessedChunk loads the last processed chunk position from a metadata file
|
|
func (nw *NetWriter) loadLastProcessedChunk() {
|
|
metaFile := filepath.Join(nw.walDir, "last_processed")
|
|
data, err := os.ReadFile(metaFile)
|
|
if err != nil {
|
|
nw.lastProcessedChunk = 0
|
|
return
|
|
}
|
|
|
|
var chunk uint32
|
|
if _, err := fmt.Sscanf(string(data), "%d", &chunk); err != nil {
|
|
nw.lastProcessedChunk = 0
|
|
return
|
|
}
|
|
|
|
nw.lastProcessedChunk = chunk
|
|
nw.logger.Info("loaded last processed chunk", "block", chunk)
|
|
}
|
|
|
|
// saveLastProcessedChunk saves the last processed chunk position to a metadata file
|
|
func (nw *NetWriter) saveLastProcessedChunk(chunk uint32) {
|
|
nw.mu.Lock()
|
|
nw.lastProcessedChunk = chunk
|
|
nw.mu.Unlock()
|
|
|
|
metaFile := filepath.Join(nw.walDir, "last_processed")
|
|
data := fmt.Sprintf("%d", chunk)
|
|
if err := os.WriteFile(metaFile, []byte(data), 0o600); err != nil {
|
|
nw.logger.Error("failed to save last processed chunk", "error", err)
|
|
}
|
|
}
|
|
|
|
// backgroundFlusher runs in the background and flushes WAL entries to the network
|
|
func (nw *NetWriter) backgroundFlusher() {
|
|
defer nw.flushWg.Done()
|
|
|
|
var conn net.Conn
|
|
var connMu sync.RWMutex
|
|
|
|
// Function to establish connection
|
|
dial := func() error {
|
|
newConn, err := net.DialTimeout(nw.addr.Network, nw.addr.JoinHostPort(0), time.Duration(nw.DialTimeout))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
connMu.Lock()
|
|
if conn != nil {
|
|
conn.Close()
|
|
}
|
|
conn = newConn
|
|
connMu.Unlock()
|
|
|
|
nw.logger.Info("connected to log destination", "address", nw.addr.String())
|
|
return nil
|
|
}
|
|
|
|
// Function to write data to connection
|
|
writeToConn := func(data []byte) error {
|
|
connMu.RLock()
|
|
currentConn := conn
|
|
connMu.RUnlock()
|
|
|
|
if currentConn == nil {
|
|
return errors.New("no connection")
|
|
}
|
|
|
|
_, err := currentConn.Write(data)
|
|
return err
|
|
}
|
|
|
|
// Try initial connection
|
|
if err := dial(); err != nil {
|
|
if !nw.SoftStart {
|
|
nw.logger.Error("failed to connect to log destination", "error", err)
|
|
} else {
|
|
nw.logger.Warn("failed to connect to log destination, will retry", "error", err)
|
|
}
|
|
}
|
|
|
|
// Set up WAL reader
|
|
reader := nw.wal.NewReader()
|
|
|
|
// Skip already processed entries
|
|
nw.mu.RLock()
|
|
lastChunk := nw.lastProcessedChunk
|
|
nw.mu.RUnlock()
|
|
|
|
if lastChunk > 0 {
|
|
nw.logger.Info("skipping already processed entries", "lastProcessedBlock", lastChunk)
|
|
// Skip already processed entries
|
|
skipped := 0
|
|
for {
|
|
data, cp, err := reader.Next()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
nw.logger.Error("error reading WAL during skip", "error", err)
|
|
break
|
|
}
|
|
|
|
// Skip entries that have already been processed
|
|
if cp.BlockNumber <= lastChunk {
|
|
skipped++
|
|
continue
|
|
}
|
|
|
|
// This is a new entry, process it
|
|
if err := nw.processWALEntry(data, cp, writeToConn); err != nil {
|
|
nw.logger.Error("error processing WAL entry", "error", err)
|
|
}
|
|
}
|
|
nw.logger.Info("skipped processed entries", "count", skipped)
|
|
}
|
|
|
|
ticker := time.NewTicker(100 * time.Millisecond) // Check for new entries every 100ms
|
|
defer ticker.Stop()
|
|
|
|
reconnectTicker := time.NewTicker(time.Duration(nw.ReconnectInterval))
|
|
defer reconnectTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-nw.flushCtx.Done():
|
|
// Flush remaining entries before shutting down
|
|
nw.flushRemainingEntries(reader, writeToConn)
|
|
|
|
connMu.Lock()
|
|
if conn != nil {
|
|
conn.Close()
|
|
}
|
|
connMu.Unlock()
|
|
return
|
|
|
|
case <-ticker.C:
|
|
// Process available WAL entries
|
|
nw.processAvailableEntries(reader, writeToConn)
|
|
|
|
case <-reconnectTicker.C:
|
|
// Try to reconnect if we don't have a connection
|
|
connMu.RLock()
|
|
hasConn := conn != nil
|
|
connMu.RUnlock()
|
|
|
|
if !hasConn {
|
|
if err := dial(); err != nil {
|
|
nw.logger.Debug("reconnection attempt failed", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processAvailableEntries processes all available entries in the WAL
|
|
func (nw *NetWriter) processAvailableEntries(reader *wal.Reader, writeToConn func([]byte) error) {
|
|
for {
|
|
data, cp, err := reader.Next()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
if err == wal.ErrClosed {
|
|
return
|
|
}
|
|
nw.logger.Error("error reading from WAL", "error", err)
|
|
break
|
|
}
|
|
|
|
// Check if we've already processed this block
|
|
nw.mu.RLock()
|
|
lastProcessed := nw.lastProcessedChunk
|
|
nw.mu.RUnlock()
|
|
|
|
if cp.BlockNumber <= lastProcessed {
|
|
// Already processed, skip
|
|
continue
|
|
}
|
|
|
|
if err := nw.processWALEntry(data, cp, writeToConn); err != nil {
|
|
nw.logger.Error("error processing WAL entry", "error", err)
|
|
// Don't break here - we want to continue processing other entries
|
|
}
|
|
}
|
|
}
|
|
|
|
// processWALEntry processes a single WAL entry
|
|
func (nw *NetWriter) processWALEntry(data []byte, cp *wal.ChunkPosition, writeToConn func([]byte) error) error {
|
|
if err := writeToConn(data); err != nil {
|
|
// Connection failed, dump to stderr as fallback
|
|
os.Stderr.Write(data)
|
|
return err
|
|
}
|
|
|
|
// Mark this block as processed
|
|
nw.saveLastProcessedChunk(cp.BlockNumber)
|
|
nw.logger.Debug("processed WAL entry", "blockNumber", cp.BlockNumber)
|
|
return nil
|
|
}
|
|
|
|
// flushRemainingEntries flushes all remaining entries during shutdown
|
|
func (nw *NetWriter) flushRemainingEntries(reader *wal.Reader, writeToConn func([]byte) error) {
|
|
nw.logger.Info("flushing remaining WAL entries during shutdown")
|
|
|
|
count := 0
|
|
for {
|
|
data, cp, err := reader.Next()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
nw.logger.Error("error reading from WAL during shutdown flush", "error", err)
|
|
break
|
|
}
|
|
|
|
// Check if we've already processed this block
|
|
nw.mu.RLock()
|
|
lastProcessed := nw.lastProcessedChunk
|
|
nw.mu.RUnlock()
|
|
|
|
if cp.BlockNumber <= lastProcessed {
|
|
// Already processed, skip
|
|
continue
|
|
}
|
|
|
|
// During shutdown, we try harder to deliver logs
|
|
maxRetries := 3
|
|
for i := 0; i < maxRetries; i++ {
|
|
if err := writeToConn(data); err != nil {
|
|
if i == maxRetries-1 {
|
|
// Final attempt failed, dump to stderr
|
|
os.Stderr.Write(data)
|
|
nw.logger.Error("failed to send log entry during shutdown, dumped to stderr", "error", err)
|
|
} else {
|
|
time.Sleep(time.Second)
|
|
}
|
|
} else {
|
|
nw.saveLastProcessedChunk(cp.BlockNumber)
|
|
nw.logger.Debug("flushed WAL entry during shutdown", "blockNumber", cp.BlockNumber)
|
|
break
|
|
}
|
|
}
|
|
count++
|
|
}
|
|
|
|
if count > 0 {
|
|
nw.logger.Info("flushed WAL entries during shutdown", "count", count)
|
|
}
|
|
}
|
|
|
|
// netWriterConn implements io.WriteCloser and writes to the WAL
|
|
type netWriterConn struct {
|
|
nw *NetWriter
|
|
}
|
|
|
|
// Write writes data to the WAL (non-blocking)
|
|
func (w *netWriterConn) Write(p []byte) (n int, err error) {
|
|
if w.nw.wal == nil {
|
|
return 0, errors.New("WAL not initialized")
|
|
}
|
|
|
|
// Write to WAL - this should be fast and non-blocking
|
|
_, err = w.nw.wal.Write(p)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to write to WAL: %v", err)
|
|
}
|
|
|
|
return len(p), nil
|
|
}
|
|
|
|
// Close closes the writer and flushes all remaining data
|
|
func (w *netWriterConn) Close() error {
|
|
if w.nw.flushCtxCancel != nil {
|
|
w.nw.flushCtxCancel()
|
|
}
|
|
|
|
// Wait for background flusher to complete
|
|
w.nw.flushWg.Wait()
|
|
|
|
var errs []error
|
|
|
|
// Sync and close WAL
|
|
if w.nw.wal != nil {
|
|
if err := w.nw.wal.Sync(); err != nil {
|
|
errs = append(errs, fmt.Errorf("WAL sync error: %v", err))
|
|
}
|
|
if err := w.nw.wal.Close(); err != nil {
|
|
errs = append(errs, fmt.Errorf("WAL close error: %v", err))
|
|
}
|
|
}
|
|
|
|
if len(errs) > 0 {
|
|
return errors.Join(errs...)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax:
|
|
//
|
|
// net <address> {
|
|
// dial_timeout <duration>
|
|
// soft_start
|
|
// }
|
|
func (nw *NetWriter) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
|
|
d.Next() // consume writer name
|
|
if !d.NextArg() {
|
|
return d.ArgErr()
|
|
}
|
|
nw.Address = d.Val()
|
|
if d.NextArg() {
|
|
return d.ArgErr()
|
|
}
|
|
for d.NextBlock(0) {
|
|
switch d.Val() {
|
|
case "dial_timeout":
|
|
if !d.NextArg() {
|
|
return d.ArgErr()
|
|
}
|
|
timeout, err := caddy.ParseDuration(d.Val())
|
|
if err != nil {
|
|
return d.Errf("invalid duration: %s", d.Val())
|
|
}
|
|
if d.NextArg() {
|
|
return d.ArgErr()
|
|
}
|
|
nw.DialTimeout = caddy.Duration(timeout)
|
|
|
|
case "soft_start":
|
|
if d.NextArg() {
|
|
return d.ArgErr()
|
|
}
|
|
nw.SoftStart = true
|
|
|
|
default:
|
|
return d.Errf("unrecognized subdirective '%s'", d.Val())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Interface guards
|
|
var (
|
|
_ caddy.Provisioner = (*NetWriter)(nil)
|
|
_ caddy.WriterOpener = (*NetWriter)(nil)
|
|
_ caddyfile.Unmarshaler = (*NetWriter)(nil)
|
|
)
|