From db5393b768119d9844575eb6bd394042259b0e57 Mon Sep 17 00:00:00 2001 From: AlpNuhoglu Date: Wed, 17 Jun 2026 15:48:04 +0300 Subject: [PATCH 1/3] feat(outbox): implement transactional outbox and async event relay Eliminates the dual-write problem in the player service by collapsing business logic and event writes into a single Postgres transaction. Events are asynchronously polled and dispatched to NATS via a dedicated `outbox-relay` service. - **Atomicity**: Injected `OutboxPublisher` to route player events into the `outbox_events` table within the same ACID transaction. - **Relay Architecture**: Implemented safe concurrent polling using `FOR UPDATE SKIP LOCKED` with a bounded worker pool in `outbox-relay`. - **Observability**: Captured and replayed W3C trace contexts inside JSONB `carrier` to ensure unbroken distributed tracing spans. - **Resilience**: Assured at-least-once delivery; consumers de-duplicate on stable `Event.ID`. Added integration tests for rollbacks and retries. - **Scope Limitation**: Documented that Redis-backed services (Matchmaking/Leaderboard) remain direct-publish due to lack of shared ACID transactions. --- .env.example | 13 + cmd/outbox-relay/main.go | 92 +++++ cmd/player/main.go | 9 +- config/prometheus/prometheus.yml | 3 + deployments/docker/Dockerfile.outbox-relay | 14 + docker-compose.yml | 28 ++ docs/outbox.md | 328 ++++++++++++++++++ internal/outbox/publisher.go | 78 +++++ internal/outbox/relay.go | 220 ++++++++++++ internal/outbox/relay_test.go | 246 +++++++++++++ internal/outbox/store.go | 173 +++++++++ internal/player/repository.go | 43 ++- internal/player/service.go | 28 +- internal/player/service_test.go | 40 ++- migrations/0003_create_outbox_events.down.sql | 1 + migrations/0003_create_outbox_events.up.sql | 21 ++ pkg/config/config.go | 15 + pkg/events/events.go | 23 ++ pkg/events/nats.go | 5 + pkg/metrics/metrics.go | 29 ++ scripts/db/schema.sql | 19 + tests/integration/api_test.go | 2 +- tests/integration/outbox_test.go | 109 ++++++ tests/integration/postgres_test.go | 13 +- 24 files changed, 1539 insertions(+), 13 deletions(-) create mode 100644 cmd/outbox-relay/main.go create mode 100644 deployments/docker/Dockerfile.outbox-relay create mode 100644 docs/outbox.md create mode 100644 internal/outbox/publisher.go create mode 100644 internal/outbox/relay.go create mode 100644 internal/outbox/relay_test.go create mode 100644 internal/outbox/store.go create mode 100644 migrations/0003_create_outbox_events.down.sql create mode 100644 migrations/0003_create_outbox_events.up.sql create mode 100644 tests/integration/outbox_test.go diff --git a/.env.example b/.env.example index 4b1c9f6..c07ecdf 100644 --- a/.env.example +++ b/.env.example @@ -29,6 +29,19 @@ NATS_URL=nats://nats:4222 # Bounded worker pool size for the JetStream consumers (per service). EVENT_WORKERS=8 +# --- Transactional outbox (outbox-relay service) --- +# The player service writes domain events to the outbox_events table in the same +# Postgres transaction as the business write; the outbox-relay process polls and +# publishes them to NATS, eliminating the dual-write problem. +# OUTBOX_ENABLED gates the relay loop (the player service always writes events). +OUTBOX_ENABLED=true +# Rows fetched per poll. +OUTBOX_BATCH_SIZE=100 +# Delay between polls when the backlog is drained. +OUTBOX_POLL_INTERVAL=1s +# Bounded publish-worker pool per batch. +OUTBOX_WORKERS=4 + # --- Service URLs (as seen from the API gateway) --- PLAYER_SERVICE_URL=http://player:8081 MATCHMAKING_SERVICE_URL=http://matchmaking:8082 diff --git a/cmd/outbox-relay/main.go b/cmd/outbox-relay/main.go new file mode 100644 index 0000000..85b9075 --- /dev/null +++ b/cmd/outbox-relay/main.go @@ -0,0 +1,92 @@ +// The outbox-relay service is the publish side of the transactional outbox. It +// polls outbox_events (written by the player service in the same transaction as +// the business state) and publishes committed rows to NATS JetStream. Running it +// as a dedicated process keeps the reliability layer decoupled from the business +// services: it scales, restarts and fails independently, and the player service +// never needs a NATS connection. +package main + +import ( + "context" + + "go.uber.org/zap" + "gorm.io/driver/postgres" + "gorm.io/gorm" + gormlogger "gorm.io/gorm/logger" + + "github.com/alpnuhoglu/gamemesh/internal/outbox" + "github.com/alpnuhoglu/gamemesh/pkg/config" + "github.com/alpnuhoglu/gamemesh/pkg/events" + "github.com/alpnuhoglu/gamemesh/pkg/logger" + "github.com/alpnuhoglu/gamemesh/pkg/metrics" + "github.com/alpnuhoglu/gamemesh/pkg/server" + "github.com/alpnuhoglu/gamemesh/pkg/tracing" +) + +func main() { + cfg := config.Load("outbox-relay") + log := logger.Must(cfg.ServiceName, cfg.Env) + defer func() { _ = log.Sync() }() + + shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{ + Enabled: cfg.OTelEnabled, + ServiceName: cfg.OTelServiceName, + Endpoint: cfg.OTelEndpoint, + Env: cfg.Env, + Version: cfg.ServiceVersion, + Sampler: cfg.OTelSampler, + SamplerRatio: cfg.OTelSamplerRatio, + }, log) + defer func() { _ = shutdownTracing(context.Background()) }() + + db, err := gorm.Open(postgres.Open(cfg.PostgresDSN), &gorm.Config{ + Logger: gormlogger.Default.LogMode(gormlogger.Silent), + }) + if err != nil { + log.Fatal("failed to connect to postgres", zap.Error(err)) + } + if err := tracing.InstrumentGorm(db); err != nil { + log.Fatal("failed to instrument gorm", zap.Error(err)) + } + + m := metrics.New(cfg.ServiceName) + + // The relay publishes to NATS; it never falls back to Redis Pub/Sub because + // outbox durability would be pointless behind a fire-and-forget transport. + bus, err := events.NewBus(events.Config{ + Transport: "nats", + DurableName: cfg.ServiceName, + Workers: cfg.EventWorkers, + }, nil, cfg.NATSURL, m, log) + if err != nil { + log.Fatal("failed to init event bus", zap.Error(err)) + } + defer func() { _ = bus.Close() }() + + store := outbox.NewStore(db) + relay := outbox.NewRelay(store, bus, outbox.RelayConfig{ + BatchSize: cfg.OutboxBatchSize, + PollInterval: cfg.OutboxPollInterval, + Workers: cfg.OutboxWorkers, + }, m, log) + + ctx, stop := server.ShutdownContext() + defer stop() + + if !cfg.OutboxEnabled { + log.Warn("OUTBOX_ENABLED=false; relay idle, serving metrics only") + } else { + go func() { + if err := relay.Run(ctx); err != nil { + log.Fatal("outbox relay failed", zap.Error(err)) + } + }() + } + + // Serve /metrics (and /healthz via the shared engine) so Prometheus can scrape + // the relay and Compose/K8s health checks work like every other service. + engine := server.NewEngine(cfg, log, m) + if err := server.Run(engine, cfg.HTTPPort, log); err != nil { + log.Fatal("server exited", zap.Error(err)) + } +} diff --git a/cmd/player/main.go b/cmd/player/main.go index db1ff34..d23d74b 100644 --- a/cmd/player/main.go +++ b/cmd/player/main.go @@ -12,6 +12,7 @@ import ( "github.com/redis/go-redis/v9" + "github.com/alpnuhoglu/gamemesh/internal/outbox" "github.com/alpnuhoglu/gamemesh/internal/player" "github.com/alpnuhoglu/gamemesh/pkg/auth" "github.com/alpnuhoglu/gamemesh/pkg/config" @@ -49,7 +50,7 @@ func main() { // SQL migrations under /migrations are the source of truth; AutoMigrate // is a dev convenience that keeps `docker compose up` zero-step. if cfg.AutoMigrate { - if err := db.AutoMigrate(&player.Player{}, &player.Stats{}); err != nil { + if err := db.AutoMigrate(&player.Player{}, &player.Stats{}, &outbox.OutboxEvent{}); err != nil { log.Fatal("auto-migration failed", zap.Error(err)) } } @@ -64,7 +65,11 @@ func main() { } tokens := auth.NewTokenManager(cfg.JWTSecret, cfg.JWTExpiry, cfg.JWTIssuer) - repo := player.NewRepository(db) + // The player service writes domain events to the outbox (NOT to NATS): the + // dedicated outbox-relay process publishes them. So the player service has no + // NATS dependency — the OutboxPublisher writes rows on the business tx. + outboxPub := outbox.NewPublisher(outbox.NewStore(db)) + repo := player.NewRepository(db, outboxPub) sessions := player.NewSessionStore(rdb) svc := player.NewService(repo, sessions, tokens, log) handler := player.NewHandler(svc) diff --git a/config/prometheus/prometheus.yml b/config/prometheus/prometheus.yml index 5471d3d..62c1a86 100644 --- a/config/prometheus/prometheus.yml +++ b/config/prometheus/prometheus.yml @@ -21,3 +21,6 @@ scrape_configs: - job_name: websocket static_configs: - targets: ["websocket:8084"] + - job_name: outbox-relay + static_configs: + - targets: ["outbox-relay:8085"] diff --git a/deployments/docker/Dockerfile.outbox-relay b/deployments/docker/Dockerfile.outbox-relay new file mode 100644 index 0000000..43bbb61 --- /dev/null +++ b/deployments/docker/Dockerfile.outbox-relay @@ -0,0 +1,14 @@ +# syntax=docker/dockerfile:1 +FROM golang:1.25-alpine AS build +WORKDIR /src +COPY go.mod go.sum ./ +RUN go mod download +COPY . . +RUN CGO_ENABLED=0 go build -trimpath -ldflags="-s -w" -o /out/service ./cmd/outbox-relay + +FROM alpine:3.20 +RUN adduser -D -u 10001 app +USER app +COPY --from=build /out/service /service +EXPOSE 8085 +ENTRYPOINT ["/service"] diff --git a/docker-compose.yml b/docker-compose.yml index 5f4876b..07304fd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -185,6 +185,34 @@ services: <<: *svc-healthcheck test: ["CMD", "wget", "-qO-", "http://localhost:8084/healthz"] + # Transactional outbox relay: the publish side of the outbox pattern. It polls + # outbox_events (written by the player service in the same Postgres transaction + # as the business state) and publishes committed rows to NATS JetStream. A + # dedicated process so it scales/restarts independently of the player API and + # keeps the player service free of any NATS dependency. + outbox-relay: + build: + context: . + dockerfile: deployments/docker/Dockerfile.outbox-relay + environment: + <<: *service-env + HTTP_PORT: "8085" + POSTGRES_DSN: host=postgres user=${POSTGRES_USER:-gamemesh} password=${POSTGRES_PASSWORD:-gamemesh-dev-password} dbname=${POSTGRES_DB:-gamemesh} port=5432 sslmode=disable + OUTBOX_ENABLED: ${OUTBOX_ENABLED:-true} + OUTBOX_BATCH_SIZE: ${OUTBOX_BATCH_SIZE:-100} + OUTBOX_POLL_INTERVAL: ${OUTBOX_POLL_INTERVAL:-1s} + OUTBOX_WORKERS: ${OUTBOX_WORKERS:-4} + depends_on: + postgres: + condition: service_healthy + nats: + condition: service_healthy + otel-collector: + condition: service_started + healthcheck: + <<: *svc-healthcheck + test: ["CMD", "wget", "-qO-", "http://localhost:8085/healthz"] + # Jaeger all-in-one: receives OTLP directly (v1.35+) and serves the trace UI # on :16686. In-memory storage — fine for local dev; swap for Badger/ES in prod. jaeger: diff --git a/docs/outbox.md b/docs/outbox.md new file mode 100644 index 0000000..2821e24 --- /dev/null +++ b/docs/outbox.md @@ -0,0 +1,328 @@ +# Transactional Outbox Pattern + +GameMesh writes business state to PostgreSQL and broadcasts domain events over +NATS JetStream. This document explains the **dual-write problem** that arises +when those two writes are independent, and the **transactional outbox** that +eliminates it — guaranteeing that a committed database transaction can never +lose its corresponding event. + +> **Companion docs:** [messaging.md](messaging.md) (the NATS JetStream +> transport) and [observability.md](observability.md) (tracing/metrics). This +> doc assumes the event contract and `events.Publisher`/`Subscriber` +> abstractions described there. + +--- + +## 1. The dual-write problem + +A service that both **persists state** and **publishes an event** is performing +two writes to two systems that do not share a transaction: + +``` +BEGIN + INSERT players ... +COMMIT ← Postgres durably committed + +publisher.Publish(MatchFound) ← separate write to NATS +``` + +Any of these now loses the event while the state is already durable: + +- the process **crashes** between `COMMIT` and `Publish`; +- NATS is **briefly unavailable** when `Publish` runs; +- the publish call **times out** or the network drops. + +The database says the player exists; no `PlayerRegistered` event was ever sent. +Downstream consumers (analytics, notifications, projections) silently diverge +from the source of truth. Reversing the order is no better — publishing first +then committing can emit an event for a transaction that later rolls back, which +is a *phantom* event. **There is no ordering of two independent writes that is +safe.** + +--- + +## 2. Why the outbox pattern exists + +The outbox pattern collapses the two writes into **one transaction against one +store**. Instead of publishing to NATS inline, the producer inserts the event +into an `outbox_events` table **in the same transaction** as the business write: + +``` +BEGIN + INSERT players ... + INSERT player_stats ... + INSERT outbox_events (PlayerRegistered) ← same transaction +COMMIT +``` + +Now the event and the state commit **atomically**: either both are durable or +neither is. A separate **relay** process then reads committed outbox rows and +publishes them to NATS, retrying until each succeeds. The database becomes the +single source of truth for "an event must be sent." + +``` +HTTP Register + │ + ▼ +player.Service.Register ──(one Postgres tx)── players + player_stats + outbox_events + │ COMMIT + ▼ + outbox-relay ──poll PENDING──► events.Publisher ──► NATS JetStream ──► consumers + ──mark PUBLISHED─┘ +``` + +### Where it applies in GameMesh (and where it does not) + +A real outbox requires the business write and the outbox row to commit in **one +transaction in one store**. In GameMesh only the **player service** is backed by +PostgreSQL, so it is the only place a *genuine* transactional outbox is +achievable. It now emits `PlayerRegistered` and `PlayerUpdated` through the +outbox. + +The **matchmaking** and **leaderboard** services keep their state in **Redis**, +which cannot share an ACID transaction with a Postgres outbox. Their +`MatchFound` / `LeaderboardUpdated` events are therefore **out of scope** for the +outbox and continue to publish directly to NATS. Bringing them under the outbox +would require relocating their authoritative state into Postgres — a larger +change deliberately not undertaken here. This limitation is called out honestly +rather than papered over with a non-atomic "outbox" that wouldn't actually +guarantee anything. + +--- + +## 3. Data model + +`migrations/0003_create_outbox_events.up.sql` (also folded into +`scripts/db/schema.sql` for the compose first-boot path, and registered with +GORM `AutoMigrate` via `outbox.OutboxEvent`): + +| Column | Type | Purpose | +| --------------- | ------------- | -------------------------------------------------------------- | +| `id` | `UUID` PK | **Equals `events.Event.ID`** — the stable consumer dedup key. | +| `event_type` | `TEXT` | e.g. `PlayerRegistered`. | +| `topic` | `TEXT` | e.g. `events.player` — the publish destination. | +| `payload` | `JSONB` | The domain payload (`events.Event.Payload`). | +| `carrier` | `JSONB` | W3C trace headers captured at write time (trace continuity). | +| `status` | `TEXT` | `PENDING` → `PUBLISHED`. | +| `created_at` | `TIMESTAMPTZ` | Poll ordering (oldest first) and audit. | +| `published_at` | `TIMESTAMPTZ` | `NULL` until relayed; audit trail. | +| `attempt_count` | `INTEGER` | Bumped on each failed publish; surfaces poison rows. | + +```sql +CREATE INDEX idx_outbox_pending + ON outbox_events (created_at) + WHERE status = 'PENDING'; +``` + +### Schema rationale + +- **`id = Event.ID`.** Reusing the event UUID as the primary key means the same + identity flows DB → NATS → consumer. A consumer can dedup on `Event.ID` with + zero extra columns and no coordination. +- **`carrier JSONB`.** The trace context is captured *inside the originating + request's span* at insert time and replayed by the relay, so the eventual NATS + publish links back to the HTTP request even though it happens later, in another + process. Without this, the trace would break at the async boundary. +- **Partial index on `PENDING`.** The relay's only hot query is "oldest + unpublished rows." A partial index stays small as `PUBLISHED` rows accumulate, + keeping the poll `O(batch)` regardless of total table size. +- **Two states, no `FAILED`.** See §6. + +--- + +## 4. Relay architecture + +The relay is a **dedicated process** (`cmd/outbox-relay`, its own container in +`docker-compose.yml`), not a goroutine inside the player service. + +**Why dedicated:** it scales and restarts independently of the player API; +`FOR UPDATE SKIP LOCKED` lets multiple replicas run safely; relay load never +competes with request handling; and the player service needs **no NATS +connection at all** — it only writes rows. The cost is one more container, which +is the standard production trade-off. + +Each poll cycle (`internal/outbox/store.go: RunBatch`) runs inside **one +transaction**: + +1. **Claim** up to `OUTBOX_BATCH_SIZE` of the oldest `PENDING` rows with + `SELECT ... FOR UPDATE SKIP LOCKED`. `SKIP LOCKED` means concurrent relay + replicas never grab the same rows — horizontal scaling is free. +2. **Publish** the batch across a **bounded worker pool** of `OUTBOX_WORKERS` + (`internal/outbox/relay.go: publishAll`) — never one goroutine per event. +3. **Mark** successfully published rows `PUBLISHED` (stamping `published_at`) and + **bump `attempt_count`** on the failures, which stay `PENDING`. +4. **Commit.** The rows stayed locked for the whole cycle, so no other replica + touched them. + +When a poll returns a full batch the relay loops immediately (draining a +backlog); otherwise it waits `OUTBOX_POLL_INTERVAL`. Shutdown is driven by +`server.ShutdownContext()` (SIGINT/SIGTERM): the loop stops polling and the +in-flight batch's transaction completes or rolls back cleanly. + +### Configuration + +| Env var | Default | Meaning | +| ---------------------- | ------- | ------------------------------------ | +| `OUTBOX_ENABLED` | `true` | Gate the relay loop. | +| `OUTBOX_BATCH_SIZE` | `100` | Rows claimed per poll. | +| `OUTBOX_POLL_INTERVAL` | `1s` | Delay between polls when idle. | +| `OUTBOX_WORKERS` | `4` | Bounded publish concurrency / batch. | + +The player service always **writes** to the outbox regardless of +`OUTBOX_ENABLED`; the flag only governs the relay. + +--- + +## 5. NATS interaction + +The relay reuses the existing `events.NATSBus` (`Transport: "nats"`) unchanged. +For each row it reconstructs an `events.Event` from the stored columns — +including the carrier — and calls `bus.Publish(ctx, topic, e)`. From NATS's +perspective this is an ordinary publish to the `events.player` stream; the +`PLAYER` stream is created idempotently on boot like the others (see +[messaging.md](messaging.md)). Consumers subscribe exactly as they do for any +other event. The outbox is invisible past the publish. + +--- + +## 6. Event lifecycle and failure scenarios + +``` + insert (in business tx) +PENDING ───────────────────────► (relay publishes) ──success──► PUBLISHED + ▲ │ + └───────────── failure ───────────────┘ (attempt_count++ , stays PENDING) +``` + +**Only two states.** A failed publish is simply a `PENDING` row that is retried +on the next poll, so a transient NATS outage **self-heals** with no operator +action and nothing to reconcile. There is intentionally **no `FAILED` state**: +it would add a manual recovery step for a condition that resolves itself. +`attempt_count` is enough to alert on a genuinely poisoned row (e.g. one that +keeps failing) without a state machine. + +| Failure | Outcome | +| ------------------------------------------------ | ----------------------------------------------------------------------- | +| Crash between `COMMIT` and any publish | Row is durable & `PENDING`; relay publishes it on next poll. **No loss.** | +| NATS down when relay polls | `Publish` fails; row stays `PENDING`, `attempt_count++`; retried later. | +| Crash **after** NATS publish, **before** mark | Row stays `PENDING`; relay republishes → **duplicate** (see §7). | +| Business `INSERT` fails (e.g. duplicate username)| Whole tx rolls back; **no orphan outbox row**. | +| Two relay replicas poll simultaneously | `SKIP LOCKED` hands each row to exactly one replica. | + +--- + +## 7. Idempotency + +**Publishing is at-least-once, not exactly-once.** Exactly-once would require +distributed consensus (2PC) between Postgres and NATS, which neither supports +cheaply and which the outbox deliberately avoids. The crash window in §6 (publish +succeeded, mark didn't) means the relay can republish a row — so **duplicates are +possible by design**. Losing an event is unacceptable; a duplicate is tolerable, +because **consumers are idempotent**. + +### Why this isn't a new burden + +Every consumer already runs under JetStream's at-least-once delivery (durable +consumers redeliver on `AckWait` expiry). The outbox adds another duplicate +source but no new *requirement* — idempotent consumption was always mandatory. + +### Consumer audit + +- **`MatchFound`** (matchmaking → WS): the WS bridge sends the message to the + matched players' connections. Re-sending the same notification is harmless — + the in-memory hub operations are naturally idempotent. +- **`LeaderboardUpdated`** (leaderboard → WS): broadcast to all clients. A + repeated broadcast carries the same score/rank snapshot; harmless. +- **`PlayerRegistered` / `PlayerUpdated`** (new, via the outbox): no consumer + exists yet — the relay makes them durably available for future projections. + **Prescribed dedup strategy:** key on `Event.ID` (== outbox `id`). A consumer + that records processed IDs (or upserts by a natural key) collapses duplicates. + +We deliberately **do not** build a dedup table or inbox now — that would be +over-engineering for events with no consumer. The stable `Event.ID` is the seam +that makes dedup trivial when a consumer arrives. + +--- + +## 8. Distributed tracing + +The trace must stay unbroken across the async hand-off: HTTP request → outbox +insert → (later, another process) relay publish → NATS → consumer. + +Spans created (all on the shared `tracing.Tracer()`): + +| Span | Where | Notes | +| ---------------------- | ------------------------------ | -------------------------------------- | +| `outbox.insert `| `OutboxPublisher.Publish` | Inside the business transaction. | +| `outbox.poll` | `Relay.processBatch` | One per poll cycle. | +| `outbox.publish `| `Relay.publishOne` | Continues the originating trace. | +| `outbox.mark_published`| `Store.RunBatch` | The status update. | + +**How continuity is preserved:** `OutboxPublisher.Publish` injects the current +trace context into `Event.Carrier` and stores it in the `carrier` column — at +insert time, so it captures the *request's* span. The relay reads `carrier` back, +`Extract`s it into the context before starting `outbox.publish`, and the existing +`NATSBus.Publish` producer span (and the downstream consumer span) therefore +share the **same trace ID** as the original HTTP request. This is verified by +`TestRelayPropagatesTrace` in `internal/outbox/relay_test.go`. + +--- + +## 9. Metrics + +Prometheus instruments (Prometheus scrapes the relay at `outbox-relay:8085`), +following the existing `gamemesh_*` convention with a `service` const label: + +| Metric | Type | Meaning | +| ---------------------------------------- | --------- | ---------------------------------------- | +| `gamemesh_outbox_events_pending` | Gauge | Current `PENDING` backlog (per poll). | +| `gamemesh_outbox_events_published_total` | Counter | Rows successfully relayed. | +| `gamemesh_outbox_publish_failures_total` | Counter | Publish attempts that failed (retried). | +| `gamemesh_outbox_publish_duration_seconds`| Histogram| Per-row publish latency. | + +**Operational signals:** a steadily rising `pending` gauge means the relay is +falling behind or NATS is unreachable; a rising `publish_failures_total` with a +flat `published_total` means NATS is down (rows are safe, just delayed). + +--- + +## 10. Operational considerations + +- **Retention.** `PUBLISHED` rows are kept for audit/debugging. Add a periodic + `DELETE FROM outbox_events WHERE status='PUBLISHED' AND published_at < now() - interval '7 days'` + job in production (not implemented here) to bound table growth. The partial + index keeps the poll fast regardless, so this is housekeeping, not correctness. +- **Scaling the relay.** Raise replicas freely — `FOR UPDATE SKIP LOCKED` + partitions work across them. Tune `OUTBOX_WORKERS`/`OUTBOX_BATCH_SIZE` for + throughput. +- **Ordering.** The relay processes oldest-first but does not guarantee strict + global ordering under concurrency. Consumers must not assume cross-event order. +- **Kubernetes.** The compose deployment runs the relay; the k8s manifests under + `deployments/k8s/` are not updated in this change and would need an analogous + Deployment to run the relay in-cluster. + +--- + +## 11. Verification + +```bash +go build ./... # compiles +go vet ./... # clean +go test ./... # unit tests (incl. relay scenarios) pass +``` + +Unit tests (`internal/outbox/relay_test.go`, no DB required): relay publishes a +pending event, retries on publish failure, is duplicate-safe across the crash +window, and propagates the trace. Service-level +(`internal/player/service_test.go`): `Register` emits a `PlayerRegistered` event +through the outbox path. + +Integration tests (`tests/integration/outbox_test.go`, Testcontainers Postgres, +run with `-tags integration`): the outbox row is written **with** the business +rows; a failed business transaction leaves **no orphan** outbox row (atomicity); +and `Store.RunBatch` claims, publishes and marks rows `PUBLISHED`. + +End-to-end (manual): `docker compose up`, `POST /auth/register`, then confirm in +Jaeger that one trace spans `player.register` → `outbox.insert` → (relay) +`outbox.publish` → NATS, and that `gamemesh_outbox_events_published_total` +increments in Prometheus. diff --git a/internal/outbox/publisher.go b/internal/outbox/publisher.go new file mode 100644 index 0000000..1a0df36 --- /dev/null +++ b/internal/outbox/publisher.go @@ -0,0 +1,78 @@ +package outbox + +import ( + "context" + + "github.com/google/uuid" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" + "gorm.io/gorm" + + "github.com/alpnuhoglu/gamemesh/pkg/events" + "github.com/alpnuhoglu/gamemesh/pkg/tracing" +) + +// Publisher implements events.Publisher by writing the event to the outbox_events +// table instead of sending it to NATS. Because it inserts on a transaction handle +// supplied by the caller, the row commits atomically with the business write. +// +// Service code keeps depending only on events.Publisher, so swapping a direct +// NATS publisher for this one requires no change to business logic — the +// reliability guarantee is purely an infrastructure concern. +type Publisher struct { + store *Store + // tx is the transaction handle the row is written on. A request-scoped + // Publisher is created via WithTx inside db.Transaction so the insert shares + // the business transaction. The zero-tx Publisher (store's db) is only used + // for tests/standalone inserts. + tx *gorm.DB +} + +// NewPublisher returns a Publisher bound to a store. Callers wrap a transaction +// with WithTx before publishing so the insert joins the business transaction. +func NewPublisher(store *Store) *Publisher { + return &Publisher{store: store, tx: store.DB()} +} + +// WithTx returns a Publisher that writes on the given transaction handle. Used +// inside db.Transaction(func(tx *gorm.DB) error {...}) so the outbox insert and +// the business writes commit together (or roll back together). +func (p *Publisher) WithTx(tx *gorm.DB) *Publisher { + return &Publisher{store: p.store, tx: tx} +} + +// Publish writes the event to the outbox. It captures the current trace context +// into the event's Carrier (mirroring NATSBus.Publish) so the relay can resume +// the originating request's trace when it later publishes to NATS. This does NOT +// talk to NATS — the relay does that asynchronously after the transaction commits. +func (p *Publisher) Publish(ctx context.Context, topic string, e events.Event) error { + ctx, span := tracing.Tracer().Start(ctx, "outbox.insert "+topic, + trace.WithAttributes( + attribute.String("messaging.event.type", e.Type), + attribute.String("messaging.destination.name", topic), + ), + ) + defer span.End() + + if e.Carrier == nil { + e.Carrier = make(map[string]string) + } + otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(e.Carrier)) + + id, err := uuid.Parse(e.ID) + if err != nil { + // Event.ID is always a generated UUID (events.New); fall back to a fresh + // one rather than failing the business transaction on a malformed ID. + id = uuid.New() + } + + err = p.store.Insert(ctx, p.tx, id, e.Type, topic, e.Payload, e.Carrier) + tracing.RecordError(span, err) + return err +} + +// Close satisfies events.Publisher. The DB lifecycle is owned by main, so this +// is a no-op. +func (p *Publisher) Close() error { return nil } diff --git a/internal/outbox/relay.go b/internal/outbox/relay.go new file mode 100644 index 0000000..84ff36c --- /dev/null +++ b/internal/outbox/relay.go @@ -0,0 +1,220 @@ +package outbox + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/google/uuid" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.uber.org/zap" + + "github.com/alpnuhoglu/gamemesh/pkg/events" + "github.com/alpnuhoglu/gamemesh/pkg/metrics" + "github.com/alpnuhoglu/gamemesh/pkg/tracing" +) + +// Batcher is the storage seam the relay depends on. *Store implements it against +// Postgres; tests substitute an in-memory fake so the relay's publish/retry +// logic is exercised without a database. +type Batcher interface { + RunBatch(ctx context.Context, limit int, publish PublishFunc) (int, error) + CountPending(ctx context.Context) (int64, error) +} + +// RelayConfig tunes the relay loop. +type RelayConfig struct { + BatchSize int // rows fetched per poll + PollInterval time.Duration // delay between polls when idle + Workers int // bounded publish concurrency per batch +} + +func (c RelayConfig) withDefaults() RelayConfig { + if c.BatchSize <= 0 { + c.BatchSize = 100 + } + if c.PollInterval <= 0 { + c.PollInterval = time.Second + } + if c.Workers <= 0 { + c.Workers = 4 + } + return c +} + +// Relay polls the outbox and publishes committed rows to the event bus. It is +// the only component that talks to NATS for outbox events; the producing service +// only ever writes to the table. Running it as a separate process lets it scale +// and restart independently of the business services. +type Relay struct { + store Batcher + bus events.Publisher + cfg RelayConfig + m *metrics.Metrics + log *zap.Logger +} + +// NewRelay wires a relay. m may be nil (metrics skipped). +func NewRelay(store Batcher, bus events.Publisher, cfg RelayConfig, m *metrics.Metrics, log *zap.Logger) *Relay { + return &Relay{store: store, bus: bus, cfg: cfg.withDefaults(), m: m, log: log} +} + +// Run polls until ctx is cancelled, draining the in-flight batch before +// returning. It loops back-to-back while a poll returns a full batch (catching +// up a backlog) and otherwise sleeps for PollInterval. +func (r *Relay) Run(ctx context.Context) error { + r.log.Info("outbox relay started", + zap.Int("batch_size", r.cfg.BatchSize), + zap.Duration("poll_interval", r.cfg.PollInterval), + zap.Int("workers", r.cfg.Workers)) + + timer := time.NewTimer(0) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + r.log.Info("outbox relay stopping") + return nil + case <-timer.C: + } + + n, err := r.processBatch(ctx) + if err != nil && ctx.Err() == nil { + r.log.Warn("outbox poll failed", zap.Error(err)) + } + r.updatePendingGauge(ctx) + + // Busy-loop while draining a backlog; otherwise wait a tick. + if n >= r.cfg.BatchSize { + timer.Reset(0) + } else { + timer.Reset(r.cfg.PollInterval) + } + } +} + +// processBatch runs one poll → publish → mark cycle inside a single DB +// transaction (owned by the store). The PENDING rows are locked with +// FOR UPDATE SKIP LOCKED for the duration so concurrent relay replicas never +// touch the same rows. Successfully published rows are marked PUBLISHED; failed +// ones get attempt_count bumped and stay PENDING for the next cycle (retry). +// Returns the number of rows polled. +func (r *Relay) processBatch(ctx context.Context) (int, error) { + pollCtx, span := tracing.Tracer().Start(ctx, "outbox.poll") + defer span.End() + + polled, err := r.store.RunBatch(pollCtx, r.cfg.BatchSize, func(ctx context.Context, rows []OutboxEvent) ([]uuid.UUID, []uuid.UUID, error) { + published, failed := r.publishAll(ctx, rows) + return published, failed, nil + }) + tracing.RecordError(span, err) + return polled, err +} + +// publishAll fans the batch out across a bounded worker pool (NOT one goroutine +// per event) and returns the IDs that published successfully and those that +// failed. +func (r *Relay) publishAll(ctx context.Context, rows []OutboxEvent) (published, failed []uuid.UUID) { + type result struct { + id uuid.UUID + err error + } + + jobs := make(chan OutboxEvent) + results := make(chan result) + + var wg sync.WaitGroup + workers := r.cfg.Workers + if workers > len(rows) { + workers = len(rows) + } + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for row := range jobs { + results <- result{id: row.ID, err: r.publishOne(ctx, row)} + } + }() + } + + go func() { + for _, row := range rows { + jobs <- row + } + close(jobs) + }() + + go func() { + wg.Wait() + close(results) + }() + + for res := range results { + if res.err != nil { + failed = append(failed, res.id) + } else { + published = append(published, res.id) + } + } + return published, failed +} + +// publishOne reconstructs the events.Event (including the stored trace carrier) +// and publishes it. The carrier is extracted so the relay's publish span — and +// the downstream NATS producer span — link back to the originating request's +// trace even though publication happens later in this separate process. +func (r *Relay) publishOne(ctx context.Context, row OutboxEvent) error { + carrier := map[string]string{} + if len(row.Carrier) > 0 { + _ = json.Unmarshal(row.Carrier, &carrier) + } + ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(carrier)) + + ctx, span := tracing.Tracer().Start(ctx, "outbox.publish "+row.Topic) + defer span.End() + + e := events.Event{ + ID: row.ID.String(), + Type: row.EventType, + Timestamp: row.CreatedAt, + Payload: row.Payload, + Carrier: carrier, + } + + start := time.Now() + err := r.bus.Publish(ctx, row.Topic, e) + if r.m != nil { + r.m.OutboxPublishDuration.Observe(time.Since(start).Seconds()) + } + if err != nil { + tracing.RecordError(span, err) + if r.m != nil { + r.m.OutboxPublishFailuresTotal.Inc() + } + r.log.Warn("outbox publish failed; will retry", + zap.String("event_id", row.ID.String()), + zap.String("event_type", row.EventType), + zap.Int("attempt", row.AttemptCount+1), + zap.Error(err)) + return err + } + if r.m != nil { + r.m.OutboxEventsPublishedTotal.Inc() + } + return nil +} + +func (r *Relay) updatePendingGauge(ctx context.Context) { + if r.m == nil { + return + } + n, err := r.store.CountPending(ctx) + if err != nil { + return + } + r.m.OutboxEventsPending.Set(float64(n)) +} diff --git a/internal/outbox/relay_test.go b/internal/outbox/relay_test.go new file mode 100644 index 0000000..eaaf5bf --- /dev/null +++ b/internal/outbox/relay_test.go @@ -0,0 +1,246 @@ +package outbox + +import ( + "context" + "encoding/json" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + + "github.com/alpnuhoglu/gamemesh/pkg/events" + "github.com/alpnuhoglu/gamemesh/pkg/metrics" +) + +// fakeBatcher is an in-memory Batcher: it models the outbox table and the +// poll/publish/mark transaction without a database, so the relay's publish and +// retry logic can be tested in isolation. +type fakeBatcher struct { + mu sync.Mutex + rows []OutboxEvent // current PENDING rows, oldest first + attempt map[uuid.UUID]int // attempt_count per id +} + +func newFakeBatcher(rows ...OutboxEvent) *fakeBatcher { + return &fakeBatcher{rows: rows, attempt: map[uuid.UUID]int{}} +} + +func (f *fakeBatcher) RunBatch(ctx context.Context, limit int, publish PublishFunc) (int, error) { + f.mu.Lock() + batch := f.rows + if len(batch) > limit { + batch = batch[:limit] + } + // Copy so the publish callback can't mutate our slice mid-flight. + rows := make([]OutboxEvent, len(batch)) + copy(rows, batch) + f.mu.Unlock() + + if len(rows) == 0 { + return 0, nil + } + + published, failed, err := publish(ctx, rows) + if err != nil { + return len(rows), err + } + + f.mu.Lock() + defer f.mu.Unlock() + done := map[uuid.UUID]bool{} + for _, id := range published { + done[id] = true + } + for _, id := range failed { + f.attempt[id]++ + } + // Drop published rows from the PENDING set; failed rows stay for retry. + var remaining []OutboxEvent + for _, r := range f.rows { + if !done[r.ID] { + remaining = append(remaining, r) + } + } + f.rows = remaining + return len(rows), nil +} + +func (f *fakeBatcher) CountPending(context.Context) (int64, error) { + f.mu.Lock() + defer f.mu.Unlock() + return int64(len(f.rows)), nil +} + +// capturingPublisher records every event handed to it and can be told to fail. +type capturingPublisher struct { + mu sync.Mutex + events []events.Event + failNext int // number of upcoming Publish calls that should fail +} + +func (p *capturingPublisher) Publish(_ context.Context, _ string, e events.Event) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.failNext > 0 { + p.failNext-- + return assertErr + } + p.events = append(p.events, e) + return nil +} +func (p *capturingPublisher) Close() error { return nil } +func (p *capturingPublisher) count() int { + p.mu.Lock() + defer p.mu.Unlock() + return len(p.events) +} + +var assertErr = &publishError{} + +type publishError struct{} + +func (*publishError) Error() string { return "simulated publish failure" } + +func pendingRow(t *testing.T, eventType string, payload any) OutboxEvent { + t.Helper() + raw, err := json.Marshal(payload) + require.NoError(t, err) + return OutboxEvent{ + ID: uuid.New(), + EventType: eventType, + Topic: events.TopicPlayer, + Payload: raw, + Carrier: []byte("{}"), + Status: StatusPending, + CreatedAt: time.Now().UTC(), + } +} + +// Scenario 2: the relay publishes a pending event exactly once and the row is +// removed from the PENDING set (marked PUBLISHED). +func TestRelayPublishesPendingEvent(t *testing.T) { + row := pendingRow(t, events.TypePlayerRegistered, events.PlayerRegisteredPayload{PlayerID: "p1"}) + fb := newFakeBatcher(row) + pub := &capturingPublisher{} + m := metrics.New("test-relay-publish") + relay := NewRelay(fb, pub, RelayConfig{BatchSize: 10}, m, zap.NewNop()) + + n, err := relay.processBatch(context.Background()) + require.NoError(t, err) + assert.Equal(t, 1, n) + assert.Equal(t, 1, pub.count(), "event must be published once") + + pending, _ := fb.CountPending(context.Background()) + assert.Equal(t, int64(0), pending, "published row must leave the PENDING set") +} + +// Scenario 3: a publish failure leaves the row PENDING with attempt_count bumped; +// a later tick succeeds and publishes it. +func TestRelayRetriesPublishFailure(t *testing.T) { + row := pendingRow(t, events.TypePlayerRegistered, events.PlayerRegisteredPayload{PlayerID: "p1"}) + fb := newFakeBatcher(row) + pub := &capturingPublisher{failNext: 1} + m := metrics.New("test-relay-retry") + relay := NewRelay(fb, pub, RelayConfig{BatchSize: 10}, m, zap.NewNop()) + + // First tick: publish fails, row stays PENDING. + _, err := relay.processBatch(context.Background()) + require.NoError(t, err) + assert.Equal(t, 0, pub.count(), "nothing published on the failing tick") + pending, _ := fb.CountPending(context.Background()) + assert.Equal(t, int64(1), pending, "failed row must remain PENDING for retry") + assert.Equal(t, 1, fb.attempt[row.ID], "attempt_count must be incremented") + + // Second tick: publish succeeds. + _, err = relay.processBatch(context.Background()) + require.NoError(t, err) + assert.Equal(t, 1, pub.count(), "event published on the retry tick") + pending, _ = fb.CountPending(context.Background()) + assert.Equal(t, int64(0), pending) +} + +// Scenario 4: duplicate-safe replay. If the process "crashes" after publishing +// but before marking (modeled by re-running the same row), the event is sent +// again — duplicates are possible, and the Event.ID is stable so a consumer can +// dedup on it. +func TestRelayDuplicateSafeReplay(t *testing.T) { + row := pendingRow(t, events.TypePlayerRegistered, events.PlayerRegisteredPayload{PlayerID: "p1"}) + pub := &capturingPublisher{} + m := metrics.New("test-relay-dup") + + // Two independent batchers both holding the same row simulate the row never + // being marked PUBLISHED between two publish cycles (the crash window). + relay := NewRelay(newFakeBatcher(row), pub, RelayConfig{BatchSize: 10}, m, zap.NewNop()) + _, err := relay.processBatch(context.Background()) + require.NoError(t, err) + relay2 := NewRelay(newFakeBatcher(row), pub, RelayConfig{BatchSize: 10}, m, zap.NewNop()) + _, err = relay2.processBatch(context.Background()) + require.NoError(t, err) + + require.Equal(t, 2, pub.count(), "the same event is published twice across the crash window") + assert.Equal(t, pub.events[0].ID, pub.events[1].ID, "stable Event.ID is the consumer dedup key") +} + +// Scenario 5: trace propagation through the relay. A carrier captured at write +// time is replayed so the relay's publish span continues the originating trace. +func TestRelayPropagatesTrace(t *testing.T) { + rec := setupRelayTracing(t) + + // Producer captures its trace context into a carrier (as OutboxPublisher does). + ctx, producer := otel.Tracer("test").Start(context.Background(), "register") + wantTrace := producer.SpanContext().TraceID() + carrier := map[string]string{} + otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(carrier)) + producer.End() + carrierJSON, err := json.Marshal(carrier) + require.NoError(t, err) + + row := pendingRow(t, events.TypePlayerRegistered, events.PlayerRegisteredPayload{PlayerID: "p1"}) + row.Carrier = carrierJSON + + pub := &capturingPublisher{} + relay := NewRelay(newFakeBatcher(row), pub, RelayConfig{BatchSize: 10}, nil, zap.NewNop()) + _, err = relay.processBatch(context.Background()) + require.NoError(t, err) + + // The republished event carries the original traceparent. + require.Equal(t, 1, pub.count()) + got := pub.events[0] + require.Contains(t, got.Carrier, "traceparent") + rctx := otel.GetTextMapPropagator().Extract(context.Background(), propagation.MapCarrier(got.Carrier)) + assert.Equal(t, wantTrace, trace.SpanContextFromContext(rctx).TraceID()) + + // And the relay's outbox.publish span is part of the same trace. + var found bool + for _, s := range rec.Ended() { + if s.Name() == "outbox.publish "+events.TopicPlayer { + found = true + assert.Equal(t, wantTrace, s.SpanContext().TraceID(), "publish span must continue the originating trace") + } + } + assert.True(t, found, "outbox.publish span must be recorded") +} + +func setupRelayTracing(t *testing.T) *tracetest.SpanRecorder { + t.Helper() + rec := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec)) + prevTP := otel.GetTracerProvider() + prevProp := otel.GetTextMapPropagator() + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.TraceContext{}) + t.Cleanup(func() { + otel.SetTracerProvider(prevTP) + otel.SetTextMapPropagator(prevProp) + }) + return rec +} diff --git a/internal/outbox/store.go b/internal/outbox/store.go new file mode 100644 index 0000000..3b73ea3 --- /dev/null +++ b/internal/outbox/store.go @@ -0,0 +1,173 @@ +// Package outbox implements the transactional outbox pattern: domain events are +// persisted to the outbox_events table in the SAME database transaction as the +// business write, then a separate relay publishes committed rows to NATS. This +// makes the database the single source of truth for "an event must be sent" and +// eliminates the dual-write problem (state committed but event lost). +package outbox + +import ( + "context" + "encoding/json" + "time" + + "github.com/google/uuid" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "gorm.io/gorm" + "gorm.io/gorm/clause" + + "github.com/alpnuhoglu/gamemesh/pkg/tracing" +) + +// Status values for an outbox row. The lifecycle is intentionally just two +// states: a failed publish is simply a PENDING row that gets retried on the +// next poll, so a transient NATS outage self-heals with no operator action and +// no FAILED state to reconcile. +const ( + StatusPending = "PENDING" + StatusPublished = "PUBLISHED" +) + +// OutboxEvent is the GORM model backing the outbox_events table. The struct +// fields mirror the migration in migrations/0003_create_outbox_events.up.sql. +type OutboxEvent struct { + // ID equals events.Event.ID so the same identity flows DB -> NATS -> + // consumer, giving consumers a natural dedup key with no extra columns. + ID uuid.UUID `gorm:"type:uuid;primaryKey"` + EventType string `gorm:"column:event_type;not null"` + Topic string `gorm:"not null"` + Payload []byte `gorm:"type:jsonb;not null"` + // Carrier holds the W3C trace headers captured at write time so the relay + // can continue the originating request's trace when it publishes later. + Carrier []byte `gorm:"type:jsonb;not null;default:'{}'"` + Status string `gorm:"not null;default:PENDING"` + CreatedAt time.Time `gorm:"not null;default:now()"` + PublishedAt *time.Time `gorm:"column:published_at"` + AttemptCount int `gorm:"column:attempt_count;not null;default:0"` +} + +// TableName pins the table name to match the SQL migrations. +func (OutboxEvent) TableName() string { return "outbox_events" } + +// Store provides access to outbox_events. The same Store type serves both the +// write side (Insert, on a caller-supplied tx) and the relay side (RunBatch, +// which claims/publishes/marks rows in one transaction, plus CountPending). +type Store struct { + db *gorm.DB +} + +// NewStore returns a Store bound to the given GORM handle. +func NewStore(db *gorm.DB) *Store { + return &Store{db: db} +} + +// Insert writes a single outbox row using the provided transaction handle. It +// MUST be called inside the business transaction (db.Transaction(...)) so the +// row commits atomically with the business state — that atomicity is the whole +// point of the pattern. The carrier map is JSON-encoded for the jsonb column. +func (s *Store) Insert(ctx context.Context, tx *gorm.DB, id uuid.UUID, eventType, topic string, payload []byte, carrier map[string]string) error { + carrierJSON, err := json.Marshal(carrier) + if err != nil { + return err + } + if len(carrierJSON) == 0 { + carrierJSON = []byte("{}") + } + row := OutboxEvent{ + ID: id, + EventType: eventType, + Topic: topic, + Payload: payload, + Carrier: carrierJSON, + Status: StatusPending, + } + return tx.WithContext(ctx).Create(&row).Error +} + +// PublishFunc publishes a batch of polled rows and reports which succeeded and +// which failed. The relay supplies this; the store calls it inside the batch +// transaction so the rows stay locked (FOR UPDATE SKIP LOCKED) until their +// status is updated. +type PublishFunc func(ctx context.Context, rows []OutboxEvent) (published, failed []uuid.UUID, err error) + +// RunBatch is the relay's poll → publish → mark cycle, all inside ONE +// transaction. It locks up to limit of the oldest PENDING rows with +// FOR UPDATE SKIP LOCKED (so concurrent relay replicas never grab the same +// rows), hands them to publish, then marks the successful ones PUBLISHED and +// bumps attempt_count on the failures (which stay PENDING and retry next cycle). +// Returns the number of rows polled. Keeping the transaction logic here is the +// only place that knows about gorm, which keeps the relay unit-testable behind +// the Batcher interface. +func (s *Store) RunBatch(ctx context.Context, limit int, publish PublishFunc) (int, error) { + var polled int + err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var rows []OutboxEvent + if err := tx. + Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}). + Where("status = ?", StatusPending). + Order("created_at ASC"). + Limit(limit). + Find(&rows).Error; err != nil { + return err + } + polled = len(rows) + if polled == 0 { + return nil + } + + published, failed, err := publish(ctx, rows) + if err != nil { + return err + } + + markCtx, span := tracing.Tracer().Start(ctx, "outbox.mark_published", + trace.WithAttributes(attribute.Int("outbox.published", len(published)), attribute.Int("outbox.failed", len(failed)))) + defer span.End() + if err := markPublished(markCtx, tx, published); err != nil { + tracing.RecordError(span, err) + return err + } + if err := incrementAttempt(markCtx, tx, failed); err != nil { + tracing.RecordError(span, err) + return err + } + return nil + }) + return polled, err +} + +func markPublished(ctx context.Context, tx *gorm.DB, ids []uuid.UUID) error { + if len(ids) == 0 { + return nil + } + now := time.Now().UTC() + return tx.WithContext(ctx). + Model(&OutboxEvent{}). + Where("id IN ?", ids). + Updates(map[string]any{"status": StatusPublished, "published_at": now}).Error +} + +func incrementAttempt(ctx context.Context, tx *gorm.DB, ids []uuid.UUID) error { + if len(ids) == 0 { + return nil + } + return tx.WithContext(ctx). + Model(&OutboxEvent{}). + Where("id IN ?", ids). + UpdateColumn("attempt_count", gorm.Expr("attempt_count + 1")).Error +} + +// CountPending returns the number of PENDING rows, used to publish the backlog +// gauge each poll tick. +func (s *Store) CountPending(ctx context.Context) (int64, error) { + var n int64 + err := s.db.WithContext(ctx). + Model(&OutboxEvent{}). + Where("status = ?", StatusPending). + Count(&n).Error + return n, err +} + +// DB exposes the underlying handle so the relay can open its own transaction +// around a poll/publish/mark cycle. +func (s *Store) DB() *gorm.DB { return s.db } diff --git a/internal/player/repository.go b/internal/player/repository.go index a9277ff..aec4166 100644 --- a/internal/player/repository.go +++ b/internal/player/repository.go @@ -7,6 +7,9 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5/pgconn" "gorm.io/gorm" + + "github.com/alpnuhoglu/gamemesh/internal/outbox" + "github.com/alpnuhoglu/gamemesh/pkg/events" ) // Domain errors. The service layer maps these to HTTP statuses; the @@ -18,27 +21,59 @@ var ( // Repository abstracts player persistence (repository pattern). The GORM // implementation below is the only one in production; tests substitute mocks. +// +// The *WithOutbox methods are the transactional-outbox write path: they perform +// the business write AND insert the domain event into outbox_events inside a +// single Postgres transaction, so the event can never be lost once the write +// commits (and is never visible before it commits). The plain Create/Update are +// retained for callers that do not emit events. type Repository interface { Create(ctx context.Context, p *Player) error + CreateWithOutbox(ctx context.Context, p *Player, e events.Event, topic string) error GetByID(ctx context.Context, id uuid.UUID) (*Player, error) GetByUsername(ctx context.Context, username string) (*Player, error) GetByEmail(ctx context.Context, email string) (*Player, error) Update(ctx context.Context, p *Player) error + UpdateWithOutbox(ctx context.Context, p *Player, e events.Event, topic string) error } type gormRepository struct { - db *gorm.DB + db *gorm.DB + out *outbox.Publisher } -// NewRepository returns the PostgreSQL-backed repository. -func NewRepository(db *gorm.DB) Repository { - return &gormRepository{db: db} +// NewRepository returns the PostgreSQL-backed repository. out may be nil for +// callers (e.g. tests) that never use the *WithOutbox methods. +func NewRepository(db *gorm.DB, out *outbox.Publisher) Repository { + return &gormRepository{db: db, out: out} } func (r *gormRepository) Create(ctx context.Context, p *Player) error { return translate(r.db.WithContext(ctx).Create(p).Error) } +// CreateWithOutbox inserts the player (and cascaded stats) and the outbox row in +// one transaction. The outbox.Publisher is rebound to the tx handle so its +// insert joins this transaction — if either write fails, both roll back. +func (r *gormRepository) CreateWithOutbox(ctx context.Context, p *Player, e events.Event, topic string) error { + return translate(r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + if err := tx.Create(p).Error; err != nil { + return err + } + return r.out.WithTx(tx).Publish(ctx, topic, e) + })) +} + +// UpdateWithOutbox saves the player and inserts the outbox row in one transaction. +func (r *gormRepository) UpdateWithOutbox(ctx context.Context, p *Player, e events.Event, topic string) error { + return translate(r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + if err := tx.Save(p).Error; err != nil { + return err + } + return r.out.WithTx(tx).Publish(ctx, topic, e) + })) +} + func (r *gormRepository) GetByID(ctx context.Context, id uuid.UUID) (*Player, error) { var p Player err := r.db.WithContext(ctx).Preload("Stats").First(&p, "id = ?", id).Error diff --git a/internal/player/service.go b/internal/player/service.go index c15c618..8bbd646 100644 --- a/internal/player/service.go +++ b/internal/player/service.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" "github.com/alpnuhoglu/gamemesh/pkg/auth" + "github.com/alpnuhoglu/gamemesh/pkg/events" ) // ErrInvalidCredentials is returned on any login failure. It is deliberately @@ -50,7 +51,22 @@ func (s *Service) Register(ctx context.Context, in RegisterInput) (*Player, erro PasswordHash: hash, Stats: &Stats{Rank: 1000}, } - if err := s.repo.Create(ctx, p); err != nil { + // Assign the ID up front (the model's BeforeCreate also would) so the event + // payload can reference it before the INSERT round-trip. + if p.ID == uuid.Nil { + p.ID = uuid.New() + } + e, err := events.New(events.TypePlayerRegistered, events.PlayerRegisteredPayload{ + PlayerID: p.ID.String(), + Username: p.Username, + Email: p.Email, + }) + if err != nil { + return nil, err + } + // Business row + outbox event commit atomically: the PlayerRegistered event + // can never be lost once registration succeeds (no dual-write). + if err := s.repo.CreateWithOutbox(ctx, p, e, events.TopicPlayer); err != nil { return nil, err } s.log.Info("player registered", zap.String("player_id", p.ID.String()), zap.String("username", p.Username)) @@ -114,7 +130,15 @@ func (s *Service) UpdateProfile(ctx context.Context, id uuid.UUID, in UpdateInpu if in.Email != nil { p.Email = *in.Email } - if err := s.repo.Update(ctx, p); err != nil { + e, err := events.New(events.TypePlayerUpdated, events.PlayerUpdatedPayload{ + PlayerID: p.ID.String(), + Username: p.Username, + Email: p.Email, + }) + if err != nil { + return nil, err + } + if err := s.repo.UpdateWithOutbox(ctx, p, e, events.TopicPlayer); err != nil { return nil, err } return p, nil diff --git a/internal/player/service_test.go b/internal/player/service_test.go index 6fd3a28..399f73d 100644 --- a/internal/player/service_test.go +++ b/internal/player/service_test.go @@ -2,6 +2,7 @@ package player import ( "context" + "encoding/json" "testing" "time" @@ -11,11 +12,15 @@ import ( "go.uber.org/zap" "github.com/alpnuhoglu/gamemesh/pkg/auth" + "github.com/alpnuhoglu/gamemesh/pkg/events" ) -// mockRepository is an in-memory Repository for unit tests. +// mockRepository is an in-memory Repository for unit tests. The *WithOutbox +// methods capture the emitted event so tests can assert the outbox payload +// without a database (the real atomicity is covered by the integration tests). type mockRepository struct { players map[uuid.UUID]*Player + events []events.Event } func newMockRepository() *mockRepository { @@ -68,6 +73,22 @@ func (r *mockRepository) Update(_ context.Context, p *Player) error { return nil } +func (r *mockRepository) CreateWithOutbox(ctx context.Context, p *Player, e events.Event, _ string) error { + if err := r.Create(ctx, p); err != nil { + return err + } + r.events = append(r.events, e) + return nil +} + +func (r *mockRepository) UpdateWithOutbox(ctx context.Context, p *Player, e events.Event, _ string) error { + if err := r.Update(ctx, p); err != nil { + return err + } + r.events = append(r.events, e) + return nil +} + // fakeSessions records session operations. type fakeSessions struct { saved map[string]string @@ -114,6 +135,23 @@ func TestRegisterHashesPassword(t *testing.T) { assert.Equal(t, 1000, p.Stats.Rank, "new players start at default rank") } +func TestRegisterEmitsOutboxEvent(t *testing.T) { + svc, repo, _ := newTestService() + + p, err := svc.Register(context.Background(), RegisterInput{ + Username: "alice", Email: "alice@example.com", Password: "password123", + }) + require.NoError(t, err) + + require.Len(t, repo.events, 1, "register must emit exactly one event through the outbox") + e := repo.events[0] + assert.Equal(t, events.TypePlayerRegistered, e.Type) + var payload events.PlayerRegisteredPayload + require.NoError(t, json.Unmarshal(e.Payload, &payload)) + assert.Equal(t, p.ID.String(), payload.PlayerID) + assert.Equal(t, "alice", payload.Username) +} + func TestRegisterDuplicate(t *testing.T) { svc, _, _ := newTestService() ctx := context.Background() diff --git a/migrations/0003_create_outbox_events.down.sql b/migrations/0003_create_outbox_events.down.sql new file mode 100644 index 0000000..eebeb85 --- /dev/null +++ b/migrations/0003_create_outbox_events.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS outbox_events; diff --git a/migrations/0003_create_outbox_events.up.sql b/migrations/0003_create_outbox_events.up.sql new file mode 100644 index 0000000..04f6b41 --- /dev/null +++ b/migrations/0003_create_outbox_events.up.sql @@ -0,0 +1,21 @@ +-- Transactional outbox: domain events are inserted here in the SAME Postgres +-- transaction as the business rows (players, player_stats). A separate relay +-- process publishes committed rows to NATS, eliminating the dual-write problem. +CREATE TABLE IF NOT EXISTS outbox_events ( + id UUID PRIMARY KEY, -- = events.Event.ID, reused as the consumer dedup key + event_type TEXT NOT NULL, -- e.g. "PlayerRegistered" + topic TEXT NOT NULL, -- e.g. "events.player" + payload JSONB NOT NULL, -- domain payload (events.Event.Payload) + carrier JSONB NOT NULL DEFAULT '{}', -- W3C trace headers captured at write time + status TEXT NOT NULL DEFAULT 'PENDING', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + published_at TIMESTAMPTZ, -- NULL until relayed to NATS + attempt_count INTEGER NOT NULL DEFAULT 0, + CONSTRAINT ck_outbox_status CHECK (status IN ('PENDING', 'PUBLISHED')) +); + +-- Relay hot path: fetch the oldest unpublished rows. The partial index stays +-- small as PUBLISHED rows accumulate, keeping the poll query O(batch). +CREATE INDEX IF NOT EXISTS idx_outbox_pending + ON outbox_events (created_at) + WHERE status = 'PENDING'; diff --git a/pkg/config/config.go b/pkg/config/config.go index f57bd0a..dede8c2 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -31,6 +31,14 @@ type Config struct { NATSURL string EventWorkers int + // Transactional outbox relay tuning. The relay polls outbox_events and + // publishes committed rows to NATS. OutboxEnabled gates the relay loop; + // the player service always writes to the outbox regardless. + OutboxEnabled bool + OutboxBatchSize int + OutboxPollInterval time.Duration + OutboxWorkers int + JWTSecret string JWTExpiry time.Duration JWTIssuer string @@ -82,6 +90,11 @@ func Load(serviceName string) *Config { NATSURL: getEnv("NATS_URL", "nats://localhost:4222"), EventWorkers: getEnvInt("EVENT_WORKERS", 8), + OutboxEnabled: getEnvBool("OUTBOX_ENABLED", true), + OutboxBatchSize: getEnvInt("OUTBOX_BATCH_SIZE", 100), + OutboxPollInterval: getEnvDuration("OUTBOX_POLL_INTERVAL", time.Second), + OutboxWorkers: getEnvInt("OUTBOX_WORKERS", 4), + JWTSecret: getEnv("JWT_SECRET", "insecure-dev-secret-do-not-use-in-prod"), JWTExpiry: getEnvDuration("JWT_EXPIRY", 24*time.Hour), JWTIssuer: getEnv("JWT_ISSUER", "gamemesh"), @@ -130,6 +143,8 @@ func defaultPort(serviceName string) string { return "8083" case "websocket": return "8084" + case "outbox-relay": + return "8085" default: return "8080" } diff --git a/pkg/events/events.go b/pkg/events/events.go index ef2276c..2834a6a 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -18,12 +18,20 @@ const ( TypePlayerLeft = "PlayerLeft" TypeMatchFound = "MatchFound" TypeLeaderboardUpdated = "LeaderboardUpdated" + // Player identity events. These flow through the transactional outbox: they + // are written to outbox_events in the SAME Postgres transaction as the + // players/player_stats rows, then relayed to NATS — so the event can never + // be lost once the registration/update commits (no dual-write). + TypePlayerRegistered = "PlayerRegistered" + TypePlayerUpdated = "PlayerUpdated" ) // Topics (channels) events are published on. const ( TopicMatchmaking = "events.matchmaking" TopicLeaderboard = "events.leaderboard" + // TopicPlayer carries identity lifecycle events emitted via the outbox. + TopicPlayer = "events.player" ) // Event is the wire format for all inter-service messages. @@ -68,6 +76,21 @@ type LeaderboardUpdatedPayload struct { Rank int64 `json:"rank"` } +// PlayerRegisteredPayload is emitted when a new player is created. Written to +// the outbox atomically with the players/player_stats rows. +type PlayerRegisteredPayload struct { + PlayerID string `json:"player_id"` + Username string `json:"username"` + Email string `json:"email"` +} + +// PlayerUpdatedPayload is emitted when a player edits their profile. +type PlayerUpdatedPayload struct { + PlayerID string `json:"player_id"` + Username string `json:"username"` + Email string `json:"email"` +} + // Publisher sends events to a topic. Implementations must be safe for // concurrent use. type Publisher interface { diff --git a/pkg/events/nats.go b/pkg/events/nats.go index 53442d4..eb1a728 100644 --- a/pkg/events/nats.go +++ b/pkg/events/nats.go @@ -76,6 +76,11 @@ var streamFor = map[string]stream{ subjects: []string{TopicLeaderboard, TopicLeaderboard + ".>"}, maxAge: 1 * time.Hour, // score updates are high-volume and disposable }, + TopicPlayer: { + name: "PLAYER", + subjects: []string{TopicPlayer, TopicPlayer + ".>"}, + maxAge: 72 * time.Hour, // identity events are low-volume, long replay window + }, } // NewNATSBus connects to NATS, ensures the streams exist and returns a bus. diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index d462cd2..afed622 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -34,6 +34,13 @@ type Metrics struct { EventsConsumedTotal *prometheus.CounterVec EventsFailedTotal *prometheus.CounterVec EventProcessingDuration *prometheus.HistogramVec + + // Transactional outbox relay instruments. Pending is a gauge of the current + // backlog (PENDING rows); the rest track the relay's publish path. + OutboxEventsPending prometheus.Gauge + OutboxEventsPublishedTotal prometheus.Counter + OutboxPublishFailuresTotal prometheus.Counter + OutboxPublishDuration prometheus.Histogram } // New creates and registers all instruments, labelled with the service name @@ -106,12 +113,34 @@ func New(service string) *Metrics { ConstLabels: constLabels, Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5}, }, []string{"topic", "type"}), + OutboxEventsPending: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "gamemesh_outbox_events_pending", + Help: "Outbox rows awaiting publication (the relay backlog).", + ConstLabels: constLabels, + }), + OutboxEventsPublishedTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "gamemesh_outbox_events_published_total", + Help: "Total outbox rows successfully relayed to the event bus.", + ConstLabels: constLabels, + }), + OutboxPublishFailuresTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "gamemesh_outbox_publish_failures_total", + Help: "Total outbox publish attempts that failed (row stays PENDING and is retried).", + ConstLabels: constLabels, + }), + OutboxPublishDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "gamemesh_outbox_publish_duration_seconds", + Help: "Latency of publishing a single outbox row to the event bus.", + ConstLabels: constLabels, + Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5}, + }), } reg.MustRegister( m.RequestsTotal, m.ErrorsTotal, m.RequestDuration, m.WSConnections, m.MatchmakingQueueSize, m.MatchesCreated, m.LeaderboardUpdates, m.EventsPublishedTotal, m.EventsConsumedTotal, m.EventsFailedTotal, m.EventProcessingDuration, + m.OutboxEventsPending, m.OutboxEventsPublishedTotal, m.OutboxPublishFailuresTotal, m.OutboxPublishDuration, ) return m } diff --git a/scripts/db/schema.sql b/scripts/db/schema.sql index 13e07e3..9ec193c 100644 --- a/scripts/db/schema.sql +++ b/scripts/db/schema.sql @@ -27,3 +27,22 @@ CREATE TABLE IF NOT EXISTS player_stats ( ); CREATE INDEX IF NOT EXISTS idx_player_stats_rank ON player_stats (rank); + +-- Transactional outbox: events written here in the same transaction as the +-- business rows, then relayed to NATS by the outbox-relay process. +CREATE TABLE IF NOT EXISTS outbox_events ( + id UUID PRIMARY KEY, + event_type TEXT NOT NULL, + topic TEXT NOT NULL, + payload JSONB NOT NULL, + carrier JSONB NOT NULL DEFAULT '{}', + status TEXT NOT NULL DEFAULT 'PENDING', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + published_at TIMESTAMPTZ, + attempt_count INTEGER NOT NULL DEFAULT 0, + CONSTRAINT ck_outbox_status CHECK (status IN ('PENDING', 'PUBLISHED')) +); + +CREATE INDEX IF NOT EXISTS idx_outbox_pending + ON outbox_events (created_at) + WHERE status = 'PENDING'; diff --git a/tests/integration/api_test.go b/tests/integration/api_test.go index 469c2c8..c33b390 100644 --- a/tests/integration/api_test.go +++ b/tests/integration/api_test.go @@ -29,7 +29,7 @@ func TestPlayerAPIEndToEnd(t *testing.T) { tokens := auth.NewTokenManager("integration-secret", time.Hour, "gamemesh") svc := player.NewService( - player.NewRepository(db), + newRepo(db), player.NewSessionStore(rdb), tokens, zap.NewNop(), diff --git a/tests/integration/outbox_test.go b/tests/integration/outbox_test.go new file mode 100644 index 0000000..d7e3895 --- /dev/null +++ b/tests/integration/outbox_test.go @@ -0,0 +1,109 @@ +//go:build integration + +package integration + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/alpnuhoglu/gamemesh/internal/outbox" + "github.com/alpnuhoglu/gamemesh/internal/player" + "github.com/alpnuhoglu/gamemesh/pkg/auth" + "github.com/alpnuhoglu/gamemesh/pkg/events" +) + +// Scenario 1: registering a player writes the business rows AND the outbox row +// in the same transaction. Both must be present after a successful register. +func TestOutboxRowWrittenWithBusinessRows(t *testing.T) { + db := startPostgres(t) + rdb := startRedis(t) + ctx := context.Background() + + tokens := auth.NewTokenManager("integration-secret", 0, "gamemesh") + svc := player.NewService(newRepo(db), player.NewSessionStore(rdb), tokens, zap.NewNop()) + + p, err := svc.Register(ctx, player.RegisterInput{ + Username: "atomicuser", Email: "atomic@example.com", Password: "password123", + }) + require.NoError(t, err) + + // Business rows committed. + var players int64 + require.NoError(t, db.Model(&player.Player{}).Where("id = ?", p.ID).Count(&players).Error) + assert.Equal(t, int64(1), players) + + // Outbox row committed in the same transaction, PENDING, correct type. + var rows []outbox.OutboxEvent + require.NoError(t, db.Where("event_type = ?", events.TypePlayerRegistered).Find(&rows).Error) + require.Len(t, rows, 1) + assert.Equal(t, events.TopicPlayer, rows[0].Topic) + assert.Equal(t, outbox.StatusPending, rows[0].Status) + assert.Equal(t, p.ID.String(), rows[0].ID.String(), "outbox id == event id (the dedup key)") +} + +// Atomicity: when the business INSERT fails (duplicate username), the outbox row +// must NOT be left behind — the whole transaction rolls back. +func TestOutboxRollsBackWithBusinessFailure(t *testing.T) { + db := startPostgres(t) + rdb := startRedis(t) + ctx := context.Background() + + tokens := auth.NewTokenManager("integration-secret", 0, "gamemesh") + svc := player.NewService(newRepo(db), player.NewSessionStore(rdb), tokens, zap.NewNop()) + + _, err := svc.Register(ctx, player.RegisterInput{ + Username: "dupe", Email: "dupe1@example.com", Password: "password123", + }) + require.NoError(t, err) + + // Second register with the same username violates the unique constraint. + _, err = svc.Register(ctx, player.RegisterInput{ + Username: "dupe", Email: "dupe2@example.com", Password: "password123", + }) + require.ErrorIs(t, err, player.ErrDuplicate) + + // Exactly one outbox row (from the first, successful register) — the failed + // transaction left no orphan event. + var n int64 + require.NoError(t, db.Model(&outbox.OutboxEvent{}).Count(&n).Error) + assert.Equal(t, int64(1), n, "failed business tx must not leave an outbox row") +} + +// The store's RunBatch claims pending rows, publishes them and marks them +// PUBLISHED — the relay's DB-backed path end to end. +func TestStoreRunBatchMarksPublished(t *testing.T) { + db := startPostgres(t) + ctx := context.Background() + store := outbox.NewStore(db) + + // Seed a pending row directly via the publisher (no business tx needed here). + e, err := events.New(events.TypePlayerRegistered, events.PlayerRegisteredPayload{PlayerID: "p1"}) + require.NoError(t, err) + require.NoError(t, outbox.NewPublisher(store).Publish(ctx, events.TopicPlayer, e)) + + pendingBefore, err := store.CountPending(ctx) + require.NoError(t, err) + require.Equal(t, int64(1), pendingBefore) + + var publishedCount int + n, err := store.RunBatch(ctx, 10, func(_ context.Context, rows []outbox.OutboxEvent) ([]uuid.UUID, []uuid.UUID, error) { + publishedCount = len(rows) + ids := make([]uuid.UUID, len(rows)) + for i, r := range rows { + ids[i] = r.ID + } + return ids, nil, nil + }) + require.NoError(t, err) + assert.Equal(t, 1, n) + assert.Equal(t, 1, publishedCount) + + pendingAfter, err := store.CountPending(ctx) + require.NoError(t, err) + assert.Equal(t, int64(0), pendingAfter, "row must be marked PUBLISHED") +} diff --git a/tests/integration/postgres_test.go b/tests/integration/postgres_test.go index e1cdad6..f4cde5d 100644 --- a/tests/integration/postgres_test.go +++ b/tests/integration/postgres_test.go @@ -18,6 +18,7 @@ import ( "gorm.io/gorm" gormlogger "gorm.io/gorm/logger" + "github.com/alpnuhoglu/gamemesh/internal/outbox" "github.com/alpnuhoglu/gamemesh/internal/player" ) @@ -43,13 +44,19 @@ func startPostgres(t *testing.T) *gorm.DB { Logger: gormlogger.Default.LogMode(gormlogger.Silent), }) require.NoError(t, err) - require.NoError(t, db.AutoMigrate(&player.Player{}, &player.Stats{})) + require.NoError(t, db.AutoMigrate(&player.Player{}, &player.Stats{}, &outbox.OutboxEvent{})) return db } +// newRepo builds a repository whose outbox writes target the same DB, so the +// *WithOutbox paths (used by the service layer) commit the event atomically. +func newRepo(db *gorm.DB) player.Repository { + return player.NewRepository(db, outbox.NewPublisher(outbox.NewStore(db))) +} + func TestPlayerRepositoryCRUD(t *testing.T) { db := startPostgres(t) - repo := player.NewRepository(db) + repo := newRepo(db) ctx := context.Background() p := &player.Player{ @@ -85,7 +92,7 @@ func TestPlayerRepositoryCRUD(t *testing.T) { func TestPlayerRepositoryConstraints(t *testing.T) { db := startPostgres(t) - repo := player.NewRepository(db) + repo := newRepo(db) ctx := context.Background() require.NoError(t, repo.Create(ctx, &player.Player{ From ba49b3d72f834eacc4bc1184ece16a67759310b1 Mon Sep 17 00:00:00 2001 From: AlpNuhoglu Date: Wed, 17 Jun 2026 16:08:02 +0300 Subject: [PATCH 2/3] fix(outbox): resolve revive stutter lint by renaming OutboxEvent to Event Eliminates redundant exported name stuttering in the `outbox` package by renaming `OutboxEvent` to `Event`, adhering to idiomatic Go naming conventions. - Updated type definition, TableName, and model refs in `store.go`. - Refactored internal channels and helper signatures in `relay.go`. - Updated GORM `AutoMigrate` call in `cmd/player/main.go`. - Refactored `relay_test.go`, `postgres_test.go`, and `outbox_test.go`. - Kept `pkg/metrics` fields unchanged as they do not stutter. --- cmd/player/main.go | 2 +- internal/outbox/relay.go | 8 ++++---- internal/outbox/relay_test.go | 12 ++++++------ internal/outbox/store.go | 22 ++++++++++++---------- tests/integration/outbox_test.go | 6 +++--- tests/integration/postgres_test.go | 2 +- 6 files changed, 27 insertions(+), 25 deletions(-) diff --git a/cmd/player/main.go b/cmd/player/main.go index d23d74b..7d34913 100644 --- a/cmd/player/main.go +++ b/cmd/player/main.go @@ -50,7 +50,7 @@ func main() { // SQL migrations under /migrations are the source of truth; AutoMigrate // is a dev convenience that keeps `docker compose up` zero-step. if cfg.AutoMigrate { - if err := db.AutoMigrate(&player.Player{}, &player.Stats{}, &outbox.OutboxEvent{}); err != nil { + if err := db.AutoMigrate(&player.Player{}, &player.Stats{}, &outbox.Event{}); err != nil { log.Fatal("auto-migration failed", zap.Error(err)) } } diff --git a/internal/outbox/relay.go b/internal/outbox/relay.go index 84ff36c..c3dd7f8 100644 --- a/internal/outbox/relay.go +++ b/internal/outbox/relay.go @@ -106,7 +106,7 @@ func (r *Relay) processBatch(ctx context.Context) (int, error) { pollCtx, span := tracing.Tracer().Start(ctx, "outbox.poll") defer span.End() - polled, err := r.store.RunBatch(pollCtx, r.cfg.BatchSize, func(ctx context.Context, rows []OutboxEvent) ([]uuid.UUID, []uuid.UUID, error) { + polled, err := r.store.RunBatch(pollCtx, r.cfg.BatchSize, func(ctx context.Context, rows []Event) ([]uuid.UUID, []uuid.UUID, error) { published, failed := r.publishAll(ctx, rows) return published, failed, nil }) @@ -117,13 +117,13 @@ func (r *Relay) processBatch(ctx context.Context) (int, error) { // publishAll fans the batch out across a bounded worker pool (NOT one goroutine // per event) and returns the IDs that published successfully and those that // failed. -func (r *Relay) publishAll(ctx context.Context, rows []OutboxEvent) (published, failed []uuid.UUID) { +func (r *Relay) publishAll(ctx context.Context, rows []Event) (published, failed []uuid.UUID) { type result struct { id uuid.UUID err error } - jobs := make(chan OutboxEvent) + jobs := make(chan Event) results := make(chan result) var wg sync.WaitGroup @@ -167,7 +167,7 @@ func (r *Relay) publishAll(ctx context.Context, rows []OutboxEvent) (published, // and publishes it. The carrier is extracted so the relay's publish span — and // the downstream NATS producer span — link back to the originating request's // trace even though publication happens later in this separate process. -func (r *Relay) publishOne(ctx context.Context, row OutboxEvent) error { +func (r *Relay) publishOne(ctx context.Context, row Event) error { carrier := map[string]string{} if len(row.Carrier) > 0 { _ = json.Unmarshal(row.Carrier, &carrier) diff --git a/internal/outbox/relay_test.go b/internal/outbox/relay_test.go index eaaf5bf..2702be5 100644 --- a/internal/outbox/relay_test.go +++ b/internal/outbox/relay_test.go @@ -26,11 +26,11 @@ import ( // retry logic can be tested in isolation. type fakeBatcher struct { mu sync.Mutex - rows []OutboxEvent // current PENDING rows, oldest first + rows []Event // current PENDING rows, oldest first attempt map[uuid.UUID]int // attempt_count per id } -func newFakeBatcher(rows ...OutboxEvent) *fakeBatcher { +func newFakeBatcher(rows ...Event) *fakeBatcher { return &fakeBatcher{rows: rows, attempt: map[uuid.UUID]int{}} } @@ -41,7 +41,7 @@ func (f *fakeBatcher) RunBatch(ctx context.Context, limit int, publish PublishFu batch = batch[:limit] } // Copy so the publish callback can't mutate our slice mid-flight. - rows := make([]OutboxEvent, len(batch)) + rows := make([]Event, len(batch)) copy(rows, batch) f.mu.Unlock() @@ -64,7 +64,7 @@ func (f *fakeBatcher) RunBatch(ctx context.Context, limit int, publish PublishFu f.attempt[id]++ } // Drop published rows from the PENDING set; failed rows stay for retry. - var remaining []OutboxEvent + var remaining []Event for _, r := range f.rows { if !done[r.ID] { remaining = append(remaining, r) @@ -110,11 +110,11 @@ type publishError struct{} func (*publishError) Error() string { return "simulated publish failure" } -func pendingRow(t *testing.T, eventType string, payload any) OutboxEvent { +func pendingRow(t *testing.T, eventType string, payload any) Event { t.Helper() raw, err := json.Marshal(payload) require.NoError(t, err) - return OutboxEvent{ + return Event{ ID: uuid.New(), EventType: eventType, Topic: events.TopicPlayer, diff --git a/internal/outbox/store.go b/internal/outbox/store.go index 3b73ea3..0f89c77 100644 --- a/internal/outbox/store.go +++ b/internal/outbox/store.go @@ -28,9 +28,11 @@ const ( StatusPublished = "PUBLISHED" ) -// OutboxEvent is the GORM model backing the outbox_events table. The struct -// fields mirror the migration in migrations/0003_create_outbox_events.up.sql. -type OutboxEvent struct { +// Event is the GORM model backing the outbox_events table. The struct fields +// mirror the migration in migrations/0003_create_outbox_events.up.sql. (Named +// Event rather than OutboxEvent to avoid the outbox.OutboxEvent stutter; it is +// distinct from events.Event, the wire format that flows over NATS.) +type Event struct { // ID equals events.Event.ID so the same identity flows DB -> NATS -> // consumer, giving consumers a natural dedup key with no extra columns. ID uuid.UUID `gorm:"type:uuid;primaryKey"` @@ -47,7 +49,7 @@ type OutboxEvent struct { } // TableName pins the table name to match the SQL migrations. -func (OutboxEvent) TableName() string { return "outbox_events" } +func (Event) TableName() string { return "outbox_events" } // Store provides access to outbox_events. The same Store type serves both the // write side (Insert, on a caller-supplied tx) and the relay side (RunBatch, @@ -73,7 +75,7 @@ func (s *Store) Insert(ctx context.Context, tx *gorm.DB, id uuid.UUID, eventType if len(carrierJSON) == 0 { carrierJSON = []byte("{}") } - row := OutboxEvent{ + row := Event{ ID: id, EventType: eventType, Topic: topic, @@ -88,7 +90,7 @@ func (s *Store) Insert(ctx context.Context, tx *gorm.DB, id uuid.UUID, eventType // which failed. The relay supplies this; the store calls it inside the batch // transaction so the rows stay locked (FOR UPDATE SKIP LOCKED) until their // status is updated. -type PublishFunc func(ctx context.Context, rows []OutboxEvent) (published, failed []uuid.UUID, err error) +type PublishFunc func(ctx context.Context, rows []Event) (published, failed []uuid.UUID, err error) // RunBatch is the relay's poll → publish → mark cycle, all inside ONE // transaction. It locks up to limit of the oldest PENDING rows with @@ -101,7 +103,7 @@ type PublishFunc func(ctx context.Context, rows []OutboxEvent) (published, faile func (s *Store) RunBatch(ctx context.Context, limit int, publish PublishFunc) (int, error) { var polled int err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { - var rows []OutboxEvent + var rows []Event if err := tx. Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}). Where("status = ?", StatusPending). @@ -142,7 +144,7 @@ func markPublished(ctx context.Context, tx *gorm.DB, ids []uuid.UUID) error { } now := time.Now().UTC() return tx.WithContext(ctx). - Model(&OutboxEvent{}). + Model(&Event{}). Where("id IN ?", ids). Updates(map[string]any{"status": StatusPublished, "published_at": now}).Error } @@ -152,7 +154,7 @@ func incrementAttempt(ctx context.Context, tx *gorm.DB, ids []uuid.UUID) error { return nil } return tx.WithContext(ctx). - Model(&OutboxEvent{}). + Model(&Event{}). Where("id IN ?", ids). UpdateColumn("attempt_count", gorm.Expr("attempt_count + 1")).Error } @@ -162,7 +164,7 @@ func incrementAttempt(ctx context.Context, tx *gorm.DB, ids []uuid.UUID) error { func (s *Store) CountPending(ctx context.Context) (int64, error) { var n int64 err := s.db.WithContext(ctx). - Model(&OutboxEvent{}). + Model(&Event{}). Where("status = ?", StatusPending). Count(&n).Error return n, err diff --git a/tests/integration/outbox_test.go b/tests/integration/outbox_test.go index d7e3895..994cd1a 100644 --- a/tests/integration/outbox_test.go +++ b/tests/integration/outbox_test.go @@ -38,7 +38,7 @@ func TestOutboxRowWrittenWithBusinessRows(t *testing.T) { assert.Equal(t, int64(1), players) // Outbox row committed in the same transaction, PENDING, correct type. - var rows []outbox.OutboxEvent + var rows []outbox.Event require.NoError(t, db.Where("event_type = ?", events.TypePlayerRegistered).Find(&rows).Error) require.Len(t, rows, 1) assert.Equal(t, events.TopicPlayer, rows[0].Topic) @@ -70,7 +70,7 @@ func TestOutboxRollsBackWithBusinessFailure(t *testing.T) { // Exactly one outbox row (from the first, successful register) — the failed // transaction left no orphan event. var n int64 - require.NoError(t, db.Model(&outbox.OutboxEvent{}).Count(&n).Error) + require.NoError(t, db.Model(&outbox.Event{}).Count(&n).Error) assert.Equal(t, int64(1), n, "failed business tx must not leave an outbox row") } @@ -91,7 +91,7 @@ func TestStoreRunBatchMarksPublished(t *testing.T) { require.Equal(t, int64(1), pendingBefore) var publishedCount int - n, err := store.RunBatch(ctx, 10, func(_ context.Context, rows []outbox.OutboxEvent) ([]uuid.UUID, []uuid.UUID, error) { + n, err := store.RunBatch(ctx, 10, func(_ context.Context, rows []outbox.Event) ([]uuid.UUID, []uuid.UUID, error) { publishedCount = len(rows) ids := make([]uuid.UUID, len(rows)) for i, r := range rows { diff --git a/tests/integration/postgres_test.go b/tests/integration/postgres_test.go index f4cde5d..833517c 100644 --- a/tests/integration/postgres_test.go +++ b/tests/integration/postgres_test.go @@ -44,7 +44,7 @@ func startPostgres(t *testing.T) *gorm.DB { Logger: gormlogger.Default.LogMode(gormlogger.Silent), }) require.NoError(t, err) - require.NoError(t, db.AutoMigrate(&player.Player{}, &player.Stats{}, &outbox.OutboxEvent{})) + require.NoError(t, db.AutoMigrate(&player.Player{}, &player.Stats{}, &outbox.Event{})) return db } From f2a8876df738044ad03ea99101c70ccd4c317038 Mon Sep 17 00:00:00 2001 From: AlpNuhoglu Date: Wed, 17 Jun 2026 17:23:39 +0300 Subject: [PATCH 3/3] feat(outbox): add transactional outbox to eliminate dual-write Guarantee that a committed Postgres transaction can never lose its domain event. The player service now writes business rows and the event to outbox_events in one transaction; a dedicated outbox-relay process polls committed rows and publishes them to NATS JetStream, retrying until success (at-least-once). - outbox_events table (migration 0003 + schema.sql + GORM model) - OutboxPublisher implements events.Publisher, inserting on the business tx instead of publishing inline - Relay: bounded worker pool, FOR UPDATE SKIP LOCKED batching, graceful shutdown, PENDING->PUBLISHED lifecycle with attempt_count - New PlayerRegistered/PlayerUpdated events on the events.player stream - Trace continuity via stored W3C carrier; outbox.* spans - Prometheus metrics: pending, published_total, failures_total, publish_duration_seconds - OUTBOX_* config, docker-compose service, Prometheus scrape target - Unit tests (relay publish/retry/replay/trace) + integration tests (atomic write, rollback, RunBatch) --- tests/integration/outbox_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/integration/outbox_test.go b/tests/integration/outbox_test.go index 994cd1a..8af267b 100644 --- a/tests/integration/outbox_test.go +++ b/tests/integration/outbox_test.go @@ -4,6 +4,7 @@ package integration import ( "context" + "encoding/json" "testing" "github.com/google/uuid" @@ -43,7 +44,12 @@ func TestOutboxRowWrittenWithBusinessRows(t *testing.T) { require.Len(t, rows, 1) assert.Equal(t, events.TopicPlayer, rows[0].Topic) assert.Equal(t, outbox.StatusPending, rows[0].Status) - assert.Equal(t, p.ID.String(), rows[0].ID.String(), "outbox id == event id (the dedup key)") + // The row id is the Event.ID (a generated UUID, the consumer dedup key) — NOT + // the player id, which travels in the payload. + assert.NotEqual(t, uuid.Nil, rows[0].ID, "outbox id is the generated event id") + var payload events.PlayerRegisteredPayload + require.NoError(t, json.Unmarshal(rows[0].Payload, &payload)) + assert.Equal(t, p.ID.String(), payload.PlayerID, "payload references the new player") } // Atomicity: when the business INSERT fails (duplicate username), the outbox row