Skip to content

Refactor/mqtt topics#14

Open
Ryan-A-B wants to merge 7 commits intomasterfrom
refactor/mqtt-topics
Open

Refactor/mqtt topics#14
Ryan-A-B wants to merge 7 commits intomasterfrom
refactor/mqtt-topics

Conversation

@Ryan-A-B
Copy link
Copy Markdown
Owner

No description provided.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors the backend’s MQTT topic/payload contracts for connection status and signalling, and adds supporting backend state machines to keep session lifecycle consistent during the WebSocket → MQTT migration.

Changes:

  • Switch connection status topics to accounts/{account_id}/clients/{client_id}/status with simplified connected/disconnected payloads (incl. reason, at_millis).
  • Introduce MQTT-side sync handlers (baby_stations, parent_stations, webrtc_inbox) plus in-memory helpers (ConnectionHub, PendingSessionStarts) to bridge legacy clients.
  • Add a sessionstartdecider to make session start ingestion idempotent and to support restarting sessions on reused connection IDs.

Reviewed changes

Copilot reviewed 32 out of 32 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
golang/internal/sessionstartdecider/Decider.go New decider to dedupe/sequence session.started vs session.ended based on event log history.
golang/internal/sessionstartdecider/Decider_test.go Unit tests for restart/late-start behaviors around session.ended.
golang/internal/sessionlist/SessionList.go Adds Get/GetByConnectionID helpers and removes request-id gating.
golang/internal/sessionlist/Session.go Removes RequestID from connection state.
golang/internal/connectionstoresync/Sync.go Updates status topic parsing/handling, and emits session.ended on clean disconnect when a session is active.
golang/internal/connectionstore/Decider.go Updates connection identity/event payloads (removes request_id, adds at_millis + disconnect reason).
golang/internal/connectionstore/Decider_test.go Updates tests and adds reconnect-after-disconnect scenario.
golang/internal/connections/Event.go Updates event schemas to match the new status payload shape.
golang/internal/babystationlist/BabyStationList.go Makes snapshot listing resilient to missing session mappings; adds session_id to output and replaces sessions on same connection.
golang/internal/babystationlist/BabyStationList_test.go Updates tests and adds coverage for session replacement + end-while-connected behavior.
golang/cmd/monitor-connection-status/main.go Updates subscribed status wildcard topic.
golang/cmd/backend/UsageStats.go Treats “clean” disconnect reason as session end for duration accounting.
golang/cmd/backend/UsageStats_test.go Updates disconnect test payloads to match the new schema.
golang/cmd/backend/sessions.go Changes session start to publish MQTT baby-station announcements (and append session.ended on delete).
golang/cmd/backend/sessions_test.go Adds tests for input parsing and delete/restart flows with announcements + event stream expectations.
golang/cmd/backend/server.go Wires new components: SessionStartDecider, PendingSessionStarts, ConnectionHub, and runs MQTT sync loop.
golang/cmd/backend/PendingSessionStarts.go New per-account LRU queue for session starts waiting on client-id resolution.
golang/cmd/backend/PendingSessionStarts_test.go Tests for LRU eviction, take, and per-account isolation.
golang/cmd/backend/MQTTSync.go New MQTT subscribers for baby_stations, parent_stations, and webrtc_inbox.
golang/cmd/backend/ConnectionHub.go New in-memory map of active websocket connections by connection_id.
golang/cmd/backend/Connection.go Publishes new status payloads, routes signalling via MQTT inbox topic, and adds a write mutex for websocket writes.
docker-compose.yml Updates Traefik image version.
AGENTS.md Adds agent ownership/scope rules and docs workspace constraints.
agent_docs/Working Agreement with Human.md Captures migration constraint/decision about resolving client_id at session-create time.
agent_docs/structure.md Adds repo map for signalling source-of-truth files and runtime areas.
agent_docs/Signalling Migration.md Documents current migration state, direction, and risks.
agent_docs/Product and Privacy Model.md Captures privacy constraints and operational realities (PWA version skew).
agent_docs/Open Questions.md Tracks deprecation/compatibility questions for the migration.
agent_docs/index.md Adds vault index and navigation links.
agent_docs/Frontend Service Architecture.md Documents frontend service-oriented architectural conventions.
agent_docs/Backend Signalling Contracts.md Documents current backend endpoints/topics/payloads for signalling and discovery.
agent_docs/architecture.md Adds high-level architecture summary aligned with signalling migration.

