Skip to content

Refactor/mqtt 2#16

Merged
Ryan-A-B merged 6 commits intomasterfrom
refactor/mqtt-2
Apr 22, 2026
Merged

Refactor/mqtt 2#16
Ryan-A-B merged 6 commits intomasterfrom
refactor/mqtt-2

Conversation

@Ryan-A-B
Copy link
Copy Markdown
Owner

No description provided.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new MQTT-based synchronization layer in the Go backend (client status, baby/parent station announcements, WebRTC inbox routing) and adds Python integration tests/utilities to validate the MQTT + HTTP/WebSocket flows end-to-end.

Changes:

  • Add backendmqtt package to define MQTT topics/payloads and run background sync loops that translate MQTT messages into eventlog updates and control inbox publications.
  • Refactor backend connection/session flows to publish/consume the new MQTT topics and keep in-memory registries for connection/session coordination.
  • Add Python integration tests + helpers for HTTP/WebSocket flows and MQTT publish/subscribe testing.

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
integration_tests/src/tests/test_backend_mqtt.py Adds integration tests for MQTT-driven client status, station announcements, and WebRTC inbox delivery.
integration_tests/src/tests/test_backend_http_api.py Adds integration tests for session start/end and websocket disconnect semantics via the HTTP API.
integration_tests/src/settings.py Adds env-configurable API base URLs for HTTP and WebSocket clients used by integration tests.
integration_tests/src/mqtt_test_utils.py Adds MQTT publish/subscribe helpers for integration tests.
integration_tests/src/backend_api_utils.py Adds a Python client + Trio websocket helpers used by integration tests.
integration_tests/requirements.txt Adds paho-mqtt dependency for MQTT-based integration tests.
golang/internal/sessionlist/SessionList.go Enhances session projection to track active connections (request_id/since) for session host connection state.
golang/internal/backendmqtt/Topics.go Defines centralized topic format helpers for backend MQTT messaging.
golang/internal/backendmqtt/Sync.go Adds MQTT sync loops: client status sync, baby station announcement -> eventlog, parent station announcement -> control inbox messages.
golang/internal/backendmqtt/State.go Adds in-memory registries for active connections and pending session starts.
golang/internal/backendmqtt/Payloads.go Defines MQTT payload schemas and helper constructors for client status, announcements, and WebRTC inbox messages.
golang/cmd/backend/sessions.go Refactors StartSession to coordinate session announcements via MQTT/pending state rather than directly appending session.started.
golang/cmd/backend/server.go Wires up the new backendmqtt sync goroutines and shared registries in the backend server startup.
golang/cmd/backend/Connection.go Refactors connection handling to publish client status via MQTT, route WebRTC signals through per-client inbox topics, and consult the connection registry.

Comment on lines +315 to +318
pythonpath_parts = ["/tmp/bb-it-pydeps", "/home/ryan/workspace-2/repositories/beddybytes/integration_tests/src"]
if env.get("PYTHONPATH"):
pythonpath_parts.append(env["PYTHONPATH"])
env["PYTHONPATH"] = ":".join(pythonpath_parts)
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spawn_connection_websocket_process hard-codes a developer-specific path in PYTHONPATH (/home/ryan/workspace-2/...). This will break for other developers and in CI. Prefer computing the path relative to this file/repo (e.g., via __file__), or rely on the existing environment/packaging instead of embedding absolute paths.

Suggested change
pythonpath_parts = ["/tmp/bb-it-pydeps", "/home/ryan/workspace-2/repositories/beddybytes/integration_tests/src"]
if env.get("PYTHONPATH"):
pythonpath_parts.append(env["PYTHONPATH"])
env["PYTHONPATH"] = ":".join(pythonpath_parts)
integration_tests_src_path = os.path.dirname(os.path.abspath(__file__))
pythonpath_parts = ["/tmp/bb-it-pydeps", integration_tests_src_path]
if env.get("PYTHONPATH"):
pythonpath_parts.append(env["PYTHONPATH"])
env["PYTHONPATH"] = os.pathsep.join(pythonpath_parts)

Copilot uses AI. Check for mistakes.
Comment on lines +22 to +24
def __enter__(self):
self.client.connect("mosquitto.beddybytes.local", 443, 60)
self.client.loop_start()
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MQTT broker host/port are hard-coded (mosquitto.beddybytes.local:443). This makes the integration tests/environment brittle and inconsistent with the new API_BASE_URL/API_WEBSOCKET_BASE_URL configuration pattern. Consider reading broker host/port (and websocket path, if applicable) from settings.py / env vars so tests can run in different environments.

