Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ NATS_URL=nats://nats:4222
# Bounded worker pool size for the JetStream consumers (per service).
EVENT_WORKERS=8

# --- Transactional outbox (outbox-relay service) ---
# The player service writes domain events to the outbox_events table in the same
# Postgres transaction as the business write; the outbox-relay process polls and
# publishes them to NATS, eliminating the dual-write problem.
# OUTBOX_ENABLED gates the relay loop (the player service always writes events).
OUTBOX_ENABLED=true
# Rows fetched per poll.
OUTBOX_BATCH_SIZE=100
# Delay between polls when the backlog is drained.
OUTBOX_POLL_INTERVAL=1s
# Bounded publish-worker pool per batch.
OUTBOX_WORKERS=4

# --- Service URLs (as seen from the API gateway) ---
PLAYER_SERVICE_URL=http://player:8081
MATCHMAKING_SERVICE_URL=http://matchmaking:8082
Expand Down
92 changes: 92 additions & 0 deletions cmd/outbox-relay/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// The outbox-relay service is the publish side of the transactional outbox. It
// polls outbox_events (written by the player service in the same transaction as
// the business state) and publishes committed rows to NATS JetStream. Running it
// as a dedicated process keeps the reliability layer decoupled from the business
// services: it scales, restarts and fails independently, and the player service
// never needs a NATS connection.
package main

import (
"context"

"go.uber.org/zap"
"gorm.io/driver/postgres"
"gorm.io/gorm"
gormlogger "gorm.io/gorm/logger"

"github.com/alpnuhoglu/gamemesh/internal/outbox"
"github.com/alpnuhoglu/gamemesh/pkg/config"
"github.com/alpnuhoglu/gamemesh/pkg/events"
"github.com/alpnuhoglu/gamemesh/pkg/logger"
"github.com/alpnuhoglu/gamemesh/pkg/metrics"
"github.com/alpnuhoglu/gamemesh/pkg/server"
"github.com/alpnuhoglu/gamemesh/pkg/tracing"
)

func main() {
cfg := config.Load("outbox-relay")
log := logger.Must(cfg.ServiceName, cfg.Env)
defer func() { _ = log.Sync() }()

shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{
Enabled: cfg.OTelEnabled,
ServiceName: cfg.OTelServiceName,
Endpoint: cfg.OTelEndpoint,
Env: cfg.Env,
Version: cfg.ServiceVersion,
Sampler: cfg.OTelSampler,
SamplerRatio: cfg.OTelSamplerRatio,
}, log)
defer func() { _ = shutdownTracing(context.Background()) }()

db, err := gorm.Open(postgres.Open(cfg.PostgresDSN), &gorm.Config{
Logger: gormlogger.Default.LogMode(gormlogger.Silent),
})
if err != nil {
log.Fatal("failed to connect to postgres", zap.Error(err))
}
if err := tracing.InstrumentGorm(db); err != nil {
log.Fatal("failed to instrument gorm", zap.Error(err))
}

m := metrics.New(cfg.ServiceName)

// The relay publishes to NATS; it never falls back to Redis Pub/Sub because
// outbox durability would be pointless behind a fire-and-forget transport.
bus, err := events.NewBus(events.Config{
Transport: "nats",
DurableName: cfg.ServiceName,
Workers: cfg.EventWorkers,
}, nil, cfg.NATSURL, m, log)
if err != nil {
log.Fatal("failed to init event bus", zap.Error(err))
}
defer func() { _ = bus.Close() }()

store := outbox.NewStore(db)
relay := outbox.NewRelay(store, bus, outbox.RelayConfig{
BatchSize: cfg.OutboxBatchSize,
PollInterval: cfg.OutboxPollInterval,
Workers: cfg.OutboxWorkers,
}, m, log)

ctx, stop := server.ShutdownContext()
defer stop()

if !cfg.OutboxEnabled {
log.Warn("OUTBOX_ENABLED=false; relay idle, serving metrics only")
} else {
go func() {
if err := relay.Run(ctx); err != nil {
log.Fatal("outbox relay failed", zap.Error(err))
}
}()
}

// Serve /metrics (and /healthz via the shared engine) so Prometheus can scrape
// the relay and Compose/K8s health checks work like every other service.
engine := server.NewEngine(cfg, log, m)
if err := server.Run(engine, cfg.HTTPPort, log); err != nil {
log.Fatal("server exited", zap.Error(err))
}
}
9 changes: 7 additions & 2 deletions cmd/player/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/redis/go-redis/v9"

"github.com/alpnuhoglu/gamemesh/internal/outbox"
"github.com/alpnuhoglu/gamemesh/internal/player"
"github.com/alpnuhoglu/gamemesh/pkg/auth"
"github.com/alpnuhoglu/gamemesh/pkg/config"
Expand Down Expand Up @@ -49,7 +50,7 @@ func main() {
// SQL migrations under /migrations are the source of truth; AutoMigrate
// is a dev convenience that keeps `docker compose up` zero-step.
if cfg.AutoMigrate {
if err := db.AutoMigrate(&player.Player{}, &player.Stats{}); err != nil {
if err := db.AutoMigrate(&player.Player{}, &player.Stats{}, &outbox.Event{}); err != nil {
log.Fatal("auto-migration failed", zap.Error(err))
}
}
Expand All @@ -64,7 +65,11 @@ func main() {
}

tokens := auth.NewTokenManager(cfg.JWTSecret, cfg.JWTExpiry, cfg.JWTIssuer)
repo := player.NewRepository(db)
// The player service writes domain events to the outbox (NOT to NATS): the
// dedicated outbox-relay process publishes them. So the player service has no
// NATS dependency — the OutboxPublisher writes rows on the business tx.
outboxPub := outbox.NewPublisher(outbox.NewStore(db))
repo := player.NewRepository(db, outboxPub)
sessions := player.NewSessionStore(rdb)
svc := player.NewService(repo, sessions, tokens, log)
handler := player.NewHandler(svc)
Expand Down
3 changes: 3 additions & 0 deletions config/prometheus/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ scrape_configs:
- job_name: websocket
static_configs:
- targets: ["websocket:8084"]
- job_name: outbox-relay
static_configs:
- targets: ["outbox-relay:8085"]
14 changes: 14 additions & 0 deletions deployments/docker/Dockerfile.outbox-relay
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# syntax=docker/dockerfile:1
FROM golang:1.25-alpine AS build
WORKDIR /src
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 go build -trimpath -ldflags="-s -w" -o /out/service ./cmd/outbox-relay

FROM alpine:3.20
RUN adduser -D -u 10001 app
USER app
COPY --from=build /out/service /service
EXPOSE 8085
ENTRYPOINT ["/service"]
28 changes: 28 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,34 @@ services:
<<: *svc-healthcheck
test: ["CMD", "wget", "-qO-", "http://localhost:8084/healthz"]

# Transactional outbox relay: the publish side of the outbox pattern. It polls
# outbox_events (written by the player service in the same Postgres transaction
# as the business state) and publishes committed rows to NATS JetStream. A
# dedicated process so it scales/restarts independently of the player API and
# keeps the player service free of any NATS dependency.
outbox-relay:
build:
context: .
dockerfile: deployments/docker/Dockerfile.outbox-relay
environment:
<<: *service-env
HTTP_PORT: "8085"
POSTGRES_DSN: host=postgres user=${POSTGRES_USER:-gamemesh} password=${POSTGRES_PASSWORD:-gamemesh-dev-password} dbname=${POSTGRES_DB:-gamemesh} port=5432 sslmode=disable
OUTBOX_ENABLED: ${OUTBOX_ENABLED:-true}
OUTBOX_BATCH_SIZE: ${OUTBOX_BATCH_SIZE:-100}
OUTBOX_POLL_INTERVAL: ${OUTBOX_POLL_INTERVAL:-1s}
OUTBOX_WORKERS: ${OUTBOX_WORKERS:-4}
depends_on:
postgres:
condition: service_healthy
nats:
condition: service_healthy
otel-collector:
condition: service_started
healthcheck:
<<: *svc-healthcheck
test: ["CMD", "wget", "-qO-", "http://localhost:8085/healthz"]

# Jaeger all-in-one: receives OTLP directly (v1.35+) and serves the trace UI
# on :16686. In-memory storage — fine for local dev; swap for Badger/ES in prod.
jaeger:
Expand Down
Loading
Loading