From 7086f7de0f16fd27b9bfb70be60e6795970a098c Mon Sep 17 00:00:00 2001 From: rballinger Date: Tue, 10 Mar 2026 07:07:40 +1000 Subject: [PATCH 1/6] .. --- AGENTS.md | 44 +++++ agent_docs/Backend Signalling Contracts.md | 118 ++++++++++++ agent_docs/Open Questions.md | 14 ++ agent_docs/Product and Privacy Model.md | 43 +++++ agent_docs/Signalling Migration.md | 31 +++ agent_docs/architecture.md | 54 ++++++ agent_docs/index.md | 25 +++ agent_docs/structure.md | 50 +++++ golang/cmd/backend/Connection.go | 151 +++++++-------- golang/cmd/backend/ConnectionHub.go | 33 ++++ golang/cmd/backend/MQTTSync.go | 180 ++++++++++++++++++ golang/cmd/backend/UsageStats.go | 11 +- golang/cmd/backend/UsageStats_test.go | 17 +- golang/cmd/backend/server.go | 33 ++-- golang/cmd/backend/sessions.go | 71 ++++--- golang/cmd/monitor-connection-status/main.go | 2 +- .../babystationlist/BabyStationList.go | 5 +- .../babystationlist/BabyStationList_test.go | 5 - golang/internal/connections/Event.go | 7 +- golang/internal/connectionstore/Decider.go | 22 ++- .../internal/connectionstore/Decider_test.go | 4 - golang/internal/connectionstoresync/Sync.go | 66 ++++--- golang/internal/sessionlist/Session.go | 1 - golang/internal/sessionlist/SessionList.go | 6 - .../internal/sessionstartdecider/Decider.go | 102 ++++++++++ 25 files changed, 892 insertions(+), 203 deletions(-) create mode 100644 AGENTS.md create mode 100644 agent_docs/Backend Signalling Contracts.md create mode 100644 agent_docs/Open Questions.md create mode 100644 agent_docs/Product and Privacy Model.md create mode 100644 agent_docs/Signalling Migration.md create mode 100644 agent_docs/architecture.md create mode 100644 agent_docs/index.md create mode 100644 agent_docs/structure.md create mode 100644 golang/cmd/backend/ConnectionHub.go create mode 100644 golang/cmd/backend/MQTTSync.go create mode 100644 golang/internal/sessionstartdecider/Decider.go diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..8d21179 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,44 @@ +## Ownership and scope + +- This `AGENTS.md` file is written and maintained by the human. +- You (the agent) may **read** this file but must **never modify it under any circumstances**. +- You are allowed to create and edit documentation **only** under the `agent_docs/` directory in this repository. +- Treat everything inside `agent_docs/` as your documentation workspace and Obsidian vault. + +### Your responsibilities: + +- Use existing documentation under `agent_docs/` before guessing. +- Keep documentation in `agent_docs/` in sync with the behavior of the code. +- Record meaningful domain knowledge there. + +--- + +## Obsidian markdown conventions + +You must treat `agent_docs/` as an Obsidian vault and follow these rules strictly: + +- **File names are titles** + - The file name is the note title. Do not add a level‑1 heading to restate the title. + - Use natural, human‑readable names with spaces and standard capitalisation. + - Example: `MQTT Topics and Payloads.md`, `Backend Overview.md`, `2026-03-08 Refactoring MQTT.md`. + - Never use kebab‑case or snake_case file names (e.g. `mqtt-topics-and-payloads.md` is wrong). + +- **Links between notes** + - Always use Obsidian wiki links: `[[Note Name]]`. + - The note name inside `[[...]]` must exactly match the file name (without the `.md` extension). + - Example: `[[MQTT Topics and Payloads]]`, `[[Backend Overview]]`. + - Never use standard markdown links for cross‑vault references. + +--- + +## Documentation workspace: `agent_docs/` + +- The root of your docs workspace is: `agent_docs/`. +- You **must not** create or edit docs outside `agent_docs/` unless explicitly asked. +- When you need to persist knowledge, decisions, or plans, write them into `agent_docs/`, not only into code comments (except for small, local notes). + +You are expected to maintain at least: + +- `agent_docs/index.md` - main index of all important notes. +- `agent_docs/structure.md` - where to find things in this repository +- `agent_docs/architecture.md` - the high level architecture of the project diff --git a/agent_docs/Backend Signalling Contracts.md b/agent_docs/Backend Signalling Contracts.md new file mode 100644 index 0000000..1fda579 --- /dev/null +++ b/agent_docs/Backend Signalling Contracts.md @@ -0,0 +1,118 @@ +## Purpose + +This note documents the current backend signalling contracts and where they are implemented. + +## Source-of-Truth Files + +- `golang/cmd/backend/server.go` +- `golang/cmd/backend/Connection.go` +- `golang/cmd/backend/MQTTSync.go` +- `golang/internal/connectionstoresync/Sync.go` +- `golang/internal/connections/Event.go` + +## Active Signalling Endpoints + +Registered in `golang/cmd/backend/server.go`: + +- `GET /clients/{client_id}/websocket` +- `GET /clients/{client_id}/connections/{connection_id}` +- `PUT /sessions/{session_id}` +- `DELETE /sessions/{session_id}` (no-op) + +Both routes are behind auth middleware under `/clients`. + +## Legacy WebSocket Signalling Route + +`GET /clients/{client_id}/websocket` in `server.go` + `HandleWebsocket`: + +- Legacy peer-to-peer relay via backend in-memory `ClientStore`. +- Inbound JSON: + - `to_peer_id: string` + - `data: any JSON` +- Outbound JSON: + - `from_peer_id: string` + - `data: any JSON` + +This is the legacy transport targeted for deprecation after MQTT migration. + +## Connection Route: WebSocket Transport + MQTT Relay + +`GET /clients/{client_id}/connections/{connection_id}` in `Connection.go`: + +- Upgrades to WebSocket. +- Publishes connect status to: + - `accounts/{account_id}/clients/{client_id}/status` + - payload: `{"type":"connected","connection_id":"...","at_millis":}` +- Publishes disconnect status to: + - `accounts/{account_id}/clients/{client_id}/status` + - payload: `{"type":"disconnected","connection_id":"...","disconnected":{"reason":"clean|unexpected"},"at_millis":}` +- Incoming websocket payload from client: + - `{"type":"signal","signal":{"to_connection_id":"...","data":}}` + - backend republishes to MQTT topic: + - `accounts/{account_id}/clients/{to_client_id|sender_client_id}/webrtc_inbox` + - payload: `{"from_client_id":"...","connection_id":"...","data":}` +- Keepalive: + - websocket ping/pong with 30s ping period + - pong timeout 10s +- Active websocket connections are tracked in-memory via `ConnectionHub`. + +## MQTT Topics + +Defined in `Connection.go` and `MQTTSync.go`: + +- Status topic format: + - `accounts/%s/clients/%s/status` +- WebRTC inbox: + - `accounts/%s/clients/%s/webrtc_inbox` +- Baby stations: + - `accounts/%s/baby_stations` +- Parent stations: + - `accounts/%s/parent_stations` +- Control inbox: + - `accounts/%s/clients/%s/control_inbox` + +Status sync subscriber in `connectionstoresync/Sync.go`: + +- Subscribes wildcard: + - `accounts/+/clients/+/status` +- Parses account and client IDs from topic, then uses payload `connection_id`. + +## MQTT Status Payloads + +Published from `Connection.go` on connect/disconnect: + +- Connected: + - `{"type":"connected","connection_id":"...","at_millis":}` +- Disconnected: + - `{"type":"disconnected","connection_id":"...","disconnected":{"reason":"clean|unexpected"},"at_millis":}` + +Event payloads are defined in `golang/internal/connections/Event.go`. + +## Session Ingress/Egress + +- `PUT /sessions/{session_id}` now publishes session announcements to: + - `accounts/{account_id}/baby_stations` +- Session delete endpoint is intentionally no-op; disconnect status drives lifecycle. +- `MQTTSync` subscribes `accounts/+/baby_stations` and appends `session.started` with idempotency via `sessionstartdecider`. + +## Parent Station Discovery + +- `MQTTSync` subscribes `accounts/+/parent_stations`. +- Expected payload includes `client_id` of requesting parent station. +- For each active baby station snapshot entry, backend publishes control message: + - topic: `accounts/{account_id}/clients/{client_id}/control_inbox` + - payload: + - `{"type":"baby_station_announcement","at_millis":,"baby_station_announcement":{"client_id":"...","connection_id":"...","name":"...","started_at_millis":}}` + +Client status subscription remains in `connectionstoresync`: + +- `accounts/+/clients/+/status` + +## Auth and Token Durations + +Configured in `golang/cmd/backend/server.go`: + +- Access token duration: `1 * time.Hour` +- Refresh token duration: `30 * 24 * time.Hour` + +Refresh token scope handling is in `golang/internal/accounts/Handlers.go`. diff --git a/agent_docs/Open Questions.md b/agent_docs/Open Questions.md new file mode 100644 index 0000000..8b723cf --- /dev/null +++ b/agent_docs/Open Questions.md @@ -0,0 +1,14 @@ +## Open Questions + +These are useful for tightening architecture docs and future onboarding. + +## High Priority + +- What is the endpoint removal order for legacy WebSocket routes? +- What is the intended deprecation timeline and compatibility policy for old cached PWAs? + +## Useful Clarifications + +- Which backend endpoints are considered stable public contract vs internal implementation detail? +- What anonymous analytics events are currently emitted from frontend and backend? +- Are there explicit SLOs for connection setup success/reconnect time in local-network conditions? diff --git a/agent_docs/Product and Privacy Model.md b/agent_docs/Product and Privacy Model.md new file mode 100644 index 0000000..8c1fc61 --- /dev/null +++ b/agent_docs/Product and Privacy Model.md @@ -0,0 +1,43 @@ +## Product Model + +BeddyBytes is for parents monitoring children during sleep using existing devices with modern browsers. + +Two runtime roles exist: + +- Baby Station: captures audio/video and streams. +- Parent Station: connects to and monitors one or more baby station sessions. + +## Privacy Model + +Privacy priorities: + +- Primary: privacy. +- Secondary: simplicity. + +Privacy guarantees (intended behavior): + +- No video/audio leaves the home network. +- Backend never receives media payloads. +- No server-side recording. +- Analytics are anonymous. + +Data collected by service: + +- Email address (account identity). +- Payment processing via Stripe (one-off checkout, no subscription model). + +## Recording Behavior + +- Recording is initiated on Parent Station. +- Browser `MediaRecorder` captures incoming stream. +- Recording is downloaded immediately to local device. +- Recording is never uploaded to backend. + +## Product Constraints + +- No STUN/TURN means no off-LAN fallback for media. +- Connection intentionally fails when local-network conditions are not met. + +## Operational Reality + +Because this is a PWA, users may run cached app versions for weeks before refresh/update. Any migration strategy must tolerate mixed-version clients. diff --git a/agent_docs/Signalling Migration.md b/agent_docs/Signalling Migration.md new file mode 100644 index 0000000..9eda489 --- /dev/null +++ b/agent_docs/Signalling Migration.md @@ -0,0 +1,31 @@ +## Current State + +- Legacy signalling path uses backend WebSockets. +- MQTT migration is active for backend ingress/egress topic contracts. +- Backend now: + - publishes client status to `accounts/{account_id}/clients/{client_id}/status` + - publishes WebRTC signalling to `accounts/{account_id}/clients/{client_id}/webrtc_inbox` + - subscribes `baby_stations`, `parent_stations`, `webrtc_inbox`, and status wildcard topics +- Current implementation is backend-centric; no finalized frontend MQTT path yet. + +See [[Backend Signalling Contracts]] for current endpoint/topic details. + +## Direction + +- Move clients to direct MQTT-first signalling behavior. +- Deprecate WebSocket endpoints. +- Remove WebSocket signalling paths after migration is complete and safe. + +## Risk Factors + +- PWA caching causes version skew across active clients. +- Mixed fleets (WebSocket-era and MQTT-era clients) must interoperate during rollout. +- Session continuity and reconnection behavior must remain stable while transports coexist. + +## Documentation Impact + +As migration advances, update: + +- [[architecture]] for control-plane transport changes. +- [[structure]] for source-of-truth files handling signalling. +- [[Backend Signalling Contracts]] as backend transport behavior changes. diff --git a/agent_docs/architecture.md b/agent_docs/architecture.md new file mode 100644 index 0000000..2810100 --- /dev/null +++ b/agent_docs/architecture.md @@ -0,0 +1,54 @@ +## System Architecture + +BeddyBytes is a privacy-first baby monitor built around browser-native WebRTC. + +## Runtime Components + +- Frontend PWA (`frontend/`): Baby Station and Parent Station user experience. +- Backend (`golang/`): authentication, session metadata, signalling transport, and account separation. +- Marketing site (`marketing/`): public website and commercial pages. + +## Core Roles + +- Baby Station: captures microphone audio (required) and camera video (optional), then publishes media to WebRTC peers. +- Parent Station: discovers active sessions, establishes WebRTC peer connections, and plays media. + +## Data Plane vs Control Plane + +- Data plane (media): browser-to-browser WebRTC, peer-to-peer, local network only. +- Control plane (signalling + auth): backend-mediated session/auth/signalling traffic. + +The backend handles signalling metadata and account/session concerns, but not media transport. + +## Privacy-Critical Constraints + +- No STUN/TURN servers are configured. +- If peers are not on the same local network, WebRTC connection fails. +- Audio/video never leave the home network. +- Recording is local to Parent Station browser via `MediaRecorder` and immediately downloaded. +- Backend never stores or relays media. + +## Deployment Overview + +- Local development stack: `docker-compose.yml` and `run_local_stack.sh`. +- QA + Prod backend: separate Docker containers on a single EC2 host. +- Shared infra on that host: Traefik, Grafana, InfluxDB. +- Frontend app hosting: S3 + CloudFront, with separate QA and Prod app environments. +- Marketing: production site only. + +## Authentication Model + +- Account-based access control. +- Access token + refresh token flow (OAuth-like model). +- JWT access token TTL: 1 hour. +- JWT refresh token TTL: 30 days. +- Password reset flow is implemented. + +## Payments + +- Stripe is used for one-off checkout purchases. +- No subscription billing model. + +## Architectural Center of Gravity + +The baby station session and WebRTC connection lifecycle are the center of the system. Most backend/frontend logic exists to establish, maintain, recover, and tear down those peer connections. diff --git a/agent_docs/index.md b/agent_docs/index.md new file mode 100644 index 0000000..b19814c --- /dev/null +++ b/agent_docs/index.md @@ -0,0 +1,25 @@ +## BeddyBytes Agent Docs + +This vault captures implementation-oriented documentation for BeddyBytes. + +## Core Notes + +- [[architecture]] +- [[structure]] +- [[Product and Privacy Model]] +- [[Backend Signalling Contracts]] +- [[Signalling Migration]] +- [[Open Questions]] + +## Source Material + +- `README.md` +- `HowBeddyBytesWorks.md` +- `HowBeddyBytesWorks2.md` +- Human guidance captured on March 9, 2026 + +## Current Priorities + +- Keep architecture docs aligned with WebSocket to MQTT signalling migration. +- Keep privacy claims aligned with actual data flow in frontend and backend code. +- Prioritize session lifecycle and WebRTC connection behavior in technical docs. diff --git a/agent_docs/structure.md b/agent_docs/structure.md new file mode 100644 index 0000000..b17a1a0 --- /dev/null +++ b/agent_docs/structure.md @@ -0,0 +1,50 @@ +## Repository Structure + +This note maps where key system behavior lives. + +## Product Runtime + +- `frontend/`: PWA app, Baby Station + Parent Station logic, WebRTC and client-side recording behavior. +- `golang/`: backend server, signalling/auth/session handling, MQTT and WebSocket infrastructure. +- `marketing/`: Gatsby marketing site. + +## Backend Entry Points + +- `golang/cmd/backend/`: backend server composition and handlers. +- `golang/internal/`: domain modules (accounts, sessions, connection stores, messaging, event log, stores, MQTT helpers). +- Signalling source-of-truth (current): + - `golang/cmd/backend/server.go` + - `golang/cmd/backend/Connection.go` + - `golang/internal/connectionstoresync/Sync.go` + - `golang/internal/connections/Message.go` + +## Frontend Areas + +- `frontend/src/pages/BabyStation/`: baby station UI and media capture/stream behavior. +- `frontend/src/pages/ParentStation/`: parent station UI and playback/session controls. +- `frontend/src/services/SignalService/`: signalling transport implementation. +- `frontend/src/services/BabyStation/` and `frontend/src/services/ParentStation/`: role-specific application services. + +## Infra and Ops + +- `docker-compose.yml`: local stack. +- `run_local_stack.sh`: local environment bootstrap. +- `run_integration_tests.sh`: integration test entrypoint. +- `integration_tests/`: end-to-end scenarios. +- `cloudformation/`: infrastructure templates. +- `traefik/`, `grafana/`: ingress and observability configuration. + +## Product/Domain References + +- `README.md`: concise project overview. +- `HowBeddyBytesWorks.md`: concise architecture explanation. +- `HowBeddyBytesWorks2.md`: detailed privacy and architecture narrative. + +## Documentation Map + +- [[index]] +- [[architecture]] +- [[Product and Privacy Model]] +- [[Backend Signalling Contracts]] +- [[Signalling Migration]] +- [[Open Questions]] diff --git a/golang/cmd/backend/Connection.go b/golang/cmd/backend/Connection.go index 5c2556a..00ffc2d 100644 --- a/golang/cmd/backend/Connection.go +++ b/golang/cmd/backend/Connection.go @@ -7,23 +7,20 @@ import ( "fmt" "log" "net/http" + "sync" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gorilla/mux" "github.com/gorilla/websocket" - uuid "github.com/satori/go.uuid" - "github.com/Ryan-A-B/beddybytes/golang/internal/connections" "github.com/Ryan-A-B/beddybytes/golang/internal/contextx" - "github.com/Ryan-A-B/beddybytes/golang/internal/fatal" "github.com/Ryan-A-B/beddybytes/golang/internal/logx" - "github.com/Ryan-A-B/beddybytes/golang/internal/messages" "github.com/Ryan-A-B/beddybytes/golang/internal/mqttx" ) -const inboxTopicFormat = "accounts/%s/connections/%s/inbox" -const statusTopicFormat = "accounts/%s/connections/%s/status" +const webrtcInboxTopicFormat = "accounts/%s/clients/%s/webrtc_inbox" +const statusTopicFormat = "accounts/%s/clients/%s/status" const EventTypeClientConnected = "client.connected" const EventTypeClientDisconnected = "client.disconnected" @@ -31,14 +28,16 @@ const EventTypeClientDisconnected = "client.disconnected" type ClientConnectedEventData struct { ClientID string `json:"client_id"` ConnectionID string `json:"connection_id"` - RequestID string `json:"request_id"` + AtMillis int64 `json:"at_millis"` } type ClientDisconnectedEventData struct { - ClientID string `json:"client_id"` - ConnectionID string `json:"connection_id"` - RequestID string `json:"request_id"` - WebSocketCloseCode int `json:"web_socket_close_code"` + ClientID string `json:"client_id"` + ConnectionID string `json:"connection_id"` + AtMillis int64 `json:"at_millis"` + Disconnected struct { + Reason string `json:"reason"` + } `json:"disconnected"` } type MessageType string @@ -72,6 +71,7 @@ func (message *IncomingMessage) Validate() (err error) { type IncomingSignal struct { ToConnectionID string `json:"to_connection_id"` + ToClientID string `json:"to_client_id,omitempty"` Data json.RawMessage `json:"data"` } @@ -89,15 +89,10 @@ func (signal *IncomingSignal) Validate() (err error) { type OutgoingMessage struct { Type MessageType `json:"type"` - Signal *OutgoingSignal `json:"signal,omitempty"` + Signal json.RawMessage `json:"signal,omitempty"` Event *Event `json:"event,omitempty"` } -type OutgoingSignal struct { - FromConnectionID string `json:"from_connection_id"` - Data json.RawMessage `json:"data"` -} - type ConnectionStoreKey struct { AccountID string ConnectionID string @@ -109,73 +104,48 @@ func (handlers *Handlers) HandleConnection(responseWriter http.ResponseWriter, r vars := mux.Vars(request) clientID := vars["client_id"] connectionID := vars["connection_id"] - requestID := uuid.NewV4().String() conn, err := handlers.Upgrader.Upgrade(responseWriter, request, nil) if err != nil { logx.Errorln(err) return } defer conn.Close() - inboxTopic := fmt.Sprintf(inboxTopicFormat, accountID, connectionID) - token := handlers.MQTTClient.Subscribe(inboxTopic, 1, func(client mqtt.Client, message mqtt.Message) { - signal := new(OutgoingSignal) - err := json.Unmarshal(message.Payload(), signal) - fatal.OnError(err) - - err = conn.WriteJSON(OutgoingMessage{ - Type: MessageTypeSignal, - Signal: signal, - }) - if err != nil { - logx.Errorln(err) - return - } - message.Ack() - }) - err = mqttx.Wait(token) - if err != nil { - logx.Errorln(err) - return - } - // Wait for the subscription to propagate - readyC := time.After(500 * time.Millisecond) err = handlers.sendConnectedMessage(ctx, sendConnectedMessageInput{ accountID: accountID, clientID: clientID, connectionID: connectionID, - requestID: requestID, }) if err != nil { logx.Errorln(err) return } + connection := handlers.ConnectionFactory.CreateConnection(ctx, &CreateConnectionInput{ + AccountID: accountID, + ClientID: clientID, + ConnectionID: connectionID, + conn: conn, + client: handlers.MQTTClient, + }) + handlers.ConnectionHub.Put(connection) + defer handlers.ConnectionHub.Delete(connection.ID) + disconnectReason := "clean" defer func() { err = handlers.sendDisconnectedMessage(ctx, sendDisconnectedMessageInput{ accountID: accountID, clientID: clientID, connectionID: connectionID, - requestID: requestID, + reason: disconnectReason, }) if err != nil { logx.Errorln(err) } - err = mqttx.Wait(handlers.MQTTClient.Unsubscribe(inboxTopic)) - if err != nil { - logx.Errorln(err) - } }() - connection := handlers.ConnectionFactory.CreateConnection(ctx, &CreateConnectionInput{ - AccountID: accountID, - ConnectionID: connectionID, - conn: conn, - client: handlers.MQTTClient, - }) + errC := make(chan error, 1) go func() { errC <- connection.runKeepAlive(ctx) }() go func() { - <-readyC errC <- connection.handleMessages(ctx) }() select { @@ -188,6 +158,7 @@ func (handlers *Handlers) HandleConnection(responseWriter http.ResponseWriter, r return } } + disconnectReason = "unexpected" logx.Errorln(err) return } @@ -198,20 +169,18 @@ type sendConnectedMessageInput struct { accountID string clientID string connectionID string - requestID string } func (handlers *Handlers) sendConnectedMessage(ctx context.Context, input sendConnectedMessageInput) (err error) { - topic := fmt.Sprintf(statusTopicFormat, input.accountID, input.connectionID) - payload := fatal.UnlessMarshalJSON(connections.MessageFrame{ - MessageFrameBase: messages.MessageFrameBase{ - Type: connections.MessageTypeConnected, - }, - Connected: &connections.MessageConnected{ - ClientID: input.clientID, - RequestID: input.requestID, - }, + topic := fmt.Sprintf(statusTopicFormat, input.accountID, input.clientID) + payload, err := json.Marshal(map[string]any{ + "type": "connected", + "connection_id": input.connectionID, + "at_millis": time.Now().UnixMilli(), }) + if err != nil { + return + } return mqttx.Wait(handlers.MQTTClient.Publish(topic, 1, false, payload)) } @@ -219,20 +188,25 @@ type sendDisconnectedMessageInput struct { accountID string clientID string connectionID string - requestID string + reason string } func (handlers *Handlers) sendDisconnectedMessage(ctx context.Context, input sendDisconnectedMessageInput) (err error) { - topic := fmt.Sprintf(statusTopicFormat, input.accountID, input.connectionID) - payload := fatal.UnlessMarshalJSON(connections.MessageFrame{ - MessageFrameBase: messages.MessageFrameBase{ - Type: connections.MessageTypeDisconnected, - }, - Disconnected: &connections.MessageDisconnected{ - ClientID: input.clientID, - RequestID: input.requestID, + if input.reason == "" { + input.reason = "clean" + } + topic := fmt.Sprintf(statusTopicFormat, input.accountID, input.clientID) + payload, err := json.Marshal(map[string]any{ + "type": "disconnected", + "connection_id": input.connectionID, + "disconnected": map[string]any{ + "reason": input.reason, }, + "at_millis": time.Now().UnixMilli(), }) + if err != nil { + return + } return mqttx.Wait(handlers.MQTTClient.Publish(topic, 1, false, payload)) } @@ -240,6 +214,7 @@ type ConnectionFactory struct{} type CreateConnectionInput struct { AccountID string + ClientID string ConnectionID string conn *websocket.Conn client mqtt.Client @@ -247,15 +222,18 @@ type CreateConnectionInput struct { type Connection struct { AccountID string + ClientID string ID string conn *websocket.Conn client mqtt.Client pongC chan struct{} + writeMu sync.Mutex } func (factory *ConnectionFactory) CreateConnection(ctx context.Context, input *CreateConnectionInput) (connection *Connection) { connection = &Connection{ AccountID: input.AccountID, + ClientID: input.ClientID, ID: input.ConnectionID, conn: input.conn, client: input.client, @@ -291,7 +269,7 @@ func (connection *Connection) handleNextMessage(ctx context.Context) (err error) } switch incomingMessage.Type { case MessageTypePing: - return connection.conn.WriteJSON(OutgoingMessage{ + return connection.WriteJSON(OutgoingMessage{ Type: MessageTypePong, }) case MessageTypeSignal: @@ -302,13 +280,20 @@ func (connection *Connection) handleNextMessage(ctx context.Context) (err error) } func (connection *Connection) handleSignal(ctx context.Context, incomingSignal *IncomingSignal) (err error) { - inboxTopic := fmt.Sprintf(inboxTopicFormat, connection.AccountID, incomingSignal.ToConnectionID) - data, err := json.Marshal(OutgoingSignal{ - FromConnectionID: connection.ID, - Data: incomingSignal.Data, + topicClientID := incomingSignal.ToClientID + if topicClientID == "" { + topicClientID = connection.ClientID + } + inboxTopic := fmt.Sprintf(webrtcInboxTopicFormat, connection.AccountID, topicClientID) + data, err := json.Marshal(map[string]any{ + "from_client_id": connection.ClientID, + "connection_id": incomingSignal.ToConnectionID, + "data": json.RawMessage(incomingSignal.Data), }) - fatal.OnError(err) - return mqttx.Wait(connection.client.Publish(inboxTopic, 1, false, []byte(data))) + if err != nil { + return + } + return mqttx.Wait(connection.client.Publish(inboxTopic, 1, false, data)) } func (connection *Connection) handlePong(appData string) (err error) { @@ -369,3 +354,9 @@ func (connection *Connection) waitForPong(ctx context.Context) (err error) { return } } + +func (connection *Connection) WriteJSON(value any) error { + connection.writeMu.Lock() + defer connection.writeMu.Unlock() + return connection.conn.WriteJSON(value) +} diff --git a/golang/cmd/backend/ConnectionHub.go b/golang/cmd/backend/ConnectionHub.go new file mode 100644 index 0000000..ce911da --- /dev/null +++ b/golang/cmd/backend/ConnectionHub.go @@ -0,0 +1,33 @@ +package main + +import "sync" + +type ConnectionHub struct { + mutex sync.RWMutex + connectionByID map[string]*Connection +} + +func NewConnectionHub() *ConnectionHub { + return &ConnectionHub{ + connectionByID: make(map[string]*Connection), + } +} + +func (hub *ConnectionHub) Put(connection *Connection) { + hub.mutex.Lock() + defer hub.mutex.Unlock() + hub.connectionByID[connection.ID] = connection +} + +func (hub *ConnectionHub) Delete(connectionID string) { + hub.mutex.Lock() + defer hub.mutex.Unlock() + delete(hub.connectionByID, connectionID) +} + +func (hub *ConnectionHub) Get(connectionID string) (*Connection, bool) { + hub.mutex.RLock() + defer hub.mutex.RUnlock() + connection, ok := hub.connectionByID[connectionID] + return connection, ok +} diff --git a/golang/cmd/backend/MQTTSync.go b/golang/cmd/backend/MQTTSync.go new file mode 100644 index 0000000..4897dbb --- /dev/null +++ b/golang/cmd/backend/MQTTSync.go @@ -0,0 +1,180 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "regexp" + "time" + + "github.com/Ryan-A-B/beddybytes/golang/internal/contextx" + "github.com/Ryan-A-B/beddybytes/golang/internal/logx" + "github.com/Ryan-A-B/beddybytes/golang/internal/mqttx" + "github.com/Ryan-A-B/beddybytes/golang/internal/sessionstartdecider" + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +var ( + babyStationsTopicRegex = regexp.MustCompile(`^accounts/([^/]+)/baby_stations$`) + parentStationsTopicRegex = regexp.MustCompile(`^accounts/([^/]+)/parent_stations$`) + clientWebRTCInboxTopicRegex = regexp.MustCompile(`^accounts/([^/]+)/clients/([^/]+)/webrtc_inbox$`) +) + +func (handlers *Handlers) RunMQTTSync(ctx context.Context) { + subscriptions := []struct { + topic string + handler mqtt.MessageHandler + }{ + {topic: "accounts/+/baby_stations", handler: handlers.handleBabyStationsMessage}, + {topic: "accounts/+/parent_stations", handler: handlers.handleParentStationsMessage}, + {topic: "accounts/+/clients/+/webrtc_inbox", handler: handlers.handleWebRTCInboxMessage}, + } + for _, subscription := range subscriptions { + if err := mqttx.Wait(handlers.MQTTClient.Subscribe(subscription.topic, 1, subscription.handler)); err != nil { + panic(err) + } + } + <-ctx.Done() +} + +func (handlers *Handlers) handleBabyStationsMessage(client mqtt.Client, message mqtt.Message) { + defer message.Ack() + accountID, ok := parseAccountID(message.Topic(), babyStationsTopicRegex) + if !ok { + logx.Warnln("invalid baby station topic:", message.Topic()) + return + } + var announcement babyStationAnnouncement + if err := json.Unmarshal(message.Payload(), &announcement); err != nil { + logx.Warnln(err) + return + } + if announcement.ClientID == "" || announcement.ConnectionID == "" || announcement.Name == "" { + logx.Warnln("invalid baby station announcement payload") + return + } + startedAt := time.UnixMilli(announcement.StartedAtMillis) + err := handlers.SessionStartDecider.Put(context.Background(), sessionstartdecider.Session{ + AccountID: accountID, + ID: announcement.ConnectionID, + Name: announcement.Name, + HostConnectionID: announcement.ConnectionID, + StartedAt: startedAt, + }) + if err != nil && err != sessionstartdecider.ErrDuplicate { + logx.Errorln(err) + } +} + +func (handlers *Handlers) handleParentStationsMessage(client mqtt.Client, message mqtt.Message) { + defer message.Ack() + accountID, ok := parseAccountID(message.Topic(), parentStationsTopicRegex) + if !ok { + logx.Warnln("invalid parent station topic:", message.Topic()) + return + } + var payload parentStationAnnouncementRequest + if err := json.Unmarshal(message.Payload(), &payload); err != nil { + logx.Warnln(err) + return + } + if payload.ClientID == "" { + logx.Warnln("missing client_id in parent station request") + return + } + ctx := contextx.WithAccountID(context.Background(), accountID) + snapshot, err := handlers.BabyStationList.GetSnapshot(ctx) + if err != nil { + logx.Errorln(err) + return + } + for _, babyStation := range snapshot.List() { + controlTopic := fmt.Sprintf("accounts/%s/clients/%s/control_inbox", accountID, payload.ClientID) + controlMessage := controlMessageBabyStationAnnouncement{ + ControlMessageBase: ControlMessageBase{ + Type: "baby_station_announcement", + AtMillis: time.Now().UnixMilli(), + }, + BabyStationAnnouncement: babyStationAnnouncement{ + ClientID: babyStation.ClientID, + ConnectionID: babyStation.Connection.ID, + Name: babyStation.Name, + StartedAtMillis: babyStation.StartedAt.UnixMilli(), + }, + } + encoded, err := json.Marshal(controlMessage) + if err != nil { + logx.Errorln(err) + continue + } + if err := mqttx.Wait(handlers.MQTTClient.Publish(controlTopic, 1, false, encoded)); err != nil { + logx.Errorln(err) + } + } +} + +func (handlers *Handlers) handleWebRTCInboxMessage(client mqtt.Client, message mqtt.Message) { + defer message.Ack() + if !clientWebRTCInboxTopicRegex.MatchString(message.Topic()) { + logx.Warnln("invalid webrtc inbox topic:", message.Topic()) + return + } + var inbox webrtcInboxMessage + if err := json.Unmarshal(message.Payload(), &inbox); err != nil { + logx.Warnln(err) + return + } + if inbox.ConnectionID == "" { + return + } + connection, ok := handlers.ConnectionHub.Get(inbox.ConnectionID) + if !ok { + return + } + signal, err := json.Marshal(inbox) + if err != nil { + logx.Errorln(err) + return + } + if err := connection.WriteJSON(OutgoingMessage{ + Type: MessageTypeSignal, + Signal: signal, + }); err != nil { + logx.Errorln(err) + } +} + +func parseAccountID(topic string, pattern *regexp.Regexp) (string, bool) { + matches := pattern.FindStringSubmatch(topic) + if len(matches) != 2 { + return "", false + } + return matches[1], true +} + +type babyStationAnnouncement struct { + ClientID string `json:"client_id"` + ConnectionID string `json:"connection_id"` + Name string `json:"name"` + StartedAtMillis int64 `json:"started_at_millis"` +} + +type parentStationAnnouncementRequest struct { + ClientID string `json:"client_id"` +} + +type webrtcInboxMessage struct { + FromClientID string `json:"from_client_id"` + ConnectionID string `json:"connection_id"` + Data json.RawMessage `json:"data"` +} + +type ControlMessageBase struct { + Type string `json:"type"` + AtMillis int64 `json:"at_millis"` +} + +type controlMessageBabyStationAnnouncement struct { + ControlMessageBase + BabyStationAnnouncement babyStationAnnouncement `json:"baby_station_announcement"` +} diff --git a/golang/cmd/backend/UsageStats.go b/golang/cmd/backend/UsageStats.go index 1c32a16..a7de49a 100644 --- a/golang/cmd/backend/UsageStats.go +++ b/golang/cmd/backend/UsageStats.go @@ -150,7 +150,7 @@ func applyClientDisconnectedEvent(ctx context.Context, stats *UsageStats, event return } disconnectTime := time.Unix(event.UnixTimestamp, 0) - if disconnectWasClean(clientDisconnectedData.WebSocketCloseCode) { + if disconnectWasClean(clientDisconnectedData.Disconnected.Reason) { duration := disconnectTime.Sub(sessionInfo.StartTime) stats.durationByAccountID[event.AccountID] += duration delete(stats.sessionInfoByID, sessionInfo.ID) @@ -170,11 +170,6 @@ func applyServerStartedEvent(ctx context.Context, stats *UsageStats, event *even } } -func disconnectWasClean(closeCode int) bool { - switch closeCode { - case 1000, 1001: - return true - default: - return false - } +func disconnectWasClean(reason string) bool { + return reason == "clean" } diff --git a/golang/cmd/backend/UsageStats_test.go b/golang/cmd/backend/UsageStats_test.go index 1f2b716..506136b 100644 --- a/golang/cmd/backend/UsageStats_test.go +++ b/golang/cmd/backend/UsageStats_test.go @@ -77,7 +77,6 @@ func TestUsageStats(t *testing.T) { clientConnectedData := ClientConnectedEventData{ ClientID: hostClientID, ConnectionID: hostConnectionID, - RequestID: uuid.NewV4().String(), } data, err := json.Marshal(clientConnectedData) So(err, ShouldBeNil) @@ -92,11 +91,10 @@ func TestUsageStats(t *testing.T) { Convey("host connection ends", func() { Convey("clean", func() { clientDisconnectedData := ClientDisconnectedEventData{ - ClientID: hostClientID, - ConnectionID: hostConnectionID, - RequestID: clientConnectedData.RequestID, - WebSocketCloseCode: 1000, + ClientID: hostClientID, + ConnectionID: hostConnectionID, } + clientDisconnectedData.Disconnected.Reason = "clean" data, err := json.Marshal(clientDisconnectedData) So(err, ShouldBeNil) _, err = log.Append(ctx, eventlog.AppendInput{ @@ -115,11 +113,10 @@ func TestUsageStats(t *testing.T) { }) Convey("unclean", func() { clientDisconnectedData := ClientDisconnectedEventData{ - ClientID: hostClientID, - ConnectionID: hostConnectionID, - RequestID: clientConnectedData.RequestID, - WebSocketCloseCode: 1006, + ClientID: hostClientID, + ConnectionID: hostConnectionID, } + clientDisconnectedData.Disconnected.Reason = "unexpected" data, err := json.Marshal(clientDisconnectedData) So(err, ShouldBeNil) _, err = log.Append(ctx, eventlog.AppendInput{ @@ -139,7 +136,6 @@ func TestUsageStats(t *testing.T) { clientConnectedData := ClientConnectedEventData{ ClientID: hostClientID, ConnectionID: hostConnectionID, - RequestID: uuid.NewV4().String(), } data, err := json.Marshal(clientConnectedData) So(err, ShouldBeNil) @@ -179,7 +175,6 @@ func TestUsageStats(t *testing.T) { clientConnectedData := ClientConnectedEventData{ ClientID: hostClientID, ConnectionID: hostConnectionID, - RequestID: uuid.NewV4().String(), } data, err := json.Marshal(clientConnectedData) So(err, ShouldBeNil) diff --git a/golang/cmd/backend/server.go b/golang/cmd/backend/server.go index 7739423..19078cd 100644 --- a/golang/cmd/backend/server.go +++ b/golang/cmd/backend/server.go @@ -32,6 +32,7 @@ import ( "github.com/Ryan-A-B/beddybytes/golang/internal/mqttx" "github.com/Ryan-A-B/beddybytes/golang/internal/resetpassword" "github.com/Ryan-A-B/beddybytes/golang/internal/sessionlist" + "github.com/Ryan-A-B/beddybytes/golang/internal/sessionstartdecider" "github.com/Ryan-A-B/beddybytes/golang/internal/sessionstore" "github.com/Ryan-A-B/beddybytes/golang/internal/store" ) @@ -66,15 +67,17 @@ type Client struct { } type Handlers struct { - Upgrader websocket.Upgrader - ClientStore ClientStore - ConnectionFactory ConnectionFactory - SessionProjection SessionProjection - SessionList *sessionlist.SessionList - BabyStationList *babystationlist.BabyStationList - EventLog eventlog.EventLog - MQTTClient mqtt.Client - UsageStats *UsageStats + Upgrader websocket.Upgrader + ClientStore ClientStore + ConnectionFactory ConnectionFactory + SessionProjection SessionProjection + SessionList *sessionlist.SessionList + SessionStartDecider *sessionstartdecider.Decider + BabyStationList *babystationlist.BabyStationList + ConnectionHub *ConnectionHub + EventLog eventlog.EventLog + MQTTClient mqtt.Client + UsageStats *UsageStats Key interface{} } @@ -276,11 +279,15 @@ func main() { SessionList: sessionlist.New(ctx, sessionlist.NewInput{ Log: eventLog, }), + SessionStartDecider: sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ + EventLog: eventLog, + }), BabyStationList: babystationlist.New(babystationlist.NewInput{ EventLog: eventLog, }), - EventLog: eventLog, - MQTTClient: mqttClient, + ConnectionHub: NewConnectionHub(), + EventLog: eventLog, + MQTTClient: mqttClient, UsageStats: NewUsageStats(ctx, NewUsageStatsInput{ Log: eventLog, }), @@ -303,6 +310,10 @@ func main() { }) log.Fatal("connectionstoresync.Run exited") }() + go func() { + handlers.RunMQTTSync(ctx) + log.Fatal("handlers.RunMQTTSync exited") + }() router := mux.NewRouter() router.Use(internal.LoggingMiddleware) handlers.AddRoutes(router.NewRoute().Subrouter()) diff --git a/golang/cmd/backend/sessions.go b/golang/cmd/backend/sessions.go index c58827d..4a6bb4c 100644 --- a/golang/cmd/backend/sessions.go +++ b/golang/cmd/backend/sessions.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "fmt" "log" "net/http" "time" @@ -14,6 +15,7 @@ import ( "github.com/Ryan-A-B/beddybytes/golang/internal/eventlog" "github.com/Ryan-A-B/beddybytes/golang/internal/fatal" "github.com/Ryan-A-B/beddybytes/golang/internal/httpx" + "github.com/Ryan-A-B/beddybytes/golang/internal/mqttx" "github.com/Ryan-A-B/beddybytes/golang/internal/sessions" "github.com/Ryan-A-B/beddybytes/golang/internal/sessionstore" ) @@ -49,6 +51,29 @@ type Session struct { StartedAt time.Time `json:"started_at"` } +type CreateSessionInput struct { + ClientID string `json:"client_id"` + ConnectionID string `json:"connection_id"` + Name string `json:"name"` + StartedAtMillis int64 `json:"started_at_millis"` +} + +func (input *CreateSessionInput) validate() error { + if input.ClientID == "" { + return merry.New("client id is empty").WithHTTPCode(http.StatusBadRequest) + } + if input.ConnectionID == "" { + return merry.New("connection id is empty").WithHTTPCode(http.StatusBadRequest) + } + if input.Name == "" { + return merry.New("session name is empty").WithHTTPCode(http.StatusBadRequest) + } + if input.StartedAtMillis <= 0 { + return merry.New("started_at_millis must be positive").WithHTTPCode(http.StatusBadRequest) + } + return nil +} + func (handlers *Handlers) StartSession(responseWriter http.ResponseWriter, request *http.Request) { var err error defer func() { @@ -58,31 +83,24 @@ func (handlers *Handlers) StartSession(responseWriter http.ResponseWriter, reque } }() ctx := request.Context() - vars := mux.Vars(request) - sessionID := vars["session_id"] - var session StartSessionEventData - err = json.NewDecoder(request.Body).Decode(&session) - if err != nil { + accountID := contextx.GetAccountID(ctx) + _ = mux.Vars(request)["session_id"] + var input CreateSessionInput + if err = json.NewDecoder(request.Body).Decode(&input); err != nil { err = merry.WithHTTPCode(err, http.StatusBadRequest) return } - err = session.validate() - if err != nil { + if err = input.validate(); err != nil { return } - if session.ID != sessionID { - err = merry.Errorf("session id in path does not match session id in body").WithHTTPCode(http.StatusBadRequest) + topic := fmt.Sprintf("accounts/%s/baby_stations", accountID) + payload, err := json.Marshal(input) + if err != nil { return } - _, err = handlers.EventLog.Append(ctx, eventlog.AppendInput{ - Type: EventTypeSessionStarted, - AccountID: contextx.GetAccountID(ctx), - Data: fatal.UnlessMarshalJSON(session), - }) - if err != nil { + if err = mqttx.Wait(handlers.MQTTClient.Publish(topic, 1, false, payload)); err != nil { return } - // TODO set header with logical clock of the start event } type EndSessionEventData struct { @@ -90,26 +108,7 @@ type EndSessionEventData struct { } func (handlers *Handlers) EndSession(responseWriter http.ResponseWriter, request *http.Request) { - var err error - defer func() { - if err != nil { - log.Println(err) - httpx.Error(responseWriter, err) - } - }() - ctx := request.Context() - vars := mux.Vars(request) - sessionID := vars["session_id"] - // TODO check we know about the session? - // expect a header with the logical clock of the start event - _, err = handlers.EventLog.Append(ctx, eventlog.AppendInput{ - Type: EventTypeSessionEnded, - AccountID: contextx.GetAccountID(ctx), - Data: fatal.UnlessMarshalJSON(&EndSessionEventData{ID: sessionID}), - }) - if err != nil { - return - } + // Session delete is intentionally a no-op. Session lifecycle now ends on disconnect. } type SessionProjection struct { diff --git a/golang/cmd/monitor-connection-status/main.go b/golang/cmd/monitor-connection-status/main.go index 0d394b4..6e7c682 100644 --- a/golang/cmd/monitor-connection-status/main.go +++ b/golang/cmd/monitor-connection-status/main.go @@ -21,7 +21,7 @@ func main() { if err != nil { panic(err) } - const topic = "accounts/+/connections/+/status" + const topic = "accounts/+/clients/+/status" err = mqttx.Wait(client.Subscribe(topic, 1, handleMessage)) if err != nil { panic(err) diff --git a/golang/internal/babystationlist/BabyStationList.go b/golang/internal/babystationlist/BabyStationList.go index 7aa00fa..8d9da27 100644 --- a/golang/internal/babystationlist/BabyStationList.go +++ b/golang/internal/babystationlist/BabyStationList.go @@ -156,9 +156,8 @@ func (babyStationList *BabyStationList) applyConnected(event *eventlog.Event) { fatal.OnError(err) snapshot := babyStationList.getOrCreateSnapshot(event.AccountID) connection := Connection{ - ClientID: data.ClientID, - ID: data.ConnectionID, - RequestID: data.RequestID, + ClientID: data.ClientID, + ID: data.ConnectionID, } snapshot.ConnectionByID[data.ConnectionID] = &connection } diff --git a/golang/internal/babystationlist/BabyStationList_test.go b/golang/internal/babystationlist/BabyStationList_test.go index 4ebf916..44f2e91 100644 --- a/golang/internal/babystationlist/BabyStationList_test.go +++ b/golang/internal/babystationlist/BabyStationList_test.go @@ -67,7 +67,6 @@ func TestBabyStationList(t *testing.T) { Data: fatal.UnlessMarshalJSON(connections.EventConnected{ ClientID: clientID, ConnectionID: connectionID, - RequestID: uuid.NewV4().String(), }), }) So(err, ShouldBeNil) @@ -106,7 +105,6 @@ func TestBabyStationList(t *testing.T) { Data: fatal.UnlessMarshalJSON(connections.EventConnected{ ClientID: clientID, ConnectionID: connectionID, - RequestID: uuid.NewV4().String(), }), }) So(err, ShouldBeNil) @@ -144,7 +142,6 @@ func TestBabyStationList(t *testing.T) { Data: fatal.UnlessMarshalJSON(connections.EventConnected{ ClientID: clientID, ConnectionID: connectionID, - RequestID: uuid.NewV4().String(), }), }) So(err, ShouldBeNil) @@ -220,7 +217,6 @@ func TestBabyStationList(t *testing.T) { Data: fatal.UnlessMarshalJSON(connections.EventConnected{ ClientID: clientID1, ConnectionID: connectionID1, - RequestID: uuid.NewV4().String(), }), }) So(err, ShouldBeNil) @@ -231,7 +227,6 @@ func TestBabyStationList(t *testing.T) { Data: fatal.UnlessMarshalJSON(connections.EventConnected{ ClientID: clientID2, ConnectionID: connectionID2, - RequestID: uuid.NewV4().String(), }), }) So(err, ShouldBeNil) diff --git a/golang/internal/connections/Event.go b/golang/internal/connections/Event.go index a18ed27..c19b3ba 100644 --- a/golang/internal/connections/Event.go +++ b/golang/internal/connections/Event.go @@ -6,11 +6,14 @@ const EventTypeDisconnected string = "client.disconnected" type EventConnected struct { ClientID string `json:"client_id"` ConnectionID string `json:"connection_id"` - RequestID string `json:"request_id"` + AtMillis int64 `json:"at_millis"` } type EventDisconnected struct { ClientID string `json:"client_id"` ConnectionID string `json:"connection_id"` - RequestID string `json:"request_id"` + AtMillis int64 `json:"at_millis"` + Disconnected struct { + Reason string `json:"reason"` + } `json:"disconnected"` } diff --git a/golang/internal/connectionstore/Decider.go b/golang/internal/connectionstore/Decider.go index 6486929..3a4922b 100644 --- a/golang/internal/connectionstore/Decider.go +++ b/golang/internal/connectionstore/Decider.go @@ -15,10 +15,11 @@ import ( var ErrDuplicate = errors.New("duplicate") type Connection struct { - ID string - AccountID string - ClientID string - RequestID string + ID string + AccountID string + ClientID string + AtMillis int64 + DisconnectReason string } type ApplyFunc func(ctx context.Context, event *eventlog.Event) error @@ -64,7 +65,7 @@ func (decider *Decider) Put(ctx context.Context, connection Connection) error { data, err := json.Marshal(connections.EventConnected{ ClientID: connection.ClientID, ConnectionID: connection.ID, - RequestID: connection.RequestID, + AtMillis: connection.AtMillis, }) fatal.OnError(err) _, err = decider.eventLog.Append(ctx, eventlog.AppendInput{ @@ -90,7 +91,12 @@ func (decider *Decider) Delete(ctx context.Context, connection Connection) error data, err := json.Marshal(connections.EventDisconnected{ ClientID: connection.ClientID, ConnectionID: connection.ID, - RequestID: connection.RequestID, + AtMillis: connection.AtMillis, + Disconnected: struct { + Reason string `json:"reason"` + }{ + Reason: connection.DisconnectReason, + }, }) fatal.OnError(err) _, err = decider.eventLog.Append(ctx, eventlog.AppendInput{ @@ -136,7 +142,6 @@ func (decider *Decider) applyConnected(ctx context.Context, event *eventlog.Even ID: data.ConnectionID, AccountID: event.AccountID, ClientID: data.ClientID, - RequestID: data.RequestID, } key := decider.getKey(connection) decider.connectedKeySet[key] = struct{}{} @@ -151,7 +156,6 @@ func (decider *Decider) applyDisconnected(ctx context.Context, event *eventlog.E ID: data.ConnectionID, AccountID: event.AccountID, ClientID: data.ClientID, - RequestID: data.RequestID, } key := decider.getKey(connection) decider.disconnectedKeySet[key] = struct{}{} @@ -159,5 +163,5 @@ func (decider *Decider) applyDisconnected(ctx context.Context, event *eventlog.E } func (decider *Decider) getKey(connection Connection) string { - return fmt.Sprintf("accounts/%s/clients/%s/connections/%s/requests/%s", connection.AccountID, connection.ClientID, connection.ID, connection.RequestID) + return fmt.Sprintf("accounts/%s/clients/%s/connections/%s", connection.AccountID, connection.ClientID, connection.ID) } diff --git a/golang/internal/connectionstore/Decider_test.go b/golang/internal/connectionstore/Decider_test.go index 9e9bfee..8e8bc9e 100644 --- a/golang/internal/connectionstore/Decider_test.go +++ b/golang/internal/connectionstore/Decider_test.go @@ -28,7 +28,6 @@ func TestDecider(t *testing.T) { ID: uuid.NewV4().String(), AccountID: uuid.NewV4().String(), ClientID: uuid.NewV4().String(), - RequestID: uuid.NewV4().String(), } Convey("Put", func() { err = decider.Put(ctx, connection) @@ -44,7 +43,6 @@ func TestDecider(t *testing.T) { ID: uuid.NewV4().String(), AccountID: uuid.NewV4().String(), ClientID: uuid.NewV4().String(), - RequestID: uuid.NewV4().String(), } err = decider.Put(ctx, connection) So(err, ShouldBeNil) @@ -64,13 +62,11 @@ func TestDecider(t *testing.T) { ID: uuid.NewV4().String(), AccountID: uuid.NewV4().String(), ClientID: uuid.NewV4().String(), - RequestID: uuid.NewV4().String(), }, { ID: uuid.NewV4().String(), AccountID: uuid.NewV4().String(), ClientID: uuid.NewV4().String(), - RequestID: uuid.NewV4().String(), }, } for _, connection := range connections { diff --git a/golang/internal/connectionstoresync/Sync.go b/golang/internal/connectionstoresync/Sync.go index 7477c91..239dab3 100644 --- a/golang/internal/connectionstoresync/Sync.go +++ b/golang/internal/connectionstoresync/Sync.go @@ -6,21 +6,19 @@ import ( "log" "regexp" - "github.com/Ryan-A-B/beddybytes/golang/internal/connections" "github.com/Ryan-A-B/beddybytes/golang/internal/connectionstore" "github.com/Ryan-A-B/beddybytes/golang/internal/fatal" "github.com/Ryan-A-B/beddybytes/golang/internal/logx" - "github.com/Ryan-A-B/beddybytes/golang/internal/messages" "github.com/Ryan-A-B/beddybytes/golang/internal/mqttx" mqtt "github.com/eclipse/paho.mqtt.golang" ) var connectionStore *connectionstore.Decider -var handlers = map[messages.MessageType]func(ctx context.Context, input handlerInput){ - connections.MessageTypeConnected: handleConnectedMessage, - connections.MessageTypeDisconnected: handleDisconnectedMessage, +var handlers = map[string]func(ctx context.Context, input handlerInput){ + "connected": handleConnectedMessage, + "disconnected": handleDisconnectedMessage, } -var topicRegex = regexp.MustCompile(`^accounts/([^/]+)/connections/([^/]+)/status$`) +var topicRegex = regexp.MustCompile(`^accounts/([^/]+)/clients/([^/]+)/status$`) type RunInput struct { MQTTClient mqtt.Client @@ -32,63 +30,79 @@ func Run(ctx context.Context, input RunInput) { panic("already running") } connectionStore = input.ConnectionStore - err := mqttx.Wait(input.MQTTClient.Subscribe("accounts/+/connections/+/status", 1, handleMessage)) + err := mqttx.Wait(input.MQTTClient.Subscribe("accounts/+/clients/+/status", 1, handleMessage)) fatal.OnError(err) <-ctx.Done() } func handleMessage(client mqtt.Client, message mqtt.Message) { ctx := context.Background() - var frame connections.MessageFrame - err := json.Unmarshal(message.Payload(), &frame) + var status statusMessage + err := json.Unmarshal(message.Payload(), &status) if err != nil { logx.Warnln(err) return } var accountID string - var connectionID string + var clientID string matches := topicRegex.FindStringSubmatch(message.Topic()) fatal.Unless(len(matches) == 3, "failed to parse topic: "+message.Topic()) - accountID, connectionID = matches[1], matches[2] - handle, ok := handlers[frame.Type] + accountID, clientID = matches[1], matches[2] + handle, ok := handlers[status.Type] if !ok { - logx.Warnln("unhandled message type:", frame.Type) + logx.Warnln("unhandled message type:", status.Type) return } handle(ctx, handlerInput{ - AccountID: accountID, - ConnectionID: connectionID, - Frame: frame, + AccountID: accountID, + ClientID: clientID, + Status: status, }) message.Ack() } func handleConnectedMessage(ctx context.Context, input handlerInput) { err := connectionStore.Put(ctx, connectionstore.Connection{ - ID: input.ConnectionID, + ID: input.Status.ConnectionID, AccountID: input.AccountID, - ClientID: input.Frame.Connected.ClientID, - RequestID: input.Frame.Connected.RequestID, + ClientID: input.ClientID, + AtMillis: input.Status.AtMillis, }) if err != nil { + if err == connectionstore.ErrDuplicate { + return + } log.Fatal(err) } } func handleDisconnectedMessage(ctx context.Context, input handlerInput) { err := connectionStore.Delete(ctx, connectionstore.Connection{ - ID: input.ConnectionID, - AccountID: input.AccountID, - ClientID: input.Frame.Disconnected.ClientID, - RequestID: input.Frame.Disconnected.RequestID, + ID: input.Status.ConnectionID, + AccountID: input.AccountID, + ClientID: input.ClientID, + AtMillis: input.Status.AtMillis, + DisconnectReason: input.Status.Disconnected.Reason, }) if err != nil { + if err == connectionstore.ErrDuplicate { + return + } log.Fatal(err) } } type handlerInput struct { - AccountID string - ConnectionID string - Frame connections.MessageFrame + AccountID string + ClientID string + Status statusMessage +} + +type statusMessage struct { + Type string `json:"type"` + ConnectionID string `json:"connection_id"` + AtMillis int64 `json:"at_millis"` + Disconnected struct { + Reason string `json:"reason"` + } `json:"disconnected"` } diff --git a/golang/internal/sessionlist/Session.go b/golang/internal/sessionlist/Session.go index c8f6e7d..59e0060 100644 --- a/golang/internal/sessionlist/Session.go +++ b/golang/internal/sessionlist/Session.go @@ -38,7 +38,6 @@ func (state HostConnectionStateBase) GetSince() int64 { type HostConnectionStateConnected struct { HostConnectionStateBase - RequestID string `json:"request_id"` } type HostConnectionStateDisconnected struct { diff --git a/golang/internal/sessionlist/SessionList.go b/golang/internal/sessionlist/SessionList.go index e6ae993..fe719e4 100644 --- a/golang/internal/sessionlist/SessionList.go +++ b/golang/internal/sessionlist/SessionList.go @@ -171,7 +171,6 @@ func applySessionStartedEvent(ctx context.Context, sessionList *SessionList, eve State: ConnectionStateConnected, Since: event.UnixTimestamp, }, - RequestID: "TODO", // TODO }, }) } @@ -198,10 +197,6 @@ func applyClientDisconnectedEvent(ctx context.Context, sessionList *SessionList, if session.HostConnectionState.GetState() != ConnectionStateConnected { return } - hostConnectionState := session.HostConnectionState.(HostConnectionStateConnected) - if hostConnectionState.RequestID != data.RequestID { - return - } session.HostConnectionState = HostConnectionStateDisconnected{ HostConnectionStateBase: HostConnectionStateBase{ State: ConnectionStateDisconnected, @@ -223,7 +218,6 @@ func applyClientConnectedEvent(ctx context.Context, sessionList *SessionList, ev State: ConnectionStateConnected, Since: event.UnixTimestamp, }, - RequestID: data.RequestID, } } diff --git a/golang/internal/sessionstartdecider/Decider.go b/golang/internal/sessionstartdecider/Decider.go new file mode 100644 index 0000000..79a16e5 --- /dev/null +++ b/golang/internal/sessionstartdecider/Decider.go @@ -0,0 +1,102 @@ +package sessionstartdecider + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sync" + "time" + + "github.com/Ryan-A-B/beddybytes/golang/internal/eventlog" +) + +const EventTypeSessionStarted = "session.started" + +var ErrDuplicate = errors.New("duplicate") + +type Session struct { + AccountID string + ID string + Name string + HostConnectionID string + StartedAt time.Time +} + +type Decider struct { + eventLog eventlog.EventLog + mutex sync.Mutex + cursor int64 + connectionIDSet map[string]struct{} +} + +type NewDeciderInput struct { + EventLog eventlog.EventLog +} + +func NewDecider(input NewDeciderInput) *Decider { + return &Decider{ + eventLog: input.EventLog, + connectionIDSet: make(map[string]struct{}), + } +} + +func (decider *Decider) Put(ctx context.Context, session Session) error { + decider.mutex.Lock() + defer decider.mutex.Unlock() + + if err := decider.catchUp(ctx); err != nil { + return err + } + key := decider.getKey(session.AccountID, session.HostConnectionID) + if _, ok := decider.connectionIDSet[key]; ok { + return ErrDuplicate + } + data, err := json.Marshal(sessionStartedEventData{ + ID: session.ID, + Name: session.Name, + HostConnectionID: session.HostConnectionID, + StartedAt: session.StartedAt, + }) + if err != nil { + return err + } + _, err = decider.eventLog.Append(ctx, eventlog.AppendInput{ + Type: EventTypeSessionStarted, + AccountID: session.AccountID, + Data: data, + }) + return err +} + +func (decider *Decider) catchUp(ctx context.Context) error { + iterator := decider.eventLog.GetEventIterator(ctx, eventlog.GetEventIteratorInput{ + FromCursor: decider.cursor, + }) + for iterator.Next(ctx) { + event := iterator.Event() + if event.Type != EventTypeSessionStarted { + decider.cursor = event.LogicalClock + continue + } + var data sessionStartedEventData + if err := json.Unmarshal(event.Data, &data); err != nil { + return err + } + key := decider.getKey(event.AccountID, data.HostConnectionID) + decider.connectionIDSet[key] = struct{}{} + decider.cursor = event.LogicalClock + } + return iterator.Err() +} + +func (decider *Decider) getKey(accountID string, connectionID string) string { + return fmt.Sprintf("accounts/%s/connections/%s", accountID, connectionID) +} + +type sessionStartedEventData struct { + ID string `json:"id"` + Name string `json:"name"` + HostConnectionID string `json:"host_connection_id"` + StartedAt time.Time `json:"started_at"` +} From b2380ae280f747afdfdb0693adac64300a434da9 Mon Sep 17 00:00:00 2001 From: rballinger Date: Wed, 11 Mar 2026 21:37:14 +1000 Subject: [PATCH 2/6] .. --- agent_docs/Backend Signalling Contracts.md | 6 ++ agent_docs/Signalling Migration.md | 1 + agent_docs/Working Agreement with Human.md | 13 +++ agent_docs/index.md | 1 + golang/cmd/backend/Connection.go | 6 ++ golang/cmd/backend/MQTTSync.go | 2 +- golang/cmd/backend/PendingSessionStarts.go | 91 +++++++++++++++++++ .../cmd/backend/PendingSessionStarts_test.go | 60 ++++++++++++ golang/cmd/backend/server.go | 24 ++--- golang/cmd/backend/sessions.go | 60 +++++++++--- golang/cmd/backend/sessions_test.go | 68 ++++++++++++++ 11 files changed, 309 insertions(+), 23 deletions(-) create mode 100644 agent_docs/Working Agreement with Human.md create mode 100644 golang/cmd/backend/PendingSessionStarts.go create mode 100644 golang/cmd/backend/PendingSessionStarts_test.go create mode 100644 golang/cmd/backend/sessions_test.go diff --git a/agent_docs/Backend Signalling Contracts.md b/agent_docs/Backend Signalling Contracts.md index 1fda579..ce6e065 100644 --- a/agent_docs/Backend Signalling Contracts.md +++ b/agent_docs/Backend Signalling Contracts.md @@ -92,6 +92,12 @@ Event payloads are defined in `golang/internal/connections/Event.go`. - `PUT /sessions/{session_id}` now publishes session announcements to: - `accounts/{account_id}/baby_stations` +- `PUT /sessions/{session_id}` accepts both: + - legacy payload (`id`, `name`, `host_connection_id`, `started_at`) + - migration payload (`client_id`, `connection_id`, `name`, `started_at_millis`) +- If `client_id` is missing, backend resolves it from in-memory connection map (`connection_id -> client_id`). +- If mapping is not available yet, backend queues session start in-memory and publishes after websocket connect registers that connection. +- Queue policy: per-account LRU with max 2 pending session starts. - Session delete endpoint is intentionally no-op; disconnect status drives lifecycle. - `MQTTSync` subscribes `accounts/+/baby_stations` and appends `session.started` with idempotency via `sessionstartdecider`. diff --git a/agent_docs/Signalling Migration.md b/agent_docs/Signalling Migration.md index 9eda489..403344f 100644 --- a/agent_docs/Signalling Migration.md +++ b/agent_docs/Signalling Migration.md @@ -6,6 +6,7 @@ - publishes client status to `accounts/{account_id}/clients/{client_id}/status` - publishes WebRTC signalling to `accounts/{account_id}/clients/{client_id}/webrtc_inbox` - subscribes `baby_stations`, `parent_stations`, `webrtc_inbox`, and status wildcard topics + - queues `PUT /sessions/{session_id}` session starts in-memory when `client_id` is unresolved and flushes on websocket connect - Current implementation is backend-centric; no finalized frontend MQTT path yet. See [[Backend Signalling Contracts]] for current endpoint/topic details. diff --git a/agent_docs/Working Agreement with Human.md b/agent_docs/Working Agreement with Human.md new file mode 100644 index 0000000..adb9dbb --- /dev/null +++ b/agent_docs/Working Agreement with Human.md @@ -0,0 +1,13 @@ +The human prefers immediate escalation when implementation issues arise. + +## Escalation Preference + +- If a refactor exposes missing data, ordering assumptions, or contract ambiguity, raise it immediately. +- Do not silently patch around issues. +- Present the issue and let the human choose the resolution approach. + +## Current Applied Example (2026-03-10) + +- MQTT `BabyStationAnnouncement` requires `client_id`, but legacy `PUT /sessions/{session_id}` payload does not contain it. +- Frontend Baby Station currently calls session create before opening the websocket connection, so connection-to-client mapping is not guaranteed at create time. +- Human-selected resolution: maintain an in-memory connection map (`connection_id` -> `client_id`) and handle session-create timing accordingly. diff --git a/agent_docs/index.md b/agent_docs/index.md index b19814c..972dec4 100644 --- a/agent_docs/index.md +++ b/agent_docs/index.md @@ -10,6 +10,7 @@ This vault captures implementation-oriented documentation for BeddyBytes. - [[Backend Signalling Contracts]] - [[Signalling Migration]] - [[Open Questions]] +- [[Working Agreement with Human]] ## Source Material diff --git a/golang/cmd/backend/Connection.go b/golang/cmd/backend/Connection.go index 00ffc2d..61b5f2b 100644 --- a/golang/cmd/backend/Connection.go +++ b/golang/cmd/backend/Connection.go @@ -128,6 +128,12 @@ func (handlers *Handlers) HandleConnection(responseWriter http.ResponseWriter, r }) handlers.ConnectionHub.Put(connection) defer handlers.ConnectionHub.Delete(connection.ID) + if pendingStart, ok := handlers.PendingSessionStarts.Take(accountID, connectionID); ok { + pendingStart.ClientID = clientID + if err = handlers.publishBabyStationAnnouncement(accountID, pendingStart); err != nil { + logx.Errorln(err) + } + } disconnectReason := "clean" defer func() { err = handlers.sendDisconnectedMessage(ctx, sendDisconnectedMessageInput{ diff --git a/golang/cmd/backend/MQTTSync.go b/golang/cmd/backend/MQTTSync.go index 4897dbb..3f9e846 100644 --- a/golang/cmd/backend/MQTTSync.go +++ b/golang/cmd/backend/MQTTSync.go @@ -49,7 +49,7 @@ func (handlers *Handlers) handleBabyStationsMessage(client mqtt.Client, message logx.Warnln(err) return } - if announcement.ClientID == "" || announcement.ConnectionID == "" || announcement.Name == "" { + if announcement.ConnectionID == "" || announcement.Name == "" { logx.Warnln("invalid baby station announcement payload") return } diff --git a/golang/cmd/backend/PendingSessionStarts.go b/golang/cmd/backend/PendingSessionStarts.go new file mode 100644 index 0000000..4dd7d7b --- /dev/null +++ b/golang/cmd/backend/PendingSessionStarts.go @@ -0,0 +1,91 @@ +package main + +import "sync" + +type pendingSessionStart struct { + connectionID string + input CreateSessionInput +} + +type PendingSessionStarts struct { + mutex sync.Mutex + entriesByAccount map[string][]pendingSessionStart + maxPerAccount int +} + +func NewPendingSessionStarts() *PendingSessionStarts { + return &PendingSessionStarts{ + entriesByAccount: make(map[string][]pendingSessionStart), + maxPerAccount: 2, + } +} + +func (store *PendingSessionStarts) Put(accountID string, input CreateSessionInput) { + store.mutex.Lock() + defer store.mutex.Unlock() + entries := store.entriesByAccount[accountID] + entries = removeConnectionID(entries, input.ConnectionID) + entries = append(entries, pendingSessionStart{ + connectionID: input.ConnectionID, + input: input, + }) + if len(entries) > store.maxPerAccount { + entries = entries[len(entries)-store.maxPerAccount:] + } + store.entriesByAccount[accountID] = entries +} + +func (store *PendingSessionStarts) Take(accountID string, connectionID string) (CreateSessionInput, bool) { + store.mutex.Lock() + defer store.mutex.Unlock() + entries := store.entriesByAccount[accountID] + for i, pending := range entries { + if pending.connectionID != connectionID { + continue + } + store.entriesByAccount[accountID] = append(entries[:i], entries[i+1:]...) + if len(store.entriesByAccount[accountID]) == 0 { + delete(store.entriesByAccount, accountID) + } + return pending.input, true + } + return CreateSessionInput{}, false +} + +func removeConnectionID(entries []pendingSessionStart, connectionID string) []pendingSessionStart { + for i, entry := range entries { + if entry.connectionID != connectionID { + continue + } + return append(entries[:i], entries[i+1:]...) + } + return entries +} + +func (store *PendingSessionStarts) lenForAccount(accountID string) int { + store.mutex.Lock() + defer store.mutex.Unlock() + return len(store.entriesByAccount[accountID]) +} + +func (store *PendingSessionStarts) contains(accountID string, connectionID string) bool { + store.mutex.Lock() + defer store.mutex.Unlock() + for _, entry := range store.entriesByAccount[accountID] { + if entry.connectionID == connectionID { + return true + } + } + return false +} + +func (store *PendingSessionStarts) connectionsInOrder(accountID string) []string { + store.mutex.Lock() + defer store.mutex.Unlock() + entries := store.entriesByAccount[accountID] + out := make([]string, 0, len(entries)) + for _, entry := range entries { + out = append(out, entry.connectionID) + } + return out +} diff --git a/golang/cmd/backend/PendingSessionStarts_test.go b/golang/cmd/backend/PendingSessionStarts_test.go new file mode 100644 index 0000000..2f948e4 --- /dev/null +++ b/golang/cmd/backend/PendingSessionStarts_test.go @@ -0,0 +1,60 @@ +package main + +import "testing" + +func TestPendingSessionStartsLRUPerAccount(t *testing.T) { + store := NewPendingSessionStarts() + accountID := "account-1" + + store.Put(accountID, CreateSessionInput{ConnectionID: "c1", Name: "one", StartedAtMillis: 1}) + store.Put(accountID, CreateSessionInput{ConnectionID: "c2", Name: "two", StartedAtMillis: 2}) + store.Put(accountID, CreateSessionInput{ConnectionID: "c3", Name: "three", StartedAtMillis: 3}) + + if store.contains(accountID, "c1") { + t.Fatalf("expected oldest entry c1 to be evicted") + } + if !store.contains(accountID, "c2") || !store.contains(accountID, "c3") { + t.Fatalf("expected c2 and c3 to be retained") + } + order := store.connectionsInOrder(accountID) + if len(order) != 2 || order[0] != "c2" || order[1] != "c3" { + t.Fatalf("unexpected order: %#v", order) + } +} + +func TestPendingSessionStartsTakeAndPromote(t *testing.T) { + store := NewPendingSessionStarts() + accountID := "account-1" + + store.Put(accountID, CreateSessionInput{ConnectionID: "c1", Name: "one", StartedAtMillis: 1}) + store.Put(accountID, CreateSessionInput{ConnectionID: "c2", Name: "two", StartedAtMillis: 2}) + store.Put(accountID, CreateSessionInput{ConnectionID: "c1", Name: "one-new", StartedAtMillis: 3}) + + order := store.connectionsInOrder(accountID) + if len(order) != 2 || order[0] != "c2" || order[1] != "c1" { + t.Fatalf("unexpected order after promote: %#v", order) + } + + input, ok := store.Take(accountID, "c1") + if !ok { + t.Fatalf("expected c1 to be present") + } + if input.Name != "one-new" { + t.Fatalf("unexpected input returned: %#v", input) + } + if store.lenForAccount(accountID) != 1 { + t.Fatalf("expected one entry remaining") + } +} + +func TestPendingSessionStartsIsolatedByAccount(t *testing.T) { + store := NewPendingSessionStarts() + store.Put("a1", CreateSessionInput{ConnectionID: "c1", Name: "one", StartedAtMillis: 1}) + store.Put("a2", CreateSessionInput{ConnectionID: "c1", Name: "other", StartedAtMillis: 1}) + + _, okA1 := store.Take("a1", "c1") + _, okA2 := store.Take("a2", "c1") + if !okA1 || !okA2 { + t.Fatalf("expected both accounts to have isolated entries") + } +} diff --git a/golang/cmd/backend/server.go b/golang/cmd/backend/server.go index 19078cd..17dbcad 100644 --- a/golang/cmd/backend/server.go +++ b/golang/cmd/backend/server.go @@ -67,17 +67,18 @@ type Client struct { } type Handlers struct { - Upgrader websocket.Upgrader - ClientStore ClientStore - ConnectionFactory ConnectionFactory - SessionProjection SessionProjection - SessionList *sessionlist.SessionList - SessionStartDecider *sessionstartdecider.Decider - BabyStationList *babystationlist.BabyStationList - ConnectionHub *ConnectionHub - EventLog eventlog.EventLog - MQTTClient mqtt.Client - UsageStats *UsageStats + Upgrader websocket.Upgrader + ClientStore ClientStore + ConnectionFactory ConnectionFactory + SessionProjection SessionProjection + SessionList *sessionlist.SessionList + SessionStartDecider *sessionstartdecider.Decider + PendingSessionStarts *PendingSessionStarts + BabyStationList *babystationlist.BabyStationList + ConnectionHub *ConnectionHub + EventLog eventlog.EventLog + MQTTClient mqtt.Client + UsageStats *UsageStats Key interface{} } @@ -282,6 +283,7 @@ func main() { SessionStartDecider: sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ EventLog: eventLog, }), + PendingSessionStarts: NewPendingSessionStarts(), BabyStationList: babystationlist.New(babystationlist.NewInput{ EventLog: eventLog, }), diff --git a/golang/cmd/backend/sessions.go b/golang/cmd/backend/sessions.go index 4a6bb4c..9d910e9 100644 --- a/golang/cmd/backend/sessions.go +++ b/golang/cmd/backend/sessions.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "log" "net/http" "time" @@ -59,9 +60,6 @@ type CreateSessionInput struct { } func (input *CreateSessionInput) validate() error { - if input.ClientID == "" { - return merry.New("client id is empty").WithHTTPCode(http.StatusBadRequest) - } if input.ConnectionID == "" { return merry.New("connection id is empty").WithHTTPCode(http.StatusBadRequest) } @@ -84,25 +82,65 @@ func (handlers *Handlers) StartSession(responseWriter http.ResponseWriter, reque }() ctx := request.Context() accountID := contextx.GetAccountID(ctx) - _ = mux.Vars(request)["session_id"] - var input CreateSessionInput - if err = json.NewDecoder(request.Body).Decode(&input); err != nil { + sessionID := mux.Vars(request)["session_id"] + requestBody, err := io.ReadAll(request.Body) + if err != nil { err = merry.WithHTTPCode(err, http.StatusBadRequest) return } - if err = input.validate(); err != nil { + input, err := parseStartSessionInput(requestBody, sessionID) + if err != nil { return } - topic := fmt.Sprintf("accounts/%s/baby_stations", accountID) - payload, err := json.Marshal(input) - if err != nil { + if input.ClientID == "" { + connection, ok := handlers.ConnectionHub.Get(input.ConnectionID) + if ok && connection.AccountID == accountID { + input.ClientID = connection.ClientID + } + } + if input.ClientID == "" { + handlers.PendingSessionStarts.Put(accountID, input) return } - if err = mqttx.Wait(handlers.MQTTClient.Publish(topic, 1, false, payload)); err != nil { + if err = handlers.publishBabyStationAnnouncement(accountID, input); err != nil { return } } +func parseStartSessionInput(data []byte, sessionID string) (CreateSessionInput, error) { + var input CreateSessionInput + if err := json.Unmarshal(data, &input); err == nil { + if err := input.validate(); err == nil { + return input, nil + } + } + var legacyInput StartSessionEventData + if err := json.Unmarshal(data, &legacyInput); err != nil { + return CreateSessionInput{}, merry.WithHTTPCode(err, http.StatusBadRequest) + } + if err := legacyInput.validate(); err != nil { + return CreateSessionInput{}, err + } + if legacyInput.ID != sessionID { + return CreateSessionInput{}, merry.Errorf("session id in path does not match session id in body").WithHTTPCode(http.StatusBadRequest) + } + return CreateSessionInput{ + ClientID: "", + ConnectionID: legacyInput.HostConnectionID, + Name: legacyInput.Name, + StartedAtMillis: legacyInput.StartedAt.UnixMilli(), + }, nil +} + +func (handlers *Handlers) publishBabyStationAnnouncement(accountID string, input CreateSessionInput) error { + topic := fmt.Sprintf("accounts/%s/baby_stations", accountID) + payload, err := json.Marshal(input) + if err != nil { + return err + } + return mqttx.Wait(handlers.MQTTClient.Publish(topic, 1, false, payload)) +} + type EndSessionEventData struct { ID string `json:"id"` } diff --git a/golang/cmd/backend/sessions_test.go b/golang/cmd/backend/sessions_test.go new file mode 100644 index 0000000..20002f3 --- /dev/null +++ b/golang/cmd/backend/sessions_test.go @@ -0,0 +1,68 @@ +package main + +import ( + "encoding/json" + "testing" + "time" +) + +func TestParseStartSessionInputNewPayload(t *testing.T) { + sessionID := "session-123" + payload := CreateSessionInput{ + ClientID: "client-123", + ConnectionID: "connection-123", + Name: "nursery", + StartedAtMillis: time.Now().UnixMilli(), + } + data, err := json.Marshal(payload) + if err != nil { + t.Fatal(err) + } + output, err := parseStartSessionInput(data, sessionID) + if err != nil { + t.Fatal(err) + } + if output.ClientID != payload.ClientID { + t.Fatalf("unexpected client id: %q", output.ClientID) + } + if output.ConnectionID != payload.ConnectionID { + t.Fatalf("unexpected connection id: %q", output.ConnectionID) + } + if output.Name != payload.Name { + t.Fatalf("unexpected name: %q", output.Name) + } + if output.StartedAtMillis != payload.StartedAtMillis { + t.Fatalf("unexpected started_at_millis: %d", output.StartedAtMillis) + } +} + +func TestParseStartSessionInputLegacyPayload(t *testing.T) { + sessionID := "session-123" + startedAt := time.Now().UTC().Truncate(time.Second) + payload := StartSessionEventData{ + ID: sessionID, + Name: "nursery", + HostConnectionID: "connection-123", + StartedAt: startedAt, + } + data, err := json.Marshal(payload) + if err != nil { + t.Fatal(err) + } + output, err := parseStartSessionInput(data, sessionID) + if err != nil { + t.Fatal(err) + } + if output.ClientID != "" { + t.Fatalf("expected empty client id for legacy payload, got %q", output.ClientID) + } + if output.ConnectionID != payload.HostConnectionID { + t.Fatalf("unexpected connection id: %q", output.ConnectionID) + } + if output.Name != payload.Name { + t.Fatalf("unexpected name: %q", output.Name) + } + if output.StartedAtMillis != payload.StartedAt.UnixMilli() { + t.Fatalf("unexpected started_at_millis: %d", output.StartedAtMillis) + } +} From 8a1903cb316de3f408ac7bffb21595831e4bba38 Mon Sep 17 00:00:00 2001 From: ryan Date: Fri, 3 Apr 2026 14:16:12 +1000 Subject: [PATCH 3/6] .. --- agent_docs/Frontend Service Architecture.md | 32 +++++++++++++++++++++ agent_docs/architecture.md | 10 +++++++ agent_docs/index.md | 1 + agent_docs/structure.md | 1 + golang/cmd/backend/Connection.go | 7 +++-- golang/cmd/backend/MQTTSync.go | 16 ++++++++--- 6 files changed, 60 insertions(+), 7 deletions(-) create mode 100644 agent_docs/Frontend Service Architecture.md diff --git a/agent_docs/Frontend Service Architecture.md b/agent_docs/Frontend Service Architecture.md new file mode 100644 index 0000000..b24c853 --- /dev/null +++ b/agent_docs/Frontend Service Architecture.md @@ -0,0 +1,32 @@ +## Frontend Service Architecture + +The frontend codebase generally works best when domain behavior lives in services and React stays focused on rendering. + +## Preferred Split + +- Services own domain state and transitions. +- Services own HTTP calls and long-running async workflows. +- Services own timers, retries, event subscriptions, and derived domain status. +- React components render state and invoke service commands. +- Hooks are thin adapters that subscribe React to service state. + +## What Belongs in React + +Keep state in React only when it is truly view-local: + +- modal open or closed +- active tab +- local draft input before submit +- focus, hover, or animation state + +If the state affects workflow, survives across screens, or mirrors backend/application behavior, it should usually move into a service. + +## Existing Repository Patterns + +- `frontend/src/services/Service.ts` provides a common base for stateful frontend services. +- `frontend/src/hooks/useServiceState.ts` is the standard hook shape for subscribing React to service state. +- `frontend/src/services/ParentStation/CountUpTimer.ts` is a concrete example where timer behavior is outside React. + +## Practical Rule + +If a React component starts accumulating fetch calls, retry logic, state-machine transitions, or several interdependent domain `useState` values, stop and extract a service. diff --git a/agent_docs/architecture.md b/agent_docs/architecture.md index 2810100..f1d7070 100644 --- a/agent_docs/architecture.md +++ b/agent_docs/architecture.md @@ -8,6 +8,16 @@ BeddyBytes is a privacy-first baby monitor built around browser-native WebRTC. - Backend (`golang/`): authentication, session metadata, signalling transport, and account separation. - Marketing site (`marketing/`): public website and commercial pages. +## Frontend Architecture Pattern + +The frontend generally follows a service-oriented model: + +- application and workflow state live in services under `frontend/src/services/` +- React components under `frontend/src/pages/` and `frontend/src/components/` render that state +- hooks such as `frontend/src/hooks/useServiceState.ts` adapt service events into React updates + +This keeps business logic, timers, and network workflows out of JSX-heavy components. + ## Core Roles - Baby Station: captures microphone audio (required) and camera video (optional), then publishes media to WebRTC peers. diff --git a/agent_docs/index.md b/agent_docs/index.md index 972dec4..4403156 100644 --- a/agent_docs/index.md +++ b/agent_docs/index.md @@ -6,6 +6,7 @@ This vault captures implementation-oriented documentation for BeddyBytes. - [[architecture]] - [[structure]] +- [[Frontend Service Architecture]] - [[Product and Privacy Model]] - [[Backend Signalling Contracts]] - [[Signalling Migration]] diff --git a/agent_docs/structure.md b/agent_docs/structure.md index b17a1a0..c669eb2 100644 --- a/agent_docs/structure.md +++ b/agent_docs/structure.md @@ -44,6 +44,7 @@ This note maps where key system behavior lives. - [[index]] - [[architecture]] +- [[Frontend Service Architecture]] - [[Product and Privacy Model]] - [[Backend Signalling Contracts]] - [[Signalling Migration]] diff --git a/golang/cmd/backend/Connection.go b/golang/cmd/backend/Connection.go index 61b5f2b..8ac0ee3 100644 --- a/golang/cmd/backend/Connection.go +++ b/golang/cmd/backend/Connection.go @@ -292,9 +292,10 @@ func (connection *Connection) handleSignal(ctx context.Context, incomingSignal * } inboxTopic := fmt.Sprintf(webrtcInboxTopicFormat, connection.AccountID, topicClientID) data, err := json.Marshal(map[string]any{ - "from_client_id": connection.ClientID, - "connection_id": incomingSignal.ToConnectionID, - "data": json.RawMessage(incomingSignal.Data), + "from_client_id": connection.ClientID, + "from_connection_id": connection.ID, + "connection_id": incomingSignal.ToConnectionID, + "data": json.RawMessage(incomingSignal.Data), }) if err != nil { return diff --git a/golang/cmd/backend/MQTTSync.go b/golang/cmd/backend/MQTTSync.go index 3f9e846..41a7516 100644 --- a/golang/cmd/backend/MQTTSync.go +++ b/golang/cmd/backend/MQTTSync.go @@ -131,7 +131,14 @@ func (handlers *Handlers) handleWebRTCInboxMessage(client mqtt.Client, message m if !ok { return } - signal, err := json.Marshal(inbox) + if inbox.FromConnectionID == "" { + // Frontend websocket signalling still routes by from_connection_id. + return + } + signal, err := json.Marshal(map[string]any{ + "from_connection_id": inbox.FromConnectionID, + "data": inbox.Data, + }) if err != nil { logx.Errorln(err) return @@ -164,9 +171,10 @@ type parentStationAnnouncementRequest struct { } type webrtcInboxMessage struct { - FromClientID string `json:"from_client_id"` - ConnectionID string `json:"connection_id"` - Data json.RawMessage `json:"data"` + FromClientID string `json:"from_client_id"` + FromConnectionID string `json:"from_connection_id"` + ConnectionID string `json:"connection_id"` + Data json.RawMessage `json:"data"` } type ControlMessageBase struct { From 42a8c1ae64b86cd6560c56a721851fe588813701 Mon Sep 17 00:00:00 2001 From: ryan Date: Fri, 3 Apr 2026 17:06:13 +1000 Subject: [PATCH 4/6] .. --- agent_docs/Backend Signalling Contracts.md | 40 ++- agent_docs/Signalling Migration.md | 5 + golang/cmd/backend/MQTTSync.go | 8 +- golang/cmd/backend/server.go | 2 + golang/cmd/backend/sessions.go | 30 +- golang/cmd/backend/sessions_test.go | 292 ++++++++++++++++-- .../babystationlist/BabyStationList.go | 13 +- .../babystationlist/BabyStationList_test.go | 86 ++++++ golang/internal/connectionstore/Decider.go | 2 + .../internal/connectionstore/Decider_test.go | 6 + golang/internal/connectionstoresync/Sync.go | 31 ++ golang/internal/sessionlist/SessionList.go | 20 ++ .../internal/sessionstartdecider/Decider.go | 62 +++- .../sessionstartdecider/Decider_test.go | 132 ++++++++ 14 files changed, 676 insertions(+), 53 deletions(-) create mode 100644 golang/internal/sessionstartdecider/Decider_test.go diff --git a/agent_docs/Backend Signalling Contracts.md b/agent_docs/Backend Signalling Contracts.md index ce6e065..64e36fb 100644 --- a/agent_docs/Backend Signalling Contracts.md +++ b/agent_docs/Backend Signalling Contracts.md @@ -48,9 +48,9 @@ This is the legacy transport targeted for deprecation after MQTT migration. - payload: `{"type":"disconnected","connection_id":"...","disconnected":{"reason":"clean|unexpected"},"at_millis":}` - Incoming websocket payload from client: - `{"type":"signal","signal":{"to_connection_id":"...","data":}}` - - backend republishes to MQTT topic: +- backend republishes to MQTT topic: - `accounts/{account_id}/clients/{to_client_id|sender_client_id}/webrtc_inbox` - - payload: `{"from_client_id":"...","connection_id":"...","data":}` + - payload: `{"from_client_id":"...","from_connection_id":"...","connection_id":"...","data":}` - Keepalive: - websocket ping/pong with 30s ping period - pong timeout 10s @@ -92,14 +92,40 @@ Event payloads are defined in `golang/internal/connections/Event.go`. - `PUT /sessions/{session_id}` now publishes session announcements to: - `accounts/{account_id}/baby_stations` -- `PUT /sessions/{session_id}` accepts both: - - legacy payload (`id`, `name`, `host_connection_id`, `started_at`) - - migration payload (`client_id`, `connection_id`, `name`, `started_at_millis`) -- If `client_id` is missing, backend resolves it from in-memory connection map (`connection_id -> client_id`). +- `PUT /sessions/{session_id}` continues to accept the legacy payload only: + - `id`, `name`, `host_connection_id`, `started_at` +- Backend derives MQTT announcement fields from the legacy request body: + - `session_id` comes from path/body `id` + - `connection_id` comes from `host_connection_id` + - `started_at_millis` comes from `started_at` +- Backend resolves `client_id` from the in-memory connection map (`connection_id -> client_id`). - If mapping is not available yet, backend queues session start in-memory and publishes after websocket connect registers that connection. - Queue policy: per-account LRU with max 2 pending session starts. -- Session delete endpoint is intentionally no-op; disconnect status drives lifecycle. +- Old-client compatibility detail: + - after publishing the MQTT announcement, backend also appends `session.started` locally before returning from `PUT /sessions/{session_id}` + - backend still subscribes its own `baby_stations` topic; that later delivery is treated as a duplicate by `sessionstartdecider` + - this avoids a race where old clients could call stop before the backend had observed its own published start announcement +- `DELETE /sessions/{session_id}` appends `session.ended` using the real session id if the session is still active. - `MQTTSync` subscribes `accounts/+/baby_stations` and appends `session.started` with idempotency via `sessionstartdecider`. +- Clean disconnect compatibility: + - `connectionstoresync` still appends `client.disconnected` + - if disconnect reason is `clean` and the connection owns an active session, backend also appends `session.ended` + - this preserves existing frontend session teardown behavior without changing frontend code +- `sessionstartdecider` releases a connection id for reuse after `session.ended`, so the same long-lived frontend connection id can start a later session again + +## Identity Model + +- `account_id`: + - stable account scope +- `client_id`: + - stable device/browser identity +- `session_id`: + - explicit baby-station session lifecycle id + - preserved for compatibility and event history +- `connection_id`: + - runtime identity for legacy websocket transport + - survives reconnects of the same runtime + - still needed as a bridge while websocket clients coexist with MQTT-native clients ## Parent Station Discovery diff --git a/agent_docs/Signalling Migration.md b/agent_docs/Signalling Migration.md index 403344f..c0ba051 100644 --- a/agent_docs/Signalling Migration.md +++ b/agent_docs/Signalling Migration.md @@ -6,7 +6,12 @@ - publishes client status to `accounts/{account_id}/clients/{client_id}/status` - publishes WebRTC signalling to `accounts/{account_id}/clients/{client_id}/webrtc_inbox` - subscribes `baby_stations`, `parent_stations`, `webrtc_inbox`, and status wildcard topics + - keeps `PUT /sessions/{session_id}` on the legacy request payload and derives the MQTT announcement fields server-side, including `session_id` - queues `PUT /sessions/{session_id}` session starts in-memory when `client_id` is unresolved and flushes on websocket connect + - appends `session.ended` on explicit delete for old clients and on clean disconnect for compatibility with future clients that end sessions by disconnecting +- Current direction: + - `session_id` stays + - `connection_id` is being reduced to compatibility/runtime identity rather than long-term primary identity - Current implementation is backend-centric; no finalized frontend MQTT path yet. See [[Backend Signalling Contracts]] for current endpoint/topic details. diff --git a/golang/cmd/backend/MQTTSync.go b/golang/cmd/backend/MQTTSync.go index 41a7516..b1bf150 100644 --- a/golang/cmd/backend/MQTTSync.go +++ b/golang/cmd/backend/MQTTSync.go @@ -53,10 +53,14 @@ func (handlers *Handlers) handleBabyStationsMessage(client mqtt.Client, message logx.Warnln("invalid baby station announcement payload") return } + if announcement.SessionID == "" { + logx.Warnln("invalid baby station announcement payload: missing session_id") + return + } startedAt := time.UnixMilli(announcement.StartedAtMillis) err := handlers.SessionStartDecider.Put(context.Background(), sessionstartdecider.Session{ AccountID: accountID, - ID: announcement.ConnectionID, + ID: announcement.SessionID, Name: announcement.Name, HostConnectionID: announcement.ConnectionID, StartedAt: startedAt, @@ -96,6 +100,7 @@ func (handlers *Handlers) handleParentStationsMessage(client mqtt.Client, messag AtMillis: time.Now().UnixMilli(), }, BabyStationAnnouncement: babyStationAnnouncement{ + SessionID: babyStation.SessionID, ClientID: babyStation.ClientID, ConnectionID: babyStation.Connection.ID, Name: babyStation.Name, @@ -160,6 +165,7 @@ func parseAccountID(topic string, pattern *regexp.Regexp) (string, bool) { } type babyStationAnnouncement struct { + SessionID string `json:"session_id"` ClientID string `json:"client_id"` ConnectionID string `json:"connection_id"` Name string `json:"name"` diff --git a/golang/cmd/backend/server.go b/golang/cmd/backend/server.go index 17dbcad..2bcbd23 100644 --- a/golang/cmd/backend/server.go +++ b/golang/cmd/backend/server.go @@ -309,6 +309,8 @@ func main() { ConnectionStore: connectionstore.NewDecider(connectionstore.NewDeciderInput{ EventLog: eventLog, }), + SessionList: handlers.SessionList, + EventLog: eventLog, }) log.Fatal("connectionstoresync.Run exited") }() diff --git a/golang/cmd/backend/sessions.go b/golang/cmd/backend/sessions.go index 9d910e9..bc64c97 100644 --- a/golang/cmd/backend/sessions.go +++ b/golang/cmd/backend/sessions.go @@ -53,6 +53,7 @@ type Session struct { } type CreateSessionInput struct { + SessionID string `json:"session_id"` ClientID string `json:"client_id"` ConnectionID string `json:"connection_id"` Name string `json:"name"` @@ -108,12 +109,6 @@ func (handlers *Handlers) StartSession(responseWriter http.ResponseWriter, reque } func parseStartSessionInput(data []byte, sessionID string) (CreateSessionInput, error) { - var input CreateSessionInput - if err := json.Unmarshal(data, &input); err == nil { - if err := input.validate(); err == nil { - return input, nil - } - } var legacyInput StartSessionEventData if err := json.Unmarshal(data, &legacyInput); err != nil { return CreateSessionInput{}, merry.WithHTTPCode(err, http.StatusBadRequest) @@ -125,6 +120,7 @@ func parseStartSessionInput(data []byte, sessionID string) (CreateSessionInput, return CreateSessionInput{}, merry.Errorf("session id in path does not match session id in body").WithHTTPCode(http.StatusBadRequest) } return CreateSessionInput{ + SessionID: legacyInput.ID, ClientID: "", ConnectionID: legacyInput.HostConnectionID, Name: legacyInput.Name, @@ -146,7 +142,27 @@ type EndSessionEventData struct { } func (handlers *Handlers) EndSession(responseWriter http.ResponseWriter, request *http.Request) { - // Session delete is intentionally a no-op. Session lifecycle now ends on disconnect. + var err error + defer func() { + if err != nil { + log.Println(err) + httpx.Error(responseWriter, err) + } + }() + ctx := request.Context() + sessionID := mux.Vars(request)["session_id"] + accountID := contextx.GetAccountID(ctx) + data, err := json.Marshal(EndSessionEventData{ + ID: sessionID, + }) + if err != nil { + return + } + _, err = handlers.EventLog.Append(ctx, eventlog.AppendInput{ + Type: EventTypeSessionEnded, + AccountID: accountID, + Data: data, + }) } type SessionProjection struct { diff --git a/golang/cmd/backend/sessions_test.go b/golang/cmd/backend/sessions_test.go index 20002f3..d135cd6 100644 --- a/golang/cmd/backend/sessions_test.go +++ b/golang/cmd/backend/sessions_test.go @@ -1,18 +1,33 @@ package main import ( + "context" "encoding/json" + "net/http" + "net/http/httptest" + "os" + "strings" "testing" "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gorilla/mux" + + "github.com/Ryan-A-B/beddybytes/golang/internal/babystationlist" + "github.com/Ryan-A-B/beddybytes/golang/internal/connections" + "github.com/Ryan-A-B/beddybytes/golang/internal/contextx" + "github.com/Ryan-A-B/beddybytes/golang/internal/eventlog" + "github.com/Ryan-A-B/beddybytes/golang/internal/sessionstartdecider" ) -func TestParseStartSessionInputNewPayload(t *testing.T) { +func TestParseStartSessionInputLegacyPayload(t *testing.T) { sessionID := "session-123" - payload := CreateSessionInput{ - ClientID: "client-123", - ConnectionID: "connection-123", - Name: "nursery", - StartedAtMillis: time.Now().UnixMilli(), + startedAt := time.Now().UTC().Truncate(time.Second) + payload := StartSessionEventData{ + ID: sessionID, + Name: "nursery", + HostConnectionID: "connection-123", + StartedAt: startedAt, } data, err := json.Marshal(payload) if err != nil { @@ -22,47 +37,272 @@ func TestParseStartSessionInputNewPayload(t *testing.T) { if err != nil { t.Fatal(err) } - if output.ClientID != payload.ClientID { - t.Fatalf("unexpected client id: %q", output.ClientID) + if output.ClientID != "" { + t.Fatalf("expected empty client id for legacy payload, got %q", output.ClientID) + } + if output.SessionID != payload.ID { + t.Fatalf("unexpected session id: %q", output.SessionID) } - if output.ConnectionID != payload.ConnectionID { + if output.ConnectionID != payload.HostConnectionID { t.Fatalf("unexpected connection id: %q", output.ConnectionID) } if output.Name != payload.Name { t.Fatalf("unexpected name: %q", output.Name) } - if output.StartedAtMillis != payload.StartedAtMillis { + if output.StartedAtMillis != payload.StartedAt.UnixMilli() { t.Fatalf("unexpected started_at_millis: %d", output.StartedAtMillis) } } -func TestParseStartSessionInputLegacyPayload(t *testing.T) { +func TestParseStartSessionInputRejectsRefactorPayload(t *testing.T) { sessionID := "session-123" - startedAt := time.Now().UTC().Truncate(time.Second) - payload := StartSessionEventData{ - ID: sessionID, - Name: "nursery", - HostConnectionID: "connection-123", - StartedAt: startedAt, + payload := CreateSessionInput{ + SessionID: sessionID, + ClientID: "client-123", + ConnectionID: "connection-123", + Name: "nursery", + StartedAtMillis: time.Now().UnixMilli(), } data, err := json.Marshal(payload) if err != nil { t.Fatal(err) } - output, err := parseStartSessionInput(data, sessionID) + if _, err := parseStartSessionInput(data, sessionID); err == nil { + t.Fatal("expected refactor payload to be rejected") + } +} + +func TestEndSessionAllowsRestartOnSameConnection(t *testing.T) { + ctx := context.Background() + folderPath, err := os.MkdirTemp("", "TestEndSessionAllowsRestartOnSameConnection-*") if err != nil { t.Fatal(err) } - if output.ClientID != "" { - t.Fatalf("expected empty client id for legacy payload, got %q", output.ClientID) + eventLog := eventlog.NewFileEventLog(&eventlog.NewFileEventLogInput{ + FolderPath: folderPath, + }) + decider := sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ + EventLog: eventLog, + }) + handlers := Handlers{ + EventLog: eventLog, } - if output.ConnectionID != payload.HostConnectionID { - t.Fatalf("unexpected connection id: %q", output.ConnectionID) + accountID := "account-1" + first := sessionstartdecider.Session{ + AccountID: accountID, + ID: "session-1", + Name: "nursery", + HostConnectionID: "connection-1", + StartedAt: time.Unix(1, 0).UTC(), } - if output.Name != payload.Name { - t.Fatalf("unexpected name: %q", output.Name) + if err := decider.Put(ctx, first); err != nil { + t.Fatal(err) } - if output.StartedAtMillis != payload.StartedAt.UnixMilli() { - t.Fatalf("unexpected started_at_millis: %d", output.StartedAtMillis) + + request := httptest.NewRequest(http.MethodDelete, "/sessions/session-1", nil) + request = request.WithContext(contextx.WithAccountID(request.Context(), accountID)) + request = mux.SetURLVars(request, map[string]string{ + "session_id": first.ID, + }) + responseWriter := httptest.NewRecorder() + handlers.EndSession(responseWriter, request) + if responseWriter.Code != http.StatusOK { + t.Fatalf("unexpected status code: %d", responseWriter.Code) + } + + second := first + second.ID = "session-2" + second.StartedAt = second.StartedAt.Add(time.Minute) + if err := decider.Put(ctx, second); err != nil { + t.Fatalf("expected restart after delete to succeed, got %v", err) + } +} + +func TestSecondBabyStationAnnouncementShowsInSnapshotAfterLegacyDelete(t *testing.T) { + ctx := context.Background() + folderPath, err := os.MkdirTemp("", "TestSecondBabyStationAnnouncementShowsInSnapshotAfterLegacyDelete-*") + if err != nil { + t.Fatal(err) + } + eventLog := eventlog.NewFileEventLog(&eventlog.NewFileEventLogInput{ + FolderPath: folderPath, + }) + accountID := "account-1" + clientID := "client-1" + connectionID := "connection-1" + handlers := Handlers{ + EventLog: eventLog, + SessionStartDecider: sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ + EventLog: eventLog, + }), + } + if _, err := eventLog.Append(contextx.WithAccountID(ctx, accountID), eventlog.AppendInput{ + Type: connections.EventTypeConnected, + AccountID: accountID, + Data: mustJSON(t, connections.EventConnected{ + ClientID: clientID, + ConnectionID: connectionID, + }), + }); err != nil { + t.Fatal(err) + } + + handlers.handleBabyStationsMessage(nil, testMQTTMessage{ + topic: "accounts/account-1/baby_stations", + payload: mustJSON(t, babyStationAnnouncement{ + SessionID: "session-1", + ClientID: clientID, + ConnectionID: connectionID, + Name: "first", + StartedAtMillis: time.Unix(1, 0).UnixMilli(), + }), + }) + + request := httptest.NewRequest(http.MethodDelete, "/sessions/session-1", nil) + request = request.WithContext(contextx.WithAccountID(request.Context(), accountID)) + request = mux.SetURLVars(request, map[string]string{"session_id": "session-1"}) + responseWriter := httptest.NewRecorder() + handlers.EndSession(responseWriter, request) + if responseWriter.Code != http.StatusOK { + t.Fatalf("unexpected delete status code: %d", responseWriter.Code) + } + + handlers.handleBabyStationsMessage(nil, testMQTTMessage{ + topic: "accounts/account-1/baby_stations", + payload: mustJSON(t, babyStationAnnouncement{ + SessionID: "session-2", + ClientID: clientID, + ConnectionID: connectionID, + Name: "second", + StartedAtMillis: time.Unix(2, 0).UnixMilli(), + }), + }) + + list := babystationlist.New(babystationlist.NewInput{ + EventLog: eventLog, + }) + output, err := list.GetSnapshot(contextx.WithAccountID(ctx, accountID)) + if err != nil { + t.Fatal(err) + } + if got := len(output.Snapshot.List()); got != 1 { + t.Fatalf("expected one baby station, got %d", got) + } + if got := output.Snapshot.List()[0].SessionID; got != "session-2" { + t.Fatalf("expected session-2, got %q", got) + } + if got := output.Snapshot.List()[0].Name; got != "second" { + t.Fatalf("expected second session name, got %q", got) + } +} + +func TestEventsStreamIncludesDeleteThenRestartOnSameConnection(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + folderPath, err := os.MkdirTemp("", "TestEventsStreamIncludesDeleteThenRestartOnSameConnection-*") + if err != nil { + t.Fatal(err) + } + eventLog := eventlog.NewFileEventLog(&eventlog.NewFileEventLogInput{ + FolderPath: folderPath, + }) + accountID := "account-1" + clientID := "client-1" + connectionID := "connection-1" + handlers := Handlers{ + EventLog: eventLog, + SessionStartDecider: sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ + EventLog: eventLog, + }), + } + accountCtx := contextx.WithAccountID(ctx, accountID) + if _, err := eventLog.Append(accountCtx, eventlog.AppendInput{ + Type: connections.EventTypeConnected, + AccountID: accountID, + Data: mustJSON(t, connections.EventConnected{ + ClientID: clientID, + ConnectionID: connectionID, + }), + }); err != nil { + t.Fatal(err) + } + handlers.handleBabyStationsMessage(nil, testMQTTMessage{ + topic: "accounts/account-1/baby_stations", + payload: mustJSON(t, babyStationAnnouncement{ + SessionID: "session-1", + ClientID: clientID, + ConnectionID: connectionID, + Name: "first", + StartedAtMillis: time.Unix(1, 0).UnixMilli(), + }), + }) + + request := httptest.NewRequest(http.MethodGet, "/events?from_cursor=2", nil) + request = request.WithContext(accountCtx) + responseWriter := httptest.NewRecorder() + done := make(chan struct{}) + go func() { + defer close(done) + handlers.GetEvents(responseWriter, request) + }() + + deleteRequest := httptest.NewRequest(http.MethodDelete, "/sessions/session-1", nil) + deleteRequest = deleteRequest.WithContext(contextx.WithAccountID(context.Background(), accountID)) + deleteRequest = mux.SetURLVars(deleteRequest, map[string]string{"session_id": "session-1"}) + deleteResponseWriter := httptest.NewRecorder() + handlers.EndSession(deleteResponseWriter, deleteRequest) + if deleteResponseWriter.Code != http.StatusOK { + t.Fatalf("unexpected delete status code: %d", deleteResponseWriter.Code) + } + handlers.handleBabyStationsMessage(nil, testMQTTMessage{ + topic: "accounts/account-1/baby_stations", + payload: mustJSON(t, babyStationAnnouncement{ + SessionID: "session-2", + ClientID: clientID, + ConnectionID: connectionID, + Name: "second", + StartedAtMillis: time.Unix(2, 0).UnixMilli(), + }), + }) + + time.Sleep(50 * time.Millisecond) + cancel() + <-done + + body := responseWriter.Body.String() + if !strings.Contains(body, `"type":"session.ended"`) { + t.Fatalf("expected session.ended in event stream, got %q", body) + } + if !strings.Contains(body, `"type":"session.started"`) { + t.Fatalf("expected session.started in event stream, got %q", body) + } + if !strings.Contains(body, `"id":"session-2"`) { + t.Fatalf("expected restarted session payload in event stream, got %q", body) + } +} + +type testMQTTMessage struct { + topic string + payload []byte +} + +func (message testMQTTMessage) Duplicate() bool { return false } +func (message testMQTTMessage) Qos() byte { return 1 } +func (message testMQTTMessage) Retained() bool { return false } +func (message testMQTTMessage) Topic() string { return message.topic } +func (message testMQTTMessage) MessageID() uint16 { + return 0 +} +func (message testMQTTMessage) Payload() []byte { return message.payload } +func (message testMQTTMessage) Ack() {} + +var _ mqtt.Message = testMQTTMessage{} + +func mustJSON(t *testing.T, value any) []byte { + t.Helper() + data, err := json.Marshal(value) + if err != nil { + t.Fatal(err) } + return data } diff --git a/golang/internal/babystationlist/BabyStationList.go b/golang/internal/babystationlist/BabyStationList.go index 8d9da27..5270e5a 100644 --- a/golang/internal/babystationlist/BabyStationList.go +++ b/golang/internal/babystationlist/BabyStationList.go @@ -21,10 +21,15 @@ func (snapshot *Snapshot) List() []BabyStation { babyStations := make([]BabyStation, 0, len(snapshot.ConnectionByID)) for connectionID, connection := range snapshot.ConnectionByID { sessionID, ok := snapshot.SessionIDByConnectionID[connectionID] - fatal.Unless(ok, "session not found: "+sessionID) + if !ok { + continue + } session, ok := snapshot.SessionByID[sessionID] - fatal.Unless(ok, "session not found: "+sessionID) + if !ok { + continue + } babyStation := BabyStation{ + SessionID: session.ID, Name: session.Name, ClientID: connection.ClientID, Connection: BabyStationConnection{ @@ -39,6 +44,7 @@ func (snapshot *Snapshot) List() []BabyStation { } type BabyStation struct { + SessionID string `json:"session_id"` Name string `json:"name"` ClientID string `json:"client_id"` Connection BabyStationConnection `json:"connection"` @@ -133,6 +139,9 @@ func (babyStationList *BabyStationList) applySessionStarted(event *eventlog.Even StartedAt: data.StartedAt, } snapshot := babyStationList.getOrCreateSnapshot(event.AccountID) + if previousSessionID, ok := snapshot.SessionIDByConnectionID[data.HostConnectionID]; ok && previousSessionID != data.ID { + delete(snapshot.SessionByID, previousSessionID) + } snapshot.SessionByID[data.ID] = &session snapshot.SessionIDByConnectionID[data.HostConnectionID] = data.ID } diff --git a/golang/internal/babystationlist/BabyStationList_test.go b/golang/internal/babystationlist/BabyStationList_test.go index 44f2e91..eb989e7 100644 --- a/golang/internal/babystationlist/BabyStationList_test.go +++ b/golang/internal/babystationlist/BabyStationList_test.go @@ -261,6 +261,92 @@ func TestBabyStationList(t *testing.T) { So(babyStation.StartedAt, ShouldHappenWithin, time.Millisecond, sessionStartedAt1) } }) + Convey("Starting a new session on the same connection replaces the old one", func() { + clientID := uuid.NewV4().String() + connectionID := uuid.NewV4().String() + first := babystationlist.StartSessionEventData{ + ID: uuid.NewV4().String(), + Name: "First", + HostConnectionID: connectionID, + StartedAt: time.Now(), + } + _, err := eventLog.Append(ctx, eventlog.AppendInput{ + Type: babystationlist.EventTypeSessionStarted, + AccountID: accountID, + Data: fatal.UnlessMarshalJSON(first), + }) + So(err, ShouldBeNil) + _, err = eventLog.Append(ctx, eventlog.AppendInput{ + Type: connections.EventTypeConnected, + AccountID: accountID, + Data: fatal.UnlessMarshalJSON(connections.EventConnected{ + ClientID: clientID, + ConnectionID: connectionID, + }), + }) + So(err, ShouldBeNil) + + second := babystationlist.StartSessionEventData{ + ID: uuid.NewV4().String(), + Name: "Second", + HostConnectionID: connectionID, + StartedAt: time.Now().Add(time.Minute), + } + _, err = eventLog.Append(ctx, eventlog.AppendInput{ + Type: babystationlist.EventTypeSessionStarted, + AccountID: accountID, + Data: fatal.UnlessMarshalJSON(second), + }) + So(err, ShouldBeNil) + + output, err := babyStationList.GetSnapshot(ctx) + So(err, ShouldBeNil) + So(output.Snapshot.SessionByID, ShouldHaveLength, 1) + So(output.Snapshot.SessionIDByConnectionID, ShouldHaveLength, 1) + So(output.Snapshot.ConnectionByID, ShouldHaveLength, 1) + So(output.Snapshot.List(), ShouldHaveLength, 1) + babyStation := output.Snapshot.List()[0] + So(babyStation.SessionID, ShouldEqual, second.ID) + So(babyStation.Name, ShouldEqual, second.Name) + }) + Convey("Ending a session while the client stays connected should leave an empty list", func() { + clientID := uuid.NewV4().String() + connectionID := uuid.NewV4().String() + session := babystationlist.StartSessionEventData{ + ID: uuid.NewV4().String(), + Name: "First", + HostConnectionID: connectionID, + StartedAt: time.Now(), + } + _, err := eventLog.Append(ctx, eventlog.AppendInput{ + Type: babystationlist.EventTypeSessionStarted, + AccountID: accountID, + Data: fatal.UnlessMarshalJSON(session), + }) + So(err, ShouldBeNil) + _, err = eventLog.Append(ctx, eventlog.AppendInput{ + Type: connections.EventTypeConnected, + AccountID: accountID, + Data: fatal.UnlessMarshalJSON(connections.EventConnected{ + ClientID: clientID, + ConnectionID: connectionID, + }), + }) + So(err, ShouldBeNil) + _, err = eventLog.Append(ctx, eventlog.AppendInput{ + Type: babystationlist.EventTypeSessionEnded, + AccountID: accountID, + Data: fatal.UnlessMarshalJSON(babystationlist.EndSessionEventData{ + ID: session.ID, + }), + }) + So(err, ShouldBeNil) + + output, err := babyStationList.GetSnapshot(ctx) + So(err, ShouldBeNil) + So(func() { _ = output.Snapshot.List() }, ShouldNotPanic) + So(output.Snapshot.List(), ShouldHaveLength, 0) + }) }) } diff --git a/golang/internal/connectionstore/Decider.go b/golang/internal/connectionstore/Decider.go index 3a4922b..953a5c1 100644 --- a/golang/internal/connectionstore/Decider.go +++ b/golang/internal/connectionstore/Decider.go @@ -144,6 +144,7 @@ func (decider *Decider) applyConnected(ctx context.Context, event *eventlog.Even ClientID: data.ClientID, } key := decider.getKey(connection) + delete(decider.disconnectedKeySet, key) decider.connectedKeySet[key] = struct{}{} return nil } @@ -158,6 +159,7 @@ func (decider *Decider) applyDisconnected(ctx context.Context, event *eventlog.E ClientID: data.ClientID, } key := decider.getKey(connection) + delete(decider.connectedKeySet, key) decider.disconnectedKeySet[key] = struct{}{} return nil } diff --git a/golang/internal/connectionstore/Decider_test.go b/golang/internal/connectionstore/Decider_test.go index 8e8bc9e..c3f97b5 100644 --- a/golang/internal/connectionstore/Decider_test.go +++ b/golang/internal/connectionstore/Decider_test.go @@ -54,6 +54,12 @@ func TestDecider(t *testing.T) { So(err, ShouldEqual, connectionstore.ErrDuplicate) }) }) + Convey("Reconnect after disconnect", func() { + err = decider.Delete(ctx, connection) + So(err, ShouldBeNil) + err = decider.Put(ctx, connection) + So(err, ShouldBeNil) + }) }) Convey("Multiple Connections", func() { Convey("Multiple Accounts", func() { diff --git a/golang/internal/connectionstoresync/Sync.go b/golang/internal/connectionstoresync/Sync.go index 239dab3..b449f92 100644 --- a/golang/internal/connectionstoresync/Sync.go +++ b/golang/internal/connectionstoresync/Sync.go @@ -7,13 +7,18 @@ import ( "regexp" "github.com/Ryan-A-B/beddybytes/golang/internal/connectionstore" + "github.com/Ryan-A-B/beddybytes/golang/internal/contextx" + "github.com/Ryan-A-B/beddybytes/golang/internal/eventlog" "github.com/Ryan-A-B/beddybytes/golang/internal/fatal" "github.com/Ryan-A-B/beddybytes/golang/internal/logx" "github.com/Ryan-A-B/beddybytes/golang/internal/mqttx" + "github.com/Ryan-A-B/beddybytes/golang/internal/sessionlist" mqtt "github.com/eclipse/paho.mqtt.golang" ) var connectionStore *connectionstore.Decider +var sessionListStore *sessionlist.SessionList +var eventLogStore eventlog.EventLog var handlers = map[string]func(ctx context.Context, input handlerInput){ "connected": handleConnectedMessage, "disconnected": handleDisconnectedMessage, @@ -23,6 +28,8 @@ var topicRegex = regexp.MustCompile(`^accounts/([^/]+)/clients/([^/]+)/status$`) type RunInput struct { MQTTClient mqtt.Client ConnectionStore *connectionstore.Decider + SessionList *sessionlist.SessionList + EventLog eventlog.EventLog } func Run(ctx context.Context, input RunInput) { @@ -30,6 +37,8 @@ func Run(ctx context.Context, input RunInput) { panic("already running") } connectionStore = input.ConnectionStore + sessionListStore = input.SessionList + eventLogStore = input.EventLog err := mqttx.Wait(input.MQTTClient.Subscribe("accounts/+/clients/+/status", 1, handleMessage)) fatal.OnError(err) <-ctx.Done() @@ -90,6 +99,28 @@ func handleDisconnectedMessage(ctx context.Context, input handlerInput) { } log.Fatal(err) } + if input.Status.Disconnected.Reason != "clean" { + return + } + sessionCtx := contextx.WithAccountID(ctx, input.AccountID) + session, ok := sessionListStore.GetByConnectionID(sessionCtx, input.Status.ConnectionID) + if !ok { + return + } + data, err := json.Marshal(struct { + ID string `json:"id"` + }{ + ID: session.ID, + }) + fatal.OnError(err) + _, err = eventLogStore.Append(sessionCtx, eventlog.AppendInput{ + Type: "session.ended", + AccountID: input.AccountID, + Data: data, + }) + if err != nil { + log.Fatal(err) + } } type handlerInput struct { diff --git a/golang/internal/sessionlist/SessionList.go b/golang/internal/sessionlist/SessionList.go index fe719e4..45c1cf0 100644 --- a/golang/internal/sessionlist/SessionList.go +++ b/golang/internal/sessionlist/SessionList.go @@ -53,6 +53,26 @@ func (sessionList *SessionList) List(ctx context.Context) (output ListOutput) { return } +func (sessionList *SessionList) GetByConnectionID(ctx context.Context, connectionID string) (session *Session, ok bool) { + sessionList.catchUp(ctx) + accountID := contextx.GetAccountID(ctx) + return sessionList.getSessionByConnectionID(accountID, connectionID) +} + +func (sessionList *SessionList) Get(ctx context.Context, sessionID string) (session *Session, ok bool) { + sessionList.catchUp(ctx) + accountID := contextx.GetAccountID(ctx) + index := sessionList.search(accountID, sessionID) + if index == len(sessionList.sessions) { + return nil, false + } + session = sessionList.sessions[index] + if session.AccountID != accountID || session.ID != sessionID { + return nil, false + } + return session, true +} + func (sessionList *SessionList) catchUp(ctx context.Context) { sessionList.mutex.Lock() defer sessionList.mutex.Unlock() diff --git a/golang/internal/sessionstartdecider/Decider.go b/golang/internal/sessionstartdecider/Decider.go index 79a16e5..4af950a 100644 --- a/golang/internal/sessionstartdecider/Decider.go +++ b/golang/internal/sessionstartdecider/Decider.go @@ -12,6 +12,7 @@ import ( ) const EventTypeSessionStarted = "session.started" +const EventTypeSessionEnded = "session.ended" var ErrDuplicate = errors.New("duplicate") @@ -24,10 +25,12 @@ type Session struct { } type Decider struct { - eventLog eventlog.EventLog - mutex sync.Mutex - cursor int64 - connectionIDSet map[string]struct{} + eventLog eventlog.EventLog + mutex sync.Mutex + cursor int64 + sessionKeyByConnectionKey map[string]string + connectionKeyBySessionKey map[string]string + endedSessionKeySet map[string]struct{} } type NewDeciderInput struct { @@ -36,8 +39,10 @@ type NewDeciderInput struct { func NewDecider(input NewDeciderInput) *Decider { return &Decider{ - eventLog: input.EventLog, - connectionIDSet: make(map[string]struct{}), + eventLog: input.EventLog, + sessionKeyByConnectionKey: make(map[string]string), + connectionKeyBySessionKey: make(map[string]string), + endedSessionKeySet: make(map[string]struct{}), } } @@ -48,8 +53,12 @@ func (decider *Decider) Put(ctx context.Context, session Session) error { if err := decider.catchUp(ctx); err != nil { return err } - key := decider.getKey(session.AccountID, session.HostConnectionID) - if _, ok := decider.connectionIDSet[key]; ok { + connectionKey := decider.getKey(session.AccountID, session.HostConnectionID) + sessionKey := decider.getSessionKey(session.AccountID, session.ID) + if _, ok := decider.endedSessionKeySet[sessionKey]; ok { + return ErrDuplicate + } + if existingSessionKey, ok := decider.sessionKeyByConnectionKey[connectionKey]; ok && existingSessionKey == sessionKey { return ErrDuplicate } data, err := json.Marshal(sessionStartedEventData{ @@ -76,6 +85,19 @@ func (decider *Decider) catchUp(ctx context.Context) error { for iterator.Next(ctx) { event := iterator.Event() if event.Type != EventTypeSessionStarted { + if event.Type == EventTypeSessionEnded { + var data sessionEndedEventData + if err := json.Unmarshal(event.Data, &data); err != nil { + return err + } + sessionKey := decider.getSessionKey(event.AccountID, data.ID) + connectionKey, ok := decider.connectionKeyBySessionKey[sessionKey] + if ok { + delete(decider.sessionKeyByConnectionKey, connectionKey) + delete(decider.connectionKeyBySessionKey, sessionKey) + } + decider.endedSessionKeySet[sessionKey] = struct{}{} + } decider.cursor = event.LogicalClock continue } @@ -83,8 +105,20 @@ func (decider *Decider) catchUp(ctx context.Context) error { if err := json.Unmarshal(event.Data, &data); err != nil { return err } - key := decider.getKey(event.AccountID, data.HostConnectionID) - decider.connectionIDSet[key] = struct{}{} + connectionKey := decider.getKey(event.AccountID, data.HostConnectionID) + sessionKey := decider.getSessionKey(event.AccountID, data.ID) + if _, ended := decider.endedSessionKeySet[sessionKey]; ended { + decider.cursor = event.LogicalClock + continue + } + if replacedSessionKey, ok := decider.sessionKeyByConnectionKey[connectionKey]; ok && replacedSessionKey != sessionKey { + delete(decider.connectionKeyBySessionKey, replacedSessionKey) + } + if replacedConnectionKey, ok := decider.connectionKeyBySessionKey[sessionKey]; ok && replacedConnectionKey != connectionKey { + delete(decider.sessionKeyByConnectionKey, replacedConnectionKey) + } + decider.sessionKeyByConnectionKey[connectionKey] = sessionKey + decider.connectionKeyBySessionKey[sessionKey] = connectionKey decider.cursor = event.LogicalClock } return iterator.Err() @@ -94,9 +128,17 @@ func (decider *Decider) getKey(accountID string, connectionID string) string { return fmt.Sprintf("accounts/%s/connections/%s", accountID, connectionID) } +func (decider *Decider) getSessionKey(accountID string, sessionID string) string { + return fmt.Sprintf("accounts/%s/sessions/%s", accountID, sessionID) +} + type sessionStartedEventData struct { ID string `json:"id"` Name string `json:"name"` HostConnectionID string `json:"host_connection_id"` StartedAt time.Time `json:"started_at"` } + +type sessionEndedEventData struct { + ID string `json:"id"` +} diff --git a/golang/internal/sessionstartdecider/Decider_test.go b/golang/internal/sessionstartdecider/Decider_test.go new file mode 100644 index 0000000..bb3b9d1 --- /dev/null +++ b/golang/internal/sessionstartdecider/Decider_test.go @@ -0,0 +1,132 @@ +package sessionstartdecider_test + +import ( + "context" + "encoding/json" + "os" + "testing" + "time" + + "github.com/Ryan-A-B/beddybytes/golang/internal/eventlog" + "github.com/Ryan-A-B/beddybytes/golang/internal/sessionstartdecider" +) + +func TestDeciderAllowsRestartAfterSessionEnded(t *testing.T) { + ctx := context.Background() + folderPath, err := os.MkdirTemp("", "TestDeciderAllowsRestartAfterSessionEnded-*") + if err != nil { + t.Fatal(err) + } + eventLog := eventlog.NewFileEventLog(&eventlog.NewFileEventLogInput{ + FolderPath: folderPath, + }) + decider := sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ + EventLog: eventLog, + }) + session := sessionstartdecider.Session{ + AccountID: "account-1", + ID: "session-1", + Name: "nursery", + HostConnectionID: "connection-1", + StartedAt: time.Unix(1, 0).UTC(), + } + if err := decider.Put(ctx, session); err != nil { + t.Fatal(err) + } + data, err := json.Marshal(struct { + ID string `json:"id"` + }{ + ID: session.ID, + }) + if err != nil { + t.Fatal(err) + } + if _, err := eventLog.Append(ctx, eventlog.AppendInput{ + Type: "session.ended", + AccountID: session.AccountID, + Data: data, + }); err != nil { + t.Fatal(err) + } + restarted := session + restarted.ID = "session-2" + restarted.StartedAt = restarted.StartedAt.Add(time.Minute) + if err := decider.Put(ctx, restarted); err != nil { + t.Fatalf("expected session start after clean end to succeed, got %v", err) + } +} + +func TestDeciderAllowsRestartWithoutSessionEnded(t *testing.T) { + ctx := context.Background() + folderPath, err := os.MkdirTemp("", "TestDeciderAllowsRestartWithoutSessionEnded-*") + if err != nil { + t.Fatal(err) + } + eventLog := eventlog.NewFileEventLog(&eventlog.NewFileEventLogInput{ + FolderPath: folderPath, + }) + decider := sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ + EventLog: eventLog, + }) + first := sessionstartdecider.Session{ + AccountID: "account-1", + ID: "session-1", + Name: "nursery", + HostConnectionID: "connection-1", + StartedAt: time.Unix(1, 0).UTC(), + } + if err := decider.Put(ctx, first); err != nil { + t.Fatal(err) + } + second := first + second.ID = "session-2" + second.StartedAt = second.StartedAt.Add(time.Minute) + if err := decider.Put(ctx, second); err != nil { + t.Fatalf("expected replacement start without session.ended, got %v", err) + } +} + +func TestDeciderRejectsLateStartForEndedSession(t *testing.T) { + ctx := context.Background() + folderPath, err := os.MkdirTemp("", "TestDeciderRejectsLateStartForEndedSession-*") + if err != nil { + t.Fatal(err) + } + eventLog := eventlog.NewFileEventLog(&eventlog.NewFileEventLogInput{ + FolderPath: folderPath, + }) + decider := sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ + EventLog: eventLog, + }) + session := sessionstartdecider.Session{ + AccountID: "account-1", + ID: "session-1", + Name: "nursery", + HostConnectionID: "connection-1", + StartedAt: time.Unix(1, 0).UTC(), + } + data, err := json.Marshal(struct { + ID string `json:"id"` + }{ + ID: session.ID, + }) + if err != nil { + t.Fatal(err) + } + if _, err := eventLog.Append(ctx, eventlog.AppendInput{ + Type: "session.ended", + AccountID: session.AccountID, + Data: data, + }); err != nil { + t.Fatal(err) + } + if err := decider.Put(ctx, session); err != sessionstartdecider.ErrDuplicate { + t.Fatalf("expected late start for ended session to be rejected, got %v", err) + } + restarted := session + restarted.ID = "session-2" + restarted.StartedAt = restarted.StartedAt.Add(time.Minute) + if err := decider.Put(ctx, restarted); err != nil { + t.Fatalf("expected new session after stale late start to succeed, got %v", err) + } +} From 242c34bc9886a80466edaa8161332513d6bfd568 Mon Sep 17 00:00:00 2001 From: ryan <2224076+Ryan-A-B@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:35:46 +1000 Subject: [PATCH 5/6] .. --- docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index f561061..5d281ca 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: traefik: - image: traefik:v2.10 + image: traefik:v3.6.7 restart: always ports: - "80:80" @@ -137,4 +137,4 @@ secrets: external: true include: - - selenium.docker-compose.yml \ No newline at end of file + - selenium.docker-compose.yml From 7cbdc7701a58cfc261bd273955292236f78773f7 Mon Sep 17 00:00:00 2001 From: ryan <2224076+Ryan-A-B@users.noreply.github.com> Date: Fri, 10 Apr 2026 15:31:58 +1000 Subject: [PATCH 6/6] fixes issue where RequestID wasn't correctly reinstated --- golang/cmd/backend/Connection.go | 9 ++++++--- golang/cmd/backend/sessions_test.go | 5 ++--- golang/internal/connections/Event.go | 2 ++ golang/internal/sessionlist/Session.go | 1 + golang/internal/sessionlist/SessionList.go | 5 +++++ 5 files changed, 16 insertions(+), 6 deletions(-) diff --git a/golang/cmd/backend/Connection.go b/golang/cmd/backend/Connection.go index 8ac0ee3..aa36973 100644 --- a/golang/cmd/backend/Connection.go +++ b/golang/cmd/backend/Connection.go @@ -28,13 +28,16 @@ const EventTypeClientDisconnected = "client.disconnected" type ClientConnectedEventData struct { ClientID string `json:"client_id"` ConnectionID string `json:"connection_id"` + RequestID string `json:"request_id"` AtMillis int64 `json:"at_millis"` } type ClientDisconnectedEventData struct { - ClientID string `json:"client_id"` - ConnectionID string `json:"connection_id"` - AtMillis int64 `json:"at_millis"` + ClientID string `json:"client_id"` + ConnectionID string `json:"connection_id"` + RequestID string `json:"request_id"` + WebSocketCloseCode int `json:"web_socket_close_code"` + AtMillis int64 `json:"at_millis"` Disconnected struct { Reason string `json:"reason"` } `json:"disconnected"` diff --git a/golang/cmd/backend/sessions_test.go b/golang/cmd/backend/sessions_test.go index a51591c..5fbae8d 100644 --- a/golang/cmd/backend/sessions_test.go +++ b/golang/cmd/backend/sessions_test.go @@ -2,16 +2,15 @@ package main import ( "context" -<<<<<<< HEAD "encoding/json" "net/http" "net/http/httptest" "os" + "path/filepath" + "runtime" "strings" "testing" "time" - "path/filepath" - "runtime" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gorilla/mux" diff --git a/golang/internal/connections/Event.go b/golang/internal/connections/Event.go index c19b3ba..b70178a 100644 --- a/golang/internal/connections/Event.go +++ b/golang/internal/connections/Event.go @@ -6,12 +6,14 @@ const EventTypeDisconnected string = "client.disconnected" type EventConnected struct { ClientID string `json:"client_id"` ConnectionID string `json:"connection_id"` + RequestID string `json:"request_id"` AtMillis int64 `json:"at_millis"` } type EventDisconnected struct { ClientID string `json:"client_id"` ConnectionID string `json:"connection_id"` + RequestID string `json:"request_id"` AtMillis int64 `json:"at_millis"` Disconnected struct { Reason string `json:"reason"` diff --git a/golang/internal/sessionlist/Session.go b/golang/internal/sessionlist/Session.go index 59e0060..c8f6e7d 100644 --- a/golang/internal/sessionlist/Session.go +++ b/golang/internal/sessionlist/Session.go @@ -38,6 +38,7 @@ func (state HostConnectionStateBase) GetSince() int64 { type HostConnectionStateConnected struct { HostConnectionStateBase + RequestID string `json:"request_id"` } type HostConnectionStateDisconnected struct { diff --git a/golang/internal/sessionlist/SessionList.go b/golang/internal/sessionlist/SessionList.go index 2d8b72e..3b97fd3 100644 --- a/golang/internal/sessionlist/SessionList.go +++ b/golang/internal/sessionlist/SessionList.go @@ -225,6 +225,10 @@ func applyClientDisconnectedEvent(ctx context.Context, sessionList *SessionList, if session.HostConnectionState.GetState() != ConnectionStateConnected { return } + hostConnectionState := session.HostConnectionState.(HostConnectionStateConnected) + if hostConnectionState.RequestID != data.RequestID { + return + } session.HostConnectionState = HostConnectionStateDisconnected{ HostConnectionStateBase: HostConnectionStateBase{ State: ConnectionStateDisconnected, @@ -251,6 +255,7 @@ func applyClientConnectedEvent(ctx context.Context, sessionList *SessionList, ev State: ConnectionStateConnected, Since: event.UnixTimestamp, }, + RequestID: data.RequestID, } }