diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go index 544506d..74936da 100644 --- a/cmd/gateway/main.go +++ b/cmd/gateway/main.go @@ -19,17 +19,22 @@ import ( func main() { cfg := config.Load("gateway") - log := logger.Must(cfg.ServiceName, cfg.Env) + log := logger.Must(cfg.ServiceName, cfg.Env, logger.SampleConfig{ + Initial: cfg.LogSampleInitial, + Thereafter: cfg.LogSampleThereafter, + }) defer func() { _ = log.Sync() }() shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{ - Enabled: cfg.OTelEnabled, - ServiceName: cfg.OTelServiceName, - Endpoint: cfg.OTelEndpoint, - Env: cfg.Env, - Version: cfg.ServiceVersion, - Sampler: cfg.OTelSampler, - SamplerRatio: cfg.OTelSamplerRatio, + Enabled: cfg.OTelEnabled, + ServiceName: cfg.OTelServiceName, + Endpoint: cfg.OTelEndpoint, + Env: cfg.Env, + Version: cfg.ServiceVersion, + Sampler: cfg.OTelSampler, + SamplerRatio: cfg.OTelSamplerRatio, + HighVolumeEvents: cfg.TraceHighVolumeEvents, + HighVolumeRatio: cfg.TraceHighVolumeSampleRatio, }, log) defer func() { _ = shutdownTracing(context.Background()) }() diff --git a/cmd/leaderboard/main.go b/cmd/leaderboard/main.go index 62bc08f..39e810b 100644 --- a/cmd/leaderboard/main.go +++ b/cmd/leaderboard/main.go @@ -19,17 +19,22 @@ import ( func main() { cfg := config.Load("leaderboard") - log := logger.Must(cfg.ServiceName, cfg.Env) + log := logger.Must(cfg.ServiceName, cfg.Env, logger.SampleConfig{ + Initial: cfg.LogSampleInitial, + Thereafter: cfg.LogSampleThereafter, + }) defer func() { _ = log.Sync() }() shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{ - Enabled: cfg.OTelEnabled, - ServiceName: cfg.OTelServiceName, - Endpoint: cfg.OTelEndpoint, - Env: cfg.Env, - Version: cfg.ServiceVersion, - Sampler: cfg.OTelSampler, - SamplerRatio: cfg.OTelSamplerRatio, + Enabled: cfg.OTelEnabled, + ServiceName: cfg.OTelServiceName, + Endpoint: cfg.OTelEndpoint, + Env: cfg.Env, + Version: cfg.ServiceVersion, + Sampler: cfg.OTelSampler, + SamplerRatio: cfg.OTelSamplerRatio, + HighVolumeEvents: cfg.TraceHighVolumeEvents, + HighVolumeRatio: cfg.TraceHighVolumeSampleRatio, }, log) defer func() { _ = shutdownTracing(context.Background()) }() diff --git a/cmd/matchmaking/main.go b/cmd/matchmaking/main.go index 285591a..9b82173 100644 --- a/cmd/matchmaking/main.go +++ b/cmd/matchmaking/main.go @@ -19,17 +19,22 @@ import ( func main() { cfg := config.Load("matchmaking") - log := logger.Must(cfg.ServiceName, cfg.Env) + log := logger.Must(cfg.ServiceName, cfg.Env, logger.SampleConfig{ + Initial: cfg.LogSampleInitial, + Thereafter: cfg.LogSampleThereafter, + }) defer func() { _ = log.Sync() }() shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{ - Enabled: cfg.OTelEnabled, - ServiceName: cfg.OTelServiceName, - Endpoint: cfg.OTelEndpoint, - Env: cfg.Env, - Version: cfg.ServiceVersion, - Sampler: cfg.OTelSampler, - SamplerRatio: cfg.OTelSamplerRatio, + Enabled: cfg.OTelEnabled, + ServiceName: cfg.OTelServiceName, + Endpoint: cfg.OTelEndpoint, + Env: cfg.Env, + Version: cfg.ServiceVersion, + Sampler: cfg.OTelSampler, + SamplerRatio: cfg.OTelSamplerRatio, + HighVolumeEvents: cfg.TraceHighVolumeEvents, + HighVolumeRatio: cfg.TraceHighVolumeSampleRatio, }, log) defer func() { _ = shutdownTracing(context.Background()) }() diff --git a/cmd/outbox-relay/main.go b/cmd/outbox-relay/main.go index ca9ae11..40972a1 100644 --- a/cmd/outbox-relay/main.go +++ b/cmd/outbox-relay/main.go @@ -25,17 +25,22 @@ import ( func main() { cfg := config.Load("outbox-relay") - log := logger.Must(cfg.ServiceName, cfg.Env) + log := logger.Must(cfg.ServiceName, cfg.Env, logger.SampleConfig{ + Initial: cfg.LogSampleInitial, + Thereafter: cfg.LogSampleThereafter, + }) defer func() { _ = log.Sync() }() shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{ - Enabled: cfg.OTelEnabled, - ServiceName: cfg.OTelServiceName, - Endpoint: cfg.OTelEndpoint, - Env: cfg.Env, - Version: cfg.ServiceVersion, - Sampler: cfg.OTelSampler, - SamplerRatio: cfg.OTelSamplerRatio, + Enabled: cfg.OTelEnabled, + ServiceName: cfg.OTelServiceName, + Endpoint: cfg.OTelEndpoint, + Env: cfg.Env, + Version: cfg.ServiceVersion, + Sampler: cfg.OTelSampler, + SamplerRatio: cfg.OTelSamplerRatio, + HighVolumeEvents: cfg.TraceHighVolumeEvents, + HighVolumeRatio: cfg.TraceHighVolumeSampleRatio, }, log) defer func() { _ = shutdownTracing(context.Background()) }() diff --git a/cmd/player/main.go b/cmd/player/main.go index 7d34913..8306e81 100644 --- a/cmd/player/main.go +++ b/cmd/player/main.go @@ -24,17 +24,22 @@ import ( func main() { cfg := config.Load("player") - log := logger.Must(cfg.ServiceName, cfg.Env) + log := logger.Must(cfg.ServiceName, cfg.Env, logger.SampleConfig{ + Initial: cfg.LogSampleInitial, + Thereafter: cfg.LogSampleThereafter, + }) defer func() { _ = log.Sync() }() shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{ - Enabled: cfg.OTelEnabled, - ServiceName: cfg.OTelServiceName, - Endpoint: cfg.OTelEndpoint, - Env: cfg.Env, - Version: cfg.ServiceVersion, - Sampler: cfg.OTelSampler, - SamplerRatio: cfg.OTelSamplerRatio, + Enabled: cfg.OTelEnabled, + ServiceName: cfg.OTelServiceName, + Endpoint: cfg.OTelEndpoint, + Env: cfg.Env, + Version: cfg.ServiceVersion, + Sampler: cfg.OTelSampler, + SamplerRatio: cfg.OTelSamplerRatio, + HighVolumeEvents: cfg.TraceHighVolumeEvents, + HighVolumeRatio: cfg.TraceHighVolumeSampleRatio, }, log) defer func() { _ = shutdownTracing(context.Background()) }() diff --git a/cmd/presence/main.go b/cmd/presence/main.go index 0fb1465..7450178 100644 --- a/cmd/presence/main.go +++ b/cmd/presence/main.go @@ -21,17 +21,22 @@ import ( func main() { cfg := config.Load("presence") - log := logger.Must(cfg.ServiceName, cfg.Env) + log := logger.Must(cfg.ServiceName, cfg.Env, logger.SampleConfig{ + Initial: cfg.LogSampleInitial, + Thereafter: cfg.LogSampleThereafter, + }) defer func() { _ = log.Sync() }() shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{ - Enabled: cfg.OTelEnabled, - ServiceName: cfg.OTelServiceName, - Endpoint: cfg.OTelEndpoint, - Env: cfg.Env, - Version: cfg.ServiceVersion, - Sampler: cfg.OTelSampler, - SamplerRatio: cfg.OTelSamplerRatio, + Enabled: cfg.OTelEnabled, + ServiceName: cfg.OTelServiceName, + Endpoint: cfg.OTelEndpoint, + Env: cfg.Env, + Version: cfg.ServiceVersion, + Sampler: cfg.OTelSampler, + SamplerRatio: cfg.OTelSamplerRatio, + HighVolumeEvents: cfg.TraceHighVolumeEvents, + HighVolumeRatio: cfg.TraceHighVolumeSampleRatio, }, log) defer func() { _ = shutdownTracing(context.Background()) }() diff --git a/cmd/websocket/main.go b/cmd/websocket/main.go index 0b8654a..e7e20b4 100644 --- a/cmd/websocket/main.go +++ b/cmd/websocket/main.go @@ -22,17 +22,22 @@ import ( func main() { cfg := config.Load("websocket") - log := logger.Must(cfg.ServiceName, cfg.Env) + log := logger.Must(cfg.ServiceName, cfg.Env, logger.SampleConfig{ + Initial: cfg.LogSampleInitial, + Thereafter: cfg.LogSampleThereafter, + }) defer func() { _ = log.Sync() }() shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{ - Enabled: cfg.OTelEnabled, - ServiceName: cfg.OTelServiceName, - Endpoint: cfg.OTelEndpoint, - Env: cfg.Env, - Version: cfg.ServiceVersion, - Sampler: cfg.OTelSampler, - SamplerRatio: cfg.OTelSamplerRatio, + Enabled: cfg.OTelEnabled, + ServiceName: cfg.OTelServiceName, + Endpoint: cfg.OTelEndpoint, + Env: cfg.Env, + Version: cfg.ServiceVersion, + Sampler: cfg.OTelSampler, + SamplerRatio: cfg.OTelSamplerRatio, + HighVolumeEvents: cfg.TraceHighVolumeEvents, + HighVolumeRatio: cfg.TraceHighVolumeSampleRatio, }, log) defer func() { _ = shutdownTracing(context.Background()) }() diff --git a/pkg/config/config.go b/pkg/config/config.go index 5f16ede..dc5068e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -80,6 +80,17 @@ type Config struct { OTelEndpoint string OTelSampler string OTelSamplerRatio float64 + + // Observability sampling. Logs and traces are both observability load, so + // their sampling knobs live together. Logger sampling thins the flood of + // successful/fast 2xx access logs (errors and slow requests always bypass the + // sampler); high-volume trace sampling drops most consumer spans for chatty + // event types while ALWAYS keeping the trace-context propagation intact. + LogSampleInitial int // first N identical entries/sec logged in full + LogSampleThereafter int // then 1 in M of the rest + LogSlowRequestThreshold time.Duration // requests slower than this always log + TraceHighVolumeEvents []string // event types whose consumer spans are sampled down + TraceHighVolumeSampleRatio float64 // keep-ratio for those high-volume spans } // Load reads configuration for the named service from the environment. @@ -145,6 +156,13 @@ func Load(serviceName string) *Config { OTelEndpoint: getEnv("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317"), OTelSampler: getEnv("OTEL_TRACES_SAMPLER", "parentbased_always_on"), OTelSamplerRatio: getEnvFloat("OTEL_TRACES_SAMPLER_ARG", 1.0), + + // Observability sampling (logs + traces, kept side by side). + LogSampleInitial: getEnvInt("LOG_SAMPLE_INITIAL", 100), + LogSampleThereafter: getEnvInt("LOG_SAMPLE_THEREAFTER", 100), + LogSlowRequestThreshold: getEnvDuration("LOG_SLOW_REQUEST_THRESHOLD", time.Second), + TraceHighVolumeEvents: splitCSV(getEnv("TRACE_HIGHVOLUME_EVENTS", "LeaderboardUpdated")), + TraceHighVolumeSampleRatio: getEnvFloat("TRACE_HIGHVOLUME_SAMPLE_RATIO", 0.01), } } diff --git a/pkg/events/nats.go b/pkg/events/nats.go index 9ff5855..8050c62 100644 --- a/pkg/events/nats.go +++ b/pkg/events/nats.go @@ -317,10 +317,15 @@ func (b *NATSBus) process(ctx context.Context, topic string, handler Handler, ms return } - // Continue the producer's trace across the async boundary (same mechanism - // as RedisBus / ReceiveSpan, just inlined so we can wrap ACK timing). + // Continue the producer's trace across the async boundary. The context is + // extracted UNCONDITIONALLY (this never opens a span), so trace propagation + // is preserved even when the consumer span below is sampled out for a + // high-volume event type. StartConsumerSpan then either opens a recording + // span or, for a sampled-out high-volume event, returns the propagating + // context + (non-recording) span — keeping the trace chain intact while + // dropping the exporter cost. See tracing.StartConsumerSpan. hctx := tracing.ResumeFromCarrier(ctx, e.Carrier) - hctx, span := tracing.Tracer().Start(hctx, "events.consume "+topic, + hctx, span := tracing.StartConsumerSpan(hctx, "events.consume "+topic, e.Type, trace.WithSpanKind(trace.SpanKindConsumer), trace.WithAttributes( semconv.MessagingSystemKey.String("nats"), diff --git a/pkg/events/redis.go b/pkg/events/redis.go index 626fe24..24d0d5a 100644 --- a/pkg/events/redis.go +++ b/pkg/events/redis.go @@ -74,8 +74,11 @@ func (b *RedisBus) Publish(ctx context.Context, topic string, e Event) error { // distributed trace. Returns the new context and span; the caller must End the // span. Transport-agnostic: relies only on the OTel propagator and the Carrier. func ReceiveSpan(ctx context.Context, e Event, name string) (context.Context, trace.Span) { + // Extract unconditionally (no span here) so propagation survives even when + // the consumer span is sampled out for a high-volume event type, then let + // StartConsumerSpan apply the per-event-type sampling. ctx = tracing.ResumeFromCarrier(ctx, e.Carrier) - return tracing.Tracer().Start(ctx, name, + return tracing.StartConsumerSpan(ctx, name, e.Type, trace.WithSpanKind(trace.SpanKindConsumer), trace.WithAttributes( semconv.MessagingSystemKey.String("redis"), diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 1a9930b..12b4961 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -4,13 +4,30 @@ package logger import ( + "time" + "go.uber.org/zap" "go.uber.org/zap/zapcore" ) +// SampleConfig tunes the access-log sampler. initial entries (per identical +// message, per second) pass in full; then one in every thereafter passes. It +// only ever thins Info-and-below — Warn/Error/etc. always bypass it (see +// New), so errors and slow requests are never dropped. A zero/negative +// initial disables sampling entirely (every entry passes), which is the +// development default. +type SampleConfig struct { + Initial int + Thereafter int + Tick time.Duration +} + // New builds a zap logger. Production gets JSON output for log aggregation; -// development gets a human-readable console encoder. -func New(serviceName, env string) (*zap.Logger, error) { +// development gets a human-readable console encoder. When sample.Initial > 0 the +// returned logger samples high-volume Info logs (see sampledCore) so a flood of +// successful 2xx access logs does not dominate CPU/IO, while still emitting +// every Warn+ entry. +func New(serviceName, env string, sample SampleConfig) (*zap.Logger, error) { var cfg zap.Config if env == "production" { cfg = zap.NewProductionConfig() @@ -19,15 +36,62 @@ func New(serviceName, env string) (*zap.Logger, error) { } cfg.EncoderConfig.TimeKey = "ts" cfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - return cfg.Build(zap.Fields(zap.String("service", serviceName))) + + opts := []zap.Option{} + if sample.Initial > 0 { + tick := sample.Tick + if tick <= 0 { + tick = time.Second + } + // Wrap the core so only Info-and-below is sampled; Warn and above pass + // through untouched. zap's own sampler samples ALL levels up to Error, so + // it cannot be used directly without dropping error logs. + opts = append(opts, zap.WrapCore(func(core zapcore.Core) zapcore.Core { + return newSampledCore(core, tick, sample.Initial, sample.Thereafter) + })) + } + + return cfg.Build(append(opts, zap.Fields(zap.String("service", serviceName)))...) } // Must is a convenience wrapper that panics on logger construction failure — // a service that cannot log should not start. -func Must(serviceName, env string) *zap.Logger { - l, err := New(serviceName, env) +func Must(serviceName, env string, sample SampleConfig) *zap.Logger { + l, err := New(serviceName, env, sample) if err != nil { panic(err) } return l } + +// sampledCore routes Info-and-below through a zap sampler and Warn-and-above +// straight to the underlying core. This is the "bypass" the architecture review +// asked for: high-volume successful access logs (Info) are thinned, while the +// rare, valuable Warn/Error lines (errors, slow requests) are never sampled. +type sampledCore struct { + zapcore.Core // underlying core: handles Warn and above directly + sampled zapcore.Core // sampler-wrapped core: handles Info and below +} + +func newSampledCore(core zapcore.Core, tick time.Duration, first, thereafter int) zapcore.Core { + return &sampledCore{ + Core: core, + sampled: zapcore.NewSamplerWithOptions(core, tick, first, thereafter), + } +} + +func (c *sampledCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if ent.Level >= zapcore.WarnLevel { + return c.Core.Check(ent, ce) // bypass the sampler entirely + } + return c.sampled.Check(ent, ce) +} + +// With must clone BOTH wrapped cores so contextual fields (e.g. the service +// name) are present whichever path an entry takes. +func (c *sampledCore) With(fields []zapcore.Field) zapcore.Core { + return &sampledCore{ + Core: c.Core.With(fields), + sampled: c.sampled.With(fields), + } +} diff --git a/pkg/logger/logger_test.go b/pkg/logger/logger_test.go index ec082f3..5953178 100644 --- a/pkg/logger/logger_test.go +++ b/pkg/logger/logger_test.go @@ -2,21 +2,64 @@ package logger import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" ) func TestNewDevelopmentAndProduction(t *testing.T) { - dev, err := New("test", "development") + dev, err := New("test", "development", SampleConfig{}) require.NoError(t, err) assert.NotNil(t, dev) - prod, err := New("test", "production") + prod, err := New("test", "production", SampleConfig{Initial: 100, Thereafter: 100}) require.NoError(t, err) assert.NotNil(t, prod) } func TestMust(t *testing.T) { - assert.NotPanics(t, func() { Must("test", "development") }) + assert.NotPanics(t, func() { Must("test", "development", SampleConfig{}) }) +} + +// TestSampledCoreThinsInfoButNeverDropsWarnAndAbove is the critical guarantee: +// the sampler must thin high-volume Info logs while letting EVERY Warn/Error +// through, so errors and slow requests are never lost. +func TestSampledCoreThinsInfoButNeverDropsWarnAndAbove(t *testing.T) { + obs, logs := observer.New(zapcore.DebugLevel) + // first=2, thereafter=0 → after the first 2 identical entries per tick, the + // rest are dropped (thereafter==0 drops all beyond first). + core := newSampledCore(obs, time.Minute, 2, 0) + log := zap.New(core) + + // 10 identical Info messages: only the first 2 should survive sampling. + for i := 0; i < 10; i++ { + log.Info("same info message") + } + // 10 identical Error messages: ALL must survive (sampler bypassed). + for i := 0; i < 10; i++ { + log.Error("same error message") + } + // 10 identical Warn messages: ALL must survive too. + for i := 0; i < 10; i++ { + log.Warn("same warn message") + } + + assert.Equal(t, 2, logs.FilterMessage("same info message").Len(), "Info must be sampled") + assert.Equal(t, 10, logs.FilterMessage("same error message").Len(), "Error must never be sampled") + assert.Equal(t, 10, logs.FilterMessage("same warn message").Len(), "Warn must never be sampled") +} + +// TestSampleDisabledLogsEverything verifies that Initial<=0 leaves all logs +// untouched (the development default). +func TestSampleDisabledLogsEverything(t *testing.T) { + obs, logs := observer.New(zapcore.DebugLevel) + log := zap.New(obs) // no sampling wrapper + for i := 0; i < 10; i++ { + log.Info("noisy") + } + assert.Equal(t, 10, logs.Len()) } diff --git a/pkg/middleware/middleware.go b/pkg/middleware/middleware.go index c194861..fedf759 100644 --- a/pkg/middleware/middleware.go +++ b/pkg/middleware/middleware.go @@ -49,19 +49,30 @@ func RequestID() gin.HandlerFunc { // Logger emits one structured log line per request with request ID, user ID // (when authenticated), latency and error details. -func Logger(log *zap.Logger) gin.HandlerFunc { +// +// Sampling: the *successful, fast* request flood is thinned by the zap sampler +// installed in logger.New (these go out at Info). Errors (status >= 400) and +// slow requests (latency >= slowThreshold) are logged at Warn/Error, which the +// sampler leaves untouched — so the rare, valuable lines always survive while +// the high-volume 2xx noise is sampled. slowThreshold <= 0 disables the slow +// bypass (every 2xx is sampled). +func Logger(log *zap.Logger, slowThreshold time.Duration) gin.HandlerFunc { return func(c *gin.Context) { start := time.Now() c.Next() + latency := time.Since(start) - fields := []zap.Field{ + // Pre-allocate to the worst-case field count (6 base + user_id + 2 trace + // + error) so the appends below never grow/realloc the backing array. + fields := make([]zap.Field, 0, 10) + fields = append(fields, zap.String("request_id", c.GetString(CtxRequestID)), zap.String("method", c.Request.Method), zap.String("path", c.Request.URL.Path), zap.Int("status", c.Writer.Status()), - zap.Duration("latency", time.Since(start)), + zap.Duration("latency", latency), zap.String("client_ip", c.ClientIP()), - } + ) if uid := c.GetString(CtxUserID); uid != "" { fields = append(fields, zap.String("user_id", uid)) } else if uid := c.GetHeader(HeaderUserID); uid != "" { @@ -71,12 +82,22 @@ func Logger(log *zap.Logger) gin.HandlerFunc { // span is active (otelgin runs before this middleware), so operators // can pivot from a log line straight to the trace in Jaeger. fields = append(fields, tracing.LogFields(c.Request.Context())...) - if len(c.Errors) > 0 { + + // Errors and slow requests bypass the sampler (logged at Error/Warn); + // successful, fast requests go at Info where the sampler thins them. + switch { + case len(c.Errors) > 0: fields = append(fields, zap.String("error", c.Errors.String())) log.Error("http request", fields...) - return + case c.Writer.Status() >= http.StatusInternalServerError: + log.Error("http request", fields...) + case c.Writer.Status() >= http.StatusBadRequest: + log.Warn("http request", fields...) + case slowThreshold > 0 && latency >= slowThreshold: + log.Warn("slow http request", fields...) + default: + log.Info("http request", fields...) } - log.Info("http request", fields...) } } diff --git a/pkg/middleware/middleware_test.go b/pkg/middleware/middleware_test.go index eaf09db..bcead3e 100644 --- a/pkg/middleware/middleware_test.go +++ b/pkg/middleware/middleware_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "github.com/alpnuhoglu/gamemesh/pkg/auth" "github.com/alpnuhoglu/gamemesh/pkg/metrics" @@ -42,12 +44,46 @@ func TestRequestIDGeneratedAndPropagated(t *testing.T) { } func TestLoggerDoesNotInterfere(t *testing.T) { - r := newEngine(RequestID(), Logger(zap.NewNop())) + r := newEngine(RequestID(), Logger(zap.NewNop(), time.Second)) w := httptest.NewRecorder() r.ServeHTTP(w, httptest.NewRequest(http.MethodGet, "/test", nil)) assert.Equal(t, http.StatusOK, w.Code) } +// TestLoggerLevelRouting verifies the sampler-bypass contract at the middleware +// layer: a 2xx logs at Info (sampler-eligible), a 5xx at Error and a slow 2xx at +// Warn (both sampler-bypassed). +func TestLoggerLevelRouting(t *testing.T) { + gin.SetMode(gin.TestMode) + + cases := []struct { + name string + path string + slowAfter time.Duration + handler gin.HandlerFunc + wantLevel zapcore.Level + }{ + {"ok 2xx", "/ok", time.Second, func(c *gin.Context) { c.JSON(200, gin.H{}) }, zapcore.InfoLevel}, + {"server 5xx", "/err", time.Second, func(c *gin.Context) { c.JSON(500, gin.H{}) }, zapcore.ErrorLevel}, + {"slow 2xx", "/slow", time.Nanosecond, func(c *gin.Context) { time.Sleep(time.Millisecond); c.JSON(200, gin.H{}) }, zapcore.WarnLevel}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + obs, logs := observer.New(zapcore.DebugLevel) + log := zap.New(obs) + r := gin.New() + r.Use(RequestID(), Logger(log, tc.slowAfter)) + r.GET(tc.path, tc.handler) + + w := httptest.NewRecorder() + r.ServeHTTP(w, httptest.NewRequest(http.MethodGet, tc.path, nil)) + + require.Equal(t, 1, logs.Len()) + assert.Equal(t, tc.wantLevel, logs.All()[0].Level) + }) + } +} + func TestMetricsCountsRequests(t *testing.T) { m := metrics.New("test-middleware") r := newEngine(Metrics(m)) diff --git a/pkg/server/server.go b/pkg/server/server.go index 8a65b9c..e27b903 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -37,7 +37,7 @@ func NewEngine(cfg *config.Config, log *zap.Logger, m *metrics.Metrics) *gin.Eng // request with http.method, http.route (low-cardinality template), // status and latency. otelgin.Middleware(cfg.OTelServiceName), - middleware.Logger(log), + middleware.Logger(log, cfg.LogSlowRequestThreshold), middleware.Metrics(m), middleware.CORS(cfg.AllowedOrigins), ) diff --git a/pkg/tracing/sampling_test.go b/pkg/tracing/sampling_test.go new file mode 100644 index 0000000..65b7d17 --- /dev/null +++ b/pkg/tracing/sampling_test.go @@ -0,0 +1,97 @@ +package tracing + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" +) + +// withParentSpanContext returns a context carrying a valid (remote) span +// context, mimicking what ResumeFromCarrier produces after extracting an +// event's Carrier — without needing a real upstream service. +func withParentSpanContext(t *testing.T) context.Context { + t.Helper() + tid, err := trace.TraceIDFromHex("0123456789abcdef0123456789abcdef") + require.NoError(t, err) + sid, err := trace.SpanIDFromHex("0123456789abcdef") + require.NoError(t, err) + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: tid, + SpanID: sid, + TraceFlags: trace.FlagsSampled, + Remote: true, + }) + return trace.ContextWithSpanContext(context.Background(), sc) +} + +func TestStartConsumerSpan_NonHighVolumeAlwaysRecords(t *testing.T) { + configureHighVolume([]string{"LeaderboardUpdated"}, 0.0) // drop ALL high-volume + t.Cleanup(func() { configureHighVolume(nil, 1) }) + + ctx := withParentSpanContext(t) + // A non-listed event type is never sampled down: it must start a real span + // whose context differs from the parent (a new span was pushed). + newCtx, span := StartConsumerSpan(ctx, "events.consume", "MatchFound") + defer span.End() + + parent := trace.SpanContextFromContext(ctx) + child := trace.SpanContextFromContext(newCtx) + // Same trace, but a span is present and the chain continues. + assert.Equal(t, parent.TraceID(), child.TraceID()) + assert.True(t, child.IsValid()) +} + +func TestStartConsumerSpan_HighVolumeSampledOutPreservesContext(t *testing.T) { + // ratio 0.0 → every high-volume event is sampled OUT (no recording span). + configureHighVolume([]string{"LeaderboardUpdated"}, 0.0) + t.Cleanup(func() { configureHighVolume(nil, 1) }) + + ctx := withParentSpanContext(t) + parent := trace.SpanContextFromContext(ctx) + + newCtx, span := StartConsumerSpan(ctx, "events.consume", "LeaderboardUpdated") + defer span.End() + + child := trace.SpanContextFromContext(newCtx) + + // THE CRITICAL ASSERTION: even though no recording span was opened, the + // trace context (trace_id) is preserved and still valid — so downstream + // Inject keeps the distributed trace chain intact. Propagation is NOT broken + // by span sampling. + assert.True(t, child.IsValid(), "trace context must survive span sampling") + assert.Equal(t, parent.TraceID(), child.TraceID(), "trace_id must propagate unchanged") + assert.Equal(t, parent.SpanID(), child.SpanID(), "no new span pushed when sampled out") +} + +func TestStartConsumerSpan_HighVolumeRatioOneRecords(t *testing.T) { + // ratio >= 1 disables high-volume sampling entirely (everything records). + configureHighVolume([]string{"LeaderboardUpdated"}, 1.0) + t.Cleanup(func() { configureHighVolume(nil, 1) }) + + ctx := withParentSpanContext(t) + newCtx, span := StartConsumerSpan(ctx, "events.consume", "LeaderboardUpdated") + defer span.End() + + assert.True(t, trace.SpanContextFromContext(newCtx).IsValid()) +} + +func TestSampleHighVolume_DeterministicForTraceID(t *testing.T) { + configureHighVolume([]string{"x"}, 0.5) + t.Cleanup(func() { configureHighVolume(nil, 1) }) + + ctx := withParentSpanContext(t) + // The decision must be stable for a given trace_id (TraceIDRatioBased is + // deterministic), so repeated calls agree — a trace is recorded consistently + // at every hop or not at all. + first := sampleHighVolume(ctx) + for i := 0; i < 5; i++ { + assert.Equal(t, first, sampleHighVolume(ctx)) + } +} + +// guard against accidental dependency on a real exporter in these unit tests. +var _ sdktrace.Sampler = sdktrace.TraceIDRatioBased(0.5) diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index 74d5c2a..7e71cf8 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -38,6 +38,13 @@ type Config struct { // spec so production can be tuned purely via env. Sampler string SamplerRatio float64 // used by the ratio-based samplers + + // HighVolumeEvents lists event types (e.g. "LeaderboardUpdated") whose + // consumer spans are sampled down to HighVolumeRatio. The trace context is + // ALWAYS propagated regardless — only the recording span is sampled. See + // StartConsumerSpan. + HighVolumeEvents []string + HighVolumeRatio float64 } // Init configures global tracing and returns a shutdown function that flushes @@ -51,6 +58,8 @@ func Init(ctx context.Context, cfg Config, log *zap.Logger) (func(context.Contex propagation.Baggage{}, )) + configureHighVolume(cfg.HighVolumeEvents, cfg.HighVolumeRatio) + if !cfg.Enabled || cfg.Endpoint == "" { // No-op provider: otel.Tracer(...).Start returns non-recording spans. log.Info("tracing disabled", zap.Bool("otel_enabled", cfg.Enabled), zap.String("endpoint", cfg.Endpoint)) @@ -127,6 +136,69 @@ func Tracer() trace.Tracer { return otel.Tracer(tracerName) } +// highVolume holds the event-type span-sampling policy. It is written once by +// Init (before any consumer runs) and only read afterwards, so no lock is +// needed. nil ratioSampler means "no high-volume sampling configured". +var ( + highVolumeEvents map[string]struct{} + highVolumeRatio sdktrace.Sampler +) + +// configureHighVolume records which event types get their consumer spans +// sampled down and at what ratio. A ratio >= 1 or an empty list disables it. +func configureHighVolume(events []string, ratio float64) { + if len(events) == 0 || ratio >= 1.0 { + highVolumeEvents = nil + highVolumeRatio = nil + return + } + set := make(map[string]struct{}, len(events)) + for _, e := range events { + set[e] = struct{}{} + } + highVolumeEvents = set + highVolumeRatio = sdktrace.TraceIDRatioBased(ratio) +} + +// StartConsumerSpan starts a consumer span for a freshly-extracted context, +// applying high-volume sampling by event type. +// +// CRITICAL — propagation vs. sampling: ctx is assumed to already carry the +// upstream trace context (the caller extracts the Carrier first). For a +// high-volume eventType whose trace_id falls outside HighVolumeRatio, this does +// NOT open a new recording span — it returns the context unchanged and the +// span already in context (a non-recording span still carries the SpanContext, +// so any downstream Inject keeps the trace chain intact). Otherwise it starts a +// normal recording span. Either way the returned span is non-nil and safe to +// End()/RecordError(). +func StartConsumerSpan(ctx context.Context, name, eventType string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + if highVolumeEvents != nil { + if _, hot := highVolumeEvents[eventType]; hot && !sampleHighVolume(ctx) { + // Drop the recording span; keep the (propagating) context + span. + return ctx, trace.SpanFromContext(ctx) + } + } + return Tracer().Start(ctx, name, opts...) +} + +// sampleHighVolume returns true when this trace should keep a recording span for +// a high-volume event, using the same deterministic trace_id ratio the SDK uses +// — so a sampled-in trace is recorded consistently at every hop. +func sampleHighVolume(ctx context.Context) bool { + sc := trace.SpanContextFromContext(ctx) + if !sc.IsValid() { + // No upstream trace context → fall back to recording (rare; lets a + // locally-rooted trace still be observed). + return true + } + res := highVolumeRatio.ShouldSample(sdktrace.SamplingParameters{ + ParentContext: ctx, + TraceID: sc.TraceID(), + Name: "events.consume", + }) + return res.Decision == sdktrace.RecordAndSample +} + // LogFields returns zap fields carrying the active trace/span IDs, or nil when // no recording span is in context. Used by the request logger and key service // logs to correlate logs with traces in Jaeger.