Copilot uses AI. Check for mistakes.
Comment thread golang/internal/backendmqtt/Payloads.go Outdated
Comment on lines +12 to +14
AnnouncementType = "announcment"
)

Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MQTT payload/schema uses the misspelling "announcment" (and related Go identifiers like Announcment). Since this becomes part of the on-the-wire contract, it’s hard to change later and easy for clients to misuse. Consider correcting to "announcement" now, or supporting both spellings for backward compatibility and documenting the canonical name.

Suggested change
AnnouncementType = "announcment"
)
AnnouncementType = "announcement"
// LegacyAnnouncementType is kept for backward compatibility with older clients
// that still use the misspelled on-the-wire value.
LegacyAnnouncementType = "announcment"
)
func IsAnnouncementType(value string) bool {
return value == AnnouncementType || value == LegacyAnnouncementType
}

Copilot uses AI. Check for mistakes.
Comment on lines +45 to +49
events = client.wait_for_matching_events(
access_token,
count=2,
predicate=lambda event: event["type"] in {"session.started", "client.connected"},
from_cursor=0,
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Trio task calls the synchronous client.wait_for_matching_events(...) (requests / streaming) directly. Blocking I/O inside Trio async code can stall the event loop and make the websocket nursery task flaky (e.g., missed pings/timeouts). Consider running the blocking call via trio.to_thread.run_sync(...), or providing an async event-wait helper.

Copilot uses AI. Check for mistakes.
Comment thread golang/cmd/backend/sessions.go Outdated
Comment on lines +90 to +94
connection, ok := handlers.ConnectionRegistry.GetByConnectionID(accountID, session.HostConnectionID)
if ok {
pending, ok := handlers.PendingSessionStarts.Take(accountID, session.HostConnectionID)
if ok {
err = backendmqtt.PublishBabyStationAnnouncement(handlers.MQTTClient, accountID, backendmqtt.BabyStationsPayload{
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PendingSessionStarts.Take() removes the pending item before the MQTT publish happens. If PublishBabyStationAnnouncement fails, the pending session start is lost and the session may never be announced/started later. Consider only deleting after a successful publish, or re-inserting the pending item on publish failure.

Copilot uses AI. Check for mistakes.
Comment on lines +178 to +182
HostConnectionStateBase: HostConnectionStateBase{
State: ConnectionStateConnected,
Since: event.UnixTimestamp,
},
RequestID: "TODO",
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RequestID: "TODO" is used as a fallback when the active connection info isn’t available. This placeholder can leak into the /sessions response (host_connection_state.request_id) and can break disconnect correlation (which relies on matching request IDs). Prefer storing the request_id in the session.started event, or representing the state as disconnected/unknown until a client.connected event arrives.

Copilot uses AI. Check for mistakes.
def wait_for_messages(self, count, predicate=None, timeout=5):
messages = []
while len(messages) < count:
message = self.messages.get(timeout=timeout)
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_for_messages may raise queue.Empty directly on timeout, which makes failures harder to read than an assertion. Consider catching queue.Empty and raising an AssertionError that reports how many messages were received and what topics/predicate were expected.

Suggested change
message = self.messages.get(timeout=timeout)
try:
message = self.messages.get(timeout=timeout)
except queue.Empty as exc:
predicate_description = repr(predicate) if predicate is not None else "None"
raise AssertionError(
f"timed out waiting for {count} matching MQTT messages; "
f"received {len(messages)} matching messages; "
f"subscribed topics={self.topics}; "
f"predicate={predicate_description}"
) from exc

Copilot uses AI. Check for mistakes.
Comment on lines +74 to +78
announcement_payload = message_by_topic[baby_stations_topic]
self.assertEqual(announcement_payload["type"], "announcment")
self.assertIn("at_millis", announcement_payload)
self.assertEqual(announcement_payload["announcment"]["client_id"], client_id)
self.assertEqual(announcement_payload["announcment"]["connection_id"], connection_id)
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test asserts/publishes the misspelled MQTT field/value "announcment". If the intent is actually "announcement", it’s better to fix the wire-format now (and update tests accordingly) or explicitly test backward-compat behavior if both spellings are supported.

Copilot uses AI. Check for mistakes.
Comment on lines +50 to +57
messages = subscription.wait_for_messages(
2,
predicate=lambda message: message["topic"] in {status_topic, baby_stations_topic},
)

release_event.set()
return messages

Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscription.wait_for_messages(...) is a blocking call (queue.get) executed inside a Trio async function. This can block the Trio scheduler and make the websocket task unreliable. Consider moving the blocking wait into trio.to_thread.run_sync(...) or adding an async wrapper around message waits.

Suggested change
messages = subscription.wait_for_messages(
2,
predicate=lambda message: message["topic"] in {status_topic, baby_stations_topic},
)
release_event.set()
return messages
try:
return await trio.to_thread.run_sync(
lambda: subscription.wait_for_messages(
2,
predicate=lambda message: message["topic"] in {status_topic, baby_stations_topic},
)
)
finally:
release_event.set()

Copilot uses AI. Check for mistakes.
Comment on lines +184 to +189
setup_events = client.wait_for_matching_events(
access_token,
count=2,
predicate=lambda event: event["type"] in {"client.connected", "session.started"},
from_cursor=0,
)
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Trio task calls the synchronous client.wait_for_matching_events(...) directly. Blocking HTTP/SSE reads inside Trio async code can stall the event loop and cause timing-related flakes. Consider running it in a worker thread (trio.to_thread.run_sync) or refactoring the client to provide async event streaming.

Copilot uses AI. Check for mistakes.
Comment thread golang/internal/backendmqtt/Payloads.go Outdated
Comment on lines +76 to +81
type BabyStationAnnouncment struct {
ClientID string `json:"client_id"`
ConnectionID string `json:"connection_id"`
SessionID string `json:"session_id"`
Name string `json:"name"`
}
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the same payload as ControlInboxBabyStationAnnouncement. Carry StartedAtMillis. The announcement is actually announcing the session.

Comment on lines +36 to +37
Description json.RawMessage `json:"description,omitempty"`
Candidate json.RawMessage `json:"candidate,omitempty"`
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omit empty doesn't work for slices. json.RawMessage is []byte

Comment thread golang/internal/backendmqtt/State.go
Comment thread golang/internal/backendmqtt/State.go
Comment on lines +61 to +68
input.ConnectionRegistry.Put(accountID, ConnectionInfo{
ClientID: clientID,
ConnectionID: payload.ConnectionID,
RequestID: payload.RequestID,
})
if err := input.ConnectionStore.Put(context.Background(), connection); err != nil && err != connectionstore.ErrDuplicate {
log.Fatal(err)
}
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two puts. Should these combine?

Comment on lines +92 to +94
if err := input.ConnectionStore.Delete(context.Background(), connection); err != nil && err != connectionstore.ErrDuplicate {
log.Fatal(err)
}
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is delete concerned with ErrDuplicate?

Comment thread golang/internal/backendmqtt/Sync.go Outdated
Comment on lines +136 to +141
data := fatal.UnlessMarshalJSON(struct {
ID string `json:"id"`
Name string `json:"name"`
HostConnectionID string `json:"host_connection_id"`
StartedAt time.Time `json:"started_at"`
}{
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnamed struct

log eventlog.EventLog
cursor int64
sessions []*Session
activeConnectionByKey map[string]activeConnectionInfo
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this map? What was missing?

Comment thread golang/cmd/backend/sessions.go Outdated
Comment on lines +91 to +108
if ok {
pending, ok := handlers.PendingSessionStarts.Take(accountID, session.HostConnectionID)
if ok {
err = backendmqtt.PublishBabyStationAnnouncement(handlers.MQTTClient, accountID, backendmqtt.BabyStationsPayload{
Type: backendmqtt.AnnouncementType,
AtMillis: pending.StartedAt.UnixMilli(),
Announcment: backendmqtt.BabyStationAnnouncment{
ClientID: connection.ClientID,
ConnectionID: pending.ConnectionID,
SessionID: pending.SessionID,
Name: pending.Name,
},
})
if err != nil {
return
}
}
}
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep the happy path on the left

Comment on lines +119 to +124
sender, ok := handlers.ConnectionRegistry.GetByClientID(accountID, payload.FromClientID)
if !ok {
logx.Warnln("sender not found for client:", payload.FromClientID)
message.Ack()
return
}
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to check for sender.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Damn, we do. need connection id to client id mapping.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (1)

golang/cmd/backend/Connection.go:136

  • In the MQTT inbox subscription callback, if conn.WriteJSON fails you return without acknowledging the MQTT message. With QoS>0 this can cause redelivery/backlog even though the websocket is already broken. Consider acknowledging (or unsubscribing/closing) on write failure to avoid repeated delivery loops and resource pressure.
		if err != nil {
			logx.Errorln(err)
			return
		}

Comment on lines +24 to +29
self.client.connect("mosquitto.beddybytes.local", 443, 60)
self.client.loop_start()
if not self.connected.wait(timeout=5):
raise AssertionError("timed out waiting for MQTT connection")
if not self.subscribed.wait(timeout=5):
raise AssertionError("timed out waiting for MQTT subscription")
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MQTT test utilities hard-code the broker host/port ("mosquitto.beddybytes.local":443) and always disable TLS verification. This makes the integration tests environment-specific and harder to run in CI/locally with different broker endpoints. Consider pulling broker URL/host/port and TLS verification behavior from settings.py/environment variables, similar to the API base URLs.

Copilot uses AI. Check for mistakes.
Comment thread golang/cmd/backend/sessions.go Outdated
Comment on lines +78 to +112
accountID := contextx.GetAccountID(ctx)
pending := backendmqtt.PendingSessionStart{
SessionID: session.ID,
Name: session.Name,
ConnectionID: session.HostConnectionID,
StartedAt: session.StartedAt,
}
if err = pending.Validate(); err != nil {
err = merry.WithHTTPCode(err, http.StatusBadRequest)
return
}
handlers.PendingSessionStarts.Put(accountID, pending)
connection, ok := handlers.ConnectionRegistry.GetByConnectionID(accountID, session.HostConnectionID)
if !ok {
return
}
pending, ok = handlers.PendingSessionStarts.Get(accountID, session.HostConnectionID)
if !ok {
return
}
err = backendmqtt.PublishBabyStationAnnouncement(handlers.MQTTClient, accountID, backendmqtt.BabyStationsPayload{
Type: backendmqtt.AnnouncementType,
AtMillis: pending.StartedAt.UnixMilli(),
Announcement: backendmqtt.SessionAnnouncement{
ClientID: connection.ClientID,
ConnectionID: pending.ConnectionID,
SessionID: pending.SessionID,
Name: pending.Name,
StartedAtMillis: pending.StartedAt.UnixMilli(),
},
})
if err != nil {
return
}
handlers.PendingSessionStarts.Delete(accountID, session.HostConnectionID)
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StartSession stores a pending session start and then returns early when the host connection isn't currently in ConnectionRegistry. Because EndSession doesn't clear PendingSessionStarts, a session can be ended and later still be started when the client connects (via RunClientStatusSync), and pending entries can accumulate indefinitely for connections that never come online.

Also, if the host is connected but PublishBabyStationAnnouncement fails, the pending entry remains but will never be retried (since the connected-status message already happened). Consider clearing/cancelling pending on EndSession, adding TTL/cleanup, and adding a retry path (or immediate event-log append) for the publish-failure case.

Copilot uses AI. Check for mistakes.
Comment thread golang/cmd/backend/sessions.go Outdated
Comment on lines +89 to +112
handlers.PendingSessionStarts.Put(accountID, pending)
connection, ok := handlers.ConnectionRegistry.GetByConnectionID(accountID, session.HostConnectionID)
if !ok {
return
}
pending, ok = handlers.PendingSessionStarts.Get(accountID, session.HostConnectionID)
if !ok {
return
}
err = backendmqtt.PublishBabyStationAnnouncement(handlers.MQTTClient, accountID, backendmqtt.BabyStationsPayload{
Type: backendmqtt.AnnouncementType,
AtMillis: pending.StartedAt.UnixMilli(),
Announcement: backendmqtt.SessionAnnouncement{
ClientID: connection.ClientID,
ConnectionID: pending.ConnectionID,
SessionID: pending.SessionID,
Name: pending.Name,
StartedAtMillis: pending.StartedAt.UnixMilli(),
},
})
if err != nil {
return
}
handlers.PendingSessionStarts.Delete(accountID, session.HostConnectionID)
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reorder, only PendingSessionStarts.Put if connection isn't found

@Ryan-A-B Ryan-A-B merged commit 1c8a7fa into master Apr 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants