diff --git a/cmd/outbox-relay/main.go b/cmd/outbox-relay/main.go index 85b9075..ca9ae11 100644 --- a/cmd/outbox-relay/main.go +++ b/cmd/outbox-relay/main.go @@ -68,6 +68,7 @@ func main() { BatchSize: cfg.OutboxBatchSize, PollInterval: cfg.OutboxPollInterval, Workers: cfg.OutboxWorkers, + MaxAttempts: cfg.OutboxMaxAttempts, }, m, log) ctx, stop := server.ShutdownContext() diff --git a/config/prometheus/alerts.yml b/config/prometheus/alerts.yml new file mode 100644 index 0000000..47ef89d --- /dev/null +++ b/config/prometheus/alerts.yml @@ -0,0 +1,53 @@ +# Alerting rules for the transactional outbox relay. These turn the relay's +# instruments (see docs/outbox.md §9) into actionable signals: a growing backlog +# means the relay is falling behind or NATS is unreachable; a dead-lettered row +# means an event will never be delivered without operator action. +groups: + - name: outbox-relay + rules: + # Any dead-lettered row is page-worthy: the event is permanently stuck in + # FAILED and a human must inspect last_error and re-queue or drop it. + - alert: OutboxEventsDeadLettered + expr: increase(gamemesh_outbox_events_dead_lettered_total[5m]) > 0 + for: 0m + labels: + severity: critical + component: outbox-relay + annotations: + summary: "Outbox rows dead-lettered (service {{ $labels.service }})" + description: > + {{ $value | printf "%.0f" }} outbox row(s) moved to FAILED in the last + 5m. They will never be published without operator action — inspect + last_error and re-queue or drop them. + + # A persistently rising backlog means the relay cannot keep up or NATS is + # down. Rows are durable (no loss), so this is a warning, not a page. + - alert: OutboxBacklogGrowing + expr: gamemesh_outbox_events_pending > 500 + for: 10m + labels: + severity: warning + component: outbox-relay + annotations: + summary: "Outbox backlog growing (service {{ $labels.service }})" + description: > + {{ $value | printf "%.0f" }} PENDING outbox rows for over 10m. The + relay is falling behind or NATS is unreachable. Events are safe but + delayed — check NATS connectivity and relay throughput/replicas. + + # Publishes failing while nothing is succeeding = NATS is down. Distinct + # from a backlog: this fires fast on the failure rate, not the queue depth. + - alert: OutboxPublishStalled + expr: | + rate(gamemesh_outbox_publish_failures_total[5m]) > 0 + and rate(gamemesh_outbox_events_published_total[5m]) == 0 + for: 5m + labels: + severity: critical + component: outbox-relay + annotations: + summary: "Outbox publishing stalled (service {{ $labels.service }})" + description: > + Publish attempts are failing and nothing is succeeding for 5m — NATS + is likely down. Rows stay PENDING and retry; if this persists they + will eventually dead-letter (see OutboxEventsDeadLettered). diff --git a/config/prometheus/prometheus.yml b/config/prometheus/prometheus.yml index 62c1a86..9ee5cfb 100644 --- a/config/prometheus/prometheus.yml +++ b/config/prometheus/prometheus.yml @@ -5,6 +5,10 @@ global: scrape_interval: 5s evaluation_interval: 5s +# Alerting rules (outbox relay SLOs). Evaluated every evaluation_interval. +rule_files: + - /etc/prometheus/alerts.yml + scrape_configs: - job_name: gateway static_configs: diff --git a/deployments/k8s/15-outbox-relay.yaml b/deployments/k8s/15-outbox-relay.yaml new file mode 100644 index 0000000..7be6549 --- /dev/null +++ b/deployments/k8s/15-outbox-relay.yaml @@ -0,0 +1,90 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: outbox-relay + namespace: gamemesh +spec: + # Three replicas for HA. The relay claims rows with FOR UPDATE SKIP LOCKED + # (internal/outbox/store.go), so replicas never publish the same outbox row — + # running more than one removes the relay as a single point of failure between + # persistence and event transport. + replicas: 3 + selector: + matchLabels: + app: outbox-relay + template: + metadata: + labels: + app: outbox-relay + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "8085" + prometheus.io/path: /metrics + spec: + containers: + - name: outbox-relay + image: gamemesh/outbox-relay:latest + imagePullPolicy: IfNotPresent + ports: + - containerPort: 8085 + env: + - name: HTTP_PORT + value: "8085" + - name: NATS_URL + value: nats://nats:4222 + - name: OUTBOX_ENABLED + value: "true" + - name: OUTBOX_BATCH_SIZE + value: "100" + - name: OUTBOX_POLL_INTERVAL + value: "1s" + - name: OUTBOX_WORKERS + value: "4" + # Dead-letter a row after this many failed publish attempts + # (0 = retry forever). Caps poison rows from wedging a worker. + - name: OUTBOX_MAX_ATTEMPTS + value: "5" + - name: POSTGRES_DSN + value: host=postgres user=$(POSTGRES_USER) password=$(POSTGRES_PASSWORD) dbname=$(POSTGRES_DB) port=5432 sslmode=disable + envFrom: + - configMapRef: + name: gamemesh-config + - secretRef: + name: gamemesh-secrets + readinessProbe: + httpGet: { path: /healthz, port: 8085 } + initialDelaySeconds: 3 + periodSeconds: 5 + livenessProbe: + httpGet: { path: /healthz, port: 8085 } + initialDelaySeconds: 10 + periodSeconds: 10 + resources: + requests: { cpu: 100m, memory: 64Mi } + limits: { cpu: 500m, memory: 128Mi } +--- +apiVersion: v1 +kind: Service +metadata: + name: outbox-relay + namespace: gamemesh +spec: + selector: + app: outbox-relay + ports: + - port: 8085 + targetPort: 8085 +--- +# Keep at least 2 relays running during voluntary disruptions (node drains, +# rollouts). Without this a cluster autoscaler or upgrade could evict all three +# replicas at once and stall outbox publishing until they reschedule. +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: outbox-relay + namespace: gamemesh +spec: + minAvailable: 2 + selector: + matchLabels: + app: outbox-relay diff --git a/deployments/k8s/30-monitoring.yaml b/deployments/k8s/30-monitoring.yaml index 15d6fe7..11f010e 100644 --- a/deployments/k8s/30-monitoring.yaml +++ b/deployments/k8s/30-monitoring.yaml @@ -37,6 +37,9 @@ data: prometheus.yml: | global: scrape_interval: 5s + evaluation_interval: 5s + rule_files: + - /etc/prometheus/alerts.yml scrape_configs: - job_name: gamemesh-pods kubernetes_sd_configs: @@ -54,6 +57,39 @@ data: target_label: __address__ - source_labels: [__meta_kubernetes_pod_label_app] target_label: app + alerts.yml: | + # Outbox relay alerting rules (mirror of config/prometheus/alerts.yml). + groups: + - name: outbox-relay + rules: + - alert: OutboxEventsDeadLettered + expr: increase(gamemesh_outbox_events_dead_lettered_total[5m]) > 0 + for: 0m + labels: { severity: critical, component: outbox-relay } + annotations: + summary: "Outbox rows dead-lettered (service {{ $labels.service }})" + description: > + {{ $value | printf "%.0f" }} outbox row(s) moved to FAILED in the + last 5m. They will never publish without operator action. + - alert: OutboxBacklogGrowing + expr: gamemesh_outbox_events_pending > 500 + for: 10m + labels: { severity: warning, component: outbox-relay } + annotations: + summary: "Outbox backlog growing (service {{ $labels.service }})" + description: > + {{ $value | printf "%.0f" }} PENDING rows for over 10m — relay + behind or NATS unreachable. Events safe but delayed. + - alert: OutboxPublishStalled + expr: | + rate(gamemesh_outbox_publish_failures_total[5m]) > 0 + and rate(gamemesh_outbox_events_published_total[5m]) == 0 + for: 5m + labels: { severity: critical, component: outbox-relay } + annotations: + summary: "Outbox publishing stalled (service {{ $labels.service }})" + description: > + Publishes failing and none succeeding for 5m — NATS likely down. --- apiVersion: apps/v1 kind: Deployment diff --git a/docker-compose.yml b/docker-compose.yml index 07304fd..2756cb8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -202,6 +202,9 @@ services: OUTBOX_BATCH_SIZE: ${OUTBOX_BATCH_SIZE:-100} OUTBOX_POLL_INTERVAL: ${OUTBOX_POLL_INTERVAL:-1s} OUTBOX_WORKERS: ${OUTBOX_WORKERS:-4} + # Dead-letter a row after this many failed publish attempts (0 = retry + # forever). 5 lets transient NATS blips self-heal but caps poison rows. + OUTBOX_MAX_ATTEMPTS: ${OUTBOX_MAX_ATTEMPTS:-5} depends_on: postgres: condition: service_healthy @@ -250,7 +253,8 @@ services: ports: - "9090:9090" volumes: - - ./config/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro + # Mount the whole dir so prometheus.yml and alerts.yml are both available. + - ./config/prometheus:/etc/prometheus:ro depends_on: - gateway diff --git a/docs/outbox.md b/docs/outbox.md index 2821e24..e474281 100644 --- a/docs/outbox.md +++ b/docs/outbox.md @@ -126,7 +126,9 @@ CREATE INDEX idx_outbox_pending - **Partial index on `PENDING`.** The relay's only hot query is "oldest unpublished rows." A partial index stays small as `PUBLISHED` rows accumulate, keeping the poll `O(batch)` regardless of total table size. -- **Two states, no `FAILED`.** See §6. +- **Three states (`PENDING` → `PUBLISHED`, or `PENDING` → `FAILED`).** A failed + publish stays `PENDING` and retries; a row that exhausts `OUTBOX_MAX_ATTEMPTS` + is dead-lettered to `FAILED`. See §6. --- @@ -150,7 +152,9 @@ transaction**: 2. **Publish** the batch across a **bounded worker pool** of `OUTBOX_WORKERS` (`internal/outbox/relay.go: publishAll`) — never one goroutine per event. 3. **Mark** successfully published rows `PUBLISHED` (stamping `published_at`) and - **bump `attempt_count`** on the failures, which stay `PENDING`. + **bump `attempt_count`** on the failures, which stay `PENDING` — unless a row + reaches `OUTBOX_MAX_ATTEMPTS`, in which case it is moved to `FAILED` + (dead-lettered) in the same statement so it is never polled again. 4. **Commit.** The rows stayed locked for the whole cycle, so no other replica touched them. @@ -167,6 +171,7 @@ in-flight batch's transaction completes or rolls back cleanly. | `OUTBOX_BATCH_SIZE` | `100` | Rows claimed per poll. | | `OUTBOX_POLL_INTERVAL` | `1s` | Delay between polls when idle. | | `OUTBOX_WORKERS` | `4` | Bounded publish concurrency / batch. | +| `OUTBOX_MAX_ATTEMPTS` | `0` | Dead-letter a row to `FAILED` after this many failed publishes (`0` = retry forever). | The player service always **writes** to the outbox regardless of `OUTBOX_ENABLED`; the flag only governs the relay. @@ -191,20 +196,26 @@ other event. The outbox is invisible past the publish. insert (in business tx) PENDING ───────────────────────► (relay publishes) ──success──► PUBLISHED ▲ │ - └───────────── failure ───────────────┘ (attempt_count++ , stays PENDING) + ├───────── failure (attempt < max) ───┘ (attempt_count++ , stays PENDING) + │ + └───────── failure (attempt = max) ──────────────────────────► FAILED (dead-letter) ``` -**Only two states.** A failed publish is simply a `PENDING` row that is retried -on the next poll, so a transient NATS outage **self-heals** with no operator -action and nothing to reconcile. There is intentionally **no `FAILED` state**: -it would add a manual recovery step for a condition that resolves itself. -`attempt_count` is enough to alert on a genuinely poisoned row (e.g. one that -keeps failing) without a state machine. +**Self-healing, with a poison-row backstop.** A failed publish is normally just a +`PENDING` row that is retried on the next poll, so a transient NATS outage +**self-heals** with no operator action. To stop a genuinely poisoned row (one +that can *never* publish — e.g. malformed payload) from being retried forever and +tying up a worker, a row is moved to `FAILED` once `attempt_count` reaches +`OUTBOX_MAX_ATTEMPTS`. `FAILED` rows drop out of the `PENDING` poll, carry a +`last_error`, and increment `gamemesh_outbox_events_dead_lettered_total` so an +operator can alert, inspect, and re-queue them. Setting `OUTBOX_MAX_ATTEMPTS=0` +disables the backstop and restores the original retry-forever behaviour. | Failure | Outcome | | ------------------------------------------------ | ----------------------------------------------------------------------- | | Crash between `COMMIT` and any publish | Row is durable & `PENDING`; relay publishes it on next poll. **No loss.** | | NATS down when relay polls | `Publish` fails; row stays `PENDING`, `attempt_count++`; retried later. | +| Row fails `OUTBOX_MAX_ATTEMPTS` times | Row moved to `FAILED` (dead-letter); no longer polled; metric + `last_error` for triage. | | Crash **after** NATS publish, **before** mark | Row stays `PENDING`; relay republishes → **duplicate** (see §7). | | Business `INSERT` fails (e.g. duplicate username)| Whole tx rolls back; **no orphan outbox row**. | | Two relay replicas poll simultaneously | `SKIP LOCKED` hands each row to exactly one replica. | @@ -278,11 +289,25 @@ following the existing `gamemesh_*` convention with a `service` const label: | `gamemesh_outbox_events_pending` | Gauge | Current `PENDING` backlog (per poll). | | `gamemesh_outbox_events_published_total` | Counter | Rows successfully relayed. | | `gamemesh_outbox_publish_failures_total` | Counter | Publish attempts that failed (retried). | +| `gamemesh_outbox_events_dead_lettered_total`| Counter| Rows moved to `FAILED` after `OUTBOX_MAX_ATTEMPTS` (poison rows). | | `gamemesh_outbox_publish_duration_seconds`| Histogram| Per-row publish latency. | **Operational signals:** a steadily rising `pending` gauge means the relay is falling behind or NATS is unreachable; a rising `publish_failures_total` with a -flat `published_total` means NATS is down (rows are safe, just delayed). +flat `published_total` means NATS is down (rows are safe, just delayed). Any +increase in `dead_lettered_total` is a **page-worthy** signal — an event will +never be delivered without operator action, so alert on +`increase(gamemesh_outbox_events_dead_lettered_total[5m]) > 0`. + +**Alert rules.** These signals are shipped as Prometheus rules in +`config/prometheus/alerts.yml` (compose) and the `prometheus-config` ConfigMap in +`deployments/k8s/30-monitoring.yaml` (k8s): + +| Alert | Severity | Fires when | +| ------------------------- | -------- | ---------------------------------------------------------------- | +| `OutboxEventsDeadLettered`| critical | any row dead-letters in 5m (permanent loss without action). | +| `OutboxBacklogGrowing` | warning | `pending > 500` for 10m (relay behind or NATS unreachable). | +| `OutboxPublishStalled` | critical | publishes failing and none succeeding for 5m (NATS down). | --- @@ -297,9 +322,12 @@ flat `published_total` means NATS is down (rows are safe, just delayed). throughput. - **Ordering.** The relay processes oldest-first but does not guarantee strict global ordering under concurrency. Consumers must not assume cross-event order. -- **Kubernetes.** The compose deployment runs the relay; the k8s manifests under - `deployments/k8s/` are not updated in this change and would need an analogous - Deployment to run the relay in-cluster. +- **Kubernetes.** `deployments/k8s/15-outbox-relay.yaml` runs the relay in-cluster + with **3 replicas** and a `PodDisruptionBudget` of `minAvailable: 2`, so node + drains and rollouts never evict the whole relay at once. The replicas are safe + by construction — `FOR UPDATE SKIP LOCKED` hands each outbox row to exactly one + of them — which removes the relay as a single point of failure between + persistence and event transport. --- diff --git a/internal/outbox/publisher.go b/internal/outbox/publisher.go index 1a0df36..61efcc9 100644 --- a/internal/outbox/publisher.go +++ b/internal/outbox/publisher.go @@ -4,9 +4,7 @@ import ( "context" "github.com/google/uuid" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "gorm.io/gorm" @@ -56,10 +54,7 @@ func (p *Publisher) Publish(ctx context.Context, topic string, e events.Event) e ) defer span.End() - if e.Carrier == nil { - e.Carrier = make(map[string]string) - } - otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(e.Carrier)) + e.Carrier = tracing.InjectCarrier(ctx, e.Carrier) id, err := uuid.Parse(e.ID) if err != nil { diff --git a/internal/outbox/relay.go b/internal/outbox/relay.go index c3dd7f8..bc08c16 100644 --- a/internal/outbox/relay.go +++ b/internal/outbox/relay.go @@ -7,8 +7,7 @@ import ( "time" "github.com/google/uuid" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" "github.com/alpnuhoglu/gamemesh/pkg/events" @@ -20,15 +19,28 @@ import ( // Postgres; tests substitute an in-memory fake so the relay's publish/retry // logic is exercised without a database. type Batcher interface { - RunBatch(ctx context.Context, limit int, publish PublishFunc) (int, error) + RunBatch(ctx context.Context, limit, maxAttempts int, publish PublishFunc) (polled, deadLettered int, err error) CountPending(ctx context.Context) (int64, error) } +// BusPublisher is the narrow publish seam the relay needs — just the one method +// it calls. It is satisfied by events.Publisher (NATSBus/RedisBus) without the +// relay depending on the wider Publish+Close interface, so the relay's coupling +// to the event bus is exactly "I can publish one event" and nothing more. (Named +// BusPublisher to avoid colliding with outbox.Publisher, the outbox write side.) +type BusPublisher interface { + Publish(ctx context.Context, topic string, e events.Event) error +} + // RelayConfig tunes the relay loop. type RelayConfig struct { BatchSize int // rows fetched per poll PollInterval time.Duration // delay between polls when idle Workers int // bounded publish concurrency per batch + // MaxAttempts dead-letters a row (status FAILED) once its publish attempts + // reach this count, so a poison row stops being retried forever. 0 disables + // dead-lettering and rows retry indefinitely (the original behaviour). + MaxAttempts int } func (c RelayConfig) withDefaults() RelayConfig { @@ -41,6 +53,8 @@ func (c RelayConfig) withDefaults() RelayConfig { if c.Workers <= 0 { c.Workers = 4 } + // MaxAttempts intentionally has no default: 0 preserves the retry-forever + // behaviour, which a deployment opts out of by setting OUTBOX_MAX_ATTEMPTS. return c } @@ -50,14 +64,14 @@ func (c RelayConfig) withDefaults() RelayConfig { // and restart independently of the business services. type Relay struct { store Batcher - bus events.Publisher + bus BusPublisher cfg RelayConfig m *metrics.Metrics log *zap.Logger } // NewRelay wires a relay. m may be nil (metrics skipped). -func NewRelay(store Batcher, bus events.Publisher, cfg RelayConfig, m *metrics.Metrics, log *zap.Logger) *Relay { +func NewRelay(store Batcher, bus BusPublisher, cfg RelayConfig, m *metrics.Metrics, log *zap.Logger) *Relay { return &Relay{store: store, bus: bus, cfg: cfg.withDefaults(), m: m, log: log} } @@ -100,17 +114,27 @@ func (r *Relay) Run(ctx context.Context) error { // transaction (owned by the store). The PENDING rows are locked with // FOR UPDATE SKIP LOCKED for the duration so concurrent relay replicas never // touch the same rows. Successfully published rows are marked PUBLISHED; failed -// ones get attempt_count bumped and stay PENDING for the next cycle (retry). -// Returns the number of rows polled. +// ones get attempt_count bumped and stay PENDING for the next cycle (retry), +// unless they reach MaxAttempts, in which case they are dead-lettered (FAILED) +// and counted on the dead-letter metric. Returns the number of rows polled. func (r *Relay) processBatch(ctx context.Context) (int, error) { pollCtx, span := tracing.Tracer().Start(ctx, "outbox.poll") defer span.End() - polled, err := r.store.RunBatch(pollCtx, r.cfg.BatchSize, func(ctx context.Context, rows []Event) ([]uuid.UUID, []uuid.UUID, error) { - published, failed := r.publishAll(ctx, rows) - return published, failed, nil - }) + polled, deadLettered, err := r.store.RunBatch(pollCtx, r.cfg.BatchSize, r.cfg.MaxAttempts, + func(ctx context.Context, rows []Event) ([]uuid.UUID, []uuid.UUID, error) { + published, failed := r.publishAll(ctx, rows) + return published, failed, nil + }) tracing.RecordError(span, err) + if deadLettered > 0 { + span.SetAttributes(attribute.Int("outbox.dead_lettered", deadLettered)) + if r.m != nil { + r.m.OutboxEventsDeadLetteredTotal.Add(float64(deadLettered)) + } + r.log.Warn("outbox rows dead-lettered after max attempts", + zap.Int("count", deadLettered), zap.Int("max_attempts", r.cfg.MaxAttempts)) + } return polled, err } @@ -172,7 +196,7 @@ func (r *Relay) publishOne(ctx context.Context, row Event) error { if len(row.Carrier) > 0 { _ = json.Unmarshal(row.Carrier, &carrier) } - ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(carrier)) + ctx = tracing.ResumeFromCarrier(ctx, carrier) ctx, span := tracing.Tracer().Start(ctx, "outbox.publish "+row.Topic) defer span.End() diff --git a/internal/outbox/relay_test.go b/internal/outbox/relay_test.go index 2702be5..dcddba4 100644 --- a/internal/outbox/relay_test.go +++ b/internal/outbox/relay_test.go @@ -34,7 +34,7 @@ func newFakeBatcher(rows ...Event) *fakeBatcher { return &fakeBatcher{rows: rows, attempt: map[uuid.UUID]int{}} } -func (f *fakeBatcher) RunBatch(ctx context.Context, limit int, publish PublishFunc) (int, error) { +func (f *fakeBatcher) RunBatch(ctx context.Context, limit, maxAttempts int, publish PublishFunc) (polled, deadLettered int, err error) { f.mu.Lock() batch := f.rows if len(batch) > limit { @@ -46,12 +46,12 @@ func (f *fakeBatcher) RunBatch(ctx context.Context, limit int, publish PublishFu f.mu.Unlock() if len(rows) == 0 { - return 0, nil + return 0, 0, nil } published, failed, err := publish(ctx, rows) if err != nil { - return len(rows), err + return len(rows), 0, err } f.mu.Lock() @@ -60,18 +60,25 @@ func (f *fakeBatcher) RunBatch(ctx context.Context, limit int, publish PublishFu for _, id := range published { done[id] = true } + // dead collects failed rows whose bumped attempt_count reached maxAttempts: + // they leave the PENDING set (dead-lettered) just like the SQL store does. + dead := map[uuid.UUID]bool{} for _, id := range failed { f.attempt[id]++ + if maxAttempts > 0 && f.attempt[id] >= maxAttempts { + dead[id] = true + deadLettered++ + } } - // Drop published rows from the PENDING set; failed rows stay for retry. + // Drop published and dead-lettered rows; transiently failed rows stay PENDING. var remaining []Event for _, r := range f.rows { - if !done[r.ID] { + if !done[r.ID] && !dead[r.ID] { remaining = append(remaining, r) } } f.rows = remaining - return len(rows), nil + return len(rows), deadLettered, nil } func (f *fakeBatcher) CountPending(context.Context) (int64, error) { @@ -168,6 +175,40 @@ func TestRelayRetriesPublishFailure(t *testing.T) { assert.Equal(t, int64(0), pending) } +// Scenario 3a: a row that keeps failing is dead-lettered once it reaches +// MaxAttempts — it leaves the PENDING set so it is never retried again, and the +// dead-letter metric is incremented for an operator to alert on. +func TestRelayDeadLettersAfterMaxAttempts(t *testing.T) { + row := pendingRow(t, events.TypePlayerRegistered, events.PlayerRegisteredPayload{PlayerID: "p1"}) + fb := newFakeBatcher(row) + pub := &capturingPublisher{failNext: 100} // every publish fails + m := metrics.New("test-relay-deadletter") + relay := NewRelay(fb, pub, RelayConfig{BatchSize: 10, MaxAttempts: 3}, m, zap.NewNop()) + + // Two failing ticks: row stays PENDING, attempts 1 then 2. + for i := 1; i <= 2; i++ { + _, err := relay.processBatch(context.Background()) + require.NoError(t, err) + pending, _ := fb.CountPending(context.Background()) + assert.Equal(t, int64(1), pending, "row still pending before max attempts") + assert.Equal(t, i, fb.attempt[row.ID]) + } + + // Third failing tick reaches MaxAttempts: the row is dead-lettered. + _, err := relay.processBatch(context.Background()) + require.NoError(t, err) + pending, _ := fb.CountPending(context.Background()) + assert.Equal(t, int64(0), pending, "dead-lettered row must leave the PENDING set") + assert.Equal(t, 0, pub.count(), "nothing was ever published") + + // A further tick is a no-op: there is nothing left to poll, so the poison row + // can never wedge a worker again. + _, err = relay.processBatch(context.Background()) + require.NoError(t, err) + pending, _ = fb.CountPending(context.Background()) + assert.Equal(t, int64(0), pending) +} + // Scenario 4: duplicate-safe replay. If the process "crashes" after publishing // but before marking (modeled by re-running the same row), the event is sent // again — duplicates are possible, and the Event.ID is stable so a consumer can diff --git a/internal/outbox/store.go b/internal/outbox/store.go index 0f89c77..791583a 100644 --- a/internal/outbox/store.go +++ b/internal/outbox/store.go @@ -19,13 +19,16 @@ import ( "github.com/alpnuhoglu/gamemesh/pkg/tracing" ) -// Status values for an outbox row. The lifecycle is intentionally just two -// states: a failed publish is simply a PENDING row that gets retried on the -// next poll, so a transient NATS outage self-heals with no operator action and -// no FAILED state to reconcile. +// Status values for an outbox row. A failed publish stays PENDING and is retried +// on the next poll, so a transient NATS outage self-heals with no operator +// action. A row that keeps failing past MaxAttempts is moved to FAILED (a +// dead-letter state): it is no longer polled, so a single poison row can never +// be retried forever and wedge a worker. FAILED rows are surfaced via metrics +// for an operator to inspect and replay. const ( StatusPending = "PENDING" StatusPublished = "PUBLISHED" + StatusFailed = "FAILED" ) // Event is the GORM model backing the outbox_events table. The struct fields @@ -46,6 +49,9 @@ type Event struct { CreatedAt time.Time `gorm:"not null;default:now()"` PublishedAt *time.Time `gorm:"column:published_at"` AttemptCount int `gorm:"column:attempt_count;not null;default:0"` + // LastError records the most recent publish failure, kept on FAILED rows so an + // operator can see why a dead-lettered event never made it to NATS. + LastError string `gorm:"column:last_error"` } // TableName pins the table name to match the SQL migrations. @@ -96,13 +102,15 @@ type PublishFunc func(ctx context.Context, rows []Event) (published, failed []uu // transaction. It locks up to limit of the oldest PENDING rows with // FOR UPDATE SKIP LOCKED (so concurrent relay replicas never grab the same // rows), hands them to publish, then marks the successful ones PUBLISHED and -// bumps attempt_count on the failures (which stay PENDING and retry next cycle). -// Returns the number of rows polled. Keeping the transaction logic here is the -// only place that knows about gorm, which keeps the relay unit-testable behind -// the Batcher interface. -func (s *Store) RunBatch(ctx context.Context, limit int, publish PublishFunc) (int, error) { - var polled int - err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { +// bumps attempt_count on the failures. A failure whose bumped attempt_count +// reaches maxAttempts is moved to FAILED (dead-lettered) instead of staying +// PENDING, so a poison row stops being retried; maxAttempts <= 0 disables this +// and rows retry forever (the original behaviour). Returns the number of rows +// polled and the number dead-lettered in this batch. Keeping the transaction +// logic here is the only place that knows about gorm, which keeps the relay +// unit-testable behind the Batcher interface. +func (s *Store) RunBatch(ctx context.Context, limit, maxAttempts int, publish PublishFunc) (polledN, deadLetteredN int, err error) { + txErr := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { var rows []Event if err := tx. Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}). @@ -112,8 +120,8 @@ func (s *Store) RunBatch(ctx context.Context, limit int, publish PublishFunc) (i Find(&rows).Error; err != nil { return err } - polled = len(rows) - if polled == 0 { + polledN = len(rows) + if polledN == 0 { return nil } @@ -129,13 +137,20 @@ func (s *Store) RunBatch(ctx context.Context, limit int, publish PublishFunc) (i tracing.RecordError(span, err) return err } - if err := incrementAttempt(markCtx, tx, failed); err != nil { + dead, err := incrementAttempt(markCtx, tx, failed, maxAttempts) + if err != nil { tracing.RecordError(span, err) return err } + deadLetteredN = dead + span.SetAttributes(attribute.Int("outbox.dead_lettered", dead)) return nil }) - return polled, err + if txErr != nil { + // A failed transaction commits nothing, so no rows were dead-lettered. + return polledN, 0, txErr + } + return polledN, deadLetteredN, nil } func markPublished(ctx context.Context, tx *gorm.DB, ids []uuid.UUID) error { @@ -149,14 +164,52 @@ func markPublished(ctx context.Context, tx *gorm.DB, ids []uuid.UUID) error { Updates(map[string]any{"status": StatusPublished, "published_at": now}).Error } -func incrementAttempt(ctx context.Context, tx *gorm.DB, ids []uuid.UUID) error { +// incrementAttempt bumps attempt_count on every failed row and, when maxAttempts +// is positive, dead-letters the rows whose bumped count reaches it: those flip to +// FAILED so the PENDING-only poll skips them on the next cycle. Both updates run +// in one statement via a CASE so the bump and the status flip stay atomic and a +// row is never read back between them. Returns the number of rows dead-lettered. +func incrementAttempt(ctx context.Context, tx *gorm.DB, ids []uuid.UUID, maxAttempts int) (deadLettered int, err error) { if len(ids) == 0 { - return nil + return 0, nil } - return tx.WithContext(ctx). + + updates := map[string]any{ + "attempt_count": gorm.Expr("attempt_count + 1"), + } + // maxAttempts <= 0 disables dead-lettering: rows just keep retrying (the + // original self-healing behaviour for indefinite transient outages). + if maxAttempts > 0 { + updates["status"] = gorm.Expr( + "CASE WHEN attempt_count + 1 >= ? THEN ? ELSE status END", + maxAttempts, StatusFailed, + ) + updates["last_error"] = gorm.Expr( + "CASE WHEN attempt_count + 1 >= ? THEN ? ELSE last_error END", + maxAttempts, "publish failed: max attempts exceeded", + ) + } + + if err := tx.WithContext(ctx). Model(&Event{}). Where("id IN ?", ids). - UpdateColumn("attempt_count", gorm.Expr("attempt_count + 1")).Error + Updates(updates).Error; err != nil { + return 0, err + } + + if maxAttempts <= 0 { + return 0, nil + } + // Count how many of these rows ended up FAILED so the relay can record the + // dead-letter metric. Scoped to the just-touched ids inside the same tx. + var n int64 + if err := tx.WithContext(ctx). + Model(&Event{}). + Where("id IN ? AND status = ?", ids, StatusFailed). + Count(&n).Error; err != nil { + return 0, err + } + return int(n), nil } // CountPending returns the number of PENDING rows, used to publish the backlog diff --git a/migrations/0004_outbox_dead_letter.down.sql b/migrations/0004_outbox_dead_letter.down.sql new file mode 100644 index 0000000..ea13b59 --- /dev/null +++ b/migrations/0004_outbox_dead_letter.down.sql @@ -0,0 +1,11 @@ +-- Revert dead-letter support. Any FAILED rows must be resolved first (re-queued +-- to PENDING or deleted), otherwise the restored two-state check would reject +-- them. +DROP INDEX IF EXISTS idx_outbox_failed; + +ALTER TABLE outbox_events DROP COLUMN IF EXISTS last_error; + +ALTER TABLE outbox_events DROP CONSTRAINT IF EXISTS ck_outbox_status; +ALTER TABLE outbox_events + ADD CONSTRAINT ck_outbox_status + CHECK (status IN ('PENDING', 'PUBLISHED')); diff --git a/migrations/0004_outbox_dead_letter.up.sql b/migrations/0004_outbox_dead_letter.up.sql new file mode 100644 index 0000000..0f31858 --- /dev/null +++ b/migrations/0004_outbox_dead_letter.up.sql @@ -0,0 +1,21 @@ +-- Dead-letter support for the transactional outbox. A row that keeps failing to +-- publish past OUTBOX_MAX_ATTEMPTS is moved to the FAILED state instead of being +-- retried forever, so a single poison row can never wedge a relay worker. FAILED +-- rows drop out of the PENDING poll and are surfaced for operator inspection. + +-- 1. Allow the new FAILED status. Drop the two-state check and re-add a +-- three-state one (PENDING is the only status that is ever polled). +ALTER TABLE outbox_events DROP CONSTRAINT IF EXISTS ck_outbox_status; +ALTER TABLE outbox_events + ADD CONSTRAINT ck_outbox_status + CHECK (status IN ('PENDING', 'PUBLISHED', 'FAILED')); + +-- 2. Record why a dead-lettered row never published, for operator triage. +ALTER TABLE outbox_events + ADD COLUMN IF NOT EXISTS last_error TEXT NOT NULL DEFAULT ''; + +-- 3. Small index over the dead-letter queue so an operator (or a replay job) can +-- list FAILED rows cheaply without scanning the published history. +CREATE INDEX IF NOT EXISTS idx_outbox_failed + ON outbox_events (created_at) + WHERE status = 'FAILED'; diff --git a/pkg/config/config.go b/pkg/config/config.go index dede8c2..af99c41 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -38,6 +38,10 @@ type Config struct { OutboxBatchSize int OutboxPollInterval time.Duration OutboxWorkers int + // OutboxMaxAttempts dead-letters a row (status FAILED) once its publish + // attempts reach this count, so a poison row stops being retried forever. + // 0 (the default) disables dead-lettering: rows retry indefinitely. + OutboxMaxAttempts int JWTSecret string JWTExpiry time.Duration @@ -94,6 +98,7 @@ func Load(serviceName string) *Config { OutboxBatchSize: getEnvInt("OUTBOX_BATCH_SIZE", 100), OutboxPollInterval: getEnvDuration("OUTBOX_POLL_INTERVAL", time.Second), OutboxWorkers: getEnvInt("OUTBOX_WORKERS", 4), + OutboxMaxAttempts: getEnvInt("OUTBOX_MAX_ATTEMPTS", 0), JWTSecret: getEnv("JWT_SECRET", "insecure-dev-secret-do-not-use-in-prod"), JWTExpiry: getEnvDuration("JWT_EXPIRY", 24*time.Hour), diff --git a/pkg/events/nats.go b/pkg/events/nats.go index eb1a728..fb7f71f 100644 --- a/pkg/events/nats.go +++ b/pkg/events/nats.go @@ -11,9 +11,7 @@ import ( "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/propagation" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -156,10 +154,7 @@ func (b *NATSBus) Publish(ctx context.Context, topic string, e Event) error { ) defer span.End() - if e.Carrier == nil { - e.Carrier = make(map[string]string) - } - otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(e.Carrier)) + e.Carrier = tracing.InjectCarrier(ctx, e.Carrier) raw, err := json.Marshal(e) if err != nil { @@ -319,10 +314,7 @@ func (b *NATSBus) process(ctx context.Context, topic string, handler Handler, ms // Continue the producer's trace across the async boundary (same mechanism // as RedisBus / ReceiveSpan, just inlined so we can wrap ACK timing). - hctx := ctx - if e.Carrier != nil { - hctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(e.Carrier)) - } + hctx := tracing.ResumeFromCarrier(ctx, e.Carrier) hctx, span := tracing.Tracer().Start(hctx, "events.consume "+topic, trace.WithSpanKind(trace.SpanKindConsumer), trace.WithAttributes( diff --git a/pkg/events/redis.go b/pkg/events/redis.go index aaf5492..626fe24 100644 --- a/pkg/events/redis.go +++ b/pkg/events/redis.go @@ -5,9 +5,7 @@ import ( "encoding/json" "github.com/redis/go-redis/v9" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/propagation" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -51,10 +49,7 @@ func (b *RedisBus) Publish(ctx context.Context, topic string, e Event) error { defer span.End() // Inject W3C trace context into the carrier (transport-agnostic). - if e.Carrier == nil { - e.Carrier = make(map[string]string) - } - otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(e.Carrier)) + e.Carrier = tracing.InjectCarrier(ctx, e.Carrier) raw, err := json.Marshal(e) if err != nil { @@ -79,9 +74,7 @@ func (b *RedisBus) Publish(ctx context.Context, topic string, e Event) error { // distributed trace. Returns the new context and span; the caller must End the // span. Transport-agnostic: relies only on the OTel propagator and the Carrier. func ReceiveSpan(ctx context.Context, e Event, name string) (context.Context, trace.Span) { - if e.Carrier != nil { - ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(e.Carrier)) - } + ctx = tracing.ResumeFromCarrier(ctx, e.Carrier) return tracing.Tracer().Start(ctx, name, trace.WithSpanKind(trace.SpanKindConsumer), trace.WithAttributes( diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index afed622..27334b6 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -37,10 +37,11 @@ type Metrics struct { // Transactional outbox relay instruments. Pending is a gauge of the current // backlog (PENDING rows); the rest track the relay's publish path. - OutboxEventsPending prometheus.Gauge - OutboxEventsPublishedTotal prometheus.Counter - OutboxPublishFailuresTotal prometheus.Counter - OutboxPublishDuration prometheus.Histogram + OutboxEventsPending prometheus.Gauge + OutboxEventsPublishedTotal prometheus.Counter + OutboxPublishFailuresTotal prometheus.Counter + OutboxEventsDeadLetteredTotal prometheus.Counter + OutboxPublishDuration prometheus.Histogram } // New creates and registers all instruments, labelled with the service name @@ -128,6 +129,11 @@ func New(service string) *Metrics { Help: "Total outbox publish attempts that failed (row stays PENDING and is retried).", ConstLabels: constLabels, }), + OutboxEventsDeadLetteredTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "gamemesh_outbox_events_dead_lettered_total", + Help: "Total outbox rows moved to FAILED after exceeding max publish attempts (poison rows; need operator attention).", + ConstLabels: constLabels, + }), OutboxPublishDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "gamemesh_outbox_publish_duration_seconds", Help: "Latency of publishing a single outbox row to the event bus.", @@ -140,7 +146,8 @@ func New(service string) *Metrics { m.RequestsTotal, m.ErrorsTotal, m.RequestDuration, m.WSConnections, m.MatchmakingQueueSize, m.MatchesCreated, m.LeaderboardUpdates, m.EventsPublishedTotal, m.EventsConsumedTotal, m.EventsFailedTotal, m.EventProcessingDuration, - m.OutboxEventsPending, m.OutboxEventsPublishedTotal, m.OutboxPublishFailuresTotal, m.OutboxPublishDuration, + m.OutboxEventsPending, m.OutboxEventsPublishedTotal, m.OutboxPublishFailuresTotal, + m.OutboxEventsDeadLetteredTotal, m.OutboxPublishDuration, ) return m } diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index d516e7b..74d5c2a 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -150,3 +150,27 @@ func RecordError(span trace.Span, err error) { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) } + +// InjectCarrier writes the active W3C trace context from ctx into carrier (a +// string map suitable for the event's Carrier field), allocating it if nil, and +// returns it. This is the producer half of async trace propagation: every +// transport (Redis, NATS) and the outbox publisher use it so the inject logic +// lives in exactly one place. See docs/outbox.md §8 and pkg/events/events.go. +func InjectCarrier(ctx context.Context, carrier map[string]string) map[string]string { + if carrier == nil { + carrier = make(map[string]string) + } + otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(carrier)) + return carrier +} + +// ResumeFromCarrier extracts the W3C trace context stored in carrier and returns +// a ctx whose parent is the originating request's span. This is the consumer +// half: the relay (after a DB round-trip) and the bus subscribers use it so a +// span survives the async boundary. A nil/empty carrier yields ctx unchanged. +func ResumeFromCarrier(ctx context.Context, carrier map[string]string) context.Context { + if len(carrier) == 0 { + return ctx + } + return otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(carrier)) +} diff --git a/tests/integration/outbox_test.go b/tests/integration/outbox_test.go index 8af267b..d9cd4b8 100644 --- a/tests/integration/outbox_test.go +++ b/tests/integration/outbox_test.go @@ -97,7 +97,8 @@ func TestStoreRunBatchMarksPublished(t *testing.T) { require.Equal(t, int64(1), pendingBefore) var publishedCount int - n, err := store.RunBatch(ctx, 10, func(_ context.Context, rows []outbox.Event) ([]uuid.UUID, []uuid.UUID, error) { + // maxAttempts 0: dead-lettering disabled, exercises the plain publish path. + n, dead, err := store.RunBatch(ctx, 10, 0, func(_ context.Context, rows []outbox.Event) ([]uuid.UUID, []uuid.UUID, error) { publishedCount = len(rows) ids := make([]uuid.UUID, len(rows)) for i, r := range rows { @@ -107,9 +108,59 @@ func TestStoreRunBatchMarksPublished(t *testing.T) { }) require.NoError(t, err) assert.Equal(t, 1, n) + assert.Equal(t, 0, dead) assert.Equal(t, 1, publishedCount) pendingAfter, err := store.CountPending(ctx) require.NoError(t, err) assert.Equal(t, int64(0), pendingAfter, "row must be marked PUBLISHED") } + +// A row that keeps failing is dead-lettered (status FAILED) once attempt_count +// reaches maxAttempts, exercising the real SQL CASE update: it drops out of the +// PENDING poll and carries a last_error for operator triage. +func TestStoreRunBatchDeadLetters(t *testing.T) { + db := startPostgres(t) + ctx := context.Background() + store := outbox.NewStore(db) + + e, err := events.New(events.TypePlayerRegistered, events.PlayerRegisteredPayload{PlayerID: "p1"}) + require.NoError(t, err) + require.NoError(t, outbox.NewPublisher(store).Publish(ctx, events.TopicPlayer, e)) + + // failPublish reports every row as failed so attempt_count climbs each batch. + failPublish := func(_ context.Context, rows []outbox.Event) ([]uuid.UUID, []uuid.UUID, error) { + ids := make([]uuid.UUID, len(rows)) + for i, r := range rows { + ids[i] = r.ID + } + return nil, ids, nil + } + + // maxAttempts 2: first failure bumps to 1 (stays PENDING), second reaches 2 + // and dead-letters. + _, dead, err := store.RunBatch(ctx, 10, 2, failPublish) + require.NoError(t, err) + assert.Equal(t, 0, dead, "not dead-lettered before max attempts") + pending, err := store.CountPending(ctx) + require.NoError(t, err) + assert.Equal(t, int64(1), pending) + + _, dead, err = store.RunBatch(ctx, 10, 2, failPublish) + require.NoError(t, err) + assert.Equal(t, 1, dead, "row dead-lettered on reaching max attempts") + pending, err = store.CountPending(ctx) + require.NoError(t, err) + assert.Equal(t, int64(0), pending, "FAILED row must leave the PENDING set") + + // The row now carries FAILED status and a last_error for triage. + var row outbox.Event + require.NoError(t, db.WithContext(ctx).Where("id = ?", e.ID).First(&row).Error) + assert.Equal(t, outbox.StatusFailed, row.Status) + assert.NotEmpty(t, row.LastError) + + // A further batch is a no-op: the poison row is never polled again. + n, _, err := store.RunBatch(ctx, 10, 2, failPublish) + require.NoError(t, err) + assert.Equal(t, 0, n) +}