Comment on lines 84 to +109
ctx := request.Context()
vars := mux.Vars(request)
sessionID := vars["session_id"]
var session StartSessionEventData
err = json.NewDecoder(request.Body).Decode(&session)
accountID := contextx.GetAccountID(ctx)
sessionID := mux.Vars(request)["session_id"]
requestBody, err := io.ReadAll(request.Body)
if err != nil {
err = merry.WithHTTPCode(err, http.StatusBadRequest)
return
}
err = session.validate()
input, err := parseStartSessionInput(requestBody, sessionID)
if err != nil {
return
}
if session.ID != sessionID {
err = merry.Errorf("session id in path does not match session id in body").WithHTTPCode(http.StatusBadRequest)
if input.ClientID == "" {
connection, ok := handlers.ConnectionHub.Get(input.ConnectionID)
if ok && connection.AccountID == accountID {
input.ClientID = connection.ClientID
}
}
if input.ClientID == "" {
handlers.PendingSessionStarts.Put(accountID, input)
return
}
_, err = handlers.EventLog.Append(ctx, eventlog.AppendInput{
Type: EventTypeSessionStarted,
AccountID: contextx.GetAccountID(ctx),
Data: fatal.UnlessMarshalJSON(session),
})
if err != nil {
if err = handlers.publishBabyStationAnnouncement(accountID, input); err != nil {
return
}
// TODO set header with logical clock of the start event
}
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StartSession no longer appends a session.started event to the event log; it only publishes an MQTT announcement and returns. If the client calls DELETE quickly (or the connection clean-disconnect path appends session.ended) before the backend observes its own MQTT publish, sessionstartdecider will treat the later announcement as a duplicate and drop it, leaving the event log with session.ended but no session.started for that session. Consider appending session.started synchronously (or via SessionStartDecider) before returning, and treat the later MQTT-delivered announcement as idempotent/duplicate.

Copilot uses AI. Check for mistakes.
Comment on lines +85 to 90
accountID := contextx.GetAccountID(ctx)
sessionID := mux.Vars(request)["session_id"]
requestBody, err := io.ReadAll(request.Body)
if err != nil {
err = merry.WithHTTPCode(err, http.StatusBadRequest)
return
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io.ReadAll(request.Body) reads the entire request body without any size limit. This can allow memory exhaustion / DoS if a client sends a very large payload. Consider wrapping the body with http.MaxBytesReader (or otherwise enforcing a reasonable max size) before reading/parsing.

Copilot uses AI. Check for mistakes.
Comment on lines +122 to +127
return CreateSessionInput{
SessionID: legacyInput.ID,
ClientID: "",
ConnectionID: legacyInput.HostConnectionID,
Name: legacyInput.Name,
StartedAtMillis: legacyInput.StartedAt.UnixMilli(),
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseStartSessionInput converts started_at to started_at_millis, but the legacy payload validation doesn’t enforce that started_at is present/non-zero. A zero started_at would produce started_at_millis == 0 and propagate an invalid timestamp into MQTT/session processing. Consider validating started_at (or StartedAtMillis > 0) before accepting the request.

Suggested change
return CreateSessionInput{
SessionID: legacyInput.ID,
ClientID: "",
ConnectionID: legacyInput.HostConnectionID,
Name: legacyInput.Name,
StartedAtMillis: legacyInput.StartedAt.UnixMilli(),
startedAtMillis := legacyInput.StartedAt.UnixMilli()
if legacyInput.StartedAt.IsZero() || startedAtMillis <= 0 {
return CreateSessionInput{}, merry.Errorf("started_at must be present and greater than zero").WithHTTPCode(http.StatusBadRequest)
}
return CreateSessionInput{
SessionID: legacyInput.ID,
ClientID: "",
ConnectionID: legacyInput.HostConnectionID,
Name: legacyInput.Name,
StartedAtMillis: startedAtMillis,

Copilot uses AI. Check for mistakes.
Comment on lines 47 to 71
func handleMessage(client mqtt.Client, message mqtt.Message) {
ctx := context.Background()
var frame connections.MessageFrame
err := json.Unmarshal(message.Payload(), &frame)
var status statusMessage
err := json.Unmarshal(message.Payload(), &status)
if err != nil {
logx.Warnln(err)
return
}
var accountID string
var connectionID string
var clientID string
matches := topicRegex.FindStringSubmatch(message.Topic())
fatal.Unless(len(matches) == 3, "failed to parse topic: "+message.Topic())
accountID, connectionID = matches[1], matches[2]
handle, ok := handlers[frame.Type]
accountID, clientID = matches[1], matches[2]
handle, ok := handlers[status.Type]
if !ok {
logx.Warnln("unhandled message type:", frame.Type)
logx.Warnln("unhandled message type:", status.Type)
return
}
handle(ctx, handlerInput{
AccountID: accountID,
ConnectionID: connectionID,
Frame: frame,
AccountID: accountID,
ClientID: clientID,
Status: status,
})
message.Ack()
}
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleMessage only calls message.Ack() on the success path. On JSON/topic parse errors or unknown types it returns without acking, which can cause repeated redelivery of permanently-invalid messages and increased load. Other MQTT handlers in this repo use defer message.Ack(); consider doing the same here (and log errors instead of retrying forever).

Copilot uses AI. Check for mistakes.
Comment on lines +121 to +138
func (handlers *Handlers) handleWebRTCInboxMessage(client mqtt.Client, message mqtt.Message) {
defer message.Ack()
if !clientWebRTCInboxTopicRegex.MatchString(message.Topic()) {
logx.Warnln("invalid webrtc inbox topic:", message.Topic())
return
}
var inbox webrtcInboxMessage
if err := json.Unmarshal(message.Payload(), &inbox); err != nil {
logx.Warnln(err)
return
}
if inbox.ConnectionID == "" {
return
}
connection, ok := handlers.ConnectionHub.Get(inbox.ConnectionID)
if !ok {
return
}
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleWebRTCInboxMessage routes purely by inbox.connection_id and does not verify that the MQTT topic account/client IDs match the websocket connection’s AccountID (or that the sender is authorized). If a connection_id ever collides or an attacker can publish to MQTT, this can inject signalling messages into another account’s websocket. Consider parsing {account_id, client_id} from the topic and rejecting messages where they don’t match the target connection (and/or include/validate the expected client/account in the payload).

Copilot uses AI. Check for mistakes.
Comment on lines +365 to +369
func (connection *Connection) WriteJSON(value any) error {
connection.writeMu.Lock()
defer connection.writeMu.Unlock()
return connection.conn.WriteJSON(value)
}
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WriteJSON adds a write mutex, but sendPing (keepalive) still uses conn.WriteControl(...) without that mutex. Gorilla WebSocket requires all writes (including control frames) to be serialized; as-is, keepalive pings can still race with WriteJSON and cause concurrent write to websocket connection panics. Consider also guarding WriteControl with the same mutex (or funnel all writes through a single writer).

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants