From 5161487af46ca97280e60ed08dd7a6a26e5d06f4 Mon Sep 17 00:00:00 2001 From: AlpNuhoglu Date: Mon, 22 Jun 2026 17:16:48 +0300 Subject: [PATCH] perf(core): batch matchmaking I/O and implement bulk presence heartbeat This refactor eliminates the two most critical bottlenecks in the hot path: the N+1 Redis network round-trips in matchmaking and the goroutine/TCP explosion in presence heartbeats. Key changes: - Matchmaking (#1): Replaced sequential per-pair `createRoom` calls with a single tick-level pipeline (`SaveMany` and `RemoveBatch`). Collapsed thousands of spans into a single batch span. - Presence (#2): Replaced per-player heartbeat goroutines with a single `/presence/heartbeat/bulk` POST request. Implemented a 2-pass pipelined approach (EXPIRE + heal EVAL) for steady-state O(1) RTT. - Cluster Readiness: Replaced all `TxPipeline` occurrences with standard `Pipeline()` and removed cross-slot assumptions, ensuring seamless future migration to Redis Cluster. - Memory: Eliminated hidden heap escapes (boxing) in `ZRem` and pre-allocated all slices with exact capacities to reduce GC pressure. - Fix: Resolved pipeline `NOSCRIPT` error by replacing `EvalSha` fallback with explicit `Eval` in the heal pass. --- internal/matchmaking/queue.go | 35 ++++++--- internal/matchmaking/rooms.go | 24 ++++++ internal/matchmaking/service.go | 107 ++++++++++++++++----------- internal/matchmaking/service_test.go | 38 ++++++++++ internal/presence/handler.go | 23 ++++++ internal/presence/notifier.go | 24 ++++++ internal/presence/repository.go | 79 ++++++++++++++++++++ internal/presence/service.go | 34 +++++++++ internal/presence/service_test.go | 67 +++++++++++++++++ internal/wsgateway/hub.go | 19 ++++- 10 files changed, 395 insertions(+), 55 deletions(-) diff --git a/internal/matchmaking/queue.go b/internal/matchmaking/queue.go index e510106..28366ac 100644 --- a/internal/matchmaking/queue.go +++ b/internal/matchmaking/queue.go @@ -45,8 +45,14 @@ func NewQueue(rdb *redis.Client) *Queue { // Enqueue adds (or re-ranks) a player. Idempotent: re-joining just refreshes // rank and join time. +// +// Cluster-ready: uses the non-transactional Pipeline() (not TxPipeline). ZADD +// (queueKey) and HSET (joinedKey) hash to different slots under Redis Cluster, +// where a TxPipeline would fail cross-slot; Pipeline() lets a ClusterClient +// split them. The two writes are independent, so dropping atomicity is safe — a +// re-join overwrites both anyway. func (q *Queue) Enqueue(ctx context.Context, playerID string, rank int) error { - pipe := q.rdb.TxPipeline() + pipe := q.rdb.Pipeline() pipe.ZAdd(ctx, queueKey, redis.Z{Score: float64(rank), Member: playerID}) pipe.HSet(ctx, joinedKey, playerID, time.Now().Unix()) _, err := pipe.Exec(ctx) @@ -66,20 +72,27 @@ func (q *Queue) Remove(ctx context.Context, playerID string) error { return nil } -// RemoveBatch deletes multiple players atomically (used after a match tick). +// RemoveBatch deletes multiple players from the queue (used after a match tick). +// +// Cluster-ready: it uses the non-transactional Pipeline() (not TxPipeline). The +// ZREM (queueKey) and HDEL (joinedKey) hash to different slots under Redis +// Cluster, so a TxPipeline would error as a cross-slot transaction; Pipeline() +// lets a ClusterClient split them safely. Atomicity is intentionally traded +// away — a player left half-removed is simply re-paired on the next tick +// (self-correcting), and an orphaned room is reclaimed by its TTL. +// +// Allocation-free argument passing: ZRem is variadic (...interface{}) but +// go-redis special-cases a single []string argument (appendArgs -> appendArg), +// boxing the strings itself — so passing playerIDs as one []string avoids +// building an intermediate []any and the per-element heap escape. HDel already +// takes ...string, so playerIDs flows through with no boxing at all. func (q *Queue) RemoveBatch(ctx context.Context, playerIDs []string) error { if len(playerIDs) == 0 { return nil } - members := make([]any, len(playerIDs)) - fields := make([]string, len(playerIDs)) - for i, id := range playerIDs { - members[i] = id - fields[i] = id - } - pipe := q.rdb.TxPipeline() - pipe.ZRem(ctx, queueKey, members...) - pipe.HDel(ctx, joinedKey, fields...) + pipe := q.rdb.Pipeline() + pipe.ZRem(ctx, queueKey, playerIDs) + pipe.HDel(ctx, joinedKey, playerIDs...) _, err := pipe.Exec(ctx) return err } diff --git a/internal/matchmaking/rooms.go b/internal/matchmaking/rooms.go index 52e2108..791bc84 100644 --- a/internal/matchmaking/rooms.go +++ b/internal/matchmaking/rooms.go @@ -44,6 +44,30 @@ func (s *RoomStore) Save(ctx context.Context, room *Room) error { return s.rdb.Set(ctx, roomKey(room.ID), raw, s.ttl).Err() } +// SaveMany writes many rooms in a single pipelined round trip — the batch +// fast-path for a match tick, replacing one SET round trip per room. +// +// Cluster-ready: it uses the non-transactional Pipeline() (not TxPipeline), so a +// go-redis ClusterClient can split the per-room SETs across the slots their keys +// hash to. Every SET is single-key, so there is no cross-slot assumption and no +// hash tag is needed. +func (s *RoomStore) SaveMany(ctx context.Context, rooms []*Room) error { + if len(rooms) == 0 { + return nil + } + pipe := s.rdb.Pipeline() + // Marshal up front so a JSON error fails before any network work is queued. + for _, room := range rooms { + raw, err := json.Marshal(room) + if err != nil { + return err + } + pipe.Set(ctx, roomKey(room.ID), raw, s.ttl) + } + _, err := pipe.Exec(ctx) + return err +} + // Get fetches a room by ID. func (s *RoomStore) Get(ctx context.Context, id string) (*Room, error) { raw, err := s.rdb.Get(ctx, roomKey(id)).Result() diff --git a/internal/matchmaking/service.go b/internal/matchmaking/service.go index bb9fc0b..bd6c95c 100644 --- a/internal/matchmaking/service.go +++ b/internal/matchmaking/service.go @@ -127,8 +127,11 @@ func (s *Service) MatchOnce(ctx context.Context) (int, error) { ) pairSpan.End() - for _, pair := range pairs { - s.createRoom(ctx, pair) + if len(pairs) > 0 { + if err := s.createRooms(ctx, pairs); err != nil { + tracing.RecordError(span, err) + s.log.Error("batch room creation failed", zap.Error(err)) + } } span.SetAttributes(attribute.Int("matchmaking.matches", len(pairs))) @@ -139,49 +142,69 @@ func (s *Service) MatchOnce(ctx context.Context) (int, error) { return len(pairs), nil } -// createRoom persists a room for one matched pair, dequeues the players and -// publishes MatchFound. Errors are logged and the room is skipped (the next -// tick re-pairs the still-queued players). -func (s *Service) createRoom(ctx context.Context, pair [2]Ticket) { - ctx, span := tracing.Tracer().Start(ctx, "matchmaking.create_room") +// createRooms persists all matched rooms, dequeues all matched players and +// publishes MatchFound for each — in a fixed, small number of round trips per +// tick instead of two per pair. This is the hot path's core optimization: +// O(pairs) sequential round trips collapse into O(1) pipelined batches, and the +// per-pair span is replaced by one span for the whole batch. +// +// Partial-failure is tolerated by design (Cluster-ready, non-atomic): if the +// dequeue fails after rooms are saved, the still-queued players are simply +// re-paired on the next tick and the orphaned rooms are reclaimed by their TTL. +// A "ghost room" is far cheaper than a failed match. +func (s *Service) createRooms(ctx context.Context, pairs [][2]Ticket) error { + ctx, span := tracing.Tracer().Start(ctx, "matchmaking.create_rooms") defer span.End() - - room := &Room{ - ID: uuid.NewString(), - Players: []string{pair[0].PlayerID, pair[1].PlayerID}, - Ranks: map[string]int{ - pair[0].PlayerID: pair[0].Rank, - pair[1].PlayerID: pair[1].Rank, - }, - Status: "waiting", - CreatedAt: time.Now().UTC(), - } - if err := s.rooms.Save(ctx, room); err != nil { - tracing.RecordError(span, err) - s.log.Error("failed to save room", zap.Error(err), zap.String("room_id", room.ID)) - return + span.SetAttributes(attribute.Int("matchmaking.pairs", len(pairs))) + + // Pre-allocate to exact final sizes (no growth/realloc inside the loop): + // rooms is one per pair, dequeue is two players per pair. + rooms := make([]*Room, len(pairs)) + dequeue := make([]string, 0, len(pairs)*2) + createdAt := time.Now().UTC() + + for i, pair := range pairs { + room := &Room{ + ID: uuid.NewString(), + Players: []string{pair[0].PlayerID, pair[1].PlayerID}, + Ranks: map[string]int{ + pair[0].PlayerID: pair[0].Rank, + pair[1].PlayerID: pair[1].Rank, + }, + Status: "waiting", + CreatedAt: createdAt, + } + rooms[i] = room + dequeue = append(dequeue, room.Players[0], room.Players[1]) } - if err := s.queue.RemoveBatch(ctx, room.Players); err != nil { - tracing.RecordError(span, err) - s.log.Error("failed to dequeue matched players", zap.Error(err)) - return + + // 1) Persist all rooms in one pipeline. + if err := s.rooms.SaveMany(ctx, rooms); err != nil { + return err } - s.m.MatchesCreated.Inc() - - e, err := events.New(events.TypeMatchFound, events.MatchFoundPayload{ - RoomID: room.ID, - Players: room.Players, - }) - if err == nil { - err = s.pub.Publish(ctx, events.TopicMatchmaking, e) + // 2) Dequeue all matched players in one pipeline. If this fails the rooms are + // already saved (ghost rooms, reclaimed by TTL); the still-queued players are + // re-paired next tick — idempotent, matching the old per-room behaviour. + if err := s.queue.RemoveBatch(ctx, dequeue); err != nil { + return err } - if err != nil { - tracing.RecordError(span, err) - s.log.Warn("failed to publish MatchFound", zap.Error(err), zap.String("room_id", room.ID)) + + // 3) Metrics + events. Count once for the whole batch (one atomic Add instead + // of N Inc), then publish per room since each match is a distinct downstream + // notification. + s.m.MatchesCreated.Add(float64(len(rooms))) + for _, room := range rooms { + e, err := events.New(events.TypeMatchFound, events.MatchFoundPayload{ + RoomID: room.ID, + Players: room.Players, + }) + if err != nil { + s.log.Warn("failed to build MatchFound", zap.Error(err), zap.String("room_id", room.ID)) + continue + } + if err := s.pub.Publish(ctx, events.TopicMatchmaking, e); err != nil { + s.log.Warn("failed to publish MatchFound", zap.Error(err), zap.String("room_id", room.ID)) + } } - s.log.Info("match created", - append([]zap.Field{ - zap.String("room_id", room.ID), - zap.Strings("players", room.Players), - }, tracing.LogFields(ctx)...)...) + return nil } diff --git a/internal/matchmaking/service_test.go b/internal/matchmaking/service_test.go index a70f98e..ba05811 100644 --- a/internal/matchmaking/service_test.go +++ b/internal/matchmaking/service_test.go @@ -80,6 +80,44 @@ func TestMatchOnceCreatesRoomAndPublishes(t *testing.T) { assert.ElementsMatch(t, []string{"alice", "bob"}, room.Players) } +func TestMatchOnceBatchesMultipleRooms(t *testing.T) { + // Exercises the batched createRooms path: several pairs matched in one tick + // must all be saved, dequeued and published. + svc, pub := newTestService(t) + ctx := context.Background() + + // Six players, tightly ranked → three pairs in one tick. + for i, id := range []string{"p1", "p2", "p3", "p4", "p5", "p6"} { + require.NoError(t, svc.JoinQueue(ctx, id, 1000+i)) + } + + matches, err := svc.MatchOnce(ctx) + require.NoError(t, err) + assert.Equal(t, 3, matches) + + // All six players dequeued. + _, size, err := svc.QueueStatus(ctx, "p1") + require.NoError(t, err) + assert.Equal(t, int64(0), size) + + // Exactly three MatchFound events, each with a fetchable room. + require.Len(t, pub.events, 3) + seen := map[string]bool{} + for _, e := range pub.events { + assert.Equal(t, events.TypeMatchFound, e.Type) + var payload events.MatchFoundPayload + require.NoError(t, json.Unmarshal(e.Payload, &payload)) + room, err := svc.GetRoom(ctx, payload.RoomID) + require.NoError(t, err) + assert.Equal(t, "waiting", room.Status) + for _, p := range payload.Players { + assert.Falsef(t, seen[p], "player %s matched twice", p) + seen[p] = true + } + } + assert.Len(t, seen, 6) +} + func TestMatchOnceRespectsRankWindow(t *testing.T) { svc, pub := newTestService(t) ctx := context.Background() diff --git a/internal/presence/handler.go b/internal/presence/handler.go index 5d2bf22..98a90fc 100644 --- a/internal/presence/handler.go +++ b/internal/presence/handler.go @@ -32,6 +32,7 @@ func (h *Handler) RegisterRoutes(r *gin.Engine) { r.POST("/presence/connect", h.connect) r.POST("/presence/disconnect", h.disconnect) r.POST("/presence/heartbeat", h.heartbeat) + r.POST("/presence/heartbeat/bulk", h.heartbeatBulk) } type playerRequest struct { @@ -49,6 +50,12 @@ type friendsRequest struct { IDs []string `json:"ids" binding:"required"` } +type bulkHeartbeatRequest struct { + // IDs are the players a WS replica currently holds connections for. POST so a + // large batch is not constrained by URL length limits. + IDs []string `json:"ids" binding:"required"` +} + func (h *Handler) get(c *gin.Context) { id := c.Param("id") if id == "" { @@ -99,6 +106,22 @@ func (h *Handler) connect(c *gin.Context) { h.lifecycle(c, h.svc.Connect) } func (h *Handler) disconnect(c *gin.Context) { h.lifecycle(c, h.svc.Disconnect) } func (h *Handler) heartbeat(c *gin.Context) { h.lifecycle(c, h.svc.Heartbeat) } +// heartbeatBulk refreshes presence for a whole batch of players in one call — +// the bulk fast-path that lets a WS replica heartbeat all its players with a +// single request instead of one per player. +func (h *Handler) heartbeatBulk(c *gin.Context) { + var req bulkHeartbeatRequest + if err := c.ShouldBindJSON(&req); err != nil { + httpx.Error(c, http.StatusBadRequest, "invalid request: "+err.Error()) + return + } + if err := h.svc.HeartbeatMany(c.Request.Context(), req.IDs); err != nil { + httpx.Error(c, http.StatusInternalServerError, "failed to refresh presence") + return + } + httpx.OK(c, gin.H{"refreshed": len(req.IDs)}) +} + // lifecycle is the shared body for connect/disconnect/heartbeat: bind the // player id, run the op, return the resulting record. All three ops share the // same (ctx, playerID) -> (Record, error) shape. diff --git a/internal/presence/notifier.go b/internal/presence/notifier.go index 155c913..4586a84 100644 --- a/internal/presence/notifier.go +++ b/internal/presence/notifier.go @@ -20,6 +20,9 @@ type Notifier interface { Connect(ctx context.Context, playerID string) error Disconnect(ctx context.Context, playerID string) error Heartbeat(ctx context.Context, playerID string) error + // HeartbeatMany refreshes presence for many players in one call — the bulk + // fast-path for the WS replica's heartbeat ticker (one request, not N). + HeartbeatMany(ctx context.Context, playerIDs []string) error } // HTTPNotifier calls the Presence Service over HTTP. It propagates the W3C trace @@ -58,11 +61,29 @@ func (n *HTTPNotifier) Heartbeat(ctx context.Context, playerID string) error { return n.post(ctx, "/presence/heartbeat", playerID) } +// HeartbeatMany refreshes presence for a batch of players in a single HTTP call. +func (n *HTTPNotifier) HeartbeatMany(ctx context.Context, playerIDs []string) error { + if len(playerIDs) == 0 { + return nil + } + body, err := json.Marshal(bulkHeartbeatRequest{IDs: playerIDs}) + if err != nil { + return err + } + return n.postBody(ctx, "/presence/heartbeat/bulk", body) +} + func (n *HTTPNotifier) post(ctx context.Context, path, playerID string) error { body, err := json.Marshal(playerRequest{PlayerID: playerID}) if err != nil { return err } + return n.postBody(ctx, path, body) +} + +// postBody POSTs a raw JSON body with trace propagation. Shared by the single +// and bulk notifier calls so the request/trace plumbing lives in one place. +func (n *HTTPNotifier) postBody(ctx context.Context, path string, body []byte) error { req, err := http.NewRequestWithContext(ctx, http.MethodPost, n.baseURL+path, bytes.NewReader(body)) if err != nil { return err @@ -98,3 +119,6 @@ func (NoopNotifier) Disconnect(context.Context, string) error { return nil } // Heartbeat does nothing and returns nil. func (NoopNotifier) Heartbeat(context.Context, string) error { return nil } + +// HeartbeatMany does nothing and returns nil. +func (NoopNotifier) HeartbeatMany(context.Context, []string) error { return nil } diff --git a/internal/presence/repository.go b/internal/presence/repository.go index a66a9d1..9a463da 100644 --- a/internal/presence/repository.go +++ b/internal/presence/repository.go @@ -178,6 +178,85 @@ func (r *Repository) setState(ctx context.Context, playerID string, to State) (m return r.runMutation(ctx, scriptSetState, playerID, string(to)) } +// HeartbeatMany refreshes the TTL for a batch of players in a single pipelined +// round trip — the bulk fast-path behind the WS replica's heartbeat ticker. +// Instead of one Lua EVAL per player, the common case (the key still exists) is +// one EXPIRE per player, all flushed in ONE Pipeline(). +// +// Cluster-ready: uses the non-transactional Pipeline() (not TxPipeline), so a +// go-redis ClusterClient can transparently split the per-key EXPIREs across the +// slots they hash to. Every command is single-key, so there is no cross-slot +// assumption and no hash tag. +// +// Self-heal: EXPIRE on a missing key is a no-op (returns false) — it cannot +// resurrect a record that expired between beats (e.g. after a crash). Those +// misses are collected and re-created as ONLINE in a second, still-pipelined +// pass that runs the same single-key heartbeat Lua script. In steady state the +// second pass is empty, so the hot path stays at one round trip. +// +// Returns the player IDs that were re-created (OFFLINE->ONLINE) so the caller +// can publish PresenceOnline for them; refreshed-but-unchanged players produce +// no event. +func (r *Repository) HeartbeatMany(ctx context.Context, playerIDs []string) ([]string, error) { + if len(playerIDs) == 0 { + return nil, nil + } + + // Pass 1: one EXPIRE per player, all in a single non-transactional pipeline. + pipe := r.rdb.Pipeline() + cmds := make([]*redis.BoolCmd, len(playerIDs)) + for i, id := range playerIDs { + cmds[i] = pipe.Expire(ctx, key(id), r.ttl) + } + if _, err := pipe.Exec(ctx); err != nil { + return nil, err + } + + // Collect misses (EXPIRE false → key absent → needs re-create). Zero-length + // with no capacity: in steady state there are no misses, so this never + // allocates a backing array on the hot path. + missed := make([]string, 0) + for i, id := range playerIDs { + ok, err := cmds[i].Result() + if err != nil { + return nil, err + } + if !ok { + missed = append(missed, id) + } + } + if len(missed) == 0 { + return nil, nil + } + + // Pass 2: re-create the expired records as ONLINE. Still pipelined and still + // single-key Lua EVALs (Cluster-safe); runs only for the rare crash-recovery + // minority. + healPipe := r.rdb.Pipeline() + healCmds := make([]*redis.Cmd, len(missed)) + nowSec := now().Unix() + ttlSec := r.ttlSeconds() + for i, id := range missed { + // Use Eval (full EVAL with source), not Run: Run's EVALSHA->EVAL fallback + // on NOSCRIPT cannot work inside a pipeline (the error is not seen until + // Exec), and a fresh Cluster connection may not have the script cached. + // This pass is the rare crash-recovery minority, so the extra bytes are + // off the hot path. + healCmds[i] = scriptHeartbeat.Eval(ctx, healPipe, []string{key(id)}, nowSec, ttlSec) + } + if _, err := healPipe.Exec(ctx); err != nil { + return nil, err + } + healed := make([]string, 0, len(missed)) + for i, id := range missed { + if err := healCmds[i].Err(); err != nil { + return nil, err + } + healed = append(healed, id) + } + return healed, nil +} + // Get returns the player's current record. A missing key reads as OFFLINE. This // is a pure read with no side effects — no events are emitted here. func (r *Repository) Get(ctx context.Context, playerID string) (Record, error) { diff --git a/internal/presence/service.go b/internal/presence/service.go index 7e154b2..92915e4 100644 --- a/internal/presence/service.go +++ b/internal/presence/service.go @@ -86,6 +86,40 @@ func (s *Service) Heartbeat(ctx context.Context, playerID string) (Record, error return mut.Current, nil } +// HeartbeatMany refreshes presence for a batch of players — the bulk fast-path +// for the WS replica's heartbeat ticker. It counts the heartbeats once for the +// whole batch and publishes PresenceOnline only for players whose record had +// expired and was re-created; the steady-state majority refresh silently. +func (s *Service) HeartbeatMany(ctx context.Context, playerIDs []string) error { + if len(playerIDs) == 0 { + return nil + } + ctx, span := tracing.Tracer().Start(ctx, "presence.heartbeat") + defer span.End() + span.SetAttributes(attribute.Int("presence.batch_size", len(playerIDs))) + + healed, err := s.repo.HeartbeatMany(ctx, playerIDs) + if err != nil { + tracing.RecordError(span, err) + return err + } + + if s.m != nil { + // One Add for the whole batch instead of N Inc. + s.m.PresenceHeartbeatTotal.Add(float64(len(playerIDs))) + } + + // Only re-created (OFFLINE->ONLINE) players changed state → publish those. + nowSec := now().Unix() + for _, id := range healed { + s.publishTransition(ctx, id, mutation{ + Previous: Record{State: StateOffline}, + Current: Record{State: StateOnline, ConnectionCount: 1, LastSeen: nowSec}, + }) + } + return nil +} + // SetState applies an explicit transition (e.g. ONLINE->IN_QUEUE, // IN_QUEUE->IN_MATCH). The rule is permissive (see CanTransition); only // OFFLINE->IN_QUEUE / OFFLINE->IN_MATCH are rejected. diff --git a/internal/presence/service_test.go b/internal/presence/service_test.go index 25fb91c..a9cacef 100644 --- a/internal/presence/service_test.go +++ b/internal/presence/service_test.go @@ -33,6 +33,14 @@ func (p *capturingPublisher) Publish(_ context.Context, _ string, e events.Event func (p *capturingPublisher) Close() error { return nil } +// reset clears recorded events so a test can assert only on events that occur +// after setup (e.g. after the initial connects). +func (p *capturingPublisher) reset() { + p.mu.Lock() + defer p.mu.Unlock() + p.events = nil +} + func (p *capturingPublisher) types() []string { p.mu.Lock() defer p.mu.Unlock() @@ -53,6 +61,17 @@ func newTestService(t *testing.T) (*Service, *capturingPublisher, *miniredis.Min return svc, pub, mr } +// mustConnect connects each player once, failing the test on any error. +func mustConnect(ctx context.Context, t *testing.T, svc *Service, ids ...string) error { + t.Helper() + for _, id := range ids { + if _, err := svc.Connect(ctx, id); err != nil { + return err + } + } + return nil +} + func TestConnectGoesOnlineAndEmitsOnline(t *testing.T) { svc, pub, _ := newTestService(t) ctx := context.Background() @@ -148,6 +167,54 @@ func TestHeartbeatRecreatesExpiredRecord(t *testing.T) { assert.Equal(t, 1, rec.ConnectionCount) } +func TestHeartbeatManyRefreshesTTLWithoutEvents(t *testing.T) { + svc, pub, mr := newTestService(t) + ctx := context.Background() + + require.NoError(t, mustConnect(ctx, t, svc, "a", "b", "c")) + pub.reset() + + // Age all keys partway, then bulk-heartbeat and confirm TTLs are reset and + // no events fire (steady-state refresh of existing records). + mr.FastForward(30 * time.Second) + require.NoError(t, svc.HeartbeatMany(ctx, []string{"a", "b", "c"})) + + for _, id := range []string{"a", "b", "c"} { + assert.InDelta(t, testTTL.Seconds(), mr.TTL(key(id)).Seconds(), 1) + } + assert.Empty(t, pub.types(), "refreshing live records must not publish") +} + +func TestHeartbeatManyHealsExpiredRecords(t *testing.T) { + svc, pub, mr := newTestService(t) + ctx := context.Background() + + require.NoError(t, mustConnect(ctx, t, svc, "alive", "crashed")) + pub.reset() + + // "crashed" expires (heartbeats stopped); "alive" stays within TTL by being + // re-created just before expiry below. + mr.FastForward(testTTL + time.Second) // both keys gone now + + // Bulk heartbeat must re-create BOTH as ONLINE (heal pass) and publish a + // PresenceOnline for each, since both had expired. + require.NoError(t, svc.HeartbeatMany(ctx, []string{"alive", "crashed"})) + + for _, id := range []string{"alive", "crashed"} { + rec, err := svc.Get(ctx, id) + require.NoError(t, err) + assert.Equal(t, StateOnline, rec.State) + } + // Two heals → two PresenceOnline events. + assert.Equal(t, []string{events.TypePresenceOnline, events.TypePresenceOnline}, pub.types()) +} + +func TestHeartbeatManyEmptyIsNoop(t *testing.T) { + svc, pub, _ := newTestService(t) + require.NoError(t, svc.HeartbeatMany(context.Background(), nil)) + assert.Empty(t, pub.types()) +} + func TestSetStateAllowsQueueAndMatchTransitions(t *testing.T) { svc, pub, _ := newTestService(t) ctx := context.Background() diff --git a/internal/wsgateway/hub.go b/internal/wsgateway/hub.go index 633deb8..306daef 100644 --- a/internal/wsgateway/hub.go +++ b/internal/wsgateway/hub.go @@ -193,9 +193,24 @@ func (h *Hub) RunHeartbeat(ctx context.Context, interval time.Duration) { case <-ctx.Done(): return case <-ticker.C: - for _, playerID := range h.connectedPlayers() { - h.notifyPresence("heartbeat", playerID, h.presence.Heartbeat) + ids := h.connectedPlayers() + if len(ids) == 0 { + continue } + // One bulk call for the whole replica instead of a goroutine + HTTP + // request per player. Fire async with a bounded timeout derived from + // the loop ctx, so a slow Presence Service cannot stall the ticker yet + // in-flight beats are cancelled on shutdown. Failures are logged, not + // fatal — presence self-heals from the next beat / TTL. The closure + // captures the batch once per tick, not once per player. + go func(batch []string) { + cctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := h.presence.HeartbeatMany(cctx, batch); err != nil { + h.log.Warn("bulk presence heartbeat failed", + zap.Int("count", len(batch)), zap.Error(err)) + } + }(ids) } } }