Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions internal/matchmaking/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,14 @@ func NewQueue(rdb *redis.Client) *Queue {

// Enqueue adds (or re-ranks) a player. Idempotent: re-joining just refreshes
// rank and join time.
//
// Cluster-ready: uses the non-transactional Pipeline() (not TxPipeline). ZADD
// (queueKey) and HSET (joinedKey) hash to different slots under Redis Cluster,
// where a TxPipeline would fail cross-slot; Pipeline() lets a ClusterClient
// split them. The two writes are independent, so dropping atomicity is safe — a
// re-join overwrites both anyway.
func (q *Queue) Enqueue(ctx context.Context, playerID string, rank int) error {
pipe := q.rdb.TxPipeline()
pipe := q.rdb.Pipeline()
pipe.ZAdd(ctx, queueKey, redis.Z{Score: float64(rank), Member: playerID})
pipe.HSet(ctx, joinedKey, playerID, time.Now().Unix())
_, err := pipe.Exec(ctx)
Expand All @@ -66,20 +72,27 @@ func (q *Queue) Remove(ctx context.Context, playerID string) error {
return nil
}

// RemoveBatch deletes multiple players atomically (used after a match tick).
// RemoveBatch deletes multiple players from the queue (used after a match tick).
//
// Cluster-ready: it uses the non-transactional Pipeline() (not TxPipeline). The
// ZREM (queueKey) and HDEL (joinedKey) hash to different slots under Redis
// Cluster, so a TxPipeline would error as a cross-slot transaction; Pipeline()
// lets a ClusterClient split them safely. Atomicity is intentionally traded
// away — a player left half-removed is simply re-paired on the next tick
// (self-correcting), and an orphaned room is reclaimed by its TTL.
//
// Allocation-free argument passing: ZRem is variadic (...interface{}) but
// go-redis special-cases a single []string argument (appendArgs -> appendArg),
// boxing the strings itself — so passing playerIDs as one []string avoids
// building an intermediate []any and the per-element heap escape. HDel already
// takes ...string, so playerIDs flows through with no boxing at all.
func (q *Queue) RemoveBatch(ctx context.Context, playerIDs []string) error {
if len(playerIDs) == 0 {
return nil
}
members := make([]any, len(playerIDs))
fields := make([]string, len(playerIDs))
for i, id := range playerIDs {
members[i] = id
fields[i] = id
}
pipe := q.rdb.TxPipeline()
pipe.ZRem(ctx, queueKey, members...)
pipe.HDel(ctx, joinedKey, fields...)
pipe := q.rdb.Pipeline()
pipe.ZRem(ctx, queueKey, playerIDs)
pipe.HDel(ctx, joinedKey, playerIDs...)
_, err := pipe.Exec(ctx)
return err
}
Expand Down
24 changes: 24 additions & 0 deletions internal/matchmaking/rooms.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,30 @@ func (s *RoomStore) Save(ctx context.Context, room *Room) error {
return s.rdb.Set(ctx, roomKey(room.ID), raw, s.ttl).Err()
}

// SaveMany writes many rooms in a single pipelined round trip — the batch
// fast-path for a match tick, replacing one SET round trip per room.
//
// Cluster-ready: it uses the non-transactional Pipeline() (not TxPipeline), so a
// go-redis ClusterClient can split the per-room SETs across the slots their keys
// hash to. Every SET is single-key, so there is no cross-slot assumption and no
// hash tag is needed.
func (s *RoomStore) SaveMany(ctx context.Context, rooms []*Room) error {
if len(rooms) == 0 {
return nil
}
pipe := s.rdb.Pipeline()
// Marshal up front so a JSON error fails before any network work is queued.
for _, room := range rooms {
raw, err := json.Marshal(room)
if err != nil {
return err
}
pipe.Set(ctx, roomKey(room.ID), raw, s.ttl)
}
_, err := pipe.Exec(ctx)
return err
}

// Get fetches a room by ID.
func (s *RoomStore) Get(ctx context.Context, id string) (*Room, error) {
raw, err := s.rdb.Get(ctx, roomKey(id)).Result()
Expand Down
107 changes: 65 additions & 42 deletions internal/matchmaking/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,11 @@ func (s *Service) MatchOnce(ctx context.Context) (int, error) {
)
pairSpan.End()

for _, pair := range pairs {
s.createRoom(ctx, pair)
if len(pairs) > 0 {
if err := s.createRooms(ctx, pairs); err != nil {
tracing.RecordError(span, err)
s.log.Error("batch room creation failed", zap.Error(err))
}
}

span.SetAttributes(attribute.Int("matchmaking.matches", len(pairs)))
Expand All @@ -139,49 +142,69 @@ func (s *Service) MatchOnce(ctx context.Context) (int, error) {
return len(pairs), nil
}

// createRoom persists a room for one matched pair, dequeues the players and
// publishes MatchFound. Errors are logged and the room is skipped (the next
// tick re-pairs the still-queued players).
func (s *Service) createRoom(ctx context.Context, pair [2]Ticket) {
ctx, span := tracing.Tracer().Start(ctx, "matchmaking.create_room")
// createRooms persists all matched rooms, dequeues all matched players and
// publishes MatchFound for each — in a fixed, small number of round trips per
// tick instead of two per pair. This is the hot path's core optimization:
// O(pairs) sequential round trips collapse into O(1) pipelined batches, and the
// per-pair span is replaced by one span for the whole batch.
//
// Partial-failure is tolerated by design (Cluster-ready, non-atomic): if the
// dequeue fails after rooms are saved, the still-queued players are simply
// re-paired on the next tick and the orphaned rooms are reclaimed by their TTL.
// A "ghost room" is far cheaper than a failed match.
func (s *Service) createRooms(ctx context.Context, pairs [][2]Ticket) error {
ctx, span := tracing.Tracer().Start(ctx, "matchmaking.create_rooms")
defer span.End()

room := &Room{
ID: uuid.NewString(),
Players: []string{pair[0].PlayerID, pair[1].PlayerID},
Ranks: map[string]int{
pair[0].PlayerID: pair[0].Rank,
pair[1].PlayerID: pair[1].Rank,
},
Status: "waiting",
CreatedAt: time.Now().UTC(),
}
if err := s.rooms.Save(ctx, room); err != nil {
tracing.RecordError(span, err)
s.log.Error("failed to save room", zap.Error(err), zap.String("room_id", room.ID))
return
span.SetAttributes(attribute.Int("matchmaking.pairs", len(pairs)))

// Pre-allocate to exact final sizes (no growth/realloc inside the loop):
// rooms is one per pair, dequeue is two players per pair.
rooms := make([]*Room, len(pairs))
dequeue := make([]string, 0, len(pairs)*2)
createdAt := time.Now().UTC()

for i, pair := range pairs {
room := &Room{
ID: uuid.NewString(),
Players: []string{pair[0].PlayerID, pair[1].PlayerID},
Ranks: map[string]int{
pair[0].PlayerID: pair[0].Rank,
pair[1].PlayerID: pair[1].Rank,
},
Status: "waiting",
CreatedAt: createdAt,
}
rooms[i] = room
dequeue = append(dequeue, room.Players[0], room.Players[1])
}
if err := s.queue.RemoveBatch(ctx, room.Players); err != nil {
tracing.RecordError(span, err)
s.log.Error("failed to dequeue matched players", zap.Error(err))
return

// 1) Persist all rooms in one pipeline.
if err := s.rooms.SaveMany(ctx, rooms); err != nil {
return err
}
s.m.MatchesCreated.Inc()

e, err := events.New(events.TypeMatchFound, events.MatchFoundPayload{
RoomID: room.ID,
Players: room.Players,
})
if err == nil {
err = s.pub.Publish(ctx, events.TopicMatchmaking, e)
// 2) Dequeue all matched players in one pipeline. If this fails the rooms are
// already saved (ghost rooms, reclaimed by TTL); the still-queued players are
// re-paired next tick — idempotent, matching the old per-room behaviour.
if err := s.queue.RemoveBatch(ctx, dequeue); err != nil {
return err
}
if err != nil {
tracing.RecordError(span, err)
s.log.Warn("failed to publish MatchFound", zap.Error(err), zap.String("room_id", room.ID))

// 3) Metrics + events. Count once for the whole batch (one atomic Add instead
// of N Inc), then publish per room since each match is a distinct downstream
// notification.
s.m.MatchesCreated.Add(float64(len(rooms)))
for _, room := range rooms {
e, err := events.New(events.TypeMatchFound, events.MatchFoundPayload{
RoomID: room.ID,
Players: room.Players,
})
if err != nil {
s.log.Warn("failed to build MatchFound", zap.Error(err), zap.String("room_id", room.ID))
continue
}
if err := s.pub.Publish(ctx, events.TopicMatchmaking, e); err != nil {
s.log.Warn("failed to publish MatchFound", zap.Error(err), zap.String("room_id", room.ID))
}
}
s.log.Info("match created",
append([]zap.Field{
zap.String("room_id", room.ID),
zap.Strings("players", room.Players),
}, tracing.LogFields(ctx)...)...)
return nil
}
38 changes: 38 additions & 0 deletions internal/matchmaking/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,44 @@ func TestMatchOnceCreatesRoomAndPublishes(t *testing.T) {
assert.ElementsMatch(t, []string{"alice", "bob"}, room.Players)
}

func TestMatchOnceBatchesMultipleRooms(t *testing.T) {
// Exercises the batched createRooms path: several pairs matched in one tick
// must all be saved, dequeued and published.
svc, pub := newTestService(t)
ctx := context.Background()

// Six players, tightly ranked → three pairs in one tick.
for i, id := range []string{"p1", "p2", "p3", "p4", "p5", "p6"} {
require.NoError(t, svc.JoinQueue(ctx, id, 1000+i))
}

matches, err := svc.MatchOnce(ctx)
require.NoError(t, err)
assert.Equal(t, 3, matches)

// All six players dequeued.
_, size, err := svc.QueueStatus(ctx, "p1")
require.NoError(t, err)
assert.Equal(t, int64(0), size)

// Exactly three MatchFound events, each with a fetchable room.
require.Len(t, pub.events, 3)
seen := map[string]bool{}
for _, e := range pub.events {
assert.Equal(t, events.TypeMatchFound, e.Type)
var payload events.MatchFoundPayload
require.NoError(t, json.Unmarshal(e.Payload, &payload))
room, err := svc.GetRoom(ctx, payload.RoomID)
require.NoError(t, err)
assert.Equal(t, "waiting", room.Status)
for _, p := range payload.Players {
assert.Falsef(t, seen[p], "player %s matched twice", p)
seen[p] = true
}
}
assert.Len(t, seen, 6)
}

func TestMatchOnceRespectsRankWindow(t *testing.T) {
svc, pub := newTestService(t)
ctx := context.Background()
Expand Down
23 changes: 23 additions & 0 deletions internal/presence/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (h *Handler) RegisterRoutes(r *gin.Engine) {
r.POST("/presence/connect", h.connect)
r.POST("/presence/disconnect", h.disconnect)
r.POST("/presence/heartbeat", h.heartbeat)
r.POST("/presence/heartbeat/bulk", h.heartbeatBulk)
}

type playerRequest struct {
Expand All @@ -49,6 +50,12 @@ type friendsRequest struct {
IDs []string `json:"ids" binding:"required"`
}

type bulkHeartbeatRequest struct {
// IDs are the players a WS replica currently holds connections for. POST so a
// large batch is not constrained by URL length limits.
IDs []string `json:"ids" binding:"required"`
}

func (h *Handler) get(c *gin.Context) {
id := c.Param("id")
if id == "" {
Expand Down Expand Up @@ -99,6 +106,22 @@ func (h *Handler) connect(c *gin.Context) { h.lifecycle(c, h.svc.Connect) }
func (h *Handler) disconnect(c *gin.Context) { h.lifecycle(c, h.svc.Disconnect) }
func (h *Handler) heartbeat(c *gin.Context) { h.lifecycle(c, h.svc.Heartbeat) }

// heartbeatBulk refreshes presence for a whole batch of players in one call —
// the bulk fast-path that lets a WS replica heartbeat all its players with a
// single request instead of one per player.
func (h *Handler) heartbeatBulk(c *gin.Context) {
var req bulkHeartbeatRequest
if err := c.ShouldBindJSON(&req); err != nil {
httpx.Error(c, http.StatusBadRequest, "invalid request: "+err.Error())
return
}
if err := h.svc.HeartbeatMany(c.Request.Context(), req.IDs); err != nil {
httpx.Error(c, http.StatusInternalServerError, "failed to refresh presence")
return
}
httpx.OK(c, gin.H{"refreshed": len(req.IDs)})
}

// lifecycle is the shared body for connect/disconnect/heartbeat: bind the
// player id, run the op, return the resulting record. All three ops share the
// same (ctx, playerID) -> (Record, error) shape.
Expand Down
24 changes: 24 additions & 0 deletions internal/presence/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type Notifier interface {
Connect(ctx context.Context, playerID string) error
Disconnect(ctx context.Context, playerID string) error
Heartbeat(ctx context.Context, playerID string) error
// HeartbeatMany refreshes presence for many players in one call — the bulk
// fast-path for the WS replica's heartbeat ticker (one request, not N).
HeartbeatMany(ctx context.Context, playerIDs []string) error
}

// HTTPNotifier calls the Presence Service over HTTP. It propagates the W3C trace
Expand Down Expand Up @@ -58,11 +61,29 @@ func (n *HTTPNotifier) Heartbeat(ctx context.Context, playerID string) error {
return n.post(ctx, "/presence/heartbeat", playerID)
}

// HeartbeatMany refreshes presence for a batch of players in a single HTTP call.
func (n *HTTPNotifier) HeartbeatMany(ctx context.Context, playerIDs []string) error {
if len(playerIDs) == 0 {
return nil
}
body, err := json.Marshal(bulkHeartbeatRequest{IDs: playerIDs})
if err != nil {
return err
}
return n.postBody(ctx, "/presence/heartbeat/bulk", body)
}

func (n *HTTPNotifier) post(ctx context.Context, path, playerID string) error {
body, err := json.Marshal(playerRequest{PlayerID: playerID})
if err != nil {
return err
}
return n.postBody(ctx, path, body)
}

// postBody POSTs a raw JSON body with trace propagation. Shared by the single
// and bulk notifier calls so the request/trace plumbing lives in one place.
func (n *HTTPNotifier) postBody(ctx context.Context, path string, body []byte) error {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, n.baseURL+path, bytes.NewReader(body))
if err != nil {
return err
Expand Down Expand Up @@ -98,3 +119,6 @@ func (NoopNotifier) Disconnect(context.Context, string) error { return nil }

// Heartbeat does nothing and returns nil.
func (NoopNotifier) Heartbeat(context.Context, string) error { return nil }

// HeartbeatMany does nothing and returns nil.
func (NoopNotifier) HeartbeatMany(context.Context, []string) error { return nil }
Loading
Loading