From 1eb3f1df9e4ceeed265f492506adfbc51daf2f48 Mon Sep 17 00:00:00 2001 From: Mads Jon Nielsen Date: Wed, 20 May 2026 11:01:02 +0200 Subject: [PATCH] logger: write to stdout asynchronously Wraps os.Stdout in a shared AsyncWriter that drains on a single background goroutine through a buffered channel, so callers don't stall when stdout's consumer (filebeat, the container runtime, kubectl logs) falls behind. Drops + counts when the buffer is full. The change is transparent: logger.New() and logger.NewWithLevel() keep their signatures, all existing callers become async on upgrade. Added logger.Flush() for graceful shutdown and logger.Dropped() for observability. Also fixes a non-deterministic bug in periodic.Run: when ctx.Done() and ticker.C are both ready, select can pick the tick first and run fn after cancellation. Re-checks ctx.Err() after consuming a tick. --- logger/async_writer.go | 122 ++++++++++++++++++++++++++ logger/async_writer_test.go | 166 ++++++++++++++++++++++++++++++++++++ logger/logger.go | 50 ++++++++++- periodic/periodic.go | 6 ++ readme.md | 10 +++ 5 files changed, 350 insertions(+), 4 deletions(-) create mode 100644 logger/async_writer.go create mode 100644 logger/async_writer_test.go diff --git a/logger/async_writer.go b/logger/async_writer.go new file mode 100644 index 0000000..82d218d --- /dev/null +++ b/logger/async_writer.go @@ -0,0 +1,122 @@ +package logger + +import ( + "io" + "sync" + "sync/atomic" +) + +// AsyncWriter wraps an io.Writer and drains writes on a background +// goroutine through a buffered channel. Callers on request hot paths +// don't stall on pipe backpressure when stdout's consumer (filebeat, +// the container runtime, kubectl logs) falls behind. +// +// When the buffer is full, Write drops the line and increments a +// counter exposed via Dropped(). Call Close() during shutdown to flush +// queued lines. +type AsyncWriter struct { + w io.Writer + ch chan []byte + flushCh chan chan struct{} + done chan struct{} + closed atomic.Bool + closeOnce sync.Once + dropped atomic.Uint64 +} + +// NewAsyncWriter returns an AsyncWriter that drains to w using a buffer +// of capacity queued writes. A goroutine is spawned to drain the +// buffer; call Close to stop it. +func NewAsyncWriter(w io.Writer, capacity int) *AsyncWriter { + a := &AsyncWriter{ + w: w, + ch: make(chan []byte, capacity), + flushCh: make(chan chan struct{}), + done: make(chan struct{}), + } + go a.run() + return a +} + +func (a *AsyncWriter) run() { + defer close(a.done) + for { + select { + case buf, ok := <-a.ch: + if !ok { + return + } + _, _ = a.w.Write(buf) + case done := <-a.flushCh: + a.drainPending() + close(done) + } + } +} + +// drainPending writes everything currently queued in a.ch and returns. +// Called only from run, so there's no concurrent reader of a.ch. +func (a *AsyncWriter) drainPending() { + for { + select { + case buf, ok := <-a.ch: + if !ok { + return + } + _, _ = a.w.Write(buf) + default: + return + } + } +} + +// Write queues p for the drain goroutine. It never blocks: if the +// buffer is full the write is dropped and counted. +// +// slog reuses the slice it hands to handlers, so we copy before +// queueing. +func (a *AsyncWriter) Write(p []byte) (int, error) { + if a.closed.Load() { + return 0, io.ErrClosedPipe + } + buf := make([]byte, len(p)) + copy(buf, p) + select { + case a.ch <- buf: + default: + a.dropped.Add(1) + } + return len(p), nil +} + +// Dropped returns the number of writes dropped because the buffer was +// full. +func (a *AsyncWriter) Dropped() uint64 { + return a.dropped.Load() +} + +// Flush blocks until every write queued before this call has been +// written to the underlying writer. Concurrent writes that race with +// Flush make no guarantee. Safe to call from any goroutine. +func (a *AsyncWriter) Flush() { + if a.closed.Load() { + return + } + done := make(chan struct{}) + select { + case a.flushCh <- done: + <-done + case <-a.done: + } +} + +// Close stops accepting writes, drains queued lines to the underlying +// writer, and stops the drain goroutine. Safe to call multiple times. +func (a *AsyncWriter) Close() error { + a.closeOnce.Do(func() { + a.closed.Store(true) + close(a.ch) + }) + <-a.done + return nil +} diff --git a/logger/async_writer_test.go b/logger/async_writer_test.go new file mode 100644 index 0000000..9456166 --- /dev/null +++ b/logger/async_writer_test.go @@ -0,0 +1,166 @@ +package logger_test + +import ( + "bytes" + "io" + "sync" + "testing" + + "github.com/cego/go-lib/v2/logger" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAsyncWriter(t *testing.T) { + t.Run("forwards writes to underlying writer", func(t *testing.T) { + var buf safeBuffer + aw := logger.NewAsyncWriter(&buf, 16) + + n, err := aw.Write([]byte("hello\n")) + require.NoError(t, err) + assert.Equal(t, 6, n) + + require.NoError(t, aw.Close()) + assert.Equal(t, "hello\n", buf.String()) + assert.Equal(t, uint64(0), aw.Dropped()) + }) + + t.Run("Close flushes queued writes", func(t *testing.T) { + blocker := newBlockingWriter() + aw := logger.NewAsyncWriter(blocker, 16) + + for i := range 5 { + _, err := aw.Write([]byte{byte('a' + i)}) + require.NoError(t, err) + } + + blocker.unblock() + require.NoError(t, aw.Close()) + assert.Equal(t, "abcde", blocker.String()) + }) + + t.Run("drops writes when buffer is full", func(t *testing.T) { + blocker := newBlockingWriter() + aw := logger.NewAsyncWriter(blocker, 2) + + // First write is consumed by the drain goroutine immediately + // and blocks inside the underlying writer; the channel then + // fills with two more before subsequent writes drop. + _, _ = aw.Write([]byte("1")) + blocker.waitForFirstWrite() + _, _ = aw.Write([]byte("2")) + _, _ = aw.Write([]byte("3")) + _, _ = aw.Write([]byte("dropped-a")) + _, _ = aw.Write([]byte("dropped-b")) + + assert.Equal(t, uint64(2), aw.Dropped()) + + blocker.unblock() + require.NoError(t, aw.Close()) + }) + + t.Run("Write after Close returns ErrClosedPipe", func(t *testing.T) { + var buf safeBuffer + aw := logger.NewAsyncWriter(&buf, 4) + require.NoError(t, aw.Close()) + + _, err := aw.Write([]byte("nope")) + assert.ErrorIs(t, err, io.ErrClosedPipe) + }) + + t.Run("Flush drains pending writes without closing", func(t *testing.T) { + var buf safeBuffer + aw := logger.NewAsyncWriter(&buf, 16) + + for i := range 5 { + _, err := aw.Write([]byte{byte('a' + i)}) + require.NoError(t, err) + } + + aw.Flush() + assert.Equal(t, "abcde", buf.String()) + + _, err := aw.Write([]byte("f")) + require.NoError(t, err) + aw.Flush() + assert.Equal(t, "abcdef", buf.String()) + + require.NoError(t, aw.Close()) + }) + + t.Run("Close is idempotent", func(t *testing.T) { + var buf safeBuffer + aw := logger.NewAsyncWriter(&buf, 4) + require.NoError(t, aw.Close()) + require.NoError(t, aw.Close()) + }) + + t.Run("copies caller buffer", func(t *testing.T) { + blocker := newBlockingWriter() + aw := logger.NewAsyncWriter(blocker, 4) + + p := []byte("first") + _, _ = aw.Write(p) + blocker.waitForFirstWrite() + copy(p, "XXXXX") + + blocker.unblock() + require.NoError(t, aw.Close()) + assert.Equal(t, "first", blocker.String()) + }) +} + +// safeBuffer is bytes.Buffer with a mutex so concurrent reads from the +// test goroutine and writes from the drain goroutine don't race. +type safeBuffer struct { + mu sync.Mutex + buf bytes.Buffer +} + +func (b *safeBuffer) Write(p []byte) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.Write(p) +} + +func (b *safeBuffer) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.String() +} + +// blockingWriter stalls the first Write until unblock() is called, so +// tests can deterministically fill the AsyncWriter's channel. +type blockingWriter struct { + mu sync.Mutex + buf bytes.Buffer + gate chan struct{} + firstCh chan struct{} + once sync.Once +} + +func newBlockingWriter() *blockingWriter { + return &blockingWriter{ + gate: make(chan struct{}), + firstCh: make(chan struct{}), + } +} + +func (b *blockingWriter) Write(p []byte) (int, error) { + b.once.Do(func() { + close(b.firstCh) + <-b.gate + }) + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.Write(p) +} + +func (b *blockingWriter) waitForFirstWrite() { <-b.firstCh } +func (b *blockingWriter) unblock() { close(b.gate) } + +func (b *blockingWriter) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.String() +} diff --git a/logger/logger.go b/logger/logger.go index 5f8a1f2..8af51ab 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -2,16 +2,58 @@ package logger import ( "encoding/json" + "io" "log/slog" "net" "net/http" "os" "runtime/debug" + "sync" "time" "github.com/cego/go-lib/v2/headers" ) +// defaultAsyncCapacity is the buffer depth of the shared stdout +// AsyncWriter. Picked so a short stdout stall (a few hundred ms at +// typical service log rates) doesn't drop lines, while a sustained +// stall doesn't grow memory without bound. +const defaultAsyncCapacity = 4096 + +var ( + stdoutAsyncOnce sync.Once + stdoutAsync *AsyncWriter +) + +// stdoutWriter returns the process-wide AsyncWriter wrapping os.Stdout, +// lazily initialised. All loggers built by this package share it so +// they share one drain goroutine and one buffer. +func stdoutWriter() *AsyncWriter { + stdoutAsyncOnce.Do(func() { + stdoutAsync = NewAsyncWriter(os.Stdout, defaultAsyncCapacity) + }) + return stdoutAsync +} + +// Flush blocks until all log lines queued so far have been written to +// stdout. Call it from your graceful shutdown path so the final lines +// reach the log collector before the process exits. +func Flush() { + if w := stdoutAsync; w != nil { + w.Flush() + } +} + +// Dropped returns the number of log lines dropped because the async +// buffer was saturated. A non-zero value means stdout's consumer is +// not keeping up. +func Dropped() uint64 { + if w := stdoutAsync; w != nil { + return w.Dropped() + } + return 0 +} + type Logger interface { Debug(message string, args ...any) Info(message string, args ...any) @@ -19,14 +61,14 @@ type Logger interface { } func New() *slog.Logger { - return newSlogger(slog.LevelDebug) + return newSlogger(stdoutWriter(), slog.LevelDebug) } func NewWithLevel(level slog.Level) *slog.Logger { - return newSlogger(level) + return newSlogger(stdoutWriter(), level) } -func newSlogger(level slog.Level) *slog.Logger { +func newSlogger(w io.Writer, level slog.Level) *slog.Logger { opts := &slog.HandlerOptions{ Level: level, ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { @@ -43,7 +85,7 @@ func newSlogger(level slog.Level) *slog.Logger { return a }, } - return slog.New(slog.NewJSONHandler(os.Stdout, opts)) + return slog.New(slog.NewJSONHandler(w, opts)) } func GetSlogAttrFromError(err error) slog.Attr { diff --git a/periodic/periodic.go b/periodic/periodic.go index 30d56d1..0d629c6 100644 --- a/periodic/periodic.go +++ b/periodic/periodic.go @@ -28,6 +28,12 @@ func Run(ctx context.Context, interval time.Duration, jitter time.Duration, fn f case <-ctx.Done(): return case <-ticker.C: + // select picks non-deterministically when both + // channels are ready, so re-check cancellation + // before firing. + if ctx.Err() != nil { + return + } fn() } } diff --git a/readme.md b/readme.md index 3a66097..f93391f 100644 --- a/readme.md +++ b/readme.md @@ -24,6 +24,13 @@ import ( ``` ## Using Logger + +Writes go through a shared background goroutine so request handlers +don't stall when stdout's consumer (filebeat, the container runtime, +`kubectl logs`) falls behind. Call `logger.Flush()` from your graceful +shutdown path; `logger.Dropped()` reports lines dropped because the +buffer was saturated. + ```go l := logger.New() @@ -42,6 +49,9 @@ l := logger.NewWithLevel(slog.LevelInfo) // Set as global slog default slog.SetDefault(l) +// Drain queued lines before exit +defer logger.Flush() + // Testing with mock logger l := logger.NewMock() r := renderer.New(l)