From 0d9afbc2958ec301725f0eccae80f407b8443178 Mon Sep 17 00:00:00 2001 From: AlpNuhoglu Date: Sun, 21 Jun 2026 16:50:59 +0300 Subject: [PATCH 1/2] feat(presence): add distributed presence service for the social layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce a standalone Presence Service as the foundation for friends, parties, invites, reconnect and notifications. Presence is modeled as a distributed system, not an online/offline boolean. Design: - Redis is the source of truth: presence:{id} JSON (state, last_seen, connection_count) with a TTL. WS replicas heartbeat every 15s to refresh a 45s TTL, so presence self-heals after crashes/partitions with no explicit disconnect — a missing key is OFFLINE. - All mutations are Lua scripts (atomic read-modify-write), so any WS replica can update any player with no sticky sessions and no leader election — horizontally scalable. - Multi-device: connection_count keeps a player ONLINE until the last connection closes. - States OFFLINE/ONLINE/IN_QUEUE/IN_MATCH/AWAY with a deliberately permissive transition rule (only OFFLINE->IN_QUEUE/IN_MATCH rejected) so future reconnect/rematch/party flows need no state-machine changes. - Publishes PresenceOnline/Offline/StateChanged to NATS (events.presence, 15m retention) via the shared events.Publisher with trace propagation; consumers are optional. Reads are side-effect free. - Bulk friend lookup (POST /presence/friends) via a single Redis pipeline: O(N) commands, one round trip, no N+1. WS gateway feeds presence over HTTP through an injected, optional notifier (nil = no-op) — the only coupling, no WS internals touched. Adds metrics, tracing spans, k8s manifest (2 replicas), Dockerfile, compose, and docs/presence.md. matchmaking is unchanged. --- .gitignore | 3 + Makefile | 2 +- cmd/presence/main.go | 68 ++++++ cmd/websocket/main.go | 10 +- config/prometheus/prometheus.yml | 3 + deployments/docker/Dockerfile.presence | 14 ++ deployments/k8s/02-configmap.yaml | 3 + deployments/k8s/16-presence.yaml | 59 ++++++ docker-compose.yml | 29 +++ docs/presence.md | 279 +++++++++++++++++++++++++ internal/presence/handler.go | 127 +++++++++++ internal/presence/handler_test.go | 86 ++++++++ internal/presence/model.go | 91 ++++++++ internal/presence/model_test.go | 48 +++++ internal/presence/notifier.go | 92 ++++++++ internal/presence/repository.go | 229 ++++++++++++++++++++ internal/presence/service.go | 194 +++++++++++++++++ internal/presence/service_test.go | 272 ++++++++++++++++++++++++ internal/wsgateway/handler_test.go | 2 +- internal/wsgateway/hub.go | 80 ++++++- internal/wsgateway/hub_test.go | 2 +- pkg/config/config.go | 14 ++ pkg/events/events.go | 23 ++ pkg/events/nats.go | 5 + pkg/metrics/metrics.go | 30 +++ tests/integration/presence_test.go | 121 +++++++++++ 26 files changed, 1879 insertions(+), 7 deletions(-) create mode 100644 cmd/presence/main.go create mode 100644 deployments/docker/Dockerfile.presence create mode 100644 deployments/k8s/16-presence.yaml create mode 100644 docs/presence.md create mode 100644 internal/presence/handler.go create mode 100644 internal/presence/handler_test.go create mode 100644 internal/presence/model.go create mode 100644 internal/presence/model_test.go create mode 100644 internal/presence/notifier.go create mode 100644 internal/presence/repository.go create mode 100644 internal/presence/service.go create mode 100644 internal/presence/service_test.go create mode 100644 tests/integration/presence_test.go diff --git a/.gitignore b/.gitignore index 7c11233..04ea4aa 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,6 @@ coverage.html # k6 reports scripts/k6/reports/ + +# Graphify knowledge-graph output — local-only, never expose publicly +graphify-out/ diff --git a/Makefile b/Makefile index 7ad8e6d..68b5c9d 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .PHONY: help build test test-integration lint cover up down k8s-build k8s-apply k8s-delete k6-leaderboard k6-matchmaking k6-websocket -SERVICES := gateway player matchmaking leaderboard websocket +SERVICES := gateway player matchmaking leaderboard websocket presence help: ## Show available targets @grep -E '^[a-zA-Z_-]+:.*?## ' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf " \033[36m%-20s\033[0m %s\n", $$1, $$2}' diff --git a/cmd/presence/main.go b/cmd/presence/main.go new file mode 100644 index 0000000..0fb1465 --- /dev/null +++ b/cmd/presence/main.go @@ -0,0 +1,68 @@ +// The presence service tracks where every player is (OFFLINE, ONLINE, +// IN_QUEUE, IN_MATCH, AWAY) in Redis, refreshed by WS-gateway heartbeats with a +// TTL so presence self-heals after crashes, and publishes presence transitions +// as events for the social layer (friends, parties, invites, notifications). +package main + +import ( + "context" + + "github.com/redis/go-redis/v9" + "go.uber.org/zap" + + "github.com/alpnuhoglu/gamemesh/internal/presence" + "github.com/alpnuhoglu/gamemesh/pkg/config" + "github.com/alpnuhoglu/gamemesh/pkg/events" + "github.com/alpnuhoglu/gamemesh/pkg/logger" + "github.com/alpnuhoglu/gamemesh/pkg/metrics" + "github.com/alpnuhoglu/gamemesh/pkg/server" + "github.com/alpnuhoglu/gamemesh/pkg/tracing" +) + +func main() { + cfg := config.Load("presence") + log := logger.Must(cfg.ServiceName, cfg.Env) + defer func() { _ = log.Sync() }() + + shutdownTracing := tracing.MustInit(context.Background(), tracing.Config{ + Enabled: cfg.OTelEnabled, + ServiceName: cfg.OTelServiceName, + Endpoint: cfg.OTelEndpoint, + Env: cfg.Env, + Version: cfg.ServiceVersion, + Sampler: cfg.OTelSampler, + SamplerRatio: cfg.OTelSamplerRatio, + }, log) + defer func() { _ = shutdownTracing(context.Background()) }() + + rdb := redis.NewClient(&redis.Options{ + Addr: cfg.RedisAddr, + Password: cfg.RedisPassword, + DB: cfg.RedisDB, + }) + if err := tracing.InstrumentRedis(rdb); err != nil { + log.Fatal("failed to instrument redis", zap.Error(err)) + } + + m := metrics.New(cfg.ServiceName) + bus, err := events.NewBus(events.Config{ + Transport: cfg.EventBus, + DurableName: cfg.ServiceName, + Workers: cfg.EventWorkers, + }, rdb, cfg.NATSURL, m, log) + if err != nil { + log.Fatal("failed to init event bus", zap.Error(err)) + } + defer func() { _ = bus.Close() }() + + repo := presence.NewRepository(rdb, cfg.PresenceTTL) + svc := presence.NewService(repo, bus, m, log) + handler := presence.NewHandler(svc) + + engine := server.NewEngine(cfg, log, m) + handler.RegisterRoutes(engine) + + if err := server.Run(engine, cfg.HTTPPort, log); err != nil { + log.Fatal("server exited", zap.Error(err)) + } +} diff --git a/cmd/websocket/main.go b/cmd/websocket/main.go index e3cebbc..0b8654a 100644 --- a/cmd/websocket/main.go +++ b/cmd/websocket/main.go @@ -9,6 +9,7 @@ import ( "github.com/redis/go-redis/v9" "go.uber.org/zap" + "github.com/alpnuhoglu/gamemesh/internal/presence" "github.com/alpnuhoglu/gamemesh/internal/wsgateway" "github.com/alpnuhoglu/gamemesh/pkg/auth" "github.com/alpnuhoglu/gamemesh/pkg/config" @@ -45,12 +46,19 @@ func main() { } m := metrics.New(cfg.ServiceName) - hub := wsgateway.NewHub(log, m) + // Feed presence over HTTP. The notifier is the only coupling between the WS + // gateway and the Presence Service; if the Presence Service is down, calls + // fail softly and presence self-heals from later heartbeats / TTL. + notifier := presence.NewHTTPNotifier(cfg.PresenceServiceURL, cfg.PresenceHeartbeatInterval) + hub := wsgateway.NewHub(log, m, notifier) tokens := auth.NewTokenManager(cfg.JWTSecret, cfg.JWTExpiry, cfg.JWTIssuer) handler := wsgateway.NewHandler(hub, tokens, cfg.AllowedOrigins, log) ctx, stop := server.ShutdownContext() defer stop() + + // Refresh presence TTL for every locally-connected player on a ticker. + go hub.RunHeartbeat(ctx, cfg.PresenceHeartbeatInterval) bus, err := events.NewBus(events.Config{ Transport: cfg.EventBus, DurableName: cfg.ServiceName, diff --git a/config/prometheus/prometheus.yml b/config/prometheus/prometheus.yml index 9ee5cfb..d9c3c11 100644 --- a/config/prometheus/prometheus.yml +++ b/config/prometheus/prometheus.yml @@ -28,3 +28,6 @@ scrape_configs: - job_name: outbox-relay static_configs: - targets: ["outbox-relay:8085"] + - job_name: presence + static_configs: + - targets: ["presence:8086"] diff --git a/deployments/docker/Dockerfile.presence b/deployments/docker/Dockerfile.presence new file mode 100644 index 0000000..5df0233 --- /dev/null +++ b/deployments/docker/Dockerfile.presence @@ -0,0 +1,14 @@ +# syntax=docker/dockerfile:1 +FROM golang:1.25-alpine AS build +WORKDIR /src +COPY go.mod go.sum ./ +RUN go mod download +COPY . . +RUN CGO_ENABLED=0 go build -trimpath -ldflags="-s -w" -o /out/service ./cmd/presence + +FROM alpine:3.20 +RUN adduser -D -u 10001 app +USER app +COPY --from=build /out/service /service +EXPOSE 8086 +ENTRYPOINT ["/service"] diff --git a/deployments/k8s/02-configmap.yaml b/deployments/k8s/02-configmap.yaml index ccd12c7..7813a54 100644 --- a/deployments/k8s/02-configmap.yaml +++ b/deployments/k8s/02-configmap.yaml @@ -13,6 +13,9 @@ data: MATCHMAKING_SERVICE_URL: http://matchmaking:8082 LEADERBOARD_SERVICE_URL: http://leaderboard:8083 WEBSOCKET_SERVICE_URL: http://websocket:8084 + PRESENCE_SERVICE_URL: http://presence:8086 + PRESENCE_TTL: 45s + PRESENCE_HEARTBEAT_INTERVAL: 15s RATE_LIMIT_RPS: "50" RATE_LIMIT_BURST: "100" MATCH_INTERVAL_SECONDS: "5" diff --git a/deployments/k8s/16-presence.yaml b/deployments/k8s/16-presence.yaml new file mode 100644 index 0000000..6a79d2a --- /dev/null +++ b/deployments/k8s/16-presence.yaml @@ -0,0 +1,59 @@ +# The Presence Service is stateless: all presence lives in Redis keyed by +# playerID, so any replica can serve any read or update. No sticky sessions and +# no leader election — scale horizontally by raising replicas. +apiVersion: apps/v1 +kind: Deployment +metadata: + name: presence + namespace: gamemesh +spec: + replicas: 2 + selector: + matchLabels: + app: presence + template: + metadata: + labels: + app: presence + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "8086" + prometheus.io/path: /metrics + spec: + containers: + - name: presence + image: gamemesh/presence:latest + imagePullPolicy: IfNotPresent + ports: + - containerPort: 8086 + env: + - name: HTTP_PORT + value: "8086" + envFrom: + - configMapRef: + name: gamemesh-config + - secretRef: + name: gamemesh-secrets + readinessProbe: + httpGet: { path: /healthz, port: 8086 } + initialDelaySeconds: 3 + periodSeconds: 5 + livenessProbe: + httpGet: { path: /healthz, port: 8086 } + initialDelaySeconds: 10 + periodSeconds: 10 + resources: + requests: { cpu: 100m, memory: 64Mi } + limits: { cpu: 500m, memory: 128Mi } +--- +apiVersion: v1 +kind: Service +metadata: + name: presence + namespace: gamemesh +spec: + selector: + app: presence + ports: + - port: 8086 + targetPort: 8086 diff --git a/docker-compose.yml b/docker-compose.yml index 2756cb8..7eba64f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -172,6 +172,10 @@ services: environment: <<: *service-env HTTP_PORT: "8084" + # Feed presence over HTTP; the heartbeat interval also paces the WS replica's + # presence-refresh ticker. + PRESENCE_SERVICE_URL: http://presence:8086 + PRESENCE_HEARTBEAT_INTERVAL: ${PRESENCE_HEARTBEAT_INTERVAL:-15s} ports: - "8084:8084" depends_on: @@ -185,6 +189,31 @@ services: <<: *svc-healthcheck test: ["CMD", "wget", "-qO-", "http://localhost:8084/healthz"] + # Presence Service: tracks player presence (online/in-queue/in-match/away) in + # Redis with TTL-based self-healing and publishes presence transitions to NATS. + # Stateless and horizontally scalable — all state is in Redis keyed by playerID. + presence: + build: + context: . + dockerfile: deployments/docker/Dockerfile.presence + environment: + <<: *service-env + HTTP_PORT: "8086" + PRESENCE_TTL: ${PRESENCE_TTL:-45s} + PRESENCE_HEARTBEAT_INTERVAL: ${PRESENCE_HEARTBEAT_INTERVAL:-15s} + ports: + - "8086:8086" + depends_on: + redis: + condition: service_healthy + nats: + condition: service_healthy + otel-collector: + condition: service_started + healthcheck: + <<: *svc-healthcheck + test: ["CMD", "wget", "-qO-", "http://localhost:8086/healthz"] + # Transactional outbox relay: the publish side of the outbox pattern. It polls # outbox_events (written by the player service in the same Postgres transaction # as the business state) and publishes committed rows to NATS JetStream. A diff --git a/docs/presence.md b/docs/presence.md new file mode 100644 index 0000000..9b7424d --- /dev/null +++ b/docs/presence.md @@ -0,0 +1,279 @@ +# Presence Service + +The Presence Service is the foundation of GameMesh's social layer. It maintains a +distributed, self-healing view of **where every player is** — not just a boolean +online flag — and publishes presence transitions as events that friends, parties, +invites, reconnect and notifications build on. + +It is a separate microservice (port `8086`). The WebSocket gateway feeds it +connection lifecycle over HTTP; presence ownership lives here, not in the WS +gateway — the two stay decoupled. + +- Code: [`internal/presence`](../internal/presence), entrypoint + [`cmd/presence/main.go`](../cmd/presence/main.go) +- Event contract: [`pkg/events/events.go`](../pkg/events/events.go) + (`TopicPresence`, `PresenceChangedPayload`) + +--- + +## State machine + +``` +OFFLINE – no live connection (a missing presence key IS offline) +ONLINE – at least one connection, not queued or in a match +IN_QUEUE – waiting in matchmaking +IN_MATCH – in an active match +AWAY – idle / explicitly away +``` + +Typical flow: + +``` +connect WS OFFLINE -> ONLINE (implicit, on first connection) +join queue ONLINE -> IN_QUEUE +match found IN_QUEUE -> IN_MATCH +match ends IN_MATCH -> ONLINE +disconnect * -> OFFLINE (only when the LAST connection closes) +``` + +### Transition rule (deliberately permissive) + +The validator rejects **only** the two transitions that require a connection an +offline player does not have: + +| Rejected | +|---------------------| +| `OFFLINE -> IN_QUEUE` | +| `OFFLINE -> IN_MATCH` | + +**Every other transition is allowed** — `ONLINE->IN_QUEUE`, `IN_QUEUE->IN_MATCH`, +`IN_MATCH->ONLINE`, `IN_MATCH->AWAY`, `AWAY->IN_QUEUE`, `IN_QUEUE->ONLINE`, etc. +Rejected transitions return `400` and increment +`gamemesh_presence_invalid_transitions_total`. + +Why permissive? Future reconnect, rematch and party flows should never require +editing the state machine. The rule lives in `CanTransition` +([`model.go`](../internal/presence/model.go)). + +`OFFLINE -> ONLINE` is not done via the API; it happens implicitly on the first +`connect`. + +--- + +## Redis data model (source of truth — no SQL) + +One key per player: + +``` +presence:{playerID} -> { + "state": "ONLINE", + "last_seen": 1718900000, // unix seconds + "connection_count": 2, + "updated_at": 1718900000 +} +``` + +- **TTL** (default `45s`) on the key. Heartbeats refresh it; if they stop the key + expires and the player is OFFLINE. +- A **missing key is OFFLINE** — there is no separate "offline" record to write. + +### Why Redis and no SQL + +Presence is ephemeral, high-churn and self-correcting. Durability buys nothing: a +lost record is re-derived from the next heartbeat. Redis gives O(1) reads, a +single pipelined bulk read for friend lists, and TTL as a free crash-recovery +mechanism. So presence intentionally does **not** use Postgres or the +transactional outbox (unlike player identity events). + +### Atomic, replica-safe writes + +Every mutating operation (`connect`, `disconnect`, `heartbeat`, `setState`) is a +**Lua script** ([`repository.go`](../internal/presence/repository.go)) that does +read → modify → write+EXPIRE atomically on the Redis server. Two WS replicas +connecting/disconnecting the same player concurrently can never lose an update or +clobber each other. (Verified by `TestConcurrentConnectsCountCorrectly`.) + +--- + +## Heartbeat strategy + +- WS replicas heartbeat every **15s** (`PRESENCE_HEARTBEAT_INTERVAL`), per + locally-connected player, via the WS hub's heartbeat loop + ([`hub.go`](../internal/wsgateway/hub.go) `RunHeartbeat`). +- Each heartbeat refreshes `last_seen`/`updated_at` and resets the **45s** TTL + (`PRESENCE_TTL`) — a 3× margin, so two missed beats still leave the player + online. +- If the key had already expired, a heartbeat **re-creates** it as ONLINE, so + presence self-heals after a crash without an explicit reconnect. + +--- + +## Multi-device support + +A player may be on phone, tablet and browser at once. `connection_count` tracks +live connections: + +- each `connect` increments it (ONLINE on the first); +- each `disconnect` decrements it; +- the player flips to **OFFLINE only when the count reaches 0** — at which point + the key is deleted. + +So closing one of three devices keeps the player ONLINE. + +> **Future enhancement (not implemented):** replace the scalar `connection_count` +> with a set of connection IDs: +> ``` +> connections:{ws-id-1, ws-id-2, ws-id-3} +> ``` +> That enables reconnect, per-device metadata, browser/mobile distinction, +> connection ownership and session recovery. The current model is forward +> compatible — events already carry `connection_count`. + +--- + +## Connection ownership & horizontal scaling + +- All state is in Redis keyed by `playerID`; **any** WS replica can update **any** + player. The atomic Lua writes make concurrent updates safe. +- **No sticky sessions, no leader election, no per-replica ownership.** Scale the + Presence Service and the WS gateway horizontally by raising replicas + ([`deployments/k8s/16-presence.yaml`](../deployments/k8s/16-presence.yaml), + `replicas: 2`). + +--- + +## Events + +Published to NATS topic `events.presence` (subjects `events.presence.`): + +| Type | When | +|------------------------|---------------------------------------------| +| `PresenceOnline` | OFFLINE → any non-offline state | +| `PresenceOffline` | any state → OFFLINE (last disconnect) | +| `PresenceStateChanged` | any other state change (e.g. ONLINE→IN_QUEUE) | + +Payload — [`events.PresenceChangedPayload`](../pkg/events/events.go): + +```json +{ "player_id", "state", "previous_state", "connection_count", "last_seen" } +``` + +- Published via the shared `events.Publisher`, so trace context is propagated + automatically and the transport (NATS/Redis) is config-selected. +- The `PRESENCE` JetStream stream has a short **15m** `MaxAge` — presence events + are disposable; JetStream is not meant to replay long presence history (Redis + is the source of truth). +- **Consumers are optional.** No service is required to subscribe; the flow is + `WS → Presence → NATS → future consumers`, with no tight coupling. +- A publish failure is logged but never fails the operation — presence is + re-derived from the next heartbeat. + +--- + +## API (internal only) + +These endpoints trust the cluster network and the supplied player IDs, like the +rest of GameMesh's internal services. No auth redesign. + +| Method | Path | Purpose | +|--------|------------------------|------------------------------------------| +| `GET` | `/presence/:id` | One player's presence (OFFLINE if absent). Read-only. | +| `POST` | `/presence/friends` | Bulk lookup; body `{"ids":[...]}`. POST so large lists aren't URL-limited. | +| `PUT` | `/presence/state` | Explicit transition; body `{"player_id","state"}`. | +| `POST` | `/presence/connect` | Register a connection (WS notifier). | +| `POST` | `/presence/disconnect` | Drop a connection (WS notifier). | +| `POST` | `/presence/heartbeat` | Refresh TTL (WS notifier). | + +`IN_QUEUE` / `IN_MATCH` are driven through `PUT /presence/state` — matchmaking and +the WS gateway can call it; matchmaking itself is unchanged in this milestone. + +### Friend lookup complexity + +`GET many` issues N `GET`s in a **single pipeline** — **O(N) commands, one network +round trip, no N+1**. Missing keys map to OFFLINE. + +--- + +## Failure & recovery scenarios + +Redis TTL is the **sole** expiration mechanism. When heartbeats stop, the key +expires and the player is OFFLINE. + +| Scenario | Behaviour | +|-------------------|-----------------------------------------------------------------| +| **WS crash** | Heartbeats stop → key TTL lapses → player OFFLINE. No explicit disconnect needed. | +| **Pod restart** | Any replica resumes heartbeats; if within TTL the key survives, else the next heartbeat re-creates it ONLINE. | +| **Network partition** | Key expires during the partition; presence self-heals on reconnect. | +| **Presence Service down** | WS notifier calls fail softly (logged, async); presence catches up from later heartbeats. | + +There is **no** reconciler, cron, SCAN worker, keyspace-notification subscriber or +lazy offline emission in this milestone. **Reads are side-effect free** — `GET` +and the friends lookup never publish events. + +> **Future work:** emit explicit `PresenceOffline` on TTL expiry via Redis +> keyspace notifications or a lightweight reconciler (with a +> `presence_expired_total` metric). Not built now to keep the milestone narrowly +> scoped and avoid extra infrastructure. + +--- + +## Observability + +Tracing spans (tracer `github.com/alpnuhoglu/gamemesh`): +`presence.connect`, `presence.heartbeat`, `presence.transition`. (No +`presence.expire` — TTL expiry is passive Redis behaviour with no code path.) + +Prometheus metrics ([`pkg/metrics`](../pkg/metrics/metrics.go), `gamemesh_` prefix, +`service` label): + +| Metric | Type | +|----------------------------------------------|-------------| +| `gamemesh_presence_online_players` | Gauge | +| `gamemesh_presence_state_transitions_total` | Counter (`from`,`to`) | +| `gamemesh_presence_heartbeat_total` | Counter | +| `gamemesh_presence_invalid_transitions_total`| Counter | + +--- + +## Foundation for the social layer + +The `playerID`-keyed model and `PresenceChanged` events (carrying +`previous_state` and `connection_count`) give the next milestones what they need +without schema changes: + +- **Friend Service** — `POST /presence/friends` for friend-list presence. +- **Party Service** — subscribe to `events.presence.*` for party member status. +- **Invites** — gate invites on a target's presence. +- **Reconnect** — `connection_count` (and the future connection-ID set) underpins + session recovery. +- **Notifications / Push** — react to `PresenceOnline`/`PresenceOffline`. +- **Spectators** — presence of match participants. + +The `events.presence.>` subject wildcard lets future consumers filter by type +without new streams. + +--- + +## Configuration + +| Env var | Default | Meaning | +|-------------------------------|---------|----------------------------------| +| `HTTP_PORT` | `8086` | Presence Service port | +| `PRESENCE_TTL` | `45s` | Presence key TTL | +| `PRESENCE_HEARTBEAT_INTERVAL` | `15s` | WS heartbeat cadence | +| `PRESENCE_SERVICE_URL` | `http://localhost:8086` | WS gateway → Presence base URL | + +--- + +## Testing + +- Unit ([`internal/presence`](../internal/presence)): transitions, multi-device, + heartbeat TTL refresh, expiry-reports-OFFLINE-without-events, reconnect, + bulk friends, concurrency, handler routes. Backed by miniredis (Lua + TTL via + `FastForward`). +- Integration ([`tests/integration/presence_test.go`](../tests/integration/presence_test.go), + `-tags integration`): real Redis container exercising true TTL expiry, + heartbeat keep-alive, multi-device and bulk friends. + +Run: `go test ./internal/presence/...` and +`go test -tags integration ./tests/integration/...`. diff --git a/internal/presence/handler.go b/internal/presence/handler.go new file mode 100644 index 0000000..5d2bf22 --- /dev/null +++ b/internal/presence/handler.go @@ -0,0 +1,127 @@ +package presence + +import ( + "context" + "errors" + "net/http" + + "github.com/gin-gonic/gin" + + "github.com/alpnuhoglu/gamemesh/pkg/httpx" +) + +// Handler exposes the internal presence HTTP API. These endpoints are called by +// other cluster services (the WS gateway for connection lifecycle, matchmaking +// for queue/match transitions, friend/social services for lookups) — they are +// not public, so they trust the cluster network and the player IDs supplied in +// the request, matching the rest of GameMesh's internal-service model. +type Handler struct { + svc *Service +} + +// NewHandler constructs the handler. +func NewHandler(svc *Service) *Handler { return &Handler{svc: svc} } + +// RegisterRoutes mounts the presence endpoints. +func (h *Handler) RegisterRoutes(r *gin.Engine) { + // Public-ish reads (still internal). + r.GET("/presence/:id", h.get) + r.POST("/presence/friends", h.friends) + r.PUT("/presence/state", h.setState) + // Connection lifecycle, driven by the WS gateway notifier. + r.POST("/presence/connect", h.connect) + r.POST("/presence/disconnect", h.disconnect) + r.POST("/presence/heartbeat", h.heartbeat) +} + +type playerRequest struct { + PlayerID string `json:"player_id" binding:"required"` +} + +type stateRequest struct { + PlayerID string `json:"player_id" binding:"required"` + State State `json:"state" binding:"required"` +} + +type friendsRequest struct { + // IDs is the friend list to look up. POST (not GET) so large lists are not + // constrained by URL length limits. + IDs []string `json:"ids" binding:"required"` +} + +func (h *Handler) get(c *gin.Context) { + id := c.Param("id") + if id == "" { + httpx.Error(c, http.StatusBadRequest, "missing player id") + return + } + rec, err := h.svc.Get(c.Request.Context(), id) + if err != nil { + httpx.Error(c, http.StatusInternalServerError, "failed to fetch presence") + return + } + httpx.OK(c, presenceResponse(id, rec)) +} + +func (h *Handler) friends(c *gin.Context) { + var req friendsRequest + if err := c.ShouldBindJSON(&req); err != nil { + httpx.Error(c, http.StatusBadRequest, "invalid request: "+err.Error()) + return + } + friends, err := h.svc.Friends(c.Request.Context(), req.IDs) + if err != nil { + httpx.Error(c, http.StatusInternalServerError, "failed to fetch friend presence") + return + } + httpx.OK(c, gin.H{"friends": friends}) +} + +func (h *Handler) setState(c *gin.Context) { + var req stateRequest + if err := c.ShouldBindJSON(&req); err != nil { + httpx.Error(c, http.StatusBadRequest, "invalid request: "+err.Error()) + return + } + rec, err := h.svc.SetState(c.Request.Context(), req.PlayerID, req.State) + if err != nil { + if errors.Is(err, ErrInvalidTransition) { + httpx.Error(c, http.StatusBadRequest, "invalid presence transition to "+string(req.State)) + return + } + httpx.Error(c, http.StatusInternalServerError, "failed to set presence state") + return + } + httpx.OK(c, presenceResponse(req.PlayerID, rec)) +} + +func (h *Handler) connect(c *gin.Context) { h.lifecycle(c, h.svc.Connect) } +func (h *Handler) disconnect(c *gin.Context) { h.lifecycle(c, h.svc.Disconnect) } +func (h *Handler) heartbeat(c *gin.Context) { h.lifecycle(c, h.svc.Heartbeat) } + +// lifecycle is the shared body for connect/disconnect/heartbeat: bind the +// player id, run the op, return the resulting record. All three ops share the +// same (ctx, playerID) -> (Record, error) shape. +func (h *Handler) lifecycle(c *gin.Context, op func(ctx context.Context, playerID string) (Record, error)) { + var req playerRequest + if err := c.ShouldBindJSON(&req); err != nil { + httpx.Error(c, http.StatusBadRequest, "invalid request: "+err.Error()) + return + } + rec, err := op(c.Request.Context(), req.PlayerID) + if err != nil { + httpx.Error(c, http.StatusInternalServerError, "failed to update presence") + return + } + httpx.OK(c, presenceResponse(req.PlayerID, rec)) +} + +func presenceResponse(playerID string, rec Record) gin.H { + return gin.H{ + "player_id": playerID, + "state": rec.State, + "last_seen": rec.LastSeen, + "connection_count": rec.ConnectionCount, + "updated_at": rec.UpdatedAt, + } +} diff --git a/internal/presence/handler_test.go b/internal/presence/handler_test.go new file mode 100644 index 0000000..8102cd1 --- /dev/null +++ b/internal/presence/handler_test.go @@ -0,0 +1,86 @@ +package presence + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/alicebob/miniredis/v2" + "github.com/gin-gonic/gin" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func newHandlerRouter(t *testing.T) *gin.Engine { + t.Helper() + mr := miniredis.RunT(t) + rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + t.Cleanup(func() { _ = rdb.Close() }) + + svc := NewService(NewRepository(rdb, testTTL), &capturingPublisher{}, nil, zap.NewNop()) + gin.SetMode(gin.TestMode) + r := gin.New() + NewHandler(svc).RegisterRoutes(r) + return r +} + +func do(r *gin.Engine, method, path, body string) *httptest.ResponseRecorder { + w := httptest.NewRecorder() + req := httptest.NewRequest(method, path, strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + r.ServeHTTP(w, req) + return w +} + +func TestHandlerConnectThenGet(t *testing.T) { + r := newHandlerRouter(t) + + w := do(r, http.MethodPost, "/presence/connect", `{"player_id":"alice"}`) + require.Equal(t, http.StatusOK, w.Code) + + w = do(r, http.MethodGet, "/presence/alice", "") + require.Equal(t, http.StatusOK, w.Code) + var body map[string]any + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &body)) + assert.Equal(t, "ONLINE", body["state"]) +} + +func TestHandlerGetUnknownIsOffline(t *testing.T) { + r := newHandlerRouter(t) + w := do(r, http.MethodGet, "/presence/nobody", "") + require.Equal(t, http.StatusOK, w.Code) + var body map[string]any + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &body)) + assert.Equal(t, "OFFLINE", body["state"]) +} + +func TestHandlerSetStateInvalidTransitionIs400(t *testing.T) { + r := newHandlerRouter(t) + w := do(r, http.MethodPut, "/presence/state", `{"player_id":"ghost","state":"IN_MATCH"}`) + assert.Equal(t, http.StatusBadRequest, w.Code) +} + +func TestHandlerFriendsBulk(t *testing.T) { + r := newHandlerRouter(t) + require.Equal(t, http.StatusOK, do(r, http.MethodPost, "/presence/connect", `{"player_id":"a"}`).Code) + + w := do(r, http.MethodPost, "/presence/friends", `{"ids":["a","b"]}`) + require.Equal(t, http.StatusOK, w.Code) + var body struct { + Friends []Friend `json:"friends"` + } + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &body)) + require.Len(t, body.Friends, 2) + assert.Equal(t, StateOnline, body.Friends[0].State) + assert.Equal(t, StateOffline, body.Friends[1].State) +} + +func TestHandlerFriendsRejectsMissingIDs(t *testing.T) { + r := newHandlerRouter(t) + w := do(r, http.MethodPost, "/presence/friends", `{}`) + assert.Equal(t, http.StatusBadRequest, w.Code) +} diff --git a/internal/presence/model.go b/internal/presence/model.go new file mode 100644 index 0000000..3bfc653 --- /dev/null +++ b/internal/presence/model.go @@ -0,0 +1,91 @@ +// Package presence tracks where every player is — OFFLINE, ONLINE, IN_QUEUE, +// IN_MATCH or AWAY — as a distributed, self-healing view backed by Redis. +// +// Presence is deliberately NOT a boolean. Redis is the source of truth: each +// player has a presence:{playerID} key carrying state, last_seen and a +// connection_count, with a TTL that heartbeats refresh. If heartbeats stop (a +// WS crash, a pod restart, a network partition) the key expires and the player +// is OFFLINE — no explicit disconnect is required and the system self-heals. +// Any WS replica can update any player, so no sticky sessions or leader +// election are needed. +package presence + +import ( + "encoding/json" + "time" +) + +// State is a player's presence state. Values are stable strings shared on the +// wire (Redis JSON, presence events, HTTP) so consumers can compare them +// directly. +type State string + +const ( + StateOffline State = "OFFLINE" + StateOnline State = "ONLINE" + StateInQueue State = "IN_QUEUE" + StateInMatch State = "IN_MATCH" + StateAway State = "AWAY" +) + +// Valid reports whether s is a recognised state. +func (s State) Valid() bool { + switch s { + case StateOffline, StateOnline, StateInQueue, StateInMatch, StateAway: + return true + default: + return false + } +} + +// CanTransition reports whether moving from `s` to `to` is allowed. +// +// The rule is deliberately permissive so future reconnect, rematch and party +// flows never require changing the state machine. Only two transitions are +// rejected — both require an active connection that an offline player does not +// have: +// +// OFFLINE -> IN_QUEUE +// OFFLINE -> IN_MATCH +// +// Everything else (ONLINE->IN_QUEUE, IN_MATCH->ONLINE, IN_QUEUE->IN_MATCH, +// AWAY->IN_QUEUE, …) is allowed. OFFLINE->ONLINE happens implicitly on connect. +func CanTransition(from, to State) bool { + if !to.Valid() { + return false + } + if from == StateOffline && (to == StateInQueue || to == StateInMatch) { + return false + } + return true +} + +// Record is the value stored at presence:{playerID}. It is marshalled to JSON +// for Redis; last_seen / updated_at are Unix seconds for compact, stable wire +// representation. +type Record struct { + State State `json:"state"` + LastSeen int64 `json:"last_seen"` + ConnectionCount int `json:"connection_count"` + UpdatedAt int64 `json:"updated_at"` +} + +// MarshalBinary lets the Record be written directly with redis SET. go-redis +// calls this for any encoding.BinaryMarshaler argument. +func (r Record) MarshalBinary() ([]byte, error) { return json.Marshal(r) } + +// offlineRecord is the zero-presence value returned for any player with no live +// key — a missing key IS OFFLINE. +func offlineRecord() Record { + return Record{State: StateOffline, ConnectionCount: 0} +} + +// Friend is one entry in a bulk friend-presence lookup. +type Friend struct { + PlayerID string `json:"player_id"` + State State `json:"state"` + LastSeen int64 `json:"last_seen"` +} + +// now is indirected so tests can pin time. Production uses time.Now. +var now = func() time.Time { return time.Now() } diff --git a/internal/presence/model_test.go b/internal/presence/model_test.go new file mode 100644 index 0000000..ac6c890 --- /dev/null +++ b/internal/presence/model_test.go @@ -0,0 +1,48 @@ +package presence + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCanTransition_RejectsOnlyOfflineToActive(t *testing.T) { + // The only rejected transitions: an offline player (no connection) cannot + // jump straight into queue or match. + assert.False(t, CanTransition(StateOffline, StateInQueue)) + assert.False(t, CanTransition(StateOffline, StateInMatch)) + + // Everything else is permitted so future reconnect/rematch/party flows do + // not need to touch the state machine. + allowed := [][2]State{ + {StateOffline, StateOnline}, + {StateOffline, StateAway}, + {StateOnline, StateInQueue}, + {StateOnline, StateInMatch}, + {StateOnline, StateAway}, + {StateOnline, StateOffline}, + {StateInQueue, StateInMatch}, + {StateInQueue, StateOnline}, + {StateInQueue, StateAway}, + {StateInMatch, StateOnline}, + {StateInMatch, StateAway}, + {StateInMatch, StateOffline}, + {StateAway, StateInQueue}, + {StateAway, StateOnline}, + } + for _, tc := range allowed { + assert.Truef(t, CanTransition(tc[0], tc[1]), "expected %s->%s allowed", tc[0], tc[1]) + } +} + +func TestCanTransition_RejectsUnknownTargetState(t *testing.T) { + assert.False(t, CanTransition(StateOnline, State("BOGUS"))) +} + +func TestStateValid(t *testing.T) { + for _, s := range []State{StateOffline, StateOnline, StateInQueue, StateInMatch, StateAway} { + assert.True(t, s.Valid()) + } + assert.False(t, State("NOPE").Valid()) + assert.False(t, State("").Valid()) +} diff --git a/internal/presence/notifier.go b/internal/presence/notifier.go new file mode 100644 index 0000000..067e86b --- /dev/null +++ b/internal/presence/notifier.go @@ -0,0 +1,92 @@ +package presence + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/alpnuhoglu/gamemesh/pkg/tracing" +) + +// Notifier is the small contract the WebSocket gateway depends on to feed the +// Presence Service connection lifecycle. Keeping it an interface (rather than +// importing the concrete client) means the WS hub couples to three method +// signatures, not to presence internals — and a nil Notifier is a valid no-op, +// so the gateway still runs standalone without a Presence Service. +type Notifier interface { + Connect(ctx context.Context, playerID string) error + Disconnect(ctx context.Context, playerID string) error + Heartbeat(ctx context.Context, playerID string) error +} + +// HTTPNotifier calls the Presence Service over HTTP. It propagates the W3C trace +// context on every call so a WS connect/disconnect shows up in the same trace as +// the resulting presence transition. +type HTTPNotifier struct { + baseURL string + client *http.Client +} + +// NewHTTPNotifier builds a notifier targeting the Presence Service base URL +// (e.g. http://presence:8086). The timeout keeps a slow/absent presence service +// from blocking WS connection handling. +func NewHTTPNotifier(baseURL string, timeout time.Duration) *HTTPNotifier { + if timeout <= 0 { + timeout = 2 * time.Second + } + return &HTTPNotifier{ + baseURL: baseURL, + client: &http.Client{Timeout: timeout}, + } +} + +func (n *HTTPNotifier) Connect(ctx context.Context, playerID string) error { + return n.post(ctx, "/presence/connect", playerID) +} + +func (n *HTTPNotifier) Disconnect(ctx context.Context, playerID string) error { + return n.post(ctx, "/presence/disconnect", playerID) +} + +func (n *HTTPNotifier) Heartbeat(ctx context.Context, playerID string) error { + return n.post(ctx, "/presence/heartbeat", playerID) +} + +func (n *HTTPNotifier) post(ctx context.Context, path, playerID string) error { + body, err := json.Marshal(playerRequest{PlayerID: playerID}) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, n.baseURL+path, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + // Propagate trace context so the WS->presence hop joins the same trace. + carrier := tracing.InjectCarrier(ctx, map[string]string{}) + for k, v := range carrier { + req.Header.Set(k, v) + } + + resp, err := n.client.Do(req) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode >= 300 { + return fmt.Errorf("presence %s: status %d", path, resp.StatusCode) + } + return nil +} + +// NoopNotifier satisfies Notifier and does nothing. It is the default when no +// Presence Service is configured, so the WS gateway runs unchanged. +type NoopNotifier struct{} + +func (NoopNotifier) Connect(context.Context, string) error { return nil } +func (NoopNotifier) Disconnect(context.Context, string) error { return nil } +func (NoopNotifier) Heartbeat(context.Context, string) error { return nil } diff --git a/internal/presence/repository.go b/internal/presence/repository.go new file mode 100644 index 0000000..6a5b92c --- /dev/null +++ b/internal/presence/repository.go @@ -0,0 +1,229 @@ +package presence + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/redis/go-redis/v9" +) + +// keyPrefix namespaces every presence key. A player's record lives at +// presence:{playerID}. +const keyPrefix = "presence:" + +func key(playerID string) string { return keyPrefix + playerID } + +// ErrInvalidTransition is returned by SetState when the requested transition is +// rejected by the (permissive) state machine. +var ErrInvalidTransition = errors.New("invalid presence transition") + +// Repository is the Redis-backed presence store. All mutating operations are +// implemented as Lua scripts so a read-modify-write (e.g. incrementing the +// connection count and possibly changing state) is atomic on the server: two WS +// replicas connecting/disconnecting the same player concurrently can never lose +// an update or clobber each other. +type Repository struct { + rdb *redis.Client + ttl time.Duration +} + +// NewRepository wraps an existing Redis client. ttl is the presence key +// lifetime; heartbeats refresh it, and once it lapses the player is OFFLINE. +func NewRepository(rdb *redis.Client, ttl time.Duration) *Repository { + return &Repository{rdb: rdb, ttl: ttl} +} + +// mutation is the JSON result every mutating Lua script returns: the record as +// it was before the call (or an OFFLINE zero value if the key was absent) and +// as it is after. The service uses both to decide which event to emit. +type mutation struct { + Previous Record + Current Record +} + +// --- Lua scripts ----------------------------------------------------------- +// +// Each script reads the current record (cjson), mutates it, writes it back with +// the TTL via SET ... EX, and returns {prev, cur} as a JSON array so Go can +// unmarshal both halves in one round trip. Absent keys decode to an OFFLINE +// zero value. A connection_count that reaches 0 deletes the key entirely so the +// player reverts to "missing == OFFLINE" with no lingering record. + +const luaPreamble = ` +local raw = redis.call('GET', KEYS[1]) +local prev +if raw then + prev = cjson.decode(raw) +else + prev = {state='OFFLINE', last_seen=0, connection_count=0, updated_at=0} +end +local cur = {state=prev.state, last_seen=prev.last_seen, connection_count=prev.connection_count, updated_at=prev.updated_at} +local nowSec = tonumber(ARGV[1]) +local ttl = tonumber(ARGV[2]) +` + +// connect: bump connection_count; if the player was OFFLINE (no live session) +// promote to ONLINE. An already-elevated state (IN_QUEUE/IN_MATCH/AWAY) is +// preserved so opening a second device does not knock a player out of queue. +const luaConnect = luaPreamble + ` +cur.connection_count = cur.connection_count + 1 +if cur.state == 'OFFLINE' then + cur.state = 'ONLINE' +end +cur.last_seen = nowSec +cur.updated_at = nowSec +redis.call('SET', KEYS[1], cjson.encode(cur), 'EX', ttl) +return cjson.encode({prev, cur}) +` + +// disconnect: drop connection_count; at 0 the player is fully gone — delete the +// key so it reads as OFFLINE. Never goes negative. +const luaDisconnect = luaPreamble + ` +cur.connection_count = cur.connection_count - 1 +if cur.connection_count <= 0 then + cur.connection_count = 0 + cur.state = 'OFFLINE' + cur.last_seen = nowSec + cur.updated_at = nowSec + redis.call('DEL', KEYS[1]) + return cjson.encode({prev, cur}) +end +cur.last_seen = nowSec +cur.updated_at = nowSec +redis.call('SET', KEYS[1], cjson.encode(cur), 'EX', ttl) +return cjson.encode({prev, cur}) +` + +// heartbeat: refresh last_seen/updated_at and the TTL. If the key is absent +// (expired between beats, or first beat for a session WS already owns) treat it +// as an implicit connect so presence self-heals without an explicit connect. +const luaHeartbeat = luaPreamble + ` +if not raw then + cur.connection_count = 1 + cur.state = 'ONLINE' +end +cur.last_seen = nowSec +cur.updated_at = nowSec +redis.call('SET', KEYS[1], cjson.encode(cur), 'EX', ttl) +return cjson.encode({prev, cur}) +` + +// setState: write the requested state (ARGV[3]) and refresh the TTL. The +// permissive transition rule is enforced in Go before this runs; the script +// also refuses to resurrect a missing key into IN_QUEUE/IN_MATCH (those need an +// active connection) so a stale API call cannot fabricate presence. +const luaSetState = luaPreamble + ` +local target = ARGV[3] +if not raw and (target == 'IN_QUEUE' or target == 'IN_MATCH') then + return cjson.encode({prev, prev}) +end +if not raw then + cur.connection_count = 0 +end +cur.state = target +cur.updated_at = nowSec +if cur.last_seen == 0 then cur.last_seen = nowSec end +redis.call('SET', KEYS[1], cjson.encode(cur), 'EX', ttl) +return cjson.encode({prev, cur}) +` + +var ( + scriptConnect = redis.NewScript(luaConnect) + scriptDisconnect = redis.NewScript(luaDisconnect) + scriptHeartbeat = redis.NewScript(luaHeartbeat) + scriptSetState = redis.NewScript(luaSetState) +) + +func (r *Repository) ttlSeconds() int { return int(r.ttl / time.Second) } + +func (r *Repository) runMutation(ctx context.Context, s *redis.Script, playerID string, extra ...any) (mutation, error) { + args := append([]any{now().Unix(), r.ttlSeconds()}, extra...) + res, err := s.Run(ctx, r.rdb, []string{key(playerID)}, args...).Result() + if err != nil { + return mutation{}, err + } + str, ok := res.(string) + if !ok { + return mutation{}, fmt.Errorf("presence: unexpected script result type %T", res) + } + var pair [2]Record + if err := json.Unmarshal([]byte(str), &pair); err != nil { + return mutation{}, fmt.Errorf("presence: decode script result: %w", err) + } + return mutation{Previous: pair[0], Current: pair[1]}, nil +} + +// Connect registers a new connection for the player (multi-device safe). +func (r *Repository) Connect(ctx context.Context, playerID string) (mutation, error) { + return r.runMutation(ctx, scriptConnect, playerID) +} + +// Disconnect drops one connection; the player goes OFFLINE only at zero. +func (r *Repository) Disconnect(ctx context.Context, playerID string) (mutation, error) { + return r.runMutation(ctx, scriptDisconnect, playerID) +} + +// Heartbeat refreshes the TTL (and re-creates an expired record). +func (r *Repository) Heartbeat(ctx context.Context, playerID string) (mutation, error) { + return r.runMutation(ctx, scriptHeartbeat, playerID) +} + +// SetState writes an explicit state for the player and refreshes the TTL. +func (r *Repository) SetState(ctx context.Context, playerID string, to State) (mutation, error) { + return r.runMutation(ctx, scriptSetState, playerID, string(to)) +} + +// Get returns the player's current record. A missing key reads as OFFLINE. This +// is a pure read with no side effects — no events are emitted here. +func (r *Repository) Get(ctx context.Context, playerID string) (Record, error) { + raw, err := r.rdb.Get(ctx, key(playerID)).Result() + if errors.Is(err, redis.Nil) { + return offlineRecord(), nil + } + if err != nil { + return Record{}, err + } + var rec Record + if err := json.Unmarshal([]byte(raw), &rec); err != nil { + return Record{}, fmt.Errorf("presence: decode record: %w", err) + } + return rec, nil +} + +// GetMany returns one record per playerID in a single pipelined round trip — +// O(N) GETs, one network round trip, no N+1. Missing keys map to OFFLINE. +func (r *Repository) GetMany(ctx context.Context, playerIDs []string) (map[string]Record, error) { + out := make(map[string]Record, len(playerIDs)) + if len(playerIDs) == 0 { + return out, nil + } + pipe := r.rdb.Pipeline() + cmds := make([]*redis.StringCmd, len(playerIDs)) + for i, id := range playerIDs { + cmds[i] = pipe.Get(ctx, key(id)) + } + // Exec returns redis.Nil if ANY command missed; that is expected (offline + // friends), so we inspect each command rather than failing the batch. + if _, err := pipe.Exec(ctx); err != nil && !errors.Is(err, redis.Nil) { + return nil, err + } + for i, id := range playerIDs { + raw, err := cmds[i].Result() + if errors.Is(err, redis.Nil) { + out[id] = offlineRecord() + continue + } + if err != nil { + return nil, err + } + var rec Record + if err := json.Unmarshal([]byte(raw), &rec); err != nil { + return nil, fmt.Errorf("presence: decode record for %s: %w", id, err) + } + out[id] = rec + } + return out, nil +} diff --git a/internal/presence/service.go b/internal/presence/service.go new file mode 100644 index 0000000..a287481 --- /dev/null +++ b/internal/presence/service.go @@ -0,0 +1,194 @@ +package presence + +import ( + "context" + + "github.com/alpnuhoglu/gamemesh/pkg/events" + "github.com/alpnuhoglu/gamemesh/pkg/metrics" + "github.com/alpnuhoglu/gamemesh/pkg/tracing" + "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap" +) + +// Service is the presence domain logic. It turns connection lifecycle calls +// (connect/disconnect/heartbeat) and explicit state changes into atomic Redis +// mutations, then publishes the resulting transitions as events. Reads are +// pure: Get/GetMany never mutate or publish. +type Service struct { + repo *Repository + bus events.Publisher + m *metrics.Metrics + log *zap.Logger +} + +// NewService wires the repository, event publisher and metrics together. bus +// may publish to NATS or Redis (the events.Publisher abstraction); consumers +// are optional, so a publish failure is logged but never fails the operation — +// presence is re-derived from the next heartbeat. +func NewService(repo *Repository, bus events.Publisher, m *metrics.Metrics, log *zap.Logger) *Service { + return &Service{repo: repo, bus: bus, m: m, log: log} +} + +// Connect registers a new connection for the player and returns the resulting +// record. The player becomes ONLINE on the first connection; additional +// connections only bump the count. +func (s *Service) Connect(ctx context.Context, playerID string) (Record, error) { + ctx, span := tracing.Tracer().Start(ctx, "presence.connect") + defer span.End() + span.SetAttributes(attribute.String("presence.player_id", playerID)) + + mut, err := s.repo.Connect(ctx, playerID) + if err != nil { + tracing.RecordError(span, err) + return Record{}, err + } + s.publishTransition(ctx, playerID, mut) + return mut.Current, nil +} + +// Disconnect drops one of the player's connections. The player goes OFFLINE +// only when the last connection closes (connection_count reaches 0). +func (s *Service) Disconnect(ctx context.Context, playerID string) (Record, error) { + ctx, span := tracing.Tracer().Start(ctx, "presence.transition") + defer span.End() + span.SetAttributes( + attribute.String("presence.player_id", playerID), + attribute.String("presence.op", "disconnect"), + ) + + mut, err := s.repo.Disconnect(ctx, playerID) + if err != nil { + tracing.RecordError(span, err) + return Record{}, err + } + s.publishTransition(ctx, playerID, mut) + return mut.Current, nil +} + +// Heartbeat refreshes the player's presence TTL. If the record had expired it +// is re-created (ONLINE), so presence self-heals after a crash without an +// explicit reconnect. +func (s *Service) Heartbeat(ctx context.Context, playerID string) (Record, error) { + ctx, span := tracing.Tracer().Start(ctx, "presence.heartbeat") + defer span.End() + span.SetAttributes(attribute.String("presence.player_id", playerID)) + + mut, err := s.repo.Heartbeat(ctx, playerID) + if err != nil { + tracing.RecordError(span, err) + return Record{}, err + } + if s.m != nil { + s.m.PresenceHeartbeatTotal.Inc() + } + // A heartbeat that re-created an expired record is a real ONLINE transition. + s.publishTransition(ctx, playerID, mut) + return mut.Current, nil +} + +// SetState applies an explicit transition (e.g. ONLINE->IN_QUEUE, +// IN_QUEUE->IN_MATCH). The rule is permissive (see CanTransition); only +// OFFLINE->IN_QUEUE / OFFLINE->IN_MATCH are rejected. +func (s *Service) SetState(ctx context.Context, playerID string, to State) (Record, error) { + ctx, span := tracing.Tracer().Start(ctx, "presence.transition") + defer span.End() + span.SetAttributes( + attribute.String("presence.player_id", playerID), + attribute.String("presence.to", string(to)), + ) + + if !to.Valid() { + if s.m != nil { + s.m.PresenceInvalidTransitions.Inc() + } + return Record{}, ErrInvalidTransition + } + + current, err := s.repo.Get(ctx, playerID) + if err != nil { + tracing.RecordError(span, err) + return Record{}, err + } + if !CanTransition(current.State, to) { + if s.m != nil { + s.m.PresenceInvalidTransitions.Inc() + } + span.SetAttributes(attribute.String("presence.from", string(current.State))) + return Record{}, ErrInvalidTransition + } + + mut, err := s.repo.SetState(ctx, playerID, to) + if err != nil { + tracing.RecordError(span, err) + return Record{}, err + } + s.publishTransition(ctx, playerID, mut) + return mut.Current, nil +} + +// Get returns a single player's current presence (OFFLINE if absent). Pure read. +func (s *Service) Get(ctx context.Context, playerID string) (Record, error) { + return s.repo.Get(ctx, playerID) +} + +// Friends returns presence for a batch of players in one pipelined round trip. +// Optimised for friend lists: O(N) commands, one round trip, no N+1. Pure read. +func (s *Service) Friends(ctx context.Context, playerIDs []string) ([]Friend, error) { + recs, err := s.repo.GetMany(ctx, playerIDs) + if err != nil { + return nil, err + } + out := make([]Friend, 0, len(playerIDs)) + for _, id := range playerIDs { + rec := recs[id] + out = append(out, Friend{PlayerID: id, State: rec.State, LastSeen: rec.LastSeen}) + } + return out, nil +} + +// publishTransition records metrics and emits the appropriate presence event +// when a mutation actually changed the player's state. A no-op mutation (e.g. a +// second device connecting to an already-ONLINE player) records the heartbeat +// effect but emits no state-change event. +func (s *Service) publishTransition(ctx context.Context, playerID string, mut mutation) { + prev, cur := mut.Previous.State, mut.Current.State + if prev == cur { + return + } + + if s.m != nil { + s.m.PresenceStateTransitionsTotal.WithLabelValues(string(prev), string(cur)).Inc() + switch { + case prev == StateOffline && cur != StateOffline: + s.m.PresenceOnlinePlayers.Inc() + case prev != StateOffline && cur == StateOffline: + s.m.PresenceOnlinePlayers.Dec() + } + } + + eventType := events.TypePresenceStateChanged + switch { + case prev == StateOffline && cur != StateOffline: + eventType = events.TypePresenceOnline + case cur == StateOffline: + eventType = events.TypePresenceOffline + } + + e, err := events.New(eventType, events.PresenceChangedPayload{ + PlayerID: playerID, + State: string(cur), + PreviousState: string(prev), + ConnectionCount: mut.Current.ConnectionCount, + LastSeen: mut.Current.LastSeen, + }) + if err != nil { + s.log.Warn("presence: build event failed", zap.Error(err), zap.String("player_id", playerID)) + return + } + if err := s.bus.Publish(ctx, events.TopicPresence, e); err != nil { + // Non-fatal: presence is re-derived from the next heartbeat, so a dropped + // event never corrupts the source of truth (Redis). + s.log.Warn("presence: publish event failed", zap.Error(err), + zap.String("player_id", playerID), zap.String("type", eventType)) + } +} diff --git a/internal/presence/service_test.go b/internal/presence/service_test.go new file mode 100644 index 0000000..25fb91c --- /dev/null +++ b/internal/presence/service_test.go @@ -0,0 +1,272 @@ +package presence + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/alpnuhoglu/gamemesh/pkg/events" +) + +const testTTL = 45 * time.Second + +// capturingPublisher records every published event so tests can assert on the +// presence event stream. Safe for concurrent use. +type capturingPublisher struct { + mu sync.Mutex + events []events.Event +} + +func (p *capturingPublisher) Publish(_ context.Context, _ string, e events.Event) error { + p.mu.Lock() + defer p.mu.Unlock() + p.events = append(p.events, e) + return nil +} + +func (p *capturingPublisher) Close() error { return nil } + +func (p *capturingPublisher) types() []string { + p.mu.Lock() + defer p.mu.Unlock() + out := make([]string, len(p.events)) + for i, e := range p.events { + out[i] = e.Type + } + return out +} + +func newTestService(t *testing.T) (*Service, *capturingPublisher, *miniredis.Miniredis) { + t.Helper() + mr := miniredis.RunT(t) + rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + t.Cleanup(func() { _ = rdb.Close() }) + pub := &capturingPublisher{} + svc := NewService(NewRepository(rdb, testTTL), pub, nil, zap.NewNop()) + return svc, pub, mr +} + +func TestConnectGoesOnlineAndEmitsOnline(t *testing.T) { + svc, pub, _ := newTestService(t) + ctx := context.Background() + + rec, err := svc.Connect(ctx, "alice") + require.NoError(t, err) + assert.Equal(t, StateOnline, rec.State) + assert.Equal(t, 1, rec.ConnectionCount) + assert.Equal(t, []string{events.TypePresenceOnline}, pub.types()) +} + +func TestMultiDeviceStaysOnlineUntilLastDisconnect(t *testing.T) { + svc, pub, _ := newTestService(t) + ctx := context.Background() + + // Three devices connect. + for i := 0; i < 3; i++ { + _, err := svc.Connect(ctx, "alice") + require.NoError(t, err) + } + rec, err := svc.Get(ctx, "alice") + require.NoError(t, err) + assert.Equal(t, 3, rec.ConnectionCount) + assert.Equal(t, StateOnline, rec.State) + + // First two disconnects keep the player ONLINE. + for i := 0; i < 2; i++ { + rec, err = svc.Disconnect(ctx, "alice") + require.NoError(t, err) + assert.Equal(t, StateOnline, rec.State) + } + + // Last disconnect → OFFLINE. + rec, err = svc.Disconnect(ctx, "alice") + require.NoError(t, err) + assert.Equal(t, StateOffline, rec.State) + assert.Equal(t, 0, rec.ConnectionCount) + + // Exactly one PresenceOnline (first connect) and one PresenceOffline (last + // disconnect) — the middle connects/disconnects do not change state, so no + // extra events. + assert.Equal(t, []string{events.TypePresenceOnline, events.TypePresenceOffline}, pub.types()) +} + +func TestHeartbeatRefreshesTTL(t *testing.T) { + svc, _, mr := newTestService(t) + ctx := context.Background() + + _, err := svc.Connect(ctx, "alice") + require.NoError(t, err) + assert.InDelta(t, testTTL.Seconds(), mr.TTL(key("alice")).Seconds(), 1) + + // Age the key partway, then heartbeat and confirm the TTL is reset. + mr.FastForward(30 * time.Second) + assert.Less(t, mr.TTL(key("alice")).Seconds(), testTTL.Seconds()) + + _, err = svc.Heartbeat(ctx, "alice") + require.NoError(t, err) + assert.InDelta(t, testTTL.Seconds(), mr.TTL(key("alice")).Seconds(), 1) +} + +func TestTTLExpiryReportsOfflineWithoutEvents(t *testing.T) { + svc, pub, mr := newTestService(t) + ctx := context.Background() + + _, err := svc.Connect(ctx, "alice") + require.NoError(t, err) + + // Simulate stopped heartbeats: let the key expire. + mr.FastForward(testTTL + time.Second) + + rec, err := svc.Get(ctx, "alice") + require.NoError(t, err) + assert.Equal(t, StateOffline, rec.State) + + // Reads are side-effect free: no PresenceOffline emitted on expiry/read. + assert.Equal(t, []string{events.TypePresenceOnline}, pub.types(), + "only the connect event should exist; reads must not publish") +} + +func TestHeartbeatRecreatesExpiredRecord(t *testing.T) { + svc, _, mr := newTestService(t) + ctx := context.Background() + + _, err := svc.Connect(ctx, "alice") + require.NoError(t, err) + mr.FastForward(testTTL + time.Second) // crash: key gone + + // A resumed heartbeat self-heals presence back to ONLINE. + rec, err := svc.Heartbeat(ctx, "alice") + require.NoError(t, err) + assert.Equal(t, StateOnline, rec.State) + assert.Equal(t, 1, rec.ConnectionCount) +} + +func TestSetStateAllowsQueueAndMatchTransitions(t *testing.T) { + svc, pub, _ := newTestService(t) + ctx := context.Background() + + _, err := svc.Connect(ctx, "alice") + require.NoError(t, err) + + rec, err := svc.SetState(ctx, "alice", StateInQueue) + require.NoError(t, err) + assert.Equal(t, StateInQueue, rec.State) + + rec, err = svc.SetState(ctx, "alice", StateInMatch) + require.NoError(t, err) + assert.Equal(t, StateInMatch, rec.State) + + rec, err = svc.SetState(ctx, "alice", StateOnline) + require.NoError(t, err) + assert.Equal(t, StateOnline, rec.State) + + // online, in_queue, in_match, online → 3 state-changed style events after + // the initial PresenceOnline. + assert.Equal(t, []string{ + events.TypePresenceOnline, + events.TypePresenceStateChanged, + events.TypePresenceStateChanged, + events.TypePresenceStateChanged, + }, pub.types()) +} + +func TestSetStateRejectsOfflineToActive(t *testing.T) { + svc, pub, _ := newTestService(t) + ctx := context.Background() + + _, err := svc.SetState(ctx, "ghost", StateInQueue) + require.ErrorIs(t, err, ErrInvalidTransition) + + _, err = svc.SetState(ctx, "ghost", StateInMatch) + require.ErrorIs(t, err, ErrInvalidTransition) + + // Nothing was published for rejected transitions, and the player stays OFFLINE. + assert.Empty(t, pub.types()) + rec, err := svc.Get(ctx, "ghost") + require.NoError(t, err) + assert.Equal(t, StateOffline, rec.State) +} + +func TestSetStateRejectsUnknownState(t *testing.T) { + svc, _, _ := newTestService(t) + _, err := svc.SetState(context.Background(), "alice", State("FLYING")) + require.ErrorIs(t, err, ErrInvalidTransition) +} + +func TestFriendsBulkLookup(t *testing.T) { + svc, _, _ := newTestService(t) + ctx := context.Background() + + _, err := svc.Connect(ctx, "online1") + require.NoError(t, err) + _, err = svc.Connect(ctx, "online2") + require.NoError(t, err) + _, err = svc.SetState(ctx, "online2", StateInMatch) + require.NoError(t, err) + + friends, err := svc.Friends(ctx, []string{"online1", "missing", "online2"}) + require.NoError(t, err) + require.Len(t, friends, 3) + + byID := map[string]Friend{} + for _, f := range friends { + byID[f.PlayerID] = f + } + assert.Equal(t, StateOnline, byID["online1"].State) + assert.Equal(t, StateInMatch, byID["online2"].State) + // A friend with no live key reads as OFFLINE (no N+1, no error). + assert.Equal(t, StateOffline, byID["missing"].State) +} + +func TestFriendsEmptyInput(t *testing.T) { + svc, _, _ := newTestService(t) + friends, err := svc.Friends(context.Background(), nil) + require.NoError(t, err) + assert.Empty(t, friends) +} + +func TestReconnectRestoresOnline(t *testing.T) { + svc, _, _ := newTestService(t) + ctx := context.Background() + + _, err := svc.Connect(ctx, "alice") + require.NoError(t, err) + rec, err := svc.Disconnect(ctx, "alice") + require.NoError(t, err) + require.Equal(t, StateOffline, rec.State) + + rec, err = svc.Connect(ctx, "alice") + require.NoError(t, err) + assert.Equal(t, StateOnline, rec.State) + assert.Equal(t, 1, rec.ConnectionCount) +} + +func TestConcurrentConnectsCountCorrectly(t *testing.T) { + svc, _, _ := newTestService(t) + ctx := context.Background() + + const n = 20 + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + _, _ = svc.Connect(ctx, "alice") + }() + } + wg.Wait() + + // The Lua-script read-modify-write must not lose any increment under + // concurrency: all N connections counted. + rec, err := svc.Get(ctx, "alice") + require.NoError(t, err) + assert.Equal(t, n, rec.ConnectionCount) + assert.Equal(t, StateOnline, rec.State) +} diff --git a/internal/wsgateway/handler_test.go b/internal/wsgateway/handler_test.go index 9c1bc3e..1849d36 100644 --- a/internal/wsgateway/handler_test.go +++ b/internal/wsgateway/handler_test.go @@ -22,7 +22,7 @@ import ( func newWSServer(t *testing.T) (*httptest.Server, *auth.TokenManager, *Hub) { t.Helper() tokens := auth.NewTokenManager("ws-test-secret", time.Hour, "gamemesh") - hub := NewHub(zap.NewNop(), metrics.New("ws-handler-test")) + hub := NewHub(zap.NewNop(), metrics.New("ws-handler-test"), nil) handler := NewHandler(hub, tokens, []string{"*"}, zap.NewNop()) gin.SetMode(gin.TestMode) diff --git a/internal/wsgateway/hub.go b/internal/wsgateway/hub.go index e3661b4..633deb8 100644 --- a/internal/wsgateway/hub.go +++ b/internal/wsgateway/hub.go @@ -3,11 +3,14 @@ package wsgateway import ( + "context" "encoding/json" "sync" + "time" "go.uber.org/zap" + "github.com/alpnuhoglu/gamemesh/internal/presence" "github.com/alpnuhoglu/gamemesh/pkg/metrics" ) @@ -33,28 +36,43 @@ type Hub struct { log *zap.Logger m *metrics.Metrics + + // presence feeds player presence to the Presence Service. It is an injected + // interface and never nil (NoopNotifier when no Presence Service is wired), + // so the gateway stays decoupled from presence internals and runs standalone. + presence presence.Notifier } -// NewHub constructs an empty hub. -func NewHub(log *zap.Logger, m *metrics.Metrics) *Hub { +// NewHub constructs an empty hub. A nil notifier becomes a no-op, so existing +// callers (and tests) that do not care about presence keep working unchanged. +func NewHub(log *zap.Logger, m *metrics.Metrics, notifier presence.Notifier) *Hub { + if notifier == nil { + notifier = presence.NoopNotifier{} + } return &Hub{ clients: make(map[*Client]struct{}), byPlayer: make(map[string]map[*Client]struct{}), rooms: make(map[string]map[*Client]struct{}), log: log, m: m, + presence: notifier, } } func (h *Hub) register(c *Client) { h.mu.Lock() - defer h.mu.Unlock() h.clients[c] = struct{}{} if h.byPlayer[c.playerID] == nil { h.byPlayer[c.playerID] = make(map[*Client]struct{}) } h.byPlayer[c.playerID][c] = struct{}{} h.m.WSConnections.Set(float64(len(h.clients))) + h.mu.Unlock() + + // Each connection is one presence connection (the Presence Service does the + // multi-device counting). Fire async with a fresh context so a slow/absent + // Presence Service never blocks accepting the WebSocket. + h.notifyPresence("connect", c.playerID, h.presence.Connect) } func (h *Hub) unregister(c *Client) { @@ -79,6 +97,11 @@ func (h *Hub) unregister(c *Client) { h.m.WSConnections.Set(float64(len(h.clients))) h.mu.Unlock() + // Drop one presence connection. The Presence Service only flips the player to + // OFFLINE when their last connection closes, so multi-device players stay + // online here too. + h.notifyPresence("disconnect", c.playerID, h.presence.Disconnect) + // Notify remaining occupants outside the lock. for _, room := range left { h.BroadcastToRoom(room, marshal(Message{Type: "PlayerLeft", Room: room, Data: playerData(c.playerID)})) @@ -138,6 +161,57 @@ func (h *Hub) SendToPlayer(playerID string, msg []byte) { } } +// notifyPresence runs a presence lifecycle call asynchronously with its own +// short-lived context. Async + fresh context means the closing connection's +// cancelled context cannot cancel the call, and a slow Presence Service cannot +// block WS connect/disconnect handling. Failures are logged, never fatal — +// presence is re-derived from the next heartbeat. +func (h *Hub) notifyPresence(op, playerID string, fn func(context.Context, string) error) { + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + if err := fn(ctx, playerID); err != nil { + h.log.Warn("presence notify failed", + zap.String("op", op), zap.String("player_id", playerID), zap.Error(err)) + } + }() +} + +// RunHeartbeat refreshes presence for every locally-connected player every +// interval, keeping their presence:{id} TTL alive. It runs until ctx is +// cancelled. One ticker for the whole replica (rather than per-connection) +// keeps the load proportional to distinct players, not connections. Any WS +// replica can heartbeat any of its players — no sticky sessions required. +func (h *Hub) RunHeartbeat(ctx context.Context, interval time.Duration) { + if interval <= 0 { + interval = 15 * time.Second + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + for _, playerID := range h.connectedPlayers() { + h.notifyPresence("heartbeat", playerID, h.presence.Heartbeat) + } + } + } +} + +// connectedPlayers snapshots the distinct player IDs with at least one live +// connection on this replica. +func (h *Hub) connectedPlayers() []string { + h.mu.RLock() + defer h.mu.RUnlock() + ids := make([]string, 0, len(h.byPlayer)) + for id := range h.byPlayer { + ids = append(ids, id) + } + return ids +} + func playerData(playerID string) json.RawMessage { raw, _ := json.Marshal(map[string]string{"player_id": playerID}) return raw diff --git a/internal/wsgateway/hub_test.go b/internal/wsgateway/hub_test.go index 88f973b..166a67d 100644 --- a/internal/wsgateway/hub_test.go +++ b/internal/wsgateway/hub_test.go @@ -20,7 +20,7 @@ func newHubClient(hub *Hub, playerID string) *Client { } func newTestHub() *Hub { - return NewHub(zap.NewNop(), metrics.New("ws-hub-test")) + return NewHub(zap.NewNop(), metrics.New("ws-hub-test"), nil) } func drain(c *Client) []Message { diff --git a/pkg/config/config.go b/pkg/config/config.go index af99c41..5f16ede 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -63,6 +63,14 @@ type Config struct { MatchMaxQueueAge time.Duration RoomTTL time.Duration + // Presence tuning. PresenceTTL is the expiry on presence:{id}; if heartbeats + // stop for longer, the key vanishes and the player is OFFLINE. The heartbeat + // interval is advisory for WS replicas (the source of heartbeats) and kept at + // ~1/3 of the TTL so two missed beats still leave the player online. + PresenceServiceURL string + PresenceTTL time.Duration + PresenceHeartbeatInterval time.Duration + AutoMigrate bool ServiceVersion string @@ -108,6 +116,7 @@ func Load(serviceName string) *Config { MatchmakingServiceURL: getEnv("MATCHMAKING_SERVICE_URL", "http://localhost:8082"), LeaderboardServiceURL: getEnv("LEADERBOARD_SERVICE_URL", "http://localhost:8083"), WebsocketServiceURL: getEnv("WEBSOCKET_SERVICE_URL", "http://localhost:8084"), + PresenceServiceURL: getEnv("PRESENCE_SERVICE_URL", "http://localhost:8086"), RateLimitRPS: getEnvFloat("RATE_LIMIT_RPS", 50), RateLimitBurst: getEnvInt("RATE_LIMIT_BURST", 100), @@ -120,6 +129,9 @@ func Load(serviceName string) *Config { MatchMaxQueueAge: getEnvDuration("MATCH_MAX_QUEUE_AGE", 5*time.Minute), RoomTTL: getEnvDuration("ROOM_TTL", time.Hour), + PresenceTTL: getEnvDuration("PRESENCE_TTL", 45*time.Second), + PresenceHeartbeatInterval: getEnvDuration("PRESENCE_HEARTBEAT_INTERVAL", 15*time.Second), + AutoMigrate: getEnvBool("AUTO_MIGRATE", true), ServiceVersion: getEnv("SERVICE_VERSION", "dev"), @@ -150,6 +162,8 @@ func defaultPort(serviceName string) string { return "8084" case "outbox-relay": return "8085" + case "presence": + return "8086" default: return "8080" } diff --git a/pkg/events/events.go b/pkg/events/events.go index 2834a6a..1d5ac38 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -24,6 +24,14 @@ const ( // be lost once the registration/update commits (no dual-write). TypePlayerRegistered = "PlayerRegistered" TypePlayerUpdated = "PlayerUpdated" + // Presence events. Emitted by the Presence Service on connection-driven + // transitions. Published directly to NATS (not via the outbox): presence is + // high-churn and self-correcting — a dropped event is re-derived on the next + // heartbeat, so durability is unnecessary and the latency of the outbox poll + // loop is undesirable. + TypePresenceOnline = "PresenceOnline" + TypePresenceOffline = "PresenceOffline" + TypePresenceStateChanged = "PresenceStateChanged" ) // Topics (channels) events are published on. @@ -32,6 +40,9 @@ const ( TopicLeaderboard = "events.leaderboard" // TopicPlayer carries identity lifecycle events emitted via the outbox. TopicPlayer = "events.player" + // TopicPresence carries player presence transitions (online/offline/state + // changes). High-churn and short-lived — see the PRESENCE stream's MaxAge. + TopicPresence = "events.presence" ) // Event is the wire format for all inter-service messages. @@ -91,6 +102,18 @@ type PlayerUpdatedPayload struct { Email string `json:"email"` } +// PresenceChangedPayload is emitted on every presence transition. It carries +// both the new and previous state plus the live connection count so downstream +// consumers (friends, parties, notifications) have full context without an extra +// lookup. PreviousState is empty when the player had no prior presence record. +type PresenceChangedPayload struct { + PlayerID string `json:"player_id"` + State string `json:"state"` + PreviousState string `json:"previous_state,omitempty"` + ConnectionCount int `json:"connection_count"` + LastSeen int64 `json:"last_seen"` // Unix seconds +} + // Publisher sends events to a topic. Implementations must be safe for // concurrent use. type Publisher interface { diff --git a/pkg/events/nats.go b/pkg/events/nats.go index fb7f71f..9ff5855 100644 --- a/pkg/events/nats.go +++ b/pkg/events/nats.go @@ -79,6 +79,11 @@ var streamFor = map[string]stream{ subjects: []string{TopicPlayer, TopicPlayer + ".>"}, maxAge: 72 * time.Hour, // identity events are low-volume, long replay window }, + TopicPresence: { + name: "PRESENCE", + subjects: []string{TopicPresence, TopicPresence + ".>"}, + maxAge: 15 * time.Minute, // high-churn, disposable; Redis is source of truth, not JetStream replay + }, } // NewNATSBus connects to NATS, ensures the streams exist and returns a bus. diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 27334b6..408ff7d 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -35,6 +35,15 @@ type Metrics struct { EventsFailedTotal *prometheus.CounterVec EventProcessingDuration *prometheus.HistogramVec + // Presence instruments. OnlinePlayers is a gauge of players whose presence + // record is non-offline; the rest track the presence write path. (An expired + // counter is intentionally absent: TTL expiry is passive Redis behaviour with + // no code path to observe in this milestone.) + PresenceOnlinePlayers prometheus.Gauge + PresenceStateTransitionsTotal *prometheus.CounterVec + PresenceHeartbeatTotal prometheus.Counter + PresenceInvalidTransitions prometheus.Counter + // Transactional outbox relay instruments. Pending is a gauge of the current // backlog (PENDING rows); the rest track the relay's publish path. OutboxEventsPending prometheus.Gauge @@ -114,6 +123,26 @@ func New(service string) *Metrics { ConstLabels: constLabels, Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5}, }, []string{"topic", "type"}), + PresenceOnlinePlayers: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "gamemesh_presence_online_players", + Help: "Players currently tracked as non-offline by this presence replica.", + ConstLabels: constLabels, + }), + PresenceStateTransitionsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "gamemesh_presence_state_transitions_total", + Help: "Total presence state transitions, labelled by from/to state.", + ConstLabels: constLabels, + }, []string{"from", "to"}), + PresenceHeartbeatTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "gamemesh_presence_heartbeat_total", + Help: "Total presence heartbeats processed.", + ConstLabels: constLabels, + }), + PresenceInvalidTransitions: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "gamemesh_presence_invalid_transitions_total", + Help: "Total presence transitions rejected as invalid (e.g. OFFLINE->IN_MATCH).", + ConstLabels: constLabels, + }), OutboxEventsPending: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "gamemesh_outbox_events_pending", Help: "Outbox rows awaiting publication (the relay backlog).", @@ -146,6 +175,7 @@ func New(service string) *Metrics { m.RequestsTotal, m.ErrorsTotal, m.RequestDuration, m.WSConnections, m.MatchmakingQueueSize, m.MatchesCreated, m.LeaderboardUpdates, m.EventsPublishedTotal, m.EventsConsumedTotal, m.EventsFailedTotal, m.EventProcessingDuration, + m.PresenceOnlinePlayers, m.PresenceStateTransitionsTotal, m.PresenceHeartbeatTotal, m.PresenceInvalidTransitions, m.OutboxEventsPending, m.OutboxEventsPublishedTotal, m.OutboxPublishFailuresTotal, m.OutboxEventsDeadLetteredTotal, m.OutboxPublishDuration, ) diff --git a/tests/integration/presence_test.go b/tests/integration/presence_test.go new file mode 100644 index 0000000..8854ceb --- /dev/null +++ b/tests/integration/presence_test.go @@ -0,0 +1,121 @@ +//go:build integration + +package integration + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/alpnuhoglu/gamemesh/internal/presence" + "github.com/alpnuhoglu/gamemesh/pkg/events" +) + +// noopPublisher satisfies events.Publisher without recording — these tests +// assert on Redis state, not the event stream (that is covered by unit tests). +type noopPublisher struct{} + +func (noopPublisher) Publish(context.Context, string, events.Event) error { return nil } +func (noopPublisher) Close() error { return nil } + +// newPresenceService wires a presence Service against the real Redis container +// with a short TTL so real TTL expiry can be observed without long waits. +func newPresenceService(t *testing.T, ttl time.Duration) *presence.Service { + t.Helper() + rdb := startRedis(t) + repo := presence.NewRepository(rdb, ttl) + return presence.NewService(repo, noopPublisher{}, nil, zap.NewNop()) +} + +func TestPresenceMultiDeviceAgainstRealRedis(t *testing.T) { + svc := newPresenceService(t, 45*time.Second) + ctx := context.Background() + + for i := 0; i < 3; i++ { + _, err := svc.Connect(ctx, "alice") + require.NoError(t, err) + } + rec, err := svc.Get(ctx, "alice") + require.NoError(t, err) + assert.Equal(t, 3, rec.ConnectionCount) + assert.Equal(t, presence.StateOnline, rec.State) + + for i := 0; i < 2; i++ { + _, err = svc.Disconnect(ctx, "alice") + require.NoError(t, err) + } + rec, err = svc.Get(ctx, "alice") + require.NoError(t, err) + assert.Equal(t, presence.StateOnline, rec.State) + + rec, err = svc.Disconnect(ctx, "alice") + require.NoError(t, err) + assert.Equal(t, presence.StateOffline, rec.State) +} + +func TestPresenceTTLExpiryAgainstRealRedis(t *testing.T) { + // 1s TTL: after heartbeats stop, the key really expires in Redis and the + // player is reported OFFLINE — crash recovery with no explicit disconnect. + svc := newPresenceService(t, 1*time.Second) + ctx := context.Background() + + _, err := svc.Connect(ctx, "alice") + require.NoError(t, err) + + rec, err := svc.Get(ctx, "alice") + require.NoError(t, err) + require.Equal(t, presence.StateOnline, rec.State) + + require.Eventually(t, func() bool { + r, err := svc.Get(ctx, "alice") + return err == nil && r.State == presence.StateOffline + }, 5*time.Second, 100*time.Millisecond, "presence should expire to OFFLINE after TTL") +} + +func TestPresenceHeartbeatKeepsAliveAgainstRealRedis(t *testing.T) { + svc := newPresenceService(t, 2*time.Second) + ctx := context.Background() + + _, err := svc.Connect(ctx, "alice") + require.NoError(t, err) + + // Beat every 500ms for ~3s; the player must stay ONLINE the whole time even + // though the TTL is only 2s. + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + _, err := svc.Heartbeat(ctx, "alice") + require.NoError(t, err) + rec, err := svc.Get(ctx, "alice") + require.NoError(t, err) + require.Equal(t, presence.StateOnline, rec.State) + time.Sleep(500 * time.Millisecond) + } +} + +func TestPresenceFriendsBatchAgainstRealRedis(t *testing.T) { + svc := newPresenceService(t, 45*time.Second) + ctx := context.Background() + + _, err := svc.Connect(ctx, "f1") + require.NoError(t, err) + _, err = svc.Connect(ctx, "f2") + require.NoError(t, err) + _, err = svc.SetState(ctx, "f2", presence.StateInQueue) + require.NoError(t, err) + + friends, err := svc.Friends(ctx, []string{"f1", "f2", "f3"}) + require.NoError(t, err) + require.Len(t, friends, 3) + + byID := map[string]presence.Friend{} + for _, f := range friends { + byID[f.PlayerID] = f + } + assert.Equal(t, presence.StateOnline, byID["f1"].State) + assert.Equal(t, presence.StateInQueue, byID["f2"].State) + assert.Equal(t, presence.StateOffline, byID["f3"].State) +} From 43f0ad799d94d665a386cc073241873bba4a24ce Mon Sep 17 00:00:00 2001 From: AlpNuhoglu Date: Sun, 21 Jun 2026 17:07:10 +0300 Subject: [PATCH 2/2] fix(presence): satisfy revive lint (doc comments, unexported returns) Unexport the Repository mutating methods (connect/disconnect/heartbeat/ setState) so they no longer return the package-private mutation type through an exported API; only Service calls them in-package. Add doc comments to the State const block and the HTTPNotifier/NoopNotifier methods. --- internal/presence/model.go | 1 + internal/presence/notifier.go | 12 ++++++++++-- internal/presence/repository.go | 18 ++++++++++-------- internal/presence/service.go | 8 ++++---- 4 files changed, 25 insertions(+), 14 deletions(-) diff --git a/internal/presence/model.go b/internal/presence/model.go index 3bfc653..8d2d69a 100644 --- a/internal/presence/model.go +++ b/internal/presence/model.go @@ -20,6 +20,7 @@ import ( // directly. type State string +// The presence states. Values are stable on the wire (Redis, events, HTTP). const ( StateOffline State = "OFFLINE" StateOnline State = "ONLINE" diff --git a/internal/presence/notifier.go b/internal/presence/notifier.go index 067e86b..155c913 100644 --- a/internal/presence/notifier.go +++ b/internal/presence/notifier.go @@ -43,14 +43,17 @@ func NewHTTPNotifier(baseURL string, timeout time.Duration) *HTTPNotifier { } } +// Connect reports a new connection for the player to the Presence Service. func (n *HTTPNotifier) Connect(ctx context.Context, playerID string) error { return n.post(ctx, "/presence/connect", playerID) } +// Disconnect reports that one of the player's connections has closed. func (n *HTTPNotifier) Disconnect(ctx context.Context, playerID string) error { return n.post(ctx, "/presence/disconnect", playerID) } +// Heartbeat refreshes the player's presence TTL on the Presence Service. func (n *HTTPNotifier) Heartbeat(ctx context.Context, playerID string) error { return n.post(ctx, "/presence/heartbeat", playerID) } @@ -87,6 +90,11 @@ func (n *HTTPNotifier) post(ctx context.Context, path, playerID string) error { // Presence Service is configured, so the WS gateway runs unchanged. type NoopNotifier struct{} -func (NoopNotifier) Connect(context.Context, string) error { return nil } +// Connect does nothing and returns nil. +func (NoopNotifier) Connect(context.Context, string) error { return nil } + +// Disconnect does nothing and returns nil. func (NoopNotifier) Disconnect(context.Context, string) error { return nil } -func (NoopNotifier) Heartbeat(context.Context, string) error { return nil } + +// Heartbeat does nothing and returns nil. +func (NoopNotifier) Heartbeat(context.Context, string) error { return nil } diff --git a/internal/presence/repository.go b/internal/presence/repository.go index 6a5b92c..a66a9d1 100644 --- a/internal/presence/repository.go +++ b/internal/presence/repository.go @@ -156,23 +156,25 @@ func (r *Repository) runMutation(ctx context.Context, s *redis.Script, playerID return mutation{Previous: pair[0], Current: pair[1]}, nil } -// Connect registers a new connection for the player (multi-device safe). -func (r *Repository) Connect(ctx context.Context, playerID string) (mutation, error) { +// connect registers a new connection for the player (multi-device safe). +// Unexported because it returns the internal mutation plumbing and is only +// driven by Service in this package. +func (r *Repository) connect(ctx context.Context, playerID string) (mutation, error) { return r.runMutation(ctx, scriptConnect, playerID) } -// Disconnect drops one connection; the player goes OFFLINE only at zero. -func (r *Repository) Disconnect(ctx context.Context, playerID string) (mutation, error) { +// disconnect drops one connection; the player goes OFFLINE only at zero. +func (r *Repository) disconnect(ctx context.Context, playerID string) (mutation, error) { return r.runMutation(ctx, scriptDisconnect, playerID) } -// Heartbeat refreshes the TTL (and re-creates an expired record). -func (r *Repository) Heartbeat(ctx context.Context, playerID string) (mutation, error) { +// heartbeat refreshes the TTL (and re-creates an expired record). +func (r *Repository) heartbeat(ctx context.Context, playerID string) (mutation, error) { return r.runMutation(ctx, scriptHeartbeat, playerID) } -// SetState writes an explicit state for the player and refreshes the TTL. -func (r *Repository) SetState(ctx context.Context, playerID string, to State) (mutation, error) { +// setState writes an explicit state for the player and refreshes the TTL. +func (r *Repository) setState(ctx context.Context, playerID string, to State) (mutation, error) { return r.runMutation(ctx, scriptSetState, playerID, string(to)) } diff --git a/internal/presence/service.go b/internal/presence/service.go index a287481..7e154b2 100644 --- a/internal/presence/service.go +++ b/internal/presence/service.go @@ -37,7 +37,7 @@ func (s *Service) Connect(ctx context.Context, playerID string) (Record, error) defer span.End() span.SetAttributes(attribute.String("presence.player_id", playerID)) - mut, err := s.repo.Connect(ctx, playerID) + mut, err := s.repo.connect(ctx, playerID) if err != nil { tracing.RecordError(span, err) return Record{}, err @@ -56,7 +56,7 @@ func (s *Service) Disconnect(ctx context.Context, playerID string) (Record, erro attribute.String("presence.op", "disconnect"), ) - mut, err := s.repo.Disconnect(ctx, playerID) + mut, err := s.repo.disconnect(ctx, playerID) if err != nil { tracing.RecordError(span, err) return Record{}, err @@ -73,7 +73,7 @@ func (s *Service) Heartbeat(ctx context.Context, playerID string) (Record, error defer span.End() span.SetAttributes(attribute.String("presence.player_id", playerID)) - mut, err := s.repo.Heartbeat(ctx, playerID) + mut, err := s.repo.heartbeat(ctx, playerID) if err != nil { tracing.RecordError(span, err) return Record{}, err @@ -117,7 +117,7 @@ func (s *Service) SetState(ctx context.Context, playerID string, to State) (Reco return Record{}, ErrInvalidTransition } - mut, err := s.repo.SetState(ctx, playerID, to) + mut, err := s.repo.setState(ctx, playerID, to) if err != nil { tracing.RecordError(span, err) return Record{}, err