From 1455bdc27e1895a994e522c7768696d81f4dff0c Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Sun, 1 Mar 2026 23:40:41 -0800 Subject: [PATCH] feat(metrics): Add core/metrics utility package Add reusable metrics helpers for tally.Scope that standardize metric emission across controllers. Includes Op lifecycle (Begin/Complete) for operation tracking with automatic counters, timers, histograms, and error-aware tagging via core/errs integration. Named helpers provide consistent {name}.{sub} sub-scope patterns for ad-hoc counters, timers, histograms, and gauges. --- core/README.md | 2 + core/metrics/BUILD.bazel | 23 +++ core/metrics/README.md | 121 +++++++++++++++ core/metrics/metrics.go | 173 +++++++++++++++++++++ core/metrics/metrics_test.go | 283 +++++++++++++++++++++++++++++++++++ 5 files changed, 602 insertions(+) create mode 100644 core/metrics/BUILD.bazel create mode 100644 core/metrics/README.md create mode 100644 core/metrics/metrics.go create mode 100644 core/metrics/metrics_test.go diff --git a/core/README.md b/core/README.md index 27be75ed..c21235f6 100644 --- a/core/README.md +++ b/core/README.md @@ -5,3 +5,5 @@ Shared infrastructure packages reused across services. These are internal buildi ## Packages - **consumer/** — Queue message consumption framework. Manages subscription lifecycle, message routing to controllers, automatic ack/nack, error classification (retryable vs. poison pill), and graceful shutdown. Services register `Controller` implementations and the consumer handles the rest. +- **errs/** — Error classification framework. Classifies errors by origin (user vs. infra) and retryability. Extensions return plain errors; service controllers classify them. +- **metrics/** — Metrics utility helpers for `tally.Scope`. Provides standardized counters, timers, and histograms with error-aware tagging via `core/errs` integration. diff --git a/core/metrics/BUILD.bazel b/core/metrics/BUILD.bazel new file mode 100644 index 00000000..f01a8cc3 --- /dev/null +++ b/core/metrics/BUILD.bazel @@ -0,0 +1,23 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "metrics", + srcs = ["metrics.go"], + importpath = "github.com/uber/submitqueue/core/metrics", + visibility = ["//visibility:public"], + deps = [ + "//core/errs", + "@com_github_uber_go_tally_v4//:tally", + ], +) + +go_test( + name = "metrics_test", + srcs = ["metrics_test.go"], + embed = [":metrics"], + deps = [ + "//core/errs", + "@com_github_stretchr_testify//assert", + "@com_github_uber_go_tally_v4//:tally", + ], +) diff --git a/core/metrics/README.md b/core/metrics/README.md new file mode 100644 index 00000000..b0b6df1b --- /dev/null +++ b/core/metrics/README.md @@ -0,0 +1,121 @@ +# Metrics Utilities (`core/metrics`) + +The `metrics` package provides reusable helpers for emitting counters, timers, histograms, and gauges on a `tally.Scope`. It standardizes metric names across controllers and integrates with `core/errs` for automatic error classification tags. + +## Design + +**Free functions on `tally.Scope`** — no wrapper types. Existing constructors accept `tally.Scope` and don't need to change. + +**Operation lifecycle** — `Begin` and `Complete` tie the full metrics lifecycle together. `Begin` captures the start time and emits `{name}.called`; `Complete` emits succeeded/failed counters, a latency timer, and a latency histogram. This prevents mismatched or forgotten metrics calls. + +**Error-aware tagging** — `ErrorTags` integrates with `core/errs` to produce `error_origin=user|infra`, `retryable=true|false`, and `dependency=true` tags automatically. `Complete` uses these to tag latency metrics on failure. + +**Consistent naming** — all Named helpers follow the `{name}.{sub}` sub-scope pattern, producing structured metric paths like `process.called`, `publish.attempts`, `consumer.pending_messages`. + +## Operation Lifecycle + +For any operation with a clear start/end, use `Begin`/`Complete`: + +| Function | Emits | +|----------|-------| +| `Begin(scope, name, ...tags)` | `{name}.called` counter +1, returns `Op` | +| `op.Complete(err)` | `{name}.succeeded` or `{name}.failed` counter, `{name}.latency` timer, `{name}.latency_histogram` histogram — all tagged with `result=success\|error` and error classification tags on failure | + +```go +// RPC controller +func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *pb.LandResponse, retErr error) { + op := metrics.Begin(c.scope, "land") + defer func() { op.Complete(retErr) }() + + // ... business logic ... + return &pb.LandResponse{Sqid: request.ID}, nil +} + +// Queue controller +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + op := metrics.Begin(c.scope, "process") + defer func() { op.Complete(retErr) }() + + // ... business logic ... + return nil +} +``` + +On success, `Complete` emits: +- `{name}.succeeded` counter +1 +- `{name}.latency` timer tagged `result=success` +- `{name}.latency_histogram` histogram tagged `result=success` + +On failure, `Complete` emits: +- `{name}.failed` counter +1 +- `{name}.latency` timer tagged `result=error`, `error_origin=user|infra`, `retryable=true|false`, and optionally `dependency=true` +- `{name}.latency_histogram` histogram with the same tags + +## Named Helpers + +For ad-hoc metrics that don't fit the Begin/Complete lifecycle. All follow the `{name}.{sub}` sub-scope pattern: + +| Function | Emits | Example | +|----------|-------|---------| +| `NamedCounter(scope, name, counter, value, ...tags)` | `{name}.{counter}` counter | `publish.attempts` | +| `NamedTimer(scope, name, timer, duration, ...tags)` | `{name}.{timer}` timer | `publish.queue_latency` | +| `NamedHistogram(scope, name, histogram, buckets, ...tags)` | `{name}.{histogram}` histogram | `process.duration` | +| `NamedGauge(scope, name, gauge, value, ...tags)` | `{name}.{gauge}` gauge | `consumer.pending_messages` | + +```go +// Count a specific sub-event +metrics.NamedCounter(c.scope, "publish", "attempts", 1) + +// Record a specific sub-latency +metrics.NamedTimer(c.scope, "publish", "queue_latency", elapsed) + +// Track current queue depth (goes up and down) +metrics.NamedGauge(c.scope, "consumer", "pending_messages", float64(len(pending))) + +// Create a reusable histogram (store on struct, call RecordDuration per invocation) +h := metrics.NamedHistogram(c.scope, "process", "duration", tally.DurationBuckets{...}) +h.RecordDuration(elapsed) +``` + +## Error Tags + +`ErrorTags` classifies errors using `core/errs` and returns tags for dimensional filtering: + +| Tag | Values | Source | +|-----|--------|--------| +| `error_origin` | `user`, `infra` | `errs.IsUserError` | +| `retryable` | `true`, `false` | `errs.IsRetryable` | +| `dependency` | `true` (only when applicable) | `errs.IsDependencyError` | + +```go +tags := metrics.ErrorTags(err) +// Generic error: [{error_origin, infra}, {retryable, false}] +// User error: [{error_origin, user}, {retryable, false}] +// Retryable error: [{error_origin, infra}, {retryable, true}] +// Dependency error: [{error_origin, infra}, {retryable, false}, {dependency, true}] +``` + +## Tags + +Use `NewTag` to pass additional dimensional tags to any helper: + +```go +op := metrics.Begin(c.scope, "process", metrics.NewTag("queue", req.Queue)) +defer func() { op.Complete(retErr) }() + +metrics.NamedCounter(c.scope, "publish", "attempts", 1, metrics.NewTag("topic", c.topic)) +``` + +## Latency Buckets + +`Complete` uses default latency buckets (5ms to 4h) automatically, suitable for both fast RPCs and long-running operations like builds and merges: + +``` +5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s, 2.5s, 5s, 10s, 30s, 1m, 2m, 5m, 10m, 30m, 1h, 2h, 4h +``` + +For custom histograms, pass your own buckets to `NamedHistogram`: + +```go +h := metrics.NamedHistogram(c.scope, "build", "duration", tally.DurationBuckets{...}) +``` diff --git a/core/metrics/metrics.go b/core/metrics/metrics.go new file mode 100644 index 00000000..60307506 --- /dev/null +++ b/core/metrics/metrics.go @@ -0,0 +1,173 @@ +package metrics + +import ( + "time" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/errs" +) + +// Tag is a key-value pair attached to a metric for dimensional filtering. +type Tag struct { + // Key is the tag name (e.g., "controller", "topic"). + Key string + // Value is the tag value (e.g., "land", "request"). + Value string +} + +// NewTag creates a Tag with the given key and value. +func NewTag(key, value string) Tag { + return Tag{Key: key, Value: value} +} + +// defaultLatencyBuckets provides pre-defined duration buckets for common latency histograms. +// Covers sub-millisecond to multi-hour ranges suitable for RPC calls, queue processing, +// and long-running operations like builds and merges. +var defaultLatencyBuckets = tally.DurationBuckets{ + 5 * time.Millisecond, + 10 * time.Millisecond, + 25 * time.Millisecond, + 50 * time.Millisecond, + 100 * time.Millisecond, + 250 * time.Millisecond, + 500 * time.Millisecond, + 1 * time.Second, + 2500 * time.Millisecond, + 5 * time.Second, + 10 * time.Second, + 30 * time.Second, + 1 * time.Minute, + 2 * time.Minute, + 5 * time.Minute, + 10 * time.Minute, + 30 * time.Minute, + 1 * time.Hour, + 2 * time.Hour, + 4 * time.Hour, +} + +// Op tracks the lifecycle of a named operation. It captures the start time on +// creation, emits a {name}.called counter, and records the outcome (succeeded/failed +// counters + latency timer with error classification tags) when Complete is called. +// +// Usage: +// +// func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { +// op := metrics.Begin(c.scope, "process") +// defer func() { op.Complete(retErr) }() +// // ... business logic ... +// } +type Op struct { + // scope is the tally scope with tags and sub-scope already applied. + scope tally.Scope + // start is the time the operation began. + start time.Time +} + +// Begin starts a new operation. It emits a {name}.called counter and captures +// the start time. Call Complete on the returned Op to record the outcome. +func Begin(scope tally.Scope, name string, tags ...Tag) Op { + sub := tagged(scope, tags).SubScope(name) + sub.Counter("called").Inc(1) + return Op{scope: sub, start: time.Now()} +} + +// Complete records the outcome of the operation. It emits a {name}.succeeded or +// {name}.failed counter based on err, and records elapsed time on both +// {name}.latency (timer) and {name}.latency_histogram (histogram with +// defaultLatencyBuckets for percentile distributions), tagged with result=success|error. +// On failure, error classification tags (error_origin, retryable, dependency) +// are added to both the timer and histogram. +func (o Op) Complete(err error) { + elapsed := time.Since(o.start) + + if err == nil { + o.scope.Counter("succeeded").Inc(1) + s := o.scope.Tagged(map[string]string{"result": "success"}) + s.Timer("latency").Record(elapsed) + s.Histogram("latency_histogram", defaultLatencyBuckets).RecordDuration(elapsed) + return + } + + o.scope.Counter("failed").Inc(1) + + latencyTags := map[string]string{"result": "error"} + for _, t := range ErrorTags(err) { + latencyTags[t.Key] = t.Value + } + s := o.scope.Tagged(latencyTags) + s.Timer("latency").Record(elapsed) + s.Histogram("latency_histogram", defaultLatencyBuckets).RecordDuration(elapsed) +} + +// NamedCounter increments the {name}.{counter} counter by value. +func NamedCounter(scope tally.Scope, name string, counter string, value int64, tags ...Tag) { + tagged(scope, tags).SubScope(name).Counter(counter).Inc(value) +} + +// NamedTimer records a duration on the {name}.{timer} timer. +func NamedTimer(scope tally.Scope, name string, timer string, d time.Duration, tags ...Tag) { + tagged(scope, tags).SubScope(name).Timer(timer).Record(d) +} + +// NamedHistogram returns a tally.Histogram at {name}.{histogram} with the given +// bucket configuration. Store the returned histogram and call RecordDuration or +// RecordValue on each invocation. +func NamedHistogram(scope tally.Scope, name string, histogram string, buckets tally.Buckets, tags ...Tag) tally.Histogram { + return tagged(scope, tags).SubScope(name).Histogram(histogram, buckets) +} + +// NamedGauge updates the {name}.{gauge} gauge to value. Gauges represent a +// current point-in-time measurement that can go up or down, such as queue depth, +// active connections, or in-flight requests. +func NamedGauge(scope tally.Scope, name string, gauge string, value float64, tags ...Tag) { + tagged(scope, tags).SubScope(name).Gauge(gauge).Update(value) +} + +// ErrorTags returns classification tags for an error using core/errs. +// Returns error_origin (user|infra), retryable (true|false), and +// dependency (true) tags. Returns nil for a nil error. +func ErrorTags(err error) []Tag { + if err == nil { + return nil + } + + origin := "infra" + if errs.IsUserError(err) { + origin = "user" + } + + retryable := "false" + if errs.IsRetryable(err) { + retryable = "true" + } + + tags := []Tag{ + {Key: "error_origin", Value: origin}, + {Key: "retryable", Value: retryable}, + } + + if errs.IsDependencyError(err) { + tags = append(tags, Tag{Key: "dependency", Value: "true"}) + } + + return tags +} + +// tagsToMap converts a slice of Tag to a map for tally. +func tagsToMap(tags []Tag) map[string]string { + m := make(map[string]string, len(tags)) + for _, t := range tags { + m[t.Key] = t.Value + } + return m +} + +// tagged applies tags to a scope if any are provided, otherwise returns the +// scope unchanged. +func tagged(scope tally.Scope, tags []Tag) tally.Scope { + if len(tags) == 0 { + return scope + } + return scope.Tagged(tagsToMap(tags)) +} diff --git a/core/metrics/metrics_test.go b/core/metrics/metrics_test.go new file mode 100644 index 00000000..ed58a7a9 --- /dev/null +++ b/core/metrics/metrics_test.go @@ -0,0 +1,283 @@ +package metrics + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/errs" +) + +func TestBegin_EmitsCalled(t *testing.T) { + scope := tally.NewTestScope("", nil) + _ = Begin(scope, "process") + + snapshot := scope.Snapshot() + counters := snapshot.Counters() + c, ok := counters["process.called+"] + assert.True(t, ok, "expected process.called counter") + assert.Equal(t, int64(1), c.Value()) +} + +func TestComplete(t *testing.T) { + tests := []struct { + name string + err error + expectSucceeded bool + expectResultTag string + expectOrigin string + expectRetryable string + expectDependency bool + }{ + { + name: "nil error records success", + err: nil, + expectSucceeded: true, + expectResultTag: "success", + }, + { + name: "generic error records failure with infra origin", + err: fmt.Errorf("something broke"), + expectSucceeded: false, + expectResultTag: "error", + expectOrigin: "infra", + expectRetryable: "false", + }, + { + name: "retryable error records retryable=true", + err: errs.NewRetryableError(fmt.Errorf("timeout")), + expectSucceeded: false, + expectResultTag: "error", + expectOrigin: "infra", + expectRetryable: "true", + }, + { + name: "user error records error_origin=user", + err: errs.NewUserError(fmt.Errorf("bad input")), + expectSucceeded: false, + expectResultTag: "error", + expectOrigin: "user", + expectRetryable: "false", + }, + { + name: "dependency error records dependency=true", + err: errs.NewDependencyError(fmt.Errorf("db down")), + expectSucceeded: false, + expectResultTag: "error", + expectOrigin: "infra", + expectRetryable: "false", + expectDependency: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scope := tally.NewTestScope("", nil) + op := Begin(scope, "process") + op.Complete(tt.err) + + snapshot := scope.Snapshot() + counters := snapshot.Counters() + + // Begin always emits called + c, ok := counters["process.called+"] + assert.True(t, ok, "expected process.called counter") + assert.Equal(t, int64(1), c.Value()) + + if tt.expectSucceeded { + c, ok := counters["process.succeeded+"] + assert.True(t, ok, "expected process.succeeded counter") + assert.Equal(t, int64(1), c.Value()) + + timers := snapshot.Timers() + timer, ok := timers["process.latency+result=success"] + assert.True(t, ok, "expected process.latency timer with result=success") + assert.NotEmpty(t, timer.Values()) + + histograms := snapshot.Histograms() + _, ok = histograms["process.latency_histogram+result=success"] + assert.True(t, ok, "expected process.latency_histogram with result=success") + } else { + c, ok := counters["process.failed+"] + assert.True(t, ok, "expected process.failed counter") + assert.Equal(t, int64(1), c.Value()) + + // Build expected tag suffix (tally sorts tags alphabetically) + tagSuffix := "" + if tt.expectDependency { + tagSuffix += "dependency=true," + } + tagSuffix += "error_origin=" + tt.expectOrigin + tagSuffix += ",result=" + tt.expectResultTag + tagSuffix += ",retryable=" + tt.expectRetryable + + timerKey := "process.latency+" + tagSuffix + timers := snapshot.Timers() + timer, ok := timers[timerKey] + assert.True(t, ok, "expected timer key %s, got keys: %v", timerKey, timerKeys(timers)) + if ok { + assert.NotEmpty(t, timer.Values()) + } + + histogramKey := "process.latency_histogram+" + tagSuffix + histograms := snapshot.Histograms() + _, ok = histograms[histogramKey] + assert.True(t, ok, "expected histogram key %s, got keys: %v", histogramKey, histogramKeys(histograms)) + } + }) + } +} + +func TestBegin_WithTags(t *testing.T) { + scope := tally.NewTestScope("", nil) + op := Begin(scope, "process", NewTag("env", "prod")) + op.Complete(nil) + + snapshot := scope.Snapshot() + counters := snapshot.Counters() + + c, ok := counters["process.called+env=prod"] + assert.True(t, ok, "expected tagged called counter, got keys: %v", counterKeys(counters)) + assert.Equal(t, int64(1), c.Value()) + + c, ok = counters["process.succeeded+env=prod"] + assert.True(t, ok, "expected tagged succeeded counter, got keys: %v", counterKeys(counters)) + assert.Equal(t, int64(1), c.Value()) +} + +func TestNamedCounter(t *testing.T) { + scope := tally.NewTestScope("", nil) + NamedCounter(scope, "publish", "attempts", 5) + + snapshot := scope.Snapshot() + counters := snapshot.Counters() + c, ok := counters["publish.attempts+"] + assert.True(t, ok, "expected publish.attempts counter") + assert.Equal(t, int64(5), c.Value()) +} + +func TestNamedTimer(t *testing.T) { + scope := tally.NewTestScope("", nil) + NamedTimer(scope, "publish", "queue_latency", 42*time.Millisecond) + + snapshot := scope.Snapshot() + timers := snapshot.Timers() + timer, ok := timers["publish.queue_latency+"] + assert.True(t, ok, "expected publish.queue_latency timer") + assert.Equal(t, []time.Duration{42 * time.Millisecond}, timer.Values()) +} + +func TestNamedHistogram(t *testing.T) { + scope := tally.NewTestScope("", nil) + h := NamedHistogram(scope, "process", "duration", defaultLatencyBuckets) + assert.NotNil(t, h) + + h.RecordDuration(50 * time.Millisecond) + + snapshot := scope.Snapshot() + histograms := snapshot.Histograms() + _, ok := histograms["process.duration+"] + assert.True(t, ok, "expected process.duration histogram") +} + +func TestNamedGauge(t *testing.T) { + scope := tally.NewTestScope("", nil) + NamedGauge(scope, "consumer", "pending_messages", 42) + + snapshot := scope.Snapshot() + gauges := snapshot.Gauges() + g, ok := gauges["consumer.pending_messages+"] + assert.True(t, ok, "expected consumer.pending_messages gauge") + assert.Equal(t, float64(42), g.Value()) +} + +func TestDefaultLatencyBuckets_Sorted(t *testing.T) { + for i := 1; i < len(defaultLatencyBuckets); i++ { + assert.Greater(t, defaultLatencyBuckets[i], defaultLatencyBuckets[i-1], + "defaultLatencyBuckets[%d] (%v) must be greater than defaultLatencyBuckets[%d] (%v)", + i, defaultLatencyBuckets[i], i-1, defaultLatencyBuckets[i-1]) + } +} + +func TestErrorTags(t *testing.T) { + tests := []struct { + name string + err error + expected []Tag + }{ + { + name: "nil error returns nil", + err: nil, + expected: nil, + }, + { + name: "generic error returns infra non-retryable", + err: fmt.Errorf("fail"), + expected: []Tag{ + {Key: "error_origin", Value: "infra"}, + {Key: "retryable", Value: "false"}, + }, + }, + { + name: "user error returns user origin", + err: errs.NewUserError(fmt.Errorf("bad")), + expected: []Tag{ + {Key: "error_origin", Value: "user"}, + {Key: "retryable", Value: "false"}, + }, + }, + { + name: "retryable error returns retryable=true", + err: errs.NewRetryableError(fmt.Errorf("timeout")), + expected: []Tag{ + {Key: "error_origin", Value: "infra"}, + {Key: "retryable", Value: "true"}, + }, + }, + { + name: "dependency error includes dependency tag", + err: errs.NewDependencyError(fmt.Errorf("db down")), + expected: []Tag{ + {Key: "error_origin", Value: "infra"}, + {Key: "retryable", Value: "false"}, + {Key: "dependency", Value: "true"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tags := ErrorTags(tt.err) + assert.Equal(t, tt.expected, tags) + }) + } +} + +// timerKeys extracts map keys for error messages. +func timerKeys(m map[string]tally.TimerSnapshot) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys +} + +// counterKeys extracts map keys for error messages. +func counterKeys(m map[string]tally.CounterSnapshot) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys +} + +// histogramKeys extracts map keys for error messages. +func histogramKeys(m map[string]tally.HistogramSnapshot) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys +}