Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ Shared infrastructure packages reused across services. These are internal buildi
## Packages

- **consumer/** — Queue message consumption framework. Manages subscription lifecycle, message routing to controllers, automatic ack/nack, error classification (retryable vs. poison pill), and graceful shutdown. Services register `Controller` implementations and the consumer handles the rest.
- **errs/** — Error classification framework. Classifies errors by origin (user vs. infra) and retryability. Extensions return plain errors; service controllers classify them.
- **metrics/** — Metrics utility helpers for `tally.Scope`. Provides standardized counters, timers, and histograms with error-aware tagging via `core/errs` integration.
23 changes: 23 additions & 0 deletions core/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "metrics",
srcs = ["metrics.go"],
importpath = "github.com/uber/submitqueue/core/metrics",
visibility = ["//visibility:public"],
deps = [
"//core/errs",
"@com_github_uber_go_tally_v4//:tally",
],
)

go_test(
name = "metrics_test",
srcs = ["metrics_test.go"],
embed = [":metrics"],
deps = [
"//core/errs",
"@com_github_stretchr_testify//assert",
"@com_github_uber_go_tally_v4//:tally",
],
)
121 changes: 121 additions & 0 deletions core/metrics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Metrics Utilities (`core/metrics`)

The `metrics` package provides reusable helpers for emitting counters, timers, histograms, and gauges on a `tally.Scope`. It standardizes metric names across controllers and integrates with `core/errs` for automatic error classification tags.

## Design

**Free functions on `tally.Scope`** — no wrapper types. Existing constructors accept `tally.Scope` and don't need to change.

**Operation lifecycle** — `Begin` and `Complete` tie the full metrics lifecycle together. `Begin` captures the start time and emits `{name}.called`; `Complete` emits succeeded/failed counters, a latency timer, and a latency histogram. This prevents mismatched or forgotten metrics calls.

**Error-aware tagging** — `ErrorTags` integrates with `core/errs` to produce `error_origin=user|infra`, `retryable=true|false`, and `dependency=true` tags automatically. `Complete` uses these to tag latency metrics on failure.

**Consistent naming** — all Named helpers follow the `{name}.{sub}` sub-scope pattern, producing structured metric paths like `process.called`, `publish.attempts`, `consumer.pending_messages`.

## Operation Lifecycle

For any operation with a clear start/end, use `Begin`/`Complete`:

| Function | Emits |
|----------|-------|
| `Begin(scope, name, ...tags)` | `{name}.called` counter +1, returns `Op` |
| `op.Complete(err)` | `{name}.succeeded` or `{name}.failed` counter, `{name}.latency` timer, `{name}.latency_histogram` histogram — all tagged with `result=success\|error` and error classification tags on failure |

```go
// RPC controller
func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *pb.LandResponse, retErr error) {
op := metrics.Begin(c.scope, "land")
defer func() { op.Complete(retErr) }()

// ... business logic ...
return &pb.LandResponse{Sqid: request.ID}, nil
}

// Queue controller
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
op := metrics.Begin(c.scope, "process")
defer func() { op.Complete(retErr) }()

// ... business logic ...
return nil
}
```

On success, `Complete` emits:
- `{name}.succeeded` counter +1
- `{name}.latency` timer tagged `result=success`
- `{name}.latency_histogram` histogram tagged `result=success`

On failure, `Complete` emits:
- `{name}.failed` counter +1
- `{name}.latency` timer tagged `result=error`, `error_origin=user|infra`, `retryable=true|false`, and optionally `dependency=true`
- `{name}.latency_histogram` histogram with the same tags

## Named Helpers

For ad-hoc metrics that don't fit the Begin/Complete lifecycle. All follow the `{name}.{sub}` sub-scope pattern:

| Function | Emits | Example |
|----------|-------|---------|
| `NamedCounter(scope, name, counter, value, ...tags)` | `{name}.{counter}` counter | `publish.attempts` |
| `NamedTimer(scope, name, timer, duration, ...tags)` | `{name}.{timer}` timer | `publish.queue_latency` |
| `NamedHistogram(scope, name, histogram, buckets, ...tags)` | `{name}.{histogram}` histogram | `process.duration` |
| `NamedGauge(scope, name, gauge, value, ...tags)` | `{name}.{gauge}` gauge | `consumer.pending_messages` |

```go
// Count a specific sub-event
metrics.NamedCounter(c.scope, "publish", "attempts", 1)

// Record a specific sub-latency
metrics.NamedTimer(c.scope, "publish", "queue_latency", elapsed)

// Track current queue depth (goes up and down)
metrics.NamedGauge(c.scope, "consumer", "pending_messages", float64(len(pending)))

// Create a reusable histogram (store on struct, call RecordDuration per invocation)
h := metrics.NamedHistogram(c.scope, "process", "duration", tally.DurationBuckets{...})
h.RecordDuration(elapsed)
```

## Error Tags

`ErrorTags` classifies errors using `core/errs` and returns tags for dimensional filtering:

| Tag | Values | Source |
|-----|--------|--------|
| `error_origin` | `user`, `infra` | `errs.IsUserError` |
| `retryable` | `true`, `false` | `errs.IsRetryable` |
| `dependency` | `true` (only when applicable) | `errs.IsDependencyError` |

```go
tags := metrics.ErrorTags(err)
// Generic error: [{error_origin, infra}, {retryable, false}]
// User error: [{error_origin, user}, {retryable, false}]
// Retryable error: [{error_origin, infra}, {retryable, true}]
// Dependency error: [{error_origin, infra}, {retryable, false}, {dependency, true}]
```

## Tags

Use `NewTag` to pass additional dimensional tags to any helper:

```go
op := metrics.Begin(c.scope, "process", metrics.NewTag("queue", req.Queue))
defer func() { op.Complete(retErr) }()

metrics.NamedCounter(c.scope, "publish", "attempts", 1, metrics.NewTag("topic", c.topic))
```

## Latency Buckets

`Complete` uses default latency buckets (5ms to 4h) automatically, suitable for both fast RPCs and long-running operations like builds and merges:

```
5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s, 2.5s, 5s, 10s, 30s, 1m, 2m, 5m, 10m, 30m, 1h, 2h, 4h
```

For custom histograms, pass your own buckets to `NamedHistogram`:

```go
h := metrics.NamedHistogram(c.scope, "build", "duration", tally.DurationBuckets{...})
```
173 changes: 173 additions & 0 deletions core/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package metrics

import (
"time"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/errs"
)

// Tag is a key-value pair attached to a metric for dimensional filtering.
type Tag struct {
// Key is the tag name (e.g., "controller", "topic").
Key string
// Value is the tag value (e.g., "land", "request").
Value string
}

// NewTag creates a Tag with the given key and value.
func NewTag(key, value string) Tag {
return Tag{Key: key, Value: value}
}

// defaultLatencyBuckets provides pre-defined duration buckets for common latency histograms.
// Covers sub-millisecond to multi-hour ranges suitable for RPC calls, queue processing,
// and long-running operations like builds and merges.
var defaultLatencyBuckets = tally.DurationBuckets{
5 * time.Millisecond,
10 * time.Millisecond,
25 * time.Millisecond,
50 * time.Millisecond,
100 * time.Millisecond,
250 * time.Millisecond,
500 * time.Millisecond,
1 * time.Second,
2500 * time.Millisecond,
5 * time.Second,
10 * time.Second,
30 * time.Second,
1 * time.Minute,
2 * time.Minute,
5 * time.Minute,
10 * time.Minute,
30 * time.Minute,
1 * time.Hour,
2 * time.Hour,
4 * time.Hour,
}

// Op tracks the lifecycle of a named operation. It captures the start time on
// creation, emits a {name}.called counter, and records the outcome (succeeded/failed
// counters + latency timer with error classification tags) when Complete is called.
//
// Usage:
//
// func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
// op := metrics.Begin(c.scope, "process")
// defer func() { op.Complete(retErr) }()
// // ... business logic ...
// }
type Op struct {
// scope is the tally scope with tags and sub-scope already applied.
scope tally.Scope
// start is the time the operation began.
start time.Time
}

// Begin starts a new operation. It emits a {name}.called counter and captures
// the start time. Call Complete on the returned Op to record the outcome.
func Begin(scope tally.Scope, name string, tags ...Tag) Op {
sub := tagged(scope, tags).SubScope(name)
sub.Counter("called").Inc(1)
return Op{scope: sub, start: time.Now()}
}

// Complete records the outcome of the operation. It emits a {name}.succeeded or
// {name}.failed counter based on err, and records elapsed time on both
// {name}.latency (timer) and {name}.latency_histogram (histogram with
// defaultLatencyBuckets for percentile distributions), tagged with result=success|error.
// On failure, error classification tags (error_origin, retryable, dependency)
// are added to both the timer and histogram.
func (o Op) Complete(err error) {
elapsed := time.Since(o.start)

if err == nil {
o.scope.Counter("succeeded").Inc(1)
s := o.scope.Tagged(map[string]string{"result": "success"})
s.Timer("latency").Record(elapsed)
s.Histogram("latency_histogram", defaultLatencyBuckets).RecordDuration(elapsed)
return
}

o.scope.Counter("failed").Inc(1)

latencyTags := map[string]string{"result": "error"}
for _, t := range ErrorTags(err) {
latencyTags[t.Key] = t.Value
}
s := o.scope.Tagged(latencyTags)
s.Timer("latency").Record(elapsed)
s.Histogram("latency_histogram", defaultLatencyBuckets).RecordDuration(elapsed)
}

// NamedCounter increments the {name}.{counter} counter by value.
func NamedCounter(scope tally.Scope, name string, counter string, value int64, tags ...Tag) {
tagged(scope, tags).SubScope(name).Counter(counter).Inc(value)
}

// NamedTimer records a duration on the {name}.{timer} timer.
func NamedTimer(scope tally.Scope, name string, timer string, d time.Duration, tags ...Tag) {
tagged(scope, tags).SubScope(name).Timer(timer).Record(d)
}

// NamedHistogram returns a tally.Histogram at {name}.{histogram} with the given
// bucket configuration. Store the returned histogram and call RecordDuration or
// RecordValue on each invocation.
func NamedHistogram(scope tally.Scope, name string, histogram string, buckets tally.Buckets, tags ...Tag) tally.Histogram {
return tagged(scope, tags).SubScope(name).Histogram(histogram, buckets)
}

// NamedGauge updates the {name}.{gauge} gauge to value. Gauges represent a
// current point-in-time measurement that can go up or down, such as queue depth,
// active connections, or in-flight requests.
func NamedGauge(scope tally.Scope, name string, gauge string, value float64, tags ...Tag) {
tagged(scope, tags).SubScope(name).Gauge(gauge).Update(value)
}

// ErrorTags returns classification tags for an error using core/errs.
// Returns error_origin (user|infra), retryable (true|false), and
// dependency (true) tags. Returns nil for a nil error.
func ErrorTags(err error) []Tag {
if err == nil {
return nil
}

origin := "infra"
if errs.IsUserError(err) {
origin = "user"
}

retryable := "false"
if errs.IsRetryable(err) {
retryable = "true"
}

tags := []Tag{
{Key: "error_origin", Value: origin},
{Key: "retryable", Value: retryable},
}

if errs.IsDependencyError(err) {
tags = append(tags, Tag{Key: "dependency", Value: "true"})
}

return tags
}

// tagsToMap converts a slice of Tag to a map for tally.
func tagsToMap(tags []Tag) map[string]string {
m := make(map[string]string, len(tags))
for _, t := range tags {
m[t.Key] = t.Value
}
return m
}

// tagged applies tags to a scope if any are provided, otherwise returns the
// scope unchanged.
func tagged(scope tally.Scope, tags []Tag) tally.Scope {
if len(tags) == 0 {
return scope
}
return scope.Tagged(tagsToMap(tags))
}
Loading