Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions cmd/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@ import (

func main() {
cfg := config.Load("gateway")
log := logger.Must(cfg.ServiceName, cfg.Env)
log := logger.Must(cfg.ServiceName, cfg.Env, logger.SampleConfig{
Initial: cfg.LogSampleInitial,
Thereafter: cfg.LogSampleThereafter,
})
defer func() { _ = log.Sync() }()

shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
HighVolumeEvents: cfg.TraceHighVolumeEvents,
HighVolumeRatio: cfg.TraceHighVolumeSampleRatio,
}, log)
defer func() { _ = shutdownTracing(context.Background()) }()

Expand Down
21 changes: 13 additions & 8 deletions cmd/leaderboard/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@ import (

func main() {
cfg := config.Load("leaderboard")
log := logger.Must(cfg.ServiceName, cfg.Env)
log := logger.Must(cfg.ServiceName, cfg.Env, logger.SampleConfig{
Initial: cfg.LogSampleInitial,
Thereafter: cfg.LogSampleThereafter,
})
defer func() { _ = log.Sync() }()

shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
HighVolumeEvents: cfg.TraceHighVolumeEvents,
HighVolumeRatio: cfg.TraceHighVolumeSampleRatio,
}, log)
defer func() { _ = shutdownTracing(context.Background()) }()

Expand Down
21 changes: 13 additions & 8 deletions cmd/matchmaking/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@ import (

func main() {
cfg := config.Load("matchmaking")
log := logger.Must(cfg.ServiceName, cfg.Env)
log := logger.Must(cfg.ServiceName, cfg.Env, logger.SampleConfig{
Initial: cfg.LogSampleInitial,
Thereafter: cfg.LogSampleThereafter,
})
defer func() { _ = log.Sync() }()

shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
HighVolumeEvents: cfg.TraceHighVolumeEvents,
HighVolumeRatio: cfg.TraceHighVolumeSampleRatio,
}, log)
defer func() { _ = shutdownTracing(context.Background()) }()

Expand Down
21 changes: 13 additions & 8 deletions cmd/outbox-relay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,22 @@ import (

func main() {
cfg := config.Load("outbox-relay")
log := logger.Must(cfg.ServiceName, cfg.Env)
log := logger.Must(cfg.ServiceName, cfg.Env, logger.SampleConfig{
Initial: cfg.LogSampleInitial,
Thereafter: cfg.LogSampleThereafter,
})
defer func() { _ = log.Sync() }()

shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
HighVolumeEvents: cfg.TraceHighVolumeEvents,
HighVolumeRatio: cfg.TraceHighVolumeSampleRatio,
}, log)
defer func() { _ = shutdownTracing(context.Background()) }()

Expand Down
21 changes: 13 additions & 8 deletions cmd/player/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,22 @@ import (

func main() {
cfg := config.Load("player")
log := logger.Must(cfg.ServiceName, cfg.Env)
log := logger.Must(cfg.ServiceName, cfg.Env, logger.SampleConfig{
Initial: cfg.LogSampleInitial,
Thereafter: cfg.LogSampleThereafter,
})
defer func() { _ = log.Sync() }()

shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
HighVolumeEvents: cfg.TraceHighVolumeEvents,
HighVolumeRatio: cfg.TraceHighVolumeSampleRatio,
}, log)
defer func() { _ = shutdownTracing(context.Background()) }()

Expand Down
21 changes: 13 additions & 8 deletions cmd/presence/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@ import (

func main() {
cfg := config.Load("presence")
log := logger.Must(cfg.ServiceName, cfg.Env)
log := logger.Must(cfg.ServiceName, cfg.Env, logger.SampleConfig{
Initial: cfg.LogSampleInitial,
Thereafter: cfg.LogSampleThereafter,
})
defer func() { _ = log.Sync() }()

shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
HighVolumeEvents: cfg.TraceHighVolumeEvents,
HighVolumeRatio: cfg.TraceHighVolumeSampleRatio,
}, log)
defer func() { _ = shutdownTracing(context.Background()) }()

Expand Down
21 changes: 13 additions & 8 deletions cmd/websocket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@ import (

func main() {
cfg := config.Load("websocket")
log := logger.Must(cfg.ServiceName, cfg.Env)
log := logger.Must(cfg.ServiceName, cfg.Env, logger.SampleConfig{
Initial: cfg.LogSampleInitial,
Thereafter: cfg.LogSampleThereafter,
})
defer func() { _ = log.Sync() }()

shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
HighVolumeEvents: cfg.TraceHighVolumeEvents,
HighVolumeRatio: cfg.TraceHighVolumeSampleRatio,
}, log)
defer func() { _ = shutdownTracing(context.Background()) }()

Expand Down
18 changes: 18 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@ type Config struct {
OTelEndpoint string
OTelSampler string
OTelSamplerRatio float64

// Observability sampling. Logs and traces are both observability load, so
// their sampling knobs live together. Logger sampling thins the flood of
// successful/fast 2xx access logs (errors and slow requests always bypass the
// sampler); high-volume trace sampling drops most consumer spans for chatty
// event types while ALWAYS keeping the trace-context propagation intact.
LogSampleInitial int // first N identical entries/sec logged in full
LogSampleThereafter int // then 1 in M of the rest
LogSlowRequestThreshold time.Duration // requests slower than this always log
TraceHighVolumeEvents []string // event types whose consumer spans are sampled down
TraceHighVolumeSampleRatio float64 // keep-ratio for those high-volume spans
}

// Load reads configuration for the named service from the environment.
Expand Down Expand Up @@ -145,6 +156,13 @@ func Load(serviceName string) *Config {
OTelEndpoint: getEnv("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317"),
OTelSampler: getEnv("OTEL_TRACES_SAMPLER", "parentbased_always_on"),
OTelSamplerRatio: getEnvFloat("OTEL_TRACES_SAMPLER_ARG", 1.0),

// Observability sampling (logs + traces, kept side by side).
LogSampleInitial: getEnvInt("LOG_SAMPLE_INITIAL", 100),
LogSampleThereafter: getEnvInt("LOG_SAMPLE_THEREAFTER", 100),
LogSlowRequestThreshold: getEnvDuration("LOG_SLOW_REQUEST_THRESHOLD", time.Second),
TraceHighVolumeEvents: splitCSV(getEnv("TRACE_HIGHVOLUME_EVENTS", "LeaderboardUpdated")),
TraceHighVolumeSampleRatio: getEnvFloat("TRACE_HIGHVOLUME_SAMPLE_RATIO", 0.01),
}
}

Expand Down
11 changes: 8 additions & 3 deletions pkg/events/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,15 @@ func (b *NATSBus) process(ctx context.Context, topic string, handler Handler, ms
return
}

// Continue the producer's trace across the async boundary (same mechanism
// as RedisBus / ReceiveSpan, just inlined so we can wrap ACK timing).
// Continue the producer's trace across the async boundary. The context is
// extracted UNCONDITIONALLY (this never opens a span), so trace propagation
// is preserved even when the consumer span below is sampled out for a
// high-volume event type. StartConsumerSpan then either opens a recording
// span or, for a sampled-out high-volume event, returns the propagating
// context + (non-recording) span — keeping the trace chain intact while
// dropping the exporter cost. See tracing.StartConsumerSpan.
hctx := tracing.ResumeFromCarrier(ctx, e.Carrier)
hctx, span := tracing.Tracer().Start(hctx, "events.consume "+topic,
hctx, span := tracing.StartConsumerSpan(hctx, "events.consume "+topic, e.Type,
trace.WithSpanKind(trace.SpanKindConsumer),
trace.WithAttributes(
semconv.MessagingSystemKey.String("nats"),
Expand Down
5 changes: 4 additions & 1 deletion pkg/events/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ func (b *RedisBus) Publish(ctx context.Context, topic string, e Event) error {
// distributed trace. Returns the new context and span; the caller must End the
// span. Transport-agnostic: relies only on the OTel propagator and the Carrier.
func ReceiveSpan(ctx context.Context, e Event, name string) (context.Context, trace.Span) {
// Extract unconditionally (no span here) so propagation survives even when
// the consumer span is sampled out for a high-volume event type, then let
// StartConsumerSpan apply the per-event-type sampling.
ctx = tracing.ResumeFromCarrier(ctx, e.Carrier)
return tracing.Tracer().Start(ctx, name,
return tracing.StartConsumerSpan(ctx, name, e.Type,
trace.WithSpanKind(trace.SpanKindConsumer),
trace.WithAttributes(
semconv.MessagingSystemKey.String("redis"),
Expand Down
74 changes: 69 additions & 5 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,30 @@
package logger

import (
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// SampleConfig tunes the access-log sampler. initial entries (per identical
// message, per second) pass in full; then one in every thereafter passes. It
// only ever thins Info-and-below — Warn/Error/etc. always bypass it (see
// New), so errors and slow requests are never dropped. A zero/negative
// initial disables sampling entirely (every entry passes), which is the
// development default.
type SampleConfig struct {
Initial int
Thereafter int
Tick time.Duration
}

// New builds a zap logger. Production gets JSON output for log aggregation;
// development gets a human-readable console encoder.
func New(serviceName, env string) (*zap.Logger, error) {
// development gets a human-readable console encoder. When sample.Initial > 0 the
// returned logger samples high-volume Info logs (see sampledCore) so a flood of
// successful 2xx access logs does not dominate CPU/IO, while still emitting
// every Warn+ entry.
func New(serviceName, env string, sample SampleConfig) (*zap.Logger, error) {
var cfg zap.Config
if env == "production" {
cfg = zap.NewProductionConfig()
Expand All @@ -19,15 +36,62 @@ func New(serviceName, env string) (*zap.Logger, error) {
}
cfg.EncoderConfig.TimeKey = "ts"
cfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
return cfg.Build(zap.Fields(zap.String("service", serviceName)))

opts := []zap.Option{}
if sample.Initial > 0 {
tick := sample.Tick
if tick <= 0 {
tick = time.Second
}
// Wrap the core so only Info-and-below is sampled; Warn and above pass
// through untouched. zap's own sampler samples ALL levels up to Error, so
// it cannot be used directly without dropping error logs.
opts = append(opts, zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return newSampledCore(core, tick, sample.Initial, sample.Thereafter)
}))
}

return cfg.Build(append(opts, zap.Fields(zap.String("service", serviceName)))...)
}

// Must is a convenience wrapper that panics on logger construction failure —
// a service that cannot log should not start.
func Must(serviceName, env string) *zap.Logger {
l, err := New(serviceName, env)
func Must(serviceName, env string, sample SampleConfig) *zap.Logger {
l, err := New(serviceName, env, sample)
if err != nil {
panic(err)
}
return l
}

// sampledCore routes Info-and-below through a zap sampler and Warn-and-above
// straight to the underlying core. This is the "bypass" the architecture review
// asked for: high-volume successful access logs (Info) are thinned, while the
// rare, valuable Warn/Error lines (errors, slow requests) are never sampled.
type sampledCore struct {
zapcore.Core // underlying core: handles Warn and above directly
sampled zapcore.Core // sampler-wrapped core: handles Info and below
}

func newSampledCore(core zapcore.Core, tick time.Duration, first, thereafter int) zapcore.Core {
return &sampledCore{
Core: core,
sampled: zapcore.NewSamplerWithOptions(core, tick, first, thereafter),
}
}

func (c *sampledCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if ent.Level >= zapcore.WarnLevel {
return c.Core.Check(ent, ce) // bypass the sampler entirely
}
return c.sampled.Check(ent, ce)
}

// With must clone BOTH wrapped cores so contextual fields (e.g. the service
// name) are present whichever path an entry takes.
func (c *sampledCore) With(fields []zapcore.Field) zapcore.Core {
return &sampledCore{
Core: c.Core.With(fields),
sampled: c.sampled.With(fields),
}
}
Loading
Loading