Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions logger/async_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package logger

import (
"io"
"sync"
"sync/atomic"
)

// AsyncWriter wraps an io.Writer and drains writes on a background
// goroutine through a buffered channel. Callers on request hot paths
// don't stall on pipe backpressure when stdout's consumer (filebeat,
// the container runtime, kubectl logs) falls behind.
//
// When the buffer is full, Write drops the line and increments a
// counter exposed via Dropped(). Call Close() during shutdown to flush
// queued lines.
type AsyncWriter struct {
w io.Writer
ch chan []byte
flushCh chan chan struct{}
done chan struct{}
closed atomic.Bool
closeOnce sync.Once
dropped atomic.Uint64
}

// NewAsyncWriter returns an AsyncWriter that drains to w using a buffer
// of capacity queued writes. A goroutine is spawned to drain the
// buffer; call Close to stop it.
func NewAsyncWriter(w io.Writer, capacity int) *AsyncWriter {
a := &AsyncWriter{
w: w,
ch: make(chan []byte, capacity),
flushCh: make(chan chan struct{}),
done: make(chan struct{}),
}
go a.run()
return a
}

func (a *AsyncWriter) run() {
defer close(a.done)
for {
select {
case buf, ok := <-a.ch:
if !ok {
return
}
_, _ = a.w.Write(buf)
case done := <-a.flushCh:
a.drainPending()
close(done)
}
}
}

// drainPending writes everything currently queued in a.ch and returns.
// Called only from run, so there's no concurrent reader of a.ch.
func (a *AsyncWriter) drainPending() {
for {
select {
case buf, ok := <-a.ch:
if !ok {
return
}
_, _ = a.w.Write(buf)
default:
return
}
}
}

// Write queues p for the drain goroutine. It never blocks: if the
// buffer is full the write is dropped and counted.
//
// slog reuses the slice it hands to handlers, so we copy before
// queueing.
func (a *AsyncWriter) Write(p []byte) (int, error) {
if a.closed.Load() {
return 0, io.ErrClosedPipe
}
buf := make([]byte, len(p))
copy(buf, p)
select {
case a.ch <- buf:
default:
a.dropped.Add(1)
}
return len(p), nil
}

// Dropped returns the number of writes dropped because the buffer was
// full.
func (a *AsyncWriter) Dropped() uint64 {
return a.dropped.Load()
}

// Flush blocks until every write queued before this call has been
// written to the underlying writer. Concurrent writes that race with
// Flush make no guarantee. Safe to call from any goroutine.
func (a *AsyncWriter) Flush() {
if a.closed.Load() {
return
}
done := make(chan struct{})
select {
case a.flushCh <- done:
<-done
case <-a.done:
}
}

// Close stops accepting writes, drains queued lines to the underlying
// writer, and stops the drain goroutine. Safe to call multiple times.
func (a *AsyncWriter) Close() error {
a.closeOnce.Do(func() {
a.closed.Store(true)
close(a.ch)
})
<-a.done
return nil
}
166 changes: 166 additions & 0 deletions logger/async_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package logger_test

import (
"bytes"
"io"
"sync"
"testing"

"github.com/cego/go-lib/v2/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestAsyncWriter(t *testing.T) {
t.Run("forwards writes to underlying writer", func(t *testing.T) {
var buf safeBuffer
aw := logger.NewAsyncWriter(&buf, 16)

n, err := aw.Write([]byte("hello\n"))
require.NoError(t, err)
assert.Equal(t, 6, n)

require.NoError(t, aw.Close())
assert.Equal(t, "hello\n", buf.String())
assert.Equal(t, uint64(0), aw.Dropped())
})

t.Run("Close flushes queued writes", func(t *testing.T) {
blocker := newBlockingWriter()
aw := logger.NewAsyncWriter(blocker, 16)

for i := range 5 {
_, err := aw.Write([]byte{byte('a' + i)})
require.NoError(t, err)
}

blocker.unblock()
require.NoError(t, aw.Close())
assert.Equal(t, "abcde", blocker.String())
})

t.Run("drops writes when buffer is full", func(t *testing.T) {
blocker := newBlockingWriter()
aw := logger.NewAsyncWriter(blocker, 2)

// First write is consumed by the drain goroutine immediately
// and blocks inside the underlying writer; the channel then
// fills with two more before subsequent writes drop.
_, _ = aw.Write([]byte("1"))
blocker.waitForFirstWrite()
_, _ = aw.Write([]byte("2"))
_, _ = aw.Write([]byte("3"))
_, _ = aw.Write([]byte("dropped-a"))
_, _ = aw.Write([]byte("dropped-b"))

assert.Equal(t, uint64(2), aw.Dropped())

blocker.unblock()
require.NoError(t, aw.Close())
})

t.Run("Write after Close returns ErrClosedPipe", func(t *testing.T) {
var buf safeBuffer
aw := logger.NewAsyncWriter(&buf, 4)
require.NoError(t, aw.Close())

_, err := aw.Write([]byte("nope"))
assert.ErrorIs(t, err, io.ErrClosedPipe)
})

t.Run("Flush drains pending writes without closing", func(t *testing.T) {
var buf safeBuffer
aw := logger.NewAsyncWriter(&buf, 16)

for i := range 5 {
_, err := aw.Write([]byte{byte('a' + i)})
require.NoError(t, err)
}

aw.Flush()
assert.Equal(t, "abcde", buf.String())

_, err := aw.Write([]byte("f"))
require.NoError(t, err)
aw.Flush()
assert.Equal(t, "abcdef", buf.String())

require.NoError(t, aw.Close())
})

t.Run("Close is idempotent", func(t *testing.T) {
var buf safeBuffer
aw := logger.NewAsyncWriter(&buf, 4)
require.NoError(t, aw.Close())
require.NoError(t, aw.Close())
})

t.Run("copies caller buffer", func(t *testing.T) {
blocker := newBlockingWriter()
aw := logger.NewAsyncWriter(blocker, 4)

p := []byte("first")
_, _ = aw.Write(p)
blocker.waitForFirstWrite()
copy(p, "XXXXX")

blocker.unblock()
require.NoError(t, aw.Close())
assert.Equal(t, "first", blocker.String())
})
}

// safeBuffer is bytes.Buffer with a mutex so concurrent reads from the
// test goroutine and writes from the drain goroutine don't race.
type safeBuffer struct {
mu sync.Mutex
buf bytes.Buffer
}

func (b *safeBuffer) Write(p []byte) (int, error) {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.Write(p)
}

func (b *safeBuffer) String() string {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.String()
}

// blockingWriter stalls the first Write until unblock() is called, so
// tests can deterministically fill the AsyncWriter's channel.
type blockingWriter struct {
mu sync.Mutex
buf bytes.Buffer
gate chan struct{}
firstCh chan struct{}
once sync.Once
}

func newBlockingWriter() *blockingWriter {
return &blockingWriter{
gate: make(chan struct{}),
firstCh: make(chan struct{}),
}
}

func (b *blockingWriter) Write(p []byte) (int, error) {
b.once.Do(func() {
close(b.firstCh)
<-b.gate
})
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.Write(p)
}

func (b *blockingWriter) waitForFirstWrite() { <-b.firstCh }
func (b *blockingWriter) unblock() { close(b.gate) }

func (b *blockingWriter) String() string {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.String()
}
50 changes: 46 additions & 4 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,73 @@ package logger

import (
"encoding/json"
"io"
"log/slog"
"net"
"net/http"
"os"
"runtime/debug"
"sync"
"time"

"github.com/cego/go-lib/v2/headers"
)

// defaultAsyncCapacity is the buffer depth of the shared stdout
// AsyncWriter. Picked so a short stdout stall (a few hundred ms at
// typical service log rates) doesn't drop lines, while a sustained
// stall doesn't grow memory without bound.
const defaultAsyncCapacity = 4096

var (
stdoutAsyncOnce sync.Once
stdoutAsync *AsyncWriter
)

// stdoutWriter returns the process-wide AsyncWriter wrapping os.Stdout,
// lazily initialised. All loggers built by this package share it so
// they share one drain goroutine and one buffer.
func stdoutWriter() *AsyncWriter {
stdoutAsyncOnce.Do(func() {
stdoutAsync = NewAsyncWriter(os.Stdout, defaultAsyncCapacity)
})
return stdoutAsync
}

// Flush blocks until all log lines queued so far have been written to
// stdout. Call it from your graceful shutdown path so the final lines
// reach the log collector before the process exits.
func Flush() {
if w := stdoutAsync; w != nil {
w.Flush()
}
}

// Dropped returns the number of log lines dropped because the async
// buffer was saturated. A non-zero value means stdout's consumer is
// not keeping up.
func Dropped() uint64 {
if w := stdoutAsync; w != nil {
return w.Dropped()
}
return 0
}

type Logger interface {
Debug(message string, args ...any)
Info(message string, args ...any)
Error(message string, args ...any)
}

func New() *slog.Logger {
return newSlogger(slog.LevelDebug)
return newSlogger(stdoutWriter(), slog.LevelDebug)
}

func NewWithLevel(level slog.Level) *slog.Logger {
return newSlogger(level)
return newSlogger(stdoutWriter(), level)
}

func newSlogger(level slog.Level) *slog.Logger {
func newSlogger(w io.Writer, level slog.Level) *slog.Logger {
opts := &slog.HandlerOptions{
Level: level,
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
Expand All @@ -43,7 +85,7 @@ func newSlogger(level slog.Level) *slog.Logger {
return a
},
}
return slog.New(slog.NewJSONHandler(os.Stdout, opts))
return slog.New(slog.NewJSONHandler(w, opts))
}

func GetSlogAttrFromError(err error) slog.Attr {
Expand Down
6 changes: 6 additions & 0 deletions periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ func Run(ctx context.Context, interval time.Duration, jitter time.Duration, fn f
case <-ctx.Done():
return
case <-ticker.C:
// select picks non-deterministically when both
// channels are ready, so re-check cancellation
// before firing.
if ctx.Err() != nil {
return
}
fn()
}
}
Expand Down
Loading
Loading