Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a new MQTT-based synchronization layer in the Go backend (client status, baby/parent station announcements, WebRTC inbox routing) and adds Python integration tests/utilities to validate the MQTT + HTTP/WebSocket flows end-to-end.
Changes:
- Add
backendmqttpackage to define MQTT topics/payloads and run background sync loops that translate MQTT messages into eventlog updates and control inbox publications. - Refactor backend connection/session flows to publish/consume the new MQTT topics and keep in-memory registries for connection/session coordination.
- Add Python integration tests + helpers for HTTP/WebSocket flows and MQTT publish/subscribe testing.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| integration_tests/src/tests/test_backend_mqtt.py | Adds integration tests for MQTT-driven client status, station announcements, and WebRTC inbox delivery. |
| integration_tests/src/tests/test_backend_http_api.py | Adds integration tests for session start/end and websocket disconnect semantics via the HTTP API. |
| integration_tests/src/settings.py | Adds env-configurable API base URLs for HTTP and WebSocket clients used by integration tests. |
| integration_tests/src/mqtt_test_utils.py | Adds MQTT publish/subscribe helpers for integration tests. |
| integration_tests/src/backend_api_utils.py | Adds a Python client + Trio websocket helpers used by integration tests. |
| integration_tests/requirements.txt | Adds paho-mqtt dependency for MQTT-based integration tests. |
| golang/internal/sessionlist/SessionList.go | Enhances session projection to track active connections (request_id/since) for session host connection state. |
| golang/internal/backendmqtt/Topics.go | Defines centralized topic format helpers for backend MQTT messaging. |
| golang/internal/backendmqtt/Sync.go | Adds MQTT sync loops: client status sync, baby station announcement -> eventlog, parent station announcement -> control inbox messages. |
| golang/internal/backendmqtt/State.go | Adds in-memory registries for active connections and pending session starts. |
| golang/internal/backendmqtt/Payloads.go | Defines MQTT payload schemas and helper constructors for client status, announcements, and WebRTC inbox messages. |
| golang/cmd/backend/sessions.go | Refactors StartSession to coordinate session announcements via MQTT/pending state rather than directly appending session.started. |
| golang/cmd/backend/server.go | Wires up the new backendmqtt sync goroutines and shared registries in the backend server startup. |
| golang/cmd/backend/Connection.go | Refactors connection handling to publish client status via MQTT, route WebRTC signals through per-client inbox topics, and consult the connection registry. |
| pythonpath_parts = ["/tmp/bb-it-pydeps", "/home/ryan/workspace-2/repositories/beddybytes/integration_tests/src"] | ||
| if env.get("PYTHONPATH"): | ||
| pythonpath_parts.append(env["PYTHONPATH"]) | ||
| env["PYTHONPATH"] = ":".join(pythonpath_parts) |
There was a problem hiding this comment.
spawn_connection_websocket_process hard-codes a developer-specific path in PYTHONPATH (/home/ryan/workspace-2/...). This will break for other developers and in CI. Prefer computing the path relative to this file/repo (e.g., via __file__), or rely on the existing environment/packaging instead of embedding absolute paths.
| pythonpath_parts = ["/tmp/bb-it-pydeps", "/home/ryan/workspace-2/repositories/beddybytes/integration_tests/src"] | |
| if env.get("PYTHONPATH"): | |
| pythonpath_parts.append(env["PYTHONPATH"]) | |
| env["PYTHONPATH"] = ":".join(pythonpath_parts) | |
| integration_tests_src_path = os.path.dirname(os.path.abspath(__file__)) | |
| pythonpath_parts = ["/tmp/bb-it-pydeps", integration_tests_src_path] | |
| if env.get("PYTHONPATH"): | |
| pythonpath_parts.append(env["PYTHONPATH"]) | |
| env["PYTHONPATH"] = os.pathsep.join(pythonpath_parts) |
| def __enter__(self): | ||
| self.client.connect("mosquitto.beddybytes.local", 443, 60) | ||
| self.client.loop_start() |
There was a problem hiding this comment.
MQTT broker host/port are hard-coded (mosquitto.beddybytes.local:443). This makes the integration tests/environment brittle and inconsistent with the new API_BASE_URL/API_WEBSOCKET_BASE_URL configuration pattern. Consider reading broker host/port (and websocket path, if applicable) from settings.py / env vars so tests can run in different environments.
| AnnouncementType = "announcment" | ||
| ) | ||
|
|
There was a problem hiding this comment.
The MQTT payload/schema uses the misspelling "announcment" (and related Go identifiers like Announcment). Since this becomes part of the on-the-wire contract, it’s hard to change later and easy for clients to misuse. Consider correcting to "announcement" now, or supporting both spellings for backward compatibility and documenting the canonical name.
| AnnouncementType = "announcment" | |
| ) | |
| AnnouncementType = "announcement" | |
| // LegacyAnnouncementType is kept for backward compatibility with older clients | |
| // that still use the misspelled on-the-wire value. | |
| LegacyAnnouncementType = "announcment" | |
| ) | |
| func IsAnnouncementType(value string) bool { | |
| return value == AnnouncementType || value == LegacyAnnouncementType | |
| } |
| events = client.wait_for_matching_events( | ||
| access_token, | ||
| count=2, | ||
| predicate=lambda event: event["type"] in {"session.started", "client.connected"}, | ||
| from_cursor=0, |
There was a problem hiding this comment.
This Trio task calls the synchronous client.wait_for_matching_events(...) (requests / streaming) directly. Blocking I/O inside Trio async code can stall the event loop and make the websocket nursery task flaky (e.g., missed pings/timeouts). Consider running the blocking call via trio.to_thread.run_sync(...), or providing an async event-wait helper.
| connection, ok := handlers.ConnectionRegistry.GetByConnectionID(accountID, session.HostConnectionID) | ||
| if ok { | ||
| pending, ok := handlers.PendingSessionStarts.Take(accountID, session.HostConnectionID) | ||
| if ok { | ||
| err = backendmqtt.PublishBabyStationAnnouncement(handlers.MQTTClient, accountID, backendmqtt.BabyStationsPayload{ |
There was a problem hiding this comment.
PendingSessionStarts.Take() removes the pending item before the MQTT publish happens. If PublishBabyStationAnnouncement fails, the pending session start is lost and the session may never be announced/started later. Consider only deleting after a successful publish, or re-inserting the pending item on publish failure.
| HostConnectionStateBase: HostConnectionStateBase{ | ||
| State: ConnectionStateConnected, | ||
| Since: event.UnixTimestamp, | ||
| }, | ||
| RequestID: "TODO", |
There was a problem hiding this comment.
RequestID: "TODO" is used as a fallback when the active connection info isn’t available. This placeholder can leak into the /sessions response (host_connection_state.request_id) and can break disconnect correlation (which relies on matching request IDs). Prefer storing the request_id in the session.started event, or representing the state as disconnected/unknown until a client.connected event arrives.
| def wait_for_messages(self, count, predicate=None, timeout=5): | ||
| messages = [] | ||
| while len(messages) < count: | ||
| message = self.messages.get(timeout=timeout) |
There was a problem hiding this comment.
wait_for_messages may raise queue.Empty directly on timeout, which makes failures harder to read than an assertion. Consider catching queue.Empty and raising an AssertionError that reports how many messages were received and what topics/predicate were expected.
| message = self.messages.get(timeout=timeout) | |
| try: | |
| message = self.messages.get(timeout=timeout) | |
| except queue.Empty as exc: | |
| predicate_description = repr(predicate) if predicate is not None else "None" | |
| raise AssertionError( | |
| f"timed out waiting for {count} matching MQTT messages; " | |
| f"received {len(messages)} matching messages; " | |
| f"subscribed topics={self.topics}; " | |
| f"predicate={predicate_description}" | |
| ) from exc |
| announcement_payload = message_by_topic[baby_stations_topic] | ||
| self.assertEqual(announcement_payload["type"], "announcment") | ||
| self.assertIn("at_millis", announcement_payload) | ||
| self.assertEqual(announcement_payload["announcment"]["client_id"], client_id) | ||
| self.assertEqual(announcement_payload["announcment"]["connection_id"], connection_id) |
There was a problem hiding this comment.
Test asserts/publishes the misspelled MQTT field/value "announcment". If the intent is actually "announcement", it’s better to fix the wire-format now (and update tests accordingly) or explicitly test backward-compat behavior if both spellings are supported.
| messages = subscription.wait_for_messages( | ||
| 2, | ||
| predicate=lambda message: message["topic"] in {status_topic, baby_stations_topic}, | ||
| ) | ||
|
|
||
| release_event.set() | ||
| return messages | ||
|
|
There was a problem hiding this comment.
subscription.wait_for_messages(...) is a blocking call (queue.get) executed inside a Trio async function. This can block the Trio scheduler and make the websocket task unreliable. Consider moving the blocking wait into trio.to_thread.run_sync(...) or adding an async wrapper around message waits.
| messages = subscription.wait_for_messages( | |
| 2, | |
| predicate=lambda message: message["topic"] in {status_topic, baby_stations_topic}, | |
| ) | |
| release_event.set() | |
| return messages | |
| try: | |
| return await trio.to_thread.run_sync( | |
| lambda: subscription.wait_for_messages( | |
| 2, | |
| predicate=lambda message: message["topic"] in {status_topic, baby_stations_topic}, | |
| ) | |
| ) | |
| finally: | |
| release_event.set() |
| setup_events = client.wait_for_matching_events( | ||
| access_token, | ||
| count=2, | ||
| predicate=lambda event: event["type"] in {"client.connected", "session.started"}, | ||
| from_cursor=0, | ||
| ) |
There was a problem hiding this comment.
This Trio task calls the synchronous client.wait_for_matching_events(...) directly. Blocking HTTP/SSE reads inside Trio async code can stall the event loop and cause timing-related flakes. Consider running it in a worker thread (trio.to_thread.run_sync) or refactoring the client to provide async event streaming.
| type BabyStationAnnouncment struct { | ||
| ClientID string `json:"client_id"` | ||
| ConnectionID string `json:"connection_id"` | ||
| SessionID string `json:"session_id"` | ||
| Name string `json:"name"` | ||
| } |
There was a problem hiding this comment.
Use the same payload as ControlInboxBabyStationAnnouncement. Carry StartedAtMillis. The announcement is actually announcing the session.
| Description json.RawMessage `json:"description,omitempty"` | ||
| Candidate json.RawMessage `json:"candidate,omitempty"` |
There was a problem hiding this comment.
omit empty doesn't work for slices. json.RawMessage is []byte
| input.ConnectionRegistry.Put(accountID, ConnectionInfo{ | ||
| ClientID: clientID, | ||
| ConnectionID: payload.ConnectionID, | ||
| RequestID: payload.RequestID, | ||
| }) | ||
| if err := input.ConnectionStore.Put(context.Background(), connection); err != nil && err != connectionstore.ErrDuplicate { | ||
| log.Fatal(err) | ||
| } |
There was a problem hiding this comment.
Two puts. Should these combine?
| if err := input.ConnectionStore.Delete(context.Background(), connection); err != nil && err != connectionstore.ErrDuplicate { | ||
| log.Fatal(err) | ||
| } |
There was a problem hiding this comment.
Why is delete concerned with ErrDuplicate?
| data := fatal.UnlessMarshalJSON(struct { | ||
| ID string `json:"id"` | ||
| Name string `json:"name"` | ||
| HostConnectionID string `json:"host_connection_id"` | ||
| StartedAt time.Time `json:"started_at"` | ||
| }{ |
| log eventlog.EventLog | ||
| cursor int64 | ||
| sessions []*Session | ||
| activeConnectionByKey map[string]activeConnectionInfo |
There was a problem hiding this comment.
Why do we need this map? What was missing?
| if ok { | ||
| pending, ok := handlers.PendingSessionStarts.Take(accountID, session.HostConnectionID) | ||
| if ok { | ||
| err = backendmqtt.PublishBabyStationAnnouncement(handlers.MQTTClient, accountID, backendmqtt.BabyStationsPayload{ | ||
| Type: backendmqtt.AnnouncementType, | ||
| AtMillis: pending.StartedAt.UnixMilli(), | ||
| Announcment: backendmqtt.BabyStationAnnouncment{ | ||
| ClientID: connection.ClientID, | ||
| ConnectionID: pending.ConnectionID, | ||
| SessionID: pending.SessionID, | ||
| Name: pending.Name, | ||
| }, | ||
| }) | ||
| if err != nil { | ||
| return | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Keep the happy path on the left
| sender, ok := handlers.ConnectionRegistry.GetByClientID(accountID, payload.FromClientID) | ||
| if !ok { | ||
| logx.Warnln("sender not found for client:", payload.FromClientID) | ||
| message.Ack() | ||
| return | ||
| } |
There was a problem hiding this comment.
We don't need to check for sender.
There was a problem hiding this comment.
Damn, we do. need connection id to client id mapping.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
golang/cmd/backend/Connection.go:136
- In the MQTT inbox subscription callback, if
conn.WriteJSONfails you return without acknowledging the MQTT message. With QoS>0 this can cause redelivery/backlog even though the websocket is already broken. Consider acknowledging (or unsubscribing/closing) on write failure to avoid repeated delivery loops and resource pressure.
if err != nil {
logx.Errorln(err)
return
}
| self.client.connect("mosquitto.beddybytes.local", 443, 60) | ||
| self.client.loop_start() | ||
| if not self.connected.wait(timeout=5): | ||
| raise AssertionError("timed out waiting for MQTT connection") | ||
| if not self.subscribed.wait(timeout=5): | ||
| raise AssertionError("timed out waiting for MQTT subscription") |
There was a problem hiding this comment.
MQTT test utilities hard-code the broker host/port ("mosquitto.beddybytes.local":443) and always disable TLS verification. This makes the integration tests environment-specific and harder to run in CI/locally with different broker endpoints. Consider pulling broker URL/host/port and TLS verification behavior from settings.py/environment variables, similar to the API base URLs.
| accountID := contextx.GetAccountID(ctx) | ||
| pending := backendmqtt.PendingSessionStart{ | ||
| SessionID: session.ID, | ||
| Name: session.Name, | ||
| ConnectionID: session.HostConnectionID, | ||
| StartedAt: session.StartedAt, | ||
| } | ||
| if err = pending.Validate(); err != nil { | ||
| err = merry.WithHTTPCode(err, http.StatusBadRequest) | ||
| return | ||
| } | ||
| handlers.PendingSessionStarts.Put(accountID, pending) | ||
| connection, ok := handlers.ConnectionRegistry.GetByConnectionID(accountID, session.HostConnectionID) | ||
| if !ok { | ||
| return | ||
| } | ||
| pending, ok = handlers.PendingSessionStarts.Get(accountID, session.HostConnectionID) | ||
| if !ok { | ||
| return | ||
| } | ||
| err = backendmqtt.PublishBabyStationAnnouncement(handlers.MQTTClient, accountID, backendmqtt.BabyStationsPayload{ | ||
| Type: backendmqtt.AnnouncementType, | ||
| AtMillis: pending.StartedAt.UnixMilli(), | ||
| Announcement: backendmqtt.SessionAnnouncement{ | ||
| ClientID: connection.ClientID, | ||
| ConnectionID: pending.ConnectionID, | ||
| SessionID: pending.SessionID, | ||
| Name: pending.Name, | ||
| StartedAtMillis: pending.StartedAt.UnixMilli(), | ||
| }, | ||
| }) | ||
| if err != nil { | ||
| return | ||
| } | ||
| handlers.PendingSessionStarts.Delete(accountID, session.HostConnectionID) |
There was a problem hiding this comment.
StartSession stores a pending session start and then returns early when the host connection isn't currently in ConnectionRegistry. Because EndSession doesn't clear PendingSessionStarts, a session can be ended and later still be started when the client connects (via RunClientStatusSync), and pending entries can accumulate indefinitely for connections that never come online.
Also, if the host is connected but PublishBabyStationAnnouncement fails, the pending entry remains but will never be retried (since the connected-status message already happened). Consider clearing/cancelling pending on EndSession, adding TTL/cleanup, and adding a retry path (or immediate event-log append) for the publish-failure case.
| handlers.PendingSessionStarts.Put(accountID, pending) | ||
| connection, ok := handlers.ConnectionRegistry.GetByConnectionID(accountID, session.HostConnectionID) | ||
| if !ok { | ||
| return | ||
| } | ||
| pending, ok = handlers.PendingSessionStarts.Get(accountID, session.HostConnectionID) | ||
| if !ok { | ||
| return | ||
| } | ||
| err = backendmqtt.PublishBabyStationAnnouncement(handlers.MQTTClient, accountID, backendmqtt.BabyStationsPayload{ | ||
| Type: backendmqtt.AnnouncementType, | ||
| AtMillis: pending.StartedAt.UnixMilli(), | ||
| Announcement: backendmqtt.SessionAnnouncement{ | ||
| ClientID: connection.ClientID, | ||
| ConnectionID: pending.ConnectionID, | ||
| SessionID: pending.SessionID, | ||
| Name: pending.Name, | ||
| StartedAtMillis: pending.StartedAt.UnixMilli(), | ||
| }, | ||
| }) | ||
| if err != nil { | ||
| return | ||
| } | ||
| handlers.PendingSessionStarts.Delete(accountID, session.HostConnectionID) |
There was a problem hiding this comment.
reorder, only PendingSessionStarts.Put if connection isn't found
No description provided.