Conversation
There was a problem hiding this comment.
Pull request overview
Refactors the backend’s MQTT topic/payload contracts for connection status and signalling, and adds supporting backend state machines to keep session lifecycle consistent during the WebSocket → MQTT migration.
Changes:
- Switch connection status topics to
accounts/{account_id}/clients/{client_id}/statuswith simplifiedconnected/disconnectedpayloads (incl.reason,at_millis). - Introduce MQTT-side sync handlers (
baby_stations,parent_stations,webrtc_inbox) plus in-memory helpers (ConnectionHub,PendingSessionStarts) to bridge legacy clients. - Add a
sessionstartdeciderto make session start ingestion idempotent and to support restarting sessions on reused connection IDs.
Reviewed changes
Copilot reviewed 32 out of 32 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| golang/internal/sessionstartdecider/Decider.go | New decider to dedupe/sequence session.started vs session.ended based on event log history. |
| golang/internal/sessionstartdecider/Decider_test.go | Unit tests for restart/late-start behaviors around session.ended. |
| golang/internal/sessionlist/SessionList.go | Adds Get/GetByConnectionID helpers and removes request-id gating. |
| golang/internal/sessionlist/Session.go | Removes RequestID from connection state. |
| golang/internal/connectionstoresync/Sync.go | Updates status topic parsing/handling, and emits session.ended on clean disconnect when a session is active. |
| golang/internal/connectionstore/Decider.go | Updates connection identity/event payloads (removes request_id, adds at_millis + disconnect reason). |
| golang/internal/connectionstore/Decider_test.go | Updates tests and adds reconnect-after-disconnect scenario. |
| golang/internal/connections/Event.go | Updates event schemas to match the new status payload shape. |
| golang/internal/babystationlist/BabyStationList.go | Makes snapshot listing resilient to missing session mappings; adds session_id to output and replaces sessions on same connection. |
| golang/internal/babystationlist/BabyStationList_test.go | Updates tests and adds coverage for session replacement + end-while-connected behavior. |
| golang/cmd/monitor-connection-status/main.go | Updates subscribed status wildcard topic. |
| golang/cmd/backend/UsageStats.go | Treats “clean” disconnect reason as session end for duration accounting. |
| golang/cmd/backend/UsageStats_test.go | Updates disconnect test payloads to match the new schema. |
| golang/cmd/backend/sessions.go | Changes session start to publish MQTT baby-station announcements (and append session.ended on delete). |
| golang/cmd/backend/sessions_test.go | Adds tests for input parsing and delete/restart flows with announcements + event stream expectations. |
| golang/cmd/backend/server.go | Wires new components: SessionStartDecider, PendingSessionStarts, ConnectionHub, and runs MQTT sync loop. |
| golang/cmd/backend/PendingSessionStarts.go | New per-account LRU queue for session starts waiting on client-id resolution. |
| golang/cmd/backend/PendingSessionStarts_test.go | Tests for LRU eviction, take, and per-account isolation. |
| golang/cmd/backend/MQTTSync.go | New MQTT subscribers for baby_stations, parent_stations, and webrtc_inbox. |
| golang/cmd/backend/ConnectionHub.go | New in-memory map of active websocket connections by connection_id. |
| golang/cmd/backend/Connection.go | Publishes new status payloads, routes signalling via MQTT inbox topic, and adds a write mutex for websocket writes. |
| docker-compose.yml | Updates Traefik image version. |
| AGENTS.md | Adds agent ownership/scope rules and docs workspace constraints. |
| agent_docs/Working Agreement with Human.md | Captures migration constraint/decision about resolving client_id at session-create time. |
| agent_docs/structure.md | Adds repo map for signalling source-of-truth files and runtime areas. |
| agent_docs/Signalling Migration.md | Documents current migration state, direction, and risks. |
| agent_docs/Product and Privacy Model.md | Captures privacy constraints and operational realities (PWA version skew). |
| agent_docs/Open Questions.md | Tracks deprecation/compatibility questions for the migration. |
| agent_docs/index.md | Adds vault index and navigation links. |
| agent_docs/Frontend Service Architecture.md | Documents frontend service-oriented architectural conventions. |
| agent_docs/Backend Signalling Contracts.md | Documents current backend endpoints/topics/payloads for signalling and discovery. |
| agent_docs/architecture.md | Adds high-level architecture summary aligned with signalling migration. |
| ctx := request.Context() | ||
| vars := mux.Vars(request) | ||
| sessionID := vars["session_id"] | ||
| var session StartSessionEventData | ||
| err = json.NewDecoder(request.Body).Decode(&session) | ||
| accountID := contextx.GetAccountID(ctx) | ||
| sessionID := mux.Vars(request)["session_id"] | ||
| requestBody, err := io.ReadAll(request.Body) | ||
| if err != nil { | ||
| err = merry.WithHTTPCode(err, http.StatusBadRequest) | ||
| return | ||
| } | ||
| err = session.validate() | ||
| input, err := parseStartSessionInput(requestBody, sessionID) | ||
| if err != nil { | ||
| return | ||
| } | ||
| if session.ID != sessionID { | ||
| err = merry.Errorf("session id in path does not match session id in body").WithHTTPCode(http.StatusBadRequest) | ||
| if input.ClientID == "" { | ||
| connection, ok := handlers.ConnectionHub.Get(input.ConnectionID) | ||
| if ok && connection.AccountID == accountID { | ||
| input.ClientID = connection.ClientID | ||
| } | ||
| } | ||
| if input.ClientID == "" { | ||
| handlers.PendingSessionStarts.Put(accountID, input) | ||
| return | ||
| } | ||
| _, err = handlers.EventLog.Append(ctx, eventlog.AppendInput{ | ||
| Type: EventTypeSessionStarted, | ||
| AccountID: contextx.GetAccountID(ctx), | ||
| Data: fatal.UnlessMarshalJSON(session), | ||
| }) | ||
| if err != nil { | ||
| if err = handlers.publishBabyStationAnnouncement(accountID, input); err != nil { | ||
| return | ||
| } | ||
| // TODO set header with logical clock of the start event | ||
| } |
There was a problem hiding this comment.
StartSession no longer appends a session.started event to the event log; it only publishes an MQTT announcement and returns. If the client calls DELETE quickly (or the connection clean-disconnect path appends session.ended) before the backend observes its own MQTT publish, sessionstartdecider will treat the later announcement as a duplicate and drop it, leaving the event log with session.ended but no session.started for that session. Consider appending session.started synchronously (or via SessionStartDecider) before returning, and treat the later MQTT-delivered announcement as idempotent/duplicate.
| accountID := contextx.GetAccountID(ctx) | ||
| sessionID := mux.Vars(request)["session_id"] | ||
| requestBody, err := io.ReadAll(request.Body) | ||
| if err != nil { | ||
| err = merry.WithHTTPCode(err, http.StatusBadRequest) | ||
| return |
There was a problem hiding this comment.
io.ReadAll(request.Body) reads the entire request body without any size limit. This can allow memory exhaustion / DoS if a client sends a very large payload. Consider wrapping the body with http.MaxBytesReader (or otherwise enforcing a reasonable max size) before reading/parsing.
| return CreateSessionInput{ | ||
| SessionID: legacyInput.ID, | ||
| ClientID: "", | ||
| ConnectionID: legacyInput.HostConnectionID, | ||
| Name: legacyInput.Name, | ||
| StartedAtMillis: legacyInput.StartedAt.UnixMilli(), |
There was a problem hiding this comment.
parseStartSessionInput converts started_at to started_at_millis, but the legacy payload validation doesn’t enforce that started_at is present/non-zero. A zero started_at would produce started_at_millis == 0 and propagate an invalid timestamp into MQTT/session processing. Consider validating started_at (or StartedAtMillis > 0) before accepting the request.
| return CreateSessionInput{ | |
| SessionID: legacyInput.ID, | |
| ClientID: "", | |
| ConnectionID: legacyInput.HostConnectionID, | |
| Name: legacyInput.Name, | |
| StartedAtMillis: legacyInput.StartedAt.UnixMilli(), | |
| startedAtMillis := legacyInput.StartedAt.UnixMilli() | |
| if legacyInput.StartedAt.IsZero() || startedAtMillis <= 0 { | |
| return CreateSessionInput{}, merry.Errorf("started_at must be present and greater than zero").WithHTTPCode(http.StatusBadRequest) | |
| } | |
| return CreateSessionInput{ | |
| SessionID: legacyInput.ID, | |
| ClientID: "", | |
| ConnectionID: legacyInput.HostConnectionID, | |
| Name: legacyInput.Name, | |
| StartedAtMillis: startedAtMillis, |
| func handleMessage(client mqtt.Client, message mqtt.Message) { | ||
| ctx := context.Background() | ||
| var frame connections.MessageFrame | ||
| err := json.Unmarshal(message.Payload(), &frame) | ||
| var status statusMessage | ||
| err := json.Unmarshal(message.Payload(), &status) | ||
| if err != nil { | ||
| logx.Warnln(err) | ||
| return | ||
| } | ||
| var accountID string | ||
| var connectionID string | ||
| var clientID string | ||
| matches := topicRegex.FindStringSubmatch(message.Topic()) | ||
| fatal.Unless(len(matches) == 3, "failed to parse topic: "+message.Topic()) | ||
| accountID, connectionID = matches[1], matches[2] | ||
| handle, ok := handlers[frame.Type] | ||
| accountID, clientID = matches[1], matches[2] | ||
| handle, ok := handlers[status.Type] | ||
| if !ok { | ||
| logx.Warnln("unhandled message type:", frame.Type) | ||
| logx.Warnln("unhandled message type:", status.Type) | ||
| return | ||
| } | ||
| handle(ctx, handlerInput{ | ||
| AccountID: accountID, | ||
| ConnectionID: connectionID, | ||
| Frame: frame, | ||
| AccountID: accountID, | ||
| ClientID: clientID, | ||
| Status: status, | ||
| }) | ||
| message.Ack() | ||
| } |
There was a problem hiding this comment.
handleMessage only calls message.Ack() on the success path. On JSON/topic parse errors or unknown types it returns without acking, which can cause repeated redelivery of permanently-invalid messages and increased load. Other MQTT handlers in this repo use defer message.Ack(); consider doing the same here (and log errors instead of retrying forever).
| func (handlers *Handlers) handleWebRTCInboxMessage(client mqtt.Client, message mqtt.Message) { | ||
| defer message.Ack() | ||
| if !clientWebRTCInboxTopicRegex.MatchString(message.Topic()) { | ||
| logx.Warnln("invalid webrtc inbox topic:", message.Topic()) | ||
| return | ||
| } | ||
| var inbox webrtcInboxMessage | ||
| if err := json.Unmarshal(message.Payload(), &inbox); err != nil { | ||
| logx.Warnln(err) | ||
| return | ||
| } | ||
| if inbox.ConnectionID == "" { | ||
| return | ||
| } | ||
| connection, ok := handlers.ConnectionHub.Get(inbox.ConnectionID) | ||
| if !ok { | ||
| return | ||
| } |
There was a problem hiding this comment.
handleWebRTCInboxMessage routes purely by inbox.connection_id and does not verify that the MQTT topic account/client IDs match the websocket connection’s AccountID (or that the sender is authorized). If a connection_id ever collides or an attacker can publish to MQTT, this can inject signalling messages into another account’s websocket. Consider parsing {account_id, client_id} from the topic and rejecting messages where they don’t match the target connection (and/or include/validate the expected client/account in the payload).
| func (connection *Connection) WriteJSON(value any) error { | ||
| connection.writeMu.Lock() | ||
| defer connection.writeMu.Unlock() | ||
| return connection.conn.WriteJSON(value) | ||
| } |
There was a problem hiding this comment.
WriteJSON adds a write mutex, but sendPing (keepalive) still uses conn.WriteControl(...) without that mutex. Gorilla WebSocket requires all writes (including control frames) to be serialized; as-is, keepalive pings can still race with WriteJSON and cause concurrent write to websocket connection panics. Consider also guarding WriteControl with the same mutex (or funnel all writes through a single writer).
No description provided.