diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..8d21179 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,44 @@ +## Ownership and scope + +- This `AGENTS.md` file is written and maintained by the human. +- You (the agent) may **read** this file but must **never modify it under any circumstances**. +- You are allowed to create and edit documentation **only** under the `agent_docs/` directory in this repository. +- Treat everything inside `agent_docs/` as your documentation workspace and Obsidian vault. + +### Your responsibilities: + +- Use existing documentation under `agent_docs/` before guessing. +- Keep documentation in `agent_docs/` in sync with the behavior of the code. +- Record meaningful domain knowledge there. + +--- + +## Obsidian markdown conventions + +You must treat `agent_docs/` as an Obsidian vault and follow these rules strictly: + +- **File names are titles** + - The file name is the note title. Do not add a level‑1 heading to restate the title. + - Use natural, human‑readable names with spaces and standard capitalisation. + - Example: `MQTT Topics and Payloads.md`, `Backend Overview.md`, `2026-03-08 Refactoring MQTT.md`. + - Never use kebab‑case or snake_case file names (e.g. `mqtt-topics-and-payloads.md` is wrong). + +- **Links between notes** + - Always use Obsidian wiki links: `[[Note Name]]`. + - The note name inside `[[...]]` must exactly match the file name (without the `.md` extension). + - Example: `[[MQTT Topics and Payloads]]`, `[[Backend Overview]]`. + - Never use standard markdown links for cross‑vault references. + +--- + +## Documentation workspace: `agent_docs/` + +- The root of your docs workspace is: `agent_docs/`. +- You **must not** create or edit docs outside `agent_docs/` unless explicitly asked. +- When you need to persist knowledge, decisions, or plans, write them into `agent_docs/`, not only into code comments (except for small, local notes). + +You are expected to maintain at least: + +- `agent_docs/index.md` - main index of all important notes. +- `agent_docs/structure.md` - where to find things in this repository +- `agent_docs/architecture.md` - the high level architecture of the project diff --git a/agent_docs/Backend Signalling Contracts.md b/agent_docs/Backend Signalling Contracts.md new file mode 100644 index 0000000..64e36fb --- /dev/null +++ b/agent_docs/Backend Signalling Contracts.md @@ -0,0 +1,150 @@ +## Purpose + +This note documents the current backend signalling contracts and where they are implemented. + +## Source-of-Truth Files + +- `golang/cmd/backend/server.go` +- `golang/cmd/backend/Connection.go` +- `golang/cmd/backend/MQTTSync.go` +- `golang/internal/connectionstoresync/Sync.go` +- `golang/internal/connections/Event.go` + +## Active Signalling Endpoints + +Registered in `golang/cmd/backend/server.go`: + +- `GET /clients/{client_id}/websocket` +- `GET /clients/{client_id}/connections/{connection_id}` +- `PUT /sessions/{session_id}` +- `DELETE /sessions/{session_id}` (no-op) + +Both routes are behind auth middleware under `/clients`. + +## Legacy WebSocket Signalling Route + +`GET /clients/{client_id}/websocket` in `server.go` + `HandleWebsocket`: + +- Legacy peer-to-peer relay via backend in-memory `ClientStore`. +- Inbound JSON: + - `to_peer_id: string` + - `data: any JSON` +- Outbound JSON: + - `from_peer_id: string` + - `data: any JSON` + +This is the legacy transport targeted for deprecation after MQTT migration. + +## Connection Route: WebSocket Transport + MQTT Relay + +`GET /clients/{client_id}/connections/{connection_id}` in `Connection.go`: + +- Upgrades to WebSocket. +- Publishes connect status to: + - `accounts/{account_id}/clients/{client_id}/status` + - payload: `{"type":"connected","connection_id":"...","at_millis":}` +- Publishes disconnect status to: + - `accounts/{account_id}/clients/{client_id}/status` + - payload: `{"type":"disconnected","connection_id":"...","disconnected":{"reason":"clean|unexpected"},"at_millis":}` +- Incoming websocket payload from client: + - `{"type":"signal","signal":{"to_connection_id":"...","data":}}` +- backend republishes to MQTT topic: + - `accounts/{account_id}/clients/{to_client_id|sender_client_id}/webrtc_inbox` + - payload: `{"from_client_id":"...","from_connection_id":"...","connection_id":"...","data":}` +- Keepalive: + - websocket ping/pong with 30s ping period + - pong timeout 10s +- Active websocket connections are tracked in-memory via `ConnectionHub`. + +## MQTT Topics + +Defined in `Connection.go` and `MQTTSync.go`: + +- Status topic format: + - `accounts/%s/clients/%s/status` +- WebRTC inbox: + - `accounts/%s/clients/%s/webrtc_inbox` +- Baby stations: + - `accounts/%s/baby_stations` +- Parent stations: + - `accounts/%s/parent_stations` +- Control inbox: + - `accounts/%s/clients/%s/control_inbox` + +Status sync subscriber in `connectionstoresync/Sync.go`: + +- Subscribes wildcard: + - `accounts/+/clients/+/status` +- Parses account and client IDs from topic, then uses payload `connection_id`. + +## MQTT Status Payloads + +Published from `Connection.go` on connect/disconnect: + +- Connected: + - `{"type":"connected","connection_id":"...","at_millis":}` +- Disconnected: + - `{"type":"disconnected","connection_id":"...","disconnected":{"reason":"clean|unexpected"},"at_millis":}` + +Event payloads are defined in `golang/internal/connections/Event.go`. + +## Session Ingress/Egress + +- `PUT /sessions/{session_id}` now publishes session announcements to: + - `accounts/{account_id}/baby_stations` +- `PUT /sessions/{session_id}` continues to accept the legacy payload only: + - `id`, `name`, `host_connection_id`, `started_at` +- Backend derives MQTT announcement fields from the legacy request body: + - `session_id` comes from path/body `id` + - `connection_id` comes from `host_connection_id` + - `started_at_millis` comes from `started_at` +- Backend resolves `client_id` from the in-memory connection map (`connection_id -> client_id`). +- If mapping is not available yet, backend queues session start in-memory and publishes after websocket connect registers that connection. +- Queue policy: per-account LRU with max 2 pending session starts. +- Old-client compatibility detail: + - after publishing the MQTT announcement, backend also appends `session.started` locally before returning from `PUT /sessions/{session_id}` + - backend still subscribes its own `baby_stations` topic; that later delivery is treated as a duplicate by `sessionstartdecider` + - this avoids a race where old clients could call stop before the backend had observed its own published start announcement +- `DELETE /sessions/{session_id}` appends `session.ended` using the real session id if the session is still active. +- `MQTTSync` subscribes `accounts/+/baby_stations` and appends `session.started` with idempotency via `sessionstartdecider`. +- Clean disconnect compatibility: + - `connectionstoresync` still appends `client.disconnected` + - if disconnect reason is `clean` and the connection owns an active session, backend also appends `session.ended` + - this preserves existing frontend session teardown behavior without changing frontend code +- `sessionstartdecider` releases a connection id for reuse after `session.ended`, so the same long-lived frontend connection id can start a later session again + +## Identity Model + +- `account_id`: + - stable account scope +- `client_id`: + - stable device/browser identity +- `session_id`: + - explicit baby-station session lifecycle id + - preserved for compatibility and event history +- `connection_id`: + - runtime identity for legacy websocket transport + - survives reconnects of the same runtime + - still needed as a bridge while websocket clients coexist with MQTT-native clients + +## Parent Station Discovery + +- `MQTTSync` subscribes `accounts/+/parent_stations`. +- Expected payload includes `client_id` of requesting parent station. +- For each active baby station snapshot entry, backend publishes control message: + - topic: `accounts/{account_id}/clients/{client_id}/control_inbox` + - payload: + - `{"type":"baby_station_announcement","at_millis":,"baby_station_announcement":{"client_id":"...","connection_id":"...","name":"...","started_at_millis":}}` + +Client status subscription remains in `connectionstoresync`: + +- `accounts/+/clients/+/status` + +## Auth and Token Durations + +Configured in `golang/cmd/backend/server.go`: + +- Access token duration: `1 * time.Hour` +- Refresh token duration: `30 * 24 * time.Hour` + +Refresh token scope handling is in `golang/internal/accounts/Handlers.go`. diff --git a/agent_docs/Frontend Service Architecture.md b/agent_docs/Frontend Service Architecture.md new file mode 100644 index 0000000..b24c853 --- /dev/null +++ b/agent_docs/Frontend Service Architecture.md @@ -0,0 +1,32 @@ +## Frontend Service Architecture + +The frontend codebase generally works best when domain behavior lives in services and React stays focused on rendering. + +## Preferred Split + +- Services own domain state and transitions. +- Services own HTTP calls and long-running async workflows. +- Services own timers, retries, event subscriptions, and derived domain status. +- React components render state and invoke service commands. +- Hooks are thin adapters that subscribe React to service state. + +## What Belongs in React + +Keep state in React only when it is truly view-local: + +- modal open or closed +- active tab +- local draft input before submit +- focus, hover, or animation state + +If the state affects workflow, survives across screens, or mirrors backend/application behavior, it should usually move into a service. + +## Existing Repository Patterns + +- `frontend/src/services/Service.ts` provides a common base for stateful frontend services. +- `frontend/src/hooks/useServiceState.ts` is the standard hook shape for subscribing React to service state. +- `frontend/src/services/ParentStation/CountUpTimer.ts` is a concrete example where timer behavior is outside React. + +## Practical Rule + +If a React component starts accumulating fetch calls, retry logic, state-machine transitions, or several interdependent domain `useState` values, stop and extract a service. diff --git a/agent_docs/Open Questions.md b/agent_docs/Open Questions.md new file mode 100644 index 0000000..8b723cf --- /dev/null +++ b/agent_docs/Open Questions.md @@ -0,0 +1,14 @@ +## Open Questions + +These are useful for tightening architecture docs and future onboarding. + +## High Priority + +- What is the endpoint removal order for legacy WebSocket routes? +- What is the intended deprecation timeline and compatibility policy for old cached PWAs? + +## Useful Clarifications + +- Which backend endpoints are considered stable public contract vs internal implementation detail? +- What anonymous analytics events are currently emitted from frontend and backend? +- Are there explicit SLOs for connection setup success/reconnect time in local-network conditions? diff --git a/agent_docs/Product and Privacy Model.md b/agent_docs/Product and Privacy Model.md new file mode 100644 index 0000000..8c1fc61 --- /dev/null +++ b/agent_docs/Product and Privacy Model.md @@ -0,0 +1,43 @@ +## Product Model + +BeddyBytes is for parents monitoring children during sleep using existing devices with modern browsers. + +Two runtime roles exist: + +- Baby Station: captures audio/video and streams. +- Parent Station: connects to and monitors one or more baby station sessions. + +## Privacy Model + +Privacy priorities: + +- Primary: privacy. +- Secondary: simplicity. + +Privacy guarantees (intended behavior): + +- No video/audio leaves the home network. +- Backend never receives media payloads. +- No server-side recording. +- Analytics are anonymous. + +Data collected by service: + +- Email address (account identity). +- Payment processing via Stripe (one-off checkout, no subscription model). + +## Recording Behavior + +- Recording is initiated on Parent Station. +- Browser `MediaRecorder` captures incoming stream. +- Recording is downloaded immediately to local device. +- Recording is never uploaded to backend. + +## Product Constraints + +- No STUN/TURN means no off-LAN fallback for media. +- Connection intentionally fails when local-network conditions are not met. + +## Operational Reality + +Because this is a PWA, users may run cached app versions for weeks before refresh/update. Any migration strategy must tolerate mixed-version clients. diff --git a/agent_docs/Signalling Migration.md b/agent_docs/Signalling Migration.md new file mode 100644 index 0000000..c0ba051 --- /dev/null +++ b/agent_docs/Signalling Migration.md @@ -0,0 +1,37 @@ +## Current State + +- Legacy signalling path uses backend WebSockets. +- MQTT migration is active for backend ingress/egress topic contracts. +- Backend now: + - publishes client status to `accounts/{account_id}/clients/{client_id}/status` + - publishes WebRTC signalling to `accounts/{account_id}/clients/{client_id}/webrtc_inbox` + - subscribes `baby_stations`, `parent_stations`, `webrtc_inbox`, and status wildcard topics + - keeps `PUT /sessions/{session_id}` on the legacy request payload and derives the MQTT announcement fields server-side, including `session_id` + - queues `PUT /sessions/{session_id}` session starts in-memory when `client_id` is unresolved and flushes on websocket connect + - appends `session.ended` on explicit delete for old clients and on clean disconnect for compatibility with future clients that end sessions by disconnecting +- Current direction: + - `session_id` stays + - `connection_id` is being reduced to compatibility/runtime identity rather than long-term primary identity +- Current implementation is backend-centric; no finalized frontend MQTT path yet. + +See [[Backend Signalling Contracts]] for current endpoint/topic details. + +## Direction + +- Move clients to direct MQTT-first signalling behavior. +- Deprecate WebSocket endpoints. +- Remove WebSocket signalling paths after migration is complete and safe. + +## Risk Factors + +- PWA caching causes version skew across active clients. +- Mixed fleets (WebSocket-era and MQTT-era clients) must interoperate during rollout. +- Session continuity and reconnection behavior must remain stable while transports coexist. + +## Documentation Impact + +As migration advances, update: + +- [[architecture]] for control-plane transport changes. +- [[structure]] for source-of-truth files handling signalling. +- [[Backend Signalling Contracts]] as backend transport behavior changes. diff --git a/agent_docs/Working Agreement with Human.md b/agent_docs/Working Agreement with Human.md new file mode 100644 index 0000000..adb9dbb --- /dev/null +++ b/agent_docs/Working Agreement with Human.md @@ -0,0 +1,13 @@ +The human prefers immediate escalation when implementation issues arise. + +## Escalation Preference + +- If a refactor exposes missing data, ordering assumptions, or contract ambiguity, raise it immediately. +- Do not silently patch around issues. +- Present the issue and let the human choose the resolution approach. + +## Current Applied Example (2026-03-10) + +- MQTT `BabyStationAnnouncement` requires `client_id`, but legacy `PUT /sessions/{session_id}` payload does not contain it. +- Frontend Baby Station currently calls session create before opening the websocket connection, so connection-to-client mapping is not guaranteed at create time. +- Human-selected resolution: maintain an in-memory connection map (`connection_id` -> `client_id`) and handle session-create timing accordingly. diff --git a/agent_docs/architecture.md b/agent_docs/architecture.md new file mode 100644 index 0000000..f1d7070 --- /dev/null +++ b/agent_docs/architecture.md @@ -0,0 +1,64 @@ +## System Architecture + +BeddyBytes is a privacy-first baby monitor built around browser-native WebRTC. + +## Runtime Components + +- Frontend PWA (`frontend/`): Baby Station and Parent Station user experience. +- Backend (`golang/`): authentication, session metadata, signalling transport, and account separation. +- Marketing site (`marketing/`): public website and commercial pages. + +## Frontend Architecture Pattern + +The frontend generally follows a service-oriented model: + +- application and workflow state live in services under `frontend/src/services/` +- React components under `frontend/src/pages/` and `frontend/src/components/` render that state +- hooks such as `frontend/src/hooks/useServiceState.ts` adapt service events into React updates + +This keeps business logic, timers, and network workflows out of JSX-heavy components. + +## Core Roles + +- Baby Station: captures microphone audio (required) and camera video (optional), then publishes media to WebRTC peers. +- Parent Station: discovers active sessions, establishes WebRTC peer connections, and plays media. + +## Data Plane vs Control Plane + +- Data plane (media): browser-to-browser WebRTC, peer-to-peer, local network only. +- Control plane (signalling + auth): backend-mediated session/auth/signalling traffic. + +The backend handles signalling metadata and account/session concerns, but not media transport. + +## Privacy-Critical Constraints + +- No STUN/TURN servers are configured. +- If peers are not on the same local network, WebRTC connection fails. +- Audio/video never leave the home network. +- Recording is local to Parent Station browser via `MediaRecorder` and immediately downloaded. +- Backend never stores or relays media. + +## Deployment Overview + +- Local development stack: `docker-compose.yml` and `run_local_stack.sh`. +- QA + Prod backend: separate Docker containers on a single EC2 host. +- Shared infra on that host: Traefik, Grafana, InfluxDB. +- Frontend app hosting: S3 + CloudFront, with separate QA and Prod app environments. +- Marketing: production site only. + +## Authentication Model + +- Account-based access control. +- Access token + refresh token flow (OAuth-like model). +- JWT access token TTL: 1 hour. +- JWT refresh token TTL: 30 days. +- Password reset flow is implemented. + +## Payments + +- Stripe is used for one-off checkout purchases. +- No subscription billing model. + +## Architectural Center of Gravity + +The baby station session and WebRTC connection lifecycle are the center of the system. Most backend/frontend logic exists to establish, maintain, recover, and tear down those peer connections. diff --git a/agent_docs/index.md b/agent_docs/index.md new file mode 100644 index 0000000..4403156 --- /dev/null +++ b/agent_docs/index.md @@ -0,0 +1,27 @@ +## BeddyBytes Agent Docs + +This vault captures implementation-oriented documentation for BeddyBytes. + +## Core Notes + +- [[architecture]] +- [[structure]] +- [[Frontend Service Architecture]] +- [[Product and Privacy Model]] +- [[Backend Signalling Contracts]] +- [[Signalling Migration]] +- [[Open Questions]] +- [[Working Agreement with Human]] + +## Source Material + +- `README.md` +- `HowBeddyBytesWorks.md` +- `HowBeddyBytesWorks2.md` +- Human guidance captured on March 9, 2026 + +## Current Priorities + +- Keep architecture docs aligned with WebSocket to MQTT signalling migration. +- Keep privacy claims aligned with actual data flow in frontend and backend code. +- Prioritize session lifecycle and WebRTC connection behavior in technical docs. diff --git a/agent_docs/structure.md b/agent_docs/structure.md new file mode 100644 index 0000000..c669eb2 --- /dev/null +++ b/agent_docs/structure.md @@ -0,0 +1,51 @@ +## Repository Structure + +This note maps where key system behavior lives. + +## Product Runtime + +- `frontend/`: PWA app, Baby Station + Parent Station logic, WebRTC and client-side recording behavior. +- `golang/`: backend server, signalling/auth/session handling, MQTT and WebSocket infrastructure. +- `marketing/`: Gatsby marketing site. + +## Backend Entry Points + +- `golang/cmd/backend/`: backend server composition and handlers. +- `golang/internal/`: domain modules (accounts, sessions, connection stores, messaging, event log, stores, MQTT helpers). +- Signalling source-of-truth (current): + - `golang/cmd/backend/server.go` + - `golang/cmd/backend/Connection.go` + - `golang/internal/connectionstoresync/Sync.go` + - `golang/internal/connections/Message.go` + +## Frontend Areas + +- `frontend/src/pages/BabyStation/`: baby station UI and media capture/stream behavior. +- `frontend/src/pages/ParentStation/`: parent station UI and playback/session controls. +- `frontend/src/services/SignalService/`: signalling transport implementation. +- `frontend/src/services/BabyStation/` and `frontend/src/services/ParentStation/`: role-specific application services. + +## Infra and Ops + +- `docker-compose.yml`: local stack. +- `run_local_stack.sh`: local environment bootstrap. +- `run_integration_tests.sh`: integration test entrypoint. +- `integration_tests/`: end-to-end scenarios. +- `cloudformation/`: infrastructure templates. +- `traefik/`, `grafana/`: ingress and observability configuration. + +## Product/Domain References + +- `README.md`: concise project overview. +- `HowBeddyBytesWorks.md`: concise architecture explanation. +- `HowBeddyBytesWorks2.md`: detailed privacy and architecture narrative. + +## Documentation Map + +- [[index]] +- [[architecture]] +- [[Frontend Service Architecture]] +- [[Product and Privacy Model]] +- [[Backend Signalling Contracts]] +- [[Signalling Migration]] +- [[Open Questions]] diff --git a/docker-compose.yml b/docker-compose.yml index 368c213..4939d68 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: traefik: - image: traefik:v2.10 + image: traefik:v3.6.7 restart: always ports: - "80:80" diff --git a/golang/cmd/backend/Connection.go b/golang/cmd/backend/Connection.go index 5c2556a..aa36973 100644 --- a/golang/cmd/backend/Connection.go +++ b/golang/cmd/backend/Connection.go @@ -7,23 +7,20 @@ import ( "fmt" "log" "net/http" + "sync" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gorilla/mux" "github.com/gorilla/websocket" - uuid "github.com/satori/go.uuid" - "github.com/Ryan-A-B/beddybytes/golang/internal/connections" "github.com/Ryan-A-B/beddybytes/golang/internal/contextx" - "github.com/Ryan-A-B/beddybytes/golang/internal/fatal" "github.com/Ryan-A-B/beddybytes/golang/internal/logx" - "github.com/Ryan-A-B/beddybytes/golang/internal/messages" "github.com/Ryan-A-B/beddybytes/golang/internal/mqttx" ) -const inboxTopicFormat = "accounts/%s/connections/%s/inbox" -const statusTopicFormat = "accounts/%s/connections/%s/status" +const webrtcInboxTopicFormat = "accounts/%s/clients/%s/webrtc_inbox" +const statusTopicFormat = "accounts/%s/clients/%s/status" const EventTypeClientConnected = "client.connected" const EventTypeClientDisconnected = "client.disconnected" @@ -32,6 +29,7 @@ type ClientConnectedEventData struct { ClientID string `json:"client_id"` ConnectionID string `json:"connection_id"` RequestID string `json:"request_id"` + AtMillis int64 `json:"at_millis"` } type ClientDisconnectedEventData struct { @@ -39,6 +37,10 @@ type ClientDisconnectedEventData struct { ConnectionID string `json:"connection_id"` RequestID string `json:"request_id"` WebSocketCloseCode int `json:"web_socket_close_code"` + AtMillis int64 `json:"at_millis"` + Disconnected struct { + Reason string `json:"reason"` + } `json:"disconnected"` } type MessageType string @@ -72,6 +74,7 @@ func (message *IncomingMessage) Validate() (err error) { type IncomingSignal struct { ToConnectionID string `json:"to_connection_id"` + ToClientID string `json:"to_client_id,omitempty"` Data json.RawMessage `json:"data"` } @@ -89,15 +92,10 @@ func (signal *IncomingSignal) Validate() (err error) { type OutgoingMessage struct { Type MessageType `json:"type"` - Signal *OutgoingSignal `json:"signal,omitempty"` + Signal json.RawMessage `json:"signal,omitempty"` Event *Event `json:"event,omitempty"` } -type OutgoingSignal struct { - FromConnectionID string `json:"from_connection_id"` - Data json.RawMessage `json:"data"` -} - type ConnectionStoreKey struct { AccountID string ConnectionID string @@ -109,73 +107,54 @@ func (handlers *Handlers) HandleConnection(responseWriter http.ResponseWriter, r vars := mux.Vars(request) clientID := vars["client_id"] connectionID := vars["connection_id"] - requestID := uuid.NewV4().String() conn, err := handlers.Upgrader.Upgrade(responseWriter, request, nil) if err != nil { logx.Errorln(err) return } defer conn.Close() - inboxTopic := fmt.Sprintf(inboxTopicFormat, accountID, connectionID) - token := handlers.MQTTClient.Subscribe(inboxTopic, 1, func(client mqtt.Client, message mqtt.Message) { - signal := new(OutgoingSignal) - err := json.Unmarshal(message.Payload(), signal) - fatal.OnError(err) - - err = conn.WriteJSON(OutgoingMessage{ - Type: MessageTypeSignal, - Signal: signal, - }) - if err != nil { - logx.Errorln(err) - return - } - message.Ack() - }) - err = mqttx.Wait(token) - if err != nil { - logx.Errorln(err) - return - } - // Wait for the subscription to propagate - readyC := time.After(500 * time.Millisecond) err = handlers.sendConnectedMessage(ctx, sendConnectedMessageInput{ accountID: accountID, clientID: clientID, connectionID: connectionID, - requestID: requestID, }) if err != nil { logx.Errorln(err) return } + connection := handlers.ConnectionFactory.CreateConnection(ctx, &CreateConnectionInput{ + AccountID: accountID, + ClientID: clientID, + ConnectionID: connectionID, + conn: conn, + client: handlers.MQTTClient, + }) + handlers.ConnectionHub.Put(connection) + defer handlers.ConnectionHub.Delete(connection.ID) + if pendingStart, ok := handlers.PendingSessionStarts.Take(accountID, connectionID); ok { + pendingStart.ClientID = clientID + if err = handlers.publishBabyStationAnnouncement(accountID, pendingStart); err != nil { + logx.Errorln(err) + } + } + disconnectReason := "clean" defer func() { err = handlers.sendDisconnectedMessage(ctx, sendDisconnectedMessageInput{ accountID: accountID, clientID: clientID, connectionID: connectionID, - requestID: requestID, + reason: disconnectReason, }) if err != nil { logx.Errorln(err) } - err = mqttx.Wait(handlers.MQTTClient.Unsubscribe(inboxTopic)) - if err != nil { - logx.Errorln(err) - } }() - connection := handlers.ConnectionFactory.CreateConnection(ctx, &CreateConnectionInput{ - AccountID: accountID, - ConnectionID: connectionID, - conn: conn, - client: handlers.MQTTClient, - }) + errC := make(chan error, 1) go func() { errC <- connection.runKeepAlive(ctx) }() go func() { - <-readyC errC <- connection.handleMessages(ctx) }() select { @@ -188,6 +167,7 @@ func (handlers *Handlers) HandleConnection(responseWriter http.ResponseWriter, r return } } + disconnectReason = "unexpected" logx.Errorln(err) return } @@ -198,20 +178,18 @@ type sendConnectedMessageInput struct { accountID string clientID string connectionID string - requestID string } func (handlers *Handlers) sendConnectedMessage(ctx context.Context, input sendConnectedMessageInput) (err error) { - topic := fmt.Sprintf(statusTopicFormat, input.accountID, input.connectionID) - payload := fatal.UnlessMarshalJSON(connections.MessageFrame{ - MessageFrameBase: messages.MessageFrameBase{ - Type: connections.MessageTypeConnected, - }, - Connected: &connections.MessageConnected{ - ClientID: input.clientID, - RequestID: input.requestID, - }, + topic := fmt.Sprintf(statusTopicFormat, input.accountID, input.clientID) + payload, err := json.Marshal(map[string]any{ + "type": "connected", + "connection_id": input.connectionID, + "at_millis": time.Now().UnixMilli(), }) + if err != nil { + return + } return mqttx.Wait(handlers.MQTTClient.Publish(topic, 1, false, payload)) } @@ -219,20 +197,25 @@ type sendDisconnectedMessageInput struct { accountID string clientID string connectionID string - requestID string + reason string } func (handlers *Handlers) sendDisconnectedMessage(ctx context.Context, input sendDisconnectedMessageInput) (err error) { - topic := fmt.Sprintf(statusTopicFormat, input.accountID, input.connectionID) - payload := fatal.UnlessMarshalJSON(connections.MessageFrame{ - MessageFrameBase: messages.MessageFrameBase{ - Type: connections.MessageTypeDisconnected, - }, - Disconnected: &connections.MessageDisconnected{ - ClientID: input.clientID, - RequestID: input.requestID, + if input.reason == "" { + input.reason = "clean" + } + topic := fmt.Sprintf(statusTopicFormat, input.accountID, input.clientID) + payload, err := json.Marshal(map[string]any{ + "type": "disconnected", + "connection_id": input.connectionID, + "disconnected": map[string]any{ + "reason": input.reason, }, + "at_millis": time.Now().UnixMilli(), }) + if err != nil { + return + } return mqttx.Wait(handlers.MQTTClient.Publish(topic, 1, false, payload)) } @@ -240,6 +223,7 @@ type ConnectionFactory struct{} type CreateConnectionInput struct { AccountID string + ClientID string ConnectionID string conn *websocket.Conn client mqtt.Client @@ -247,15 +231,18 @@ type CreateConnectionInput struct { type Connection struct { AccountID string + ClientID string ID string conn *websocket.Conn client mqtt.Client pongC chan struct{} + writeMu sync.Mutex } func (factory *ConnectionFactory) CreateConnection(ctx context.Context, input *CreateConnectionInput) (connection *Connection) { connection = &Connection{ AccountID: input.AccountID, + ClientID: input.ClientID, ID: input.ConnectionID, conn: input.conn, client: input.client, @@ -291,7 +278,7 @@ func (connection *Connection) handleNextMessage(ctx context.Context) (err error) } switch incomingMessage.Type { case MessageTypePing: - return connection.conn.WriteJSON(OutgoingMessage{ + return connection.WriteJSON(OutgoingMessage{ Type: MessageTypePong, }) case MessageTypeSignal: @@ -302,13 +289,21 @@ func (connection *Connection) handleNextMessage(ctx context.Context) (err error) } func (connection *Connection) handleSignal(ctx context.Context, incomingSignal *IncomingSignal) (err error) { - inboxTopic := fmt.Sprintf(inboxTopicFormat, connection.AccountID, incomingSignal.ToConnectionID) - data, err := json.Marshal(OutgoingSignal{ - FromConnectionID: connection.ID, - Data: incomingSignal.Data, + topicClientID := incomingSignal.ToClientID + if topicClientID == "" { + topicClientID = connection.ClientID + } + inboxTopic := fmt.Sprintf(webrtcInboxTopicFormat, connection.AccountID, topicClientID) + data, err := json.Marshal(map[string]any{ + "from_client_id": connection.ClientID, + "from_connection_id": connection.ID, + "connection_id": incomingSignal.ToConnectionID, + "data": json.RawMessage(incomingSignal.Data), }) - fatal.OnError(err) - return mqttx.Wait(connection.client.Publish(inboxTopic, 1, false, []byte(data))) + if err != nil { + return + } + return mqttx.Wait(connection.client.Publish(inboxTopic, 1, false, data)) } func (connection *Connection) handlePong(appData string) (err error) { @@ -369,3 +364,9 @@ func (connection *Connection) waitForPong(ctx context.Context) (err error) { return } } + +func (connection *Connection) WriteJSON(value any) error { + connection.writeMu.Lock() + defer connection.writeMu.Unlock() + return connection.conn.WriteJSON(value) +} diff --git a/golang/cmd/backend/ConnectionHub.go b/golang/cmd/backend/ConnectionHub.go new file mode 100644 index 0000000..ce911da --- /dev/null +++ b/golang/cmd/backend/ConnectionHub.go @@ -0,0 +1,33 @@ +package main + +import "sync" + +type ConnectionHub struct { + mutex sync.RWMutex + connectionByID map[string]*Connection +} + +func NewConnectionHub() *ConnectionHub { + return &ConnectionHub{ + connectionByID: make(map[string]*Connection), + } +} + +func (hub *ConnectionHub) Put(connection *Connection) { + hub.mutex.Lock() + defer hub.mutex.Unlock() + hub.connectionByID[connection.ID] = connection +} + +func (hub *ConnectionHub) Delete(connectionID string) { + hub.mutex.Lock() + defer hub.mutex.Unlock() + delete(hub.connectionByID, connectionID) +} + +func (hub *ConnectionHub) Get(connectionID string) (*Connection, bool) { + hub.mutex.RLock() + defer hub.mutex.RUnlock() + connection, ok := hub.connectionByID[connectionID] + return connection, ok +} diff --git a/golang/cmd/backend/MQTTSync.go b/golang/cmd/backend/MQTTSync.go new file mode 100644 index 0000000..b1bf150 --- /dev/null +++ b/golang/cmd/backend/MQTTSync.go @@ -0,0 +1,194 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "regexp" + "time" + + "github.com/Ryan-A-B/beddybytes/golang/internal/contextx" + "github.com/Ryan-A-B/beddybytes/golang/internal/logx" + "github.com/Ryan-A-B/beddybytes/golang/internal/mqttx" + "github.com/Ryan-A-B/beddybytes/golang/internal/sessionstartdecider" + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +var ( + babyStationsTopicRegex = regexp.MustCompile(`^accounts/([^/]+)/baby_stations$`) + parentStationsTopicRegex = regexp.MustCompile(`^accounts/([^/]+)/parent_stations$`) + clientWebRTCInboxTopicRegex = regexp.MustCompile(`^accounts/([^/]+)/clients/([^/]+)/webrtc_inbox$`) +) + +func (handlers *Handlers) RunMQTTSync(ctx context.Context) { + subscriptions := []struct { + topic string + handler mqtt.MessageHandler + }{ + {topic: "accounts/+/baby_stations", handler: handlers.handleBabyStationsMessage}, + {topic: "accounts/+/parent_stations", handler: handlers.handleParentStationsMessage}, + {topic: "accounts/+/clients/+/webrtc_inbox", handler: handlers.handleWebRTCInboxMessage}, + } + for _, subscription := range subscriptions { + if err := mqttx.Wait(handlers.MQTTClient.Subscribe(subscription.topic, 1, subscription.handler)); err != nil { + panic(err) + } + } + <-ctx.Done() +} + +func (handlers *Handlers) handleBabyStationsMessage(client mqtt.Client, message mqtt.Message) { + defer message.Ack() + accountID, ok := parseAccountID(message.Topic(), babyStationsTopicRegex) + if !ok { + logx.Warnln("invalid baby station topic:", message.Topic()) + return + } + var announcement babyStationAnnouncement + if err := json.Unmarshal(message.Payload(), &announcement); err != nil { + logx.Warnln(err) + return + } + if announcement.ConnectionID == "" || announcement.Name == "" { + logx.Warnln("invalid baby station announcement payload") + return + } + if announcement.SessionID == "" { + logx.Warnln("invalid baby station announcement payload: missing session_id") + return + } + startedAt := time.UnixMilli(announcement.StartedAtMillis) + err := handlers.SessionStartDecider.Put(context.Background(), sessionstartdecider.Session{ + AccountID: accountID, + ID: announcement.SessionID, + Name: announcement.Name, + HostConnectionID: announcement.ConnectionID, + StartedAt: startedAt, + }) + if err != nil && err != sessionstartdecider.ErrDuplicate { + logx.Errorln(err) + } +} + +func (handlers *Handlers) handleParentStationsMessage(client mqtt.Client, message mqtt.Message) { + defer message.Ack() + accountID, ok := parseAccountID(message.Topic(), parentStationsTopicRegex) + if !ok { + logx.Warnln("invalid parent station topic:", message.Topic()) + return + } + var payload parentStationAnnouncementRequest + if err := json.Unmarshal(message.Payload(), &payload); err != nil { + logx.Warnln(err) + return + } + if payload.ClientID == "" { + logx.Warnln("missing client_id in parent station request") + return + } + ctx := contextx.WithAccountID(context.Background(), accountID) + snapshot, err := handlers.BabyStationList.GetSnapshot(ctx) + if err != nil { + logx.Errorln(err) + return + } + for _, babyStation := range snapshot.List() { + controlTopic := fmt.Sprintf("accounts/%s/clients/%s/control_inbox", accountID, payload.ClientID) + controlMessage := controlMessageBabyStationAnnouncement{ + ControlMessageBase: ControlMessageBase{ + Type: "baby_station_announcement", + AtMillis: time.Now().UnixMilli(), + }, + BabyStationAnnouncement: babyStationAnnouncement{ + SessionID: babyStation.SessionID, + ClientID: babyStation.ClientID, + ConnectionID: babyStation.Connection.ID, + Name: babyStation.Name, + StartedAtMillis: babyStation.StartedAt.UnixMilli(), + }, + } + encoded, err := json.Marshal(controlMessage) + if err != nil { + logx.Errorln(err) + continue + } + if err := mqttx.Wait(handlers.MQTTClient.Publish(controlTopic, 1, false, encoded)); err != nil { + logx.Errorln(err) + } + } +} + +func (handlers *Handlers) handleWebRTCInboxMessage(client mqtt.Client, message mqtt.Message) { + defer message.Ack() + if !clientWebRTCInboxTopicRegex.MatchString(message.Topic()) { + logx.Warnln("invalid webrtc inbox topic:", message.Topic()) + return + } + var inbox webrtcInboxMessage + if err := json.Unmarshal(message.Payload(), &inbox); err != nil { + logx.Warnln(err) + return + } + if inbox.ConnectionID == "" { + return + } + connection, ok := handlers.ConnectionHub.Get(inbox.ConnectionID) + if !ok { + return + } + if inbox.FromConnectionID == "" { + // Frontend websocket signalling still routes by from_connection_id. + return + } + signal, err := json.Marshal(map[string]any{ + "from_connection_id": inbox.FromConnectionID, + "data": inbox.Data, + }) + if err != nil { + logx.Errorln(err) + return + } + if err := connection.WriteJSON(OutgoingMessage{ + Type: MessageTypeSignal, + Signal: signal, + }); err != nil { + logx.Errorln(err) + } +} + +func parseAccountID(topic string, pattern *regexp.Regexp) (string, bool) { + matches := pattern.FindStringSubmatch(topic) + if len(matches) != 2 { + return "", false + } + return matches[1], true +} + +type babyStationAnnouncement struct { + SessionID string `json:"session_id"` + ClientID string `json:"client_id"` + ConnectionID string `json:"connection_id"` + Name string `json:"name"` + StartedAtMillis int64 `json:"started_at_millis"` +} + +type parentStationAnnouncementRequest struct { + ClientID string `json:"client_id"` +} + +type webrtcInboxMessage struct { + FromClientID string `json:"from_client_id"` + FromConnectionID string `json:"from_connection_id"` + ConnectionID string `json:"connection_id"` + Data json.RawMessage `json:"data"` +} + +type ControlMessageBase struct { + Type string `json:"type"` + AtMillis int64 `json:"at_millis"` +} + +type controlMessageBabyStationAnnouncement struct { + ControlMessageBase + BabyStationAnnouncement babyStationAnnouncement `json:"baby_station_announcement"` +} diff --git a/golang/cmd/backend/PendingSessionStarts.go b/golang/cmd/backend/PendingSessionStarts.go new file mode 100644 index 0000000..4dd7d7b --- /dev/null +++ b/golang/cmd/backend/PendingSessionStarts.go @@ -0,0 +1,91 @@ +package main + +import "sync" + +type pendingSessionStart struct { + connectionID string + input CreateSessionInput +} + +type PendingSessionStarts struct { + mutex sync.Mutex + entriesByAccount map[string][]pendingSessionStart + maxPerAccount int +} + +func NewPendingSessionStarts() *PendingSessionStarts { + return &PendingSessionStarts{ + entriesByAccount: make(map[string][]pendingSessionStart), + maxPerAccount: 2, + } +} + +func (store *PendingSessionStarts) Put(accountID string, input CreateSessionInput) { + store.mutex.Lock() + defer store.mutex.Unlock() + entries := store.entriesByAccount[accountID] + entries = removeConnectionID(entries, input.ConnectionID) + entries = append(entries, pendingSessionStart{ + connectionID: input.ConnectionID, + input: input, + }) + if len(entries) > store.maxPerAccount { + entries = entries[len(entries)-store.maxPerAccount:] + } + store.entriesByAccount[accountID] = entries +} + +func (store *PendingSessionStarts) Take(accountID string, connectionID string) (CreateSessionInput, bool) { + store.mutex.Lock() + defer store.mutex.Unlock() + entries := store.entriesByAccount[accountID] + for i, pending := range entries { + if pending.connectionID != connectionID { + continue + } + store.entriesByAccount[accountID] = append(entries[:i], entries[i+1:]...) + if len(store.entriesByAccount[accountID]) == 0 { + delete(store.entriesByAccount, accountID) + } + return pending.input, true + } + return CreateSessionInput{}, false +} + +func removeConnectionID(entries []pendingSessionStart, connectionID string) []pendingSessionStart { + for i, entry := range entries { + if entry.connectionID != connectionID { + continue + } + return append(entries[:i], entries[i+1:]...) + } + return entries +} + +func (store *PendingSessionStarts) lenForAccount(accountID string) int { + store.mutex.Lock() + defer store.mutex.Unlock() + return len(store.entriesByAccount[accountID]) +} + +func (store *PendingSessionStarts) contains(accountID string, connectionID string) bool { + store.mutex.Lock() + defer store.mutex.Unlock() + for _, entry := range store.entriesByAccount[accountID] { + if entry.connectionID == connectionID { + return true + } + } + return false +} + +func (store *PendingSessionStarts) connectionsInOrder(accountID string) []string { + store.mutex.Lock() + defer store.mutex.Unlock() + entries := store.entriesByAccount[accountID] + out := make([]string, 0, len(entries)) + for _, entry := range entries { + out = append(out, entry.connectionID) + } + return out +} diff --git a/golang/cmd/backend/PendingSessionStarts_test.go b/golang/cmd/backend/PendingSessionStarts_test.go new file mode 100644 index 0000000..2f948e4 --- /dev/null +++ b/golang/cmd/backend/PendingSessionStarts_test.go @@ -0,0 +1,60 @@ +package main + +import "testing" + +func TestPendingSessionStartsLRUPerAccount(t *testing.T) { + store := NewPendingSessionStarts() + accountID := "account-1" + + store.Put(accountID, CreateSessionInput{ConnectionID: "c1", Name: "one", StartedAtMillis: 1}) + store.Put(accountID, CreateSessionInput{ConnectionID: "c2", Name: "two", StartedAtMillis: 2}) + store.Put(accountID, CreateSessionInput{ConnectionID: "c3", Name: "three", StartedAtMillis: 3}) + + if store.contains(accountID, "c1") { + t.Fatalf("expected oldest entry c1 to be evicted") + } + if !store.contains(accountID, "c2") || !store.contains(accountID, "c3") { + t.Fatalf("expected c2 and c3 to be retained") + } + order := store.connectionsInOrder(accountID) + if len(order) != 2 || order[0] != "c2" || order[1] != "c3" { + t.Fatalf("unexpected order: %#v", order) + } +} + +func TestPendingSessionStartsTakeAndPromote(t *testing.T) { + store := NewPendingSessionStarts() + accountID := "account-1" + + store.Put(accountID, CreateSessionInput{ConnectionID: "c1", Name: "one", StartedAtMillis: 1}) + store.Put(accountID, CreateSessionInput{ConnectionID: "c2", Name: "two", StartedAtMillis: 2}) + store.Put(accountID, CreateSessionInput{ConnectionID: "c1", Name: "one-new", StartedAtMillis: 3}) + + order := store.connectionsInOrder(accountID) + if len(order) != 2 || order[0] != "c2" || order[1] != "c1" { + t.Fatalf("unexpected order after promote: %#v", order) + } + + input, ok := store.Take(accountID, "c1") + if !ok { + t.Fatalf("expected c1 to be present") + } + if input.Name != "one-new" { + t.Fatalf("unexpected input returned: %#v", input) + } + if store.lenForAccount(accountID) != 1 { + t.Fatalf("expected one entry remaining") + } +} + +func TestPendingSessionStartsIsolatedByAccount(t *testing.T) { + store := NewPendingSessionStarts() + store.Put("a1", CreateSessionInput{ConnectionID: "c1", Name: "one", StartedAtMillis: 1}) + store.Put("a2", CreateSessionInput{ConnectionID: "c1", Name: "other", StartedAtMillis: 1}) + + _, okA1 := store.Take("a1", "c1") + _, okA2 := store.Take("a2", "c1") + if !okA1 || !okA2 { + t.Fatalf("expected both accounts to have isolated entries") + } +} diff --git a/golang/cmd/backend/UsageStats.go b/golang/cmd/backend/UsageStats.go index a4ebc82..421df9e 100644 --- a/golang/cmd/backend/UsageStats.go +++ b/golang/cmd/backend/UsageStats.go @@ -182,7 +182,7 @@ func applyClientDisconnectedEvent(ctx context.Context, stats *UsageStats, event return } disconnectTime := time.Unix(event.UnixTimestamp, 0) - if disconnectWasClean(clientDisconnectedData.WebSocketCloseCode) { + if disconnectWasClean(clientDisconnectedData.Disconnected.Reason) { duration := disconnectTime.Sub(sessionInfo.StartTime) stats.durationByAccountID[event.AccountID] += duration stats.removeActiveSession(sessionInfo) @@ -271,11 +271,6 @@ func (stats *UsageStats) removeDisconnectedSessionByID(accountID string, session return nil, false } -func disconnectWasClean(closeCode int) bool { - switch closeCode { - case 1000, 1001: - return true - default: - return false - } +func disconnectWasClean(reason string) bool { + return reason == "clean" } diff --git a/golang/cmd/backend/UsageStats_test.go b/golang/cmd/backend/UsageStats_test.go index 3c4a578..2baf8f9 100644 --- a/golang/cmd/backend/UsageStats_test.go +++ b/golang/cmd/backend/UsageStats_test.go @@ -82,7 +82,6 @@ func TestUsageStats(t *testing.T) { clientConnectedData := ClientConnectedEventData{ ClientID: hostClientID, ConnectionID: hostConnectionID, - RequestID: uuid.NewV4().String(), } data, err := json.Marshal(clientConnectedData) So(err, ShouldBeNil) @@ -97,11 +96,10 @@ func TestUsageStats(t *testing.T) { Convey("host connection ends", func() { Convey("clean", func() { clientDisconnectedData := ClientDisconnectedEventData{ - ClientID: hostClientID, - ConnectionID: hostConnectionID, - RequestID: clientConnectedData.RequestID, - WebSocketCloseCode: 1000, + ClientID: hostClientID, + ConnectionID: hostConnectionID, } + clientDisconnectedData.Disconnected.Reason = "clean" data, err := json.Marshal(clientDisconnectedData) So(err, ShouldBeNil) _, err = log.Append(ctx, eventlog.AppendInput{ @@ -120,11 +118,10 @@ func TestUsageStats(t *testing.T) { }) Convey("unclean", func() { clientDisconnectedData := ClientDisconnectedEventData{ - ClientID: hostClientID, - ConnectionID: hostConnectionID, - RequestID: clientConnectedData.RequestID, - WebSocketCloseCode: 1006, + ClientID: hostClientID, + ConnectionID: hostConnectionID, } + clientDisconnectedData.Disconnected.Reason = "unexpected" data, err := json.Marshal(clientDisconnectedData) So(err, ShouldBeNil) _, err = log.Append(ctx, eventlog.AppendInput{ @@ -144,7 +141,6 @@ func TestUsageStats(t *testing.T) { clientConnectedData := ClientConnectedEventData{ ClientID: hostClientID, ConnectionID: hostConnectionID, - RequestID: uuid.NewV4().String(), } data, err := json.Marshal(clientConnectedData) So(err, ShouldBeNil) @@ -184,7 +180,6 @@ func TestUsageStats(t *testing.T) { clientConnectedData := ClientConnectedEventData{ ClientID: hostClientID, ConnectionID: hostConnectionID, - RequestID: uuid.NewV4().String(), } data, err := json.Marshal(clientConnectedData) So(err, ShouldBeNil) diff --git a/golang/cmd/backend/server.go b/golang/cmd/backend/server.go index 7739423..2bcbd23 100644 --- a/golang/cmd/backend/server.go +++ b/golang/cmd/backend/server.go @@ -32,6 +32,7 @@ import ( "github.com/Ryan-A-B/beddybytes/golang/internal/mqttx" "github.com/Ryan-A-B/beddybytes/golang/internal/resetpassword" "github.com/Ryan-A-B/beddybytes/golang/internal/sessionlist" + "github.com/Ryan-A-B/beddybytes/golang/internal/sessionstartdecider" "github.com/Ryan-A-B/beddybytes/golang/internal/sessionstore" "github.com/Ryan-A-B/beddybytes/golang/internal/store" ) @@ -66,15 +67,18 @@ type Client struct { } type Handlers struct { - Upgrader websocket.Upgrader - ClientStore ClientStore - ConnectionFactory ConnectionFactory - SessionProjection SessionProjection - SessionList *sessionlist.SessionList - BabyStationList *babystationlist.BabyStationList - EventLog eventlog.EventLog - MQTTClient mqtt.Client - UsageStats *UsageStats + Upgrader websocket.Upgrader + ClientStore ClientStore + ConnectionFactory ConnectionFactory + SessionProjection SessionProjection + SessionList *sessionlist.SessionList + SessionStartDecider *sessionstartdecider.Decider + PendingSessionStarts *PendingSessionStarts + BabyStationList *babystationlist.BabyStationList + ConnectionHub *ConnectionHub + EventLog eventlog.EventLog + MQTTClient mqtt.Client + UsageStats *UsageStats Key interface{} } @@ -276,11 +280,16 @@ func main() { SessionList: sessionlist.New(ctx, sessionlist.NewInput{ Log: eventLog, }), + SessionStartDecider: sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ + EventLog: eventLog, + }), + PendingSessionStarts: NewPendingSessionStarts(), BabyStationList: babystationlist.New(babystationlist.NewInput{ EventLog: eventLog, }), - EventLog: eventLog, - MQTTClient: mqttClient, + ConnectionHub: NewConnectionHub(), + EventLog: eventLog, + MQTTClient: mqttClient, UsageStats: NewUsageStats(ctx, NewUsageStatsInput{ Log: eventLog, }), @@ -300,9 +309,15 @@ func main() { ConnectionStore: connectionstore.NewDecider(connectionstore.NewDeciderInput{ EventLog: eventLog, }), + SessionList: handlers.SessionList, + EventLog: eventLog, }) log.Fatal("connectionstoresync.Run exited") }() + go func() { + handlers.RunMQTTSync(ctx) + log.Fatal("handlers.RunMQTTSync exited") + }() router := mux.NewRouter() router.Use(internal.LoggingMiddleware) handlers.AddRoutes(router.NewRoute().Subrouter()) diff --git a/golang/cmd/backend/sessions.go b/golang/cmd/backend/sessions.go index fcc679c..4a6bf2e 100644 --- a/golang/cmd/backend/sessions.go +++ b/golang/cmd/backend/sessions.go @@ -3,6 +3,8 @@ package main import ( "context" "encoding/json" + "fmt" + "io" "log" "net/http" "time" @@ -14,6 +16,7 @@ import ( "github.com/Ryan-A-B/beddybytes/golang/internal/eventlog" "github.com/Ryan-A-B/beddybytes/golang/internal/fatal" "github.com/Ryan-A-B/beddybytes/golang/internal/httpx" + "github.com/Ryan-A-B/beddybytes/golang/internal/mqttx" "github.com/Ryan-A-B/beddybytes/golang/internal/sessions" "github.com/Ryan-A-B/beddybytes/golang/internal/sessionstore" ) @@ -49,6 +52,27 @@ type Session struct { StartedAt time.Time `json:"started_at"` } +type CreateSessionInput struct { + SessionID string `json:"session_id"` + ClientID string `json:"client_id"` + ConnectionID string `json:"connection_id"` + Name string `json:"name"` + StartedAtMillis int64 `json:"started_at_millis"` +} + +func (input *CreateSessionInput) validate() error { + if input.ConnectionID == "" { + return merry.New("connection id is empty").WithHTTPCode(http.StatusBadRequest) + } + if input.Name == "" { + return merry.New("session name is empty").WithHTTPCode(http.StatusBadRequest) + } + if input.StartedAtMillis <= 0 { + return merry.New("started_at_millis must be positive").WithHTTPCode(http.StatusBadRequest) + } + return nil +} + func (handlers *Handlers) StartSession(responseWriter http.ResponseWriter, request *http.Request) { var err error defer func() { @@ -58,31 +82,59 @@ func (handlers *Handlers) StartSession(responseWriter http.ResponseWriter, reque } }() ctx := request.Context() - vars := mux.Vars(request) - sessionID := vars["session_id"] - var session StartSessionEventData - err = json.NewDecoder(request.Body).Decode(&session) + accountID := contextx.GetAccountID(ctx) + sessionID := mux.Vars(request)["session_id"] + requestBody, err := io.ReadAll(request.Body) if err != nil { err = merry.WithHTTPCode(err, http.StatusBadRequest) return } - err = session.validate() + input, err := parseStartSessionInput(requestBody, sessionID) if err != nil { return } - if session.ID != sessionID { - err = merry.Errorf("session id in path does not match session id in body").WithHTTPCode(http.StatusBadRequest) + if input.ClientID == "" { + connection, ok := handlers.ConnectionHub.Get(input.ConnectionID) + if ok && connection.AccountID == accountID { + input.ClientID = connection.ClientID + } + } + if input.ClientID == "" { + handlers.PendingSessionStarts.Put(accountID, input) return } - _, err = handlers.EventLog.Append(ctx, eventlog.AppendInput{ - Type: EventTypeSessionStarted, - AccountID: contextx.GetAccountID(ctx), - Data: fatal.UnlessMarshalJSON(session), - }) - if err != nil { + if err = handlers.publishBabyStationAnnouncement(accountID, input); err != nil { return } - // TODO set header with logical clock of the start event +} + +func parseStartSessionInput(data []byte, sessionID string) (CreateSessionInput, error) { + var legacyInput StartSessionEventData + if err := json.Unmarshal(data, &legacyInput); err != nil { + return CreateSessionInput{}, merry.WithHTTPCode(err, http.StatusBadRequest) + } + if err := legacyInput.validate(); err != nil { + return CreateSessionInput{}, err + } + if legacyInput.ID != sessionID { + return CreateSessionInput{}, merry.Errorf("session id in path does not match session id in body").WithHTTPCode(http.StatusBadRequest) + } + return CreateSessionInput{ + SessionID: legacyInput.ID, + ClientID: "", + ConnectionID: legacyInput.HostConnectionID, + Name: legacyInput.Name, + StartedAtMillis: legacyInput.StartedAt.UnixMilli(), + }, nil +} + +func (handlers *Handlers) publishBabyStationAnnouncement(accountID string, input CreateSessionInput) error { + topic := fmt.Sprintf("accounts/%s/baby_stations", accountID) + payload, err := json.Marshal(input) + if err != nil { + return err + } + return mqttx.Wait(handlers.MQTTClient.Publish(topic, 1, false, payload)) } type EndSessionEventData struct { @@ -98,18 +150,19 @@ func (handlers *Handlers) EndSession(responseWriter http.ResponseWriter, request } }() ctx := request.Context() - vars := mux.Vars(request) - sessionID := vars["session_id"] - // TODO check we know about the session? - // expect a header with the logical clock of the start event - _, err = handlers.EventLog.Append(ctx, eventlog.AppendInput{ - Type: EventTypeSessionEnded, - AccountID: contextx.GetAccountID(ctx), - Data: fatal.UnlessMarshalJSON(&EndSessionEventData{ID: sessionID}), + sessionID := mux.Vars(request)["session_id"] + accountID := contextx.GetAccountID(ctx) + data, err := json.Marshal(EndSessionEventData{ + ID: sessionID, }) if err != nil { return } + _, err = handlers.EventLog.Append(ctx, eventlog.AppendInput{ + Type: EventTypeSessionEnded, + AccountID: accountID, + Data: data, + }) } type SessionProjection struct { diff --git a/golang/cmd/backend/sessions_test.go b/golang/cmd/backend/sessions_test.go index 52b5101..5fbae8d 100644 --- a/golang/cmd/backend/sessions_test.go +++ b/golang/cmd/backend/sessions_test.go @@ -2,15 +2,314 @@ package main import ( "context" + "encoding/json" + "net/http" + "net/http/httptest" "os" "path/filepath" "runtime" + "strings" "testing" + "time" + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gorilla/mux" + + "github.com/Ryan-A-B/beddybytes/golang/internal/babystationlist" + "github.com/Ryan-A-B/beddybytes/golang/internal/connections" + "github.com/Ryan-A-B/beddybytes/golang/internal/contextx" "github.com/Ryan-A-B/beddybytes/golang/internal/eventlog" + "github.com/Ryan-A-B/beddybytes/golang/internal/sessionstartdecider" "github.com/Ryan-A-B/beddybytes/golang/internal/sessionstore" ) +func TestParseStartSessionInputLegacyPayload(t *testing.T) { + sessionID := "session-123" + startedAt := time.Now().UTC().Truncate(time.Second) + payload := StartSessionEventData{ + ID: sessionID, + Name: "nursery", + HostConnectionID: "connection-123", + StartedAt: startedAt, + } + data, err := json.Marshal(payload) + if err != nil { + t.Fatal(err) + } + output, err := parseStartSessionInput(data, sessionID) + if err != nil { + t.Fatal(err) + } + if output.ClientID != "" { + t.Fatalf("expected empty client id for legacy payload, got %q", output.ClientID) + } + if output.SessionID != payload.ID { + t.Fatalf("unexpected session id: %q", output.SessionID) + } + if output.ConnectionID != payload.HostConnectionID { + t.Fatalf("unexpected connection id: %q", output.ConnectionID) + } + if output.Name != payload.Name { + t.Fatalf("unexpected name: %q", output.Name) + } + if output.StartedAtMillis != payload.StartedAt.UnixMilli() { + t.Fatalf("unexpected started_at_millis: %d", output.StartedAtMillis) + } +} + +func TestParseStartSessionInputRejectsRefactorPayload(t *testing.T) { + sessionID := "session-123" + payload := CreateSessionInput{ + SessionID: sessionID, + ClientID: "client-123", + ConnectionID: "connection-123", + Name: "nursery", + StartedAtMillis: time.Now().UnixMilli(), + } + data, err := json.Marshal(payload) + if err != nil { + t.Fatal(err) + } + if _, err := parseStartSessionInput(data, sessionID); err == nil { + t.Fatal("expected refactor payload to be rejected") + } +} + +func TestEndSessionAllowsRestartOnSameConnection(t *testing.T) { + ctx := context.Background() + folderPath, err := os.MkdirTemp("", "TestEndSessionAllowsRestartOnSameConnection-*") + if err != nil { + t.Fatal(err) + } + eventLog := eventlog.NewFileEventLog(&eventlog.NewFileEventLogInput{ + FolderPath: folderPath, + }) + decider := sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ + EventLog: eventLog, + }) + handlers := Handlers{ + EventLog: eventLog, + } + accountID := "account-1" + first := sessionstartdecider.Session{ + AccountID: accountID, + ID: "session-1", + Name: "nursery", + HostConnectionID: "connection-1", + StartedAt: time.Unix(1, 0).UTC(), + } + if err := decider.Put(ctx, first); err != nil { + t.Fatal(err) + } + + request := httptest.NewRequest(http.MethodDelete, "/sessions/session-1", nil) + request = request.WithContext(contextx.WithAccountID(request.Context(), accountID)) + request = mux.SetURLVars(request, map[string]string{ + "session_id": first.ID, + }) + responseWriter := httptest.NewRecorder() + handlers.EndSession(responseWriter, request) + if responseWriter.Code != http.StatusOK { + t.Fatalf("unexpected status code: %d", responseWriter.Code) + } + + second := first + second.ID = "session-2" + second.StartedAt = second.StartedAt.Add(time.Minute) + if err := decider.Put(ctx, second); err != nil { + t.Fatalf("expected restart after delete to succeed, got %v", err) + } +} + +func TestSecondBabyStationAnnouncementShowsInSnapshotAfterLegacyDelete(t *testing.T) { + ctx := context.Background() + folderPath, err := os.MkdirTemp("", "TestSecondBabyStationAnnouncementShowsInSnapshotAfterLegacyDelete-*") + if err != nil { + t.Fatal(err) + } + eventLog := eventlog.NewFileEventLog(&eventlog.NewFileEventLogInput{ + FolderPath: folderPath, + }) + accountID := "account-1" + clientID := "client-1" + connectionID := "connection-1" + handlers := Handlers{ + EventLog: eventLog, + SessionStartDecider: sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ + EventLog: eventLog, + }), + } + if _, err := eventLog.Append(contextx.WithAccountID(ctx, accountID), eventlog.AppendInput{ + Type: connections.EventTypeConnected, + AccountID: accountID, + Data: mustJSON(t, connections.EventConnected{ + ClientID: clientID, + ConnectionID: connectionID, + }), + }); err != nil { + t.Fatal(err) + } + + handlers.handleBabyStationsMessage(nil, testMQTTMessage{ + topic: "accounts/account-1/baby_stations", + payload: mustJSON(t, babyStationAnnouncement{ + SessionID: "session-1", + ClientID: clientID, + ConnectionID: connectionID, + Name: "first", + StartedAtMillis: time.Unix(1, 0).UnixMilli(), + }), + }) + + request := httptest.NewRequest(http.MethodDelete, "/sessions/session-1", nil) + request = request.WithContext(contextx.WithAccountID(request.Context(), accountID)) + request = mux.SetURLVars(request, map[string]string{"session_id": "session-1"}) + responseWriter := httptest.NewRecorder() + handlers.EndSession(responseWriter, request) + if responseWriter.Code != http.StatusOK { + t.Fatalf("unexpected delete status code: %d", responseWriter.Code) + } + + handlers.handleBabyStationsMessage(nil, testMQTTMessage{ + topic: "accounts/account-1/baby_stations", + payload: mustJSON(t, babyStationAnnouncement{ + SessionID: "session-2", + ClientID: clientID, + ConnectionID: connectionID, + Name: "second", + StartedAtMillis: time.Unix(2, 0).UnixMilli(), + }), + }) + + list := babystationlist.New(babystationlist.NewInput{ + EventLog: eventLog, + }) + output, err := list.GetSnapshot(contextx.WithAccountID(ctx, accountID)) + if err != nil { + t.Fatal(err) + } + if got := len(output.Snapshot.List()); got != 1 { + t.Fatalf("expected one baby station, got %d", got) + } + if got := output.Snapshot.List()[0].SessionID; got != "session-2" { + t.Fatalf("expected session-2, got %q", got) + } + if got := output.Snapshot.List()[0].Name; got != "second" { + t.Fatalf("expected second session name, got %q", got) + } +} + +func TestEventsStreamIncludesDeleteThenRestartOnSameConnection(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + folderPath, err := os.MkdirTemp("", "TestEventsStreamIncludesDeleteThenRestartOnSameConnection-*") + if err != nil { + t.Fatal(err) + } + eventLog := eventlog.NewFileEventLog(&eventlog.NewFileEventLogInput{ + FolderPath: folderPath, + }) + accountID := "account-1" + clientID := "client-1" + connectionID := "connection-1" + handlers := Handlers{ + EventLog: eventLog, + SessionStartDecider: sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ + EventLog: eventLog, + }), + } + accountCtx := contextx.WithAccountID(ctx, accountID) + if _, err := eventLog.Append(accountCtx, eventlog.AppendInput{ + Type: connections.EventTypeConnected, + AccountID: accountID, + Data: mustJSON(t, connections.EventConnected{ + ClientID: clientID, + ConnectionID: connectionID, + }), + }); err != nil { + t.Fatal(err) + } + handlers.handleBabyStationsMessage(nil, testMQTTMessage{ + topic: "accounts/account-1/baby_stations", + payload: mustJSON(t, babyStationAnnouncement{ + SessionID: "session-1", + ClientID: clientID, + ConnectionID: connectionID, + Name: "first", + StartedAtMillis: time.Unix(1, 0).UnixMilli(), + }), + }) + + request := httptest.NewRequest(http.MethodGet, "/events?from_cursor=2", nil) + request = request.WithContext(accountCtx) + responseWriter := httptest.NewRecorder() + done := make(chan struct{}) + go func() { + defer close(done) + handlers.GetEvents(responseWriter, request) + }() + + deleteRequest := httptest.NewRequest(http.MethodDelete, "/sessions/session-1", nil) + deleteRequest = deleteRequest.WithContext(contextx.WithAccountID(context.Background(), accountID)) + deleteRequest = mux.SetURLVars(deleteRequest, map[string]string{"session_id": "session-1"}) + deleteResponseWriter := httptest.NewRecorder() + handlers.EndSession(deleteResponseWriter, deleteRequest) + if deleteResponseWriter.Code != http.StatusOK { + t.Fatalf("unexpected delete status code: %d", deleteResponseWriter.Code) + } + handlers.handleBabyStationsMessage(nil, testMQTTMessage{ + topic: "accounts/account-1/baby_stations", + payload: mustJSON(t, babyStationAnnouncement{ + SessionID: "session-2", + ClientID: clientID, + ConnectionID: connectionID, + Name: "second", + StartedAtMillis: time.Unix(2, 0).UnixMilli(), + }), + }) + + time.Sleep(50 * time.Millisecond) + cancel() + <-done + + body := responseWriter.Body.String() + if !strings.Contains(body, `"type":"session.ended"`) { + t.Fatalf("expected session.ended in event stream, got %q", body) + } + if !strings.Contains(body, `"type":"session.started"`) { + t.Fatalf("expected session.started in event stream, got %q", body) + } + if !strings.Contains(body, `"id":"session-2"`) { + t.Fatalf("expected restarted session payload in event stream, got %q", body) + } +} + +type testMQTTMessage struct { + topic string + payload []byte +} + +func (message testMQTTMessage) Duplicate() bool { return false } +func (message testMQTTMessage) Qos() byte { return 1 } +func (message testMQTTMessage) Retained() bool { return false } +func (message testMQTTMessage) Topic() string { return message.topic } +func (message testMQTTMessage) MessageID() uint16 { + return 0 +} +func (message testMQTTMessage) Payload() []byte { return message.payload } +func (message testMQTTMessage) Ack() {} + +var _ mqtt.Message = testMQTTMessage{} + +func mustJSON(t *testing.T, value any) []byte { + t.Helper() + data, err := json.Marshal(value) + if err != nil { + t.Fatal(err) + } + return data +} + func BenchmarkSessionProjectionRealData(b *testing.B) { ctx := context.Background() eventLogPath := findBackendRealEventLogPath(b) diff --git a/golang/cmd/monitor-connection-status/main.go b/golang/cmd/monitor-connection-status/main.go index 0d394b4..6e7c682 100644 --- a/golang/cmd/monitor-connection-status/main.go +++ b/golang/cmd/monitor-connection-status/main.go @@ -21,7 +21,7 @@ func main() { if err != nil { panic(err) } - const topic = "accounts/+/connections/+/status" + const topic = "accounts/+/clients/+/status" err = mqttx.Wait(client.Subscribe(topic, 1, handleMessage)) if err != nil { panic(err) diff --git a/golang/internal/babystationlist/BabyStationList.go b/golang/internal/babystationlist/BabyStationList.go index 7aa00fa..5270e5a 100644 --- a/golang/internal/babystationlist/BabyStationList.go +++ b/golang/internal/babystationlist/BabyStationList.go @@ -21,10 +21,15 @@ func (snapshot *Snapshot) List() []BabyStation { babyStations := make([]BabyStation, 0, len(snapshot.ConnectionByID)) for connectionID, connection := range snapshot.ConnectionByID { sessionID, ok := snapshot.SessionIDByConnectionID[connectionID] - fatal.Unless(ok, "session not found: "+sessionID) + if !ok { + continue + } session, ok := snapshot.SessionByID[sessionID] - fatal.Unless(ok, "session not found: "+sessionID) + if !ok { + continue + } babyStation := BabyStation{ + SessionID: session.ID, Name: session.Name, ClientID: connection.ClientID, Connection: BabyStationConnection{ @@ -39,6 +44,7 @@ func (snapshot *Snapshot) List() []BabyStation { } type BabyStation struct { + SessionID string `json:"session_id"` Name string `json:"name"` ClientID string `json:"client_id"` Connection BabyStationConnection `json:"connection"` @@ -133,6 +139,9 @@ func (babyStationList *BabyStationList) applySessionStarted(event *eventlog.Even StartedAt: data.StartedAt, } snapshot := babyStationList.getOrCreateSnapshot(event.AccountID) + if previousSessionID, ok := snapshot.SessionIDByConnectionID[data.HostConnectionID]; ok && previousSessionID != data.ID { + delete(snapshot.SessionByID, previousSessionID) + } snapshot.SessionByID[data.ID] = &session snapshot.SessionIDByConnectionID[data.HostConnectionID] = data.ID } @@ -156,9 +165,8 @@ func (babyStationList *BabyStationList) applyConnected(event *eventlog.Event) { fatal.OnError(err) snapshot := babyStationList.getOrCreateSnapshot(event.AccountID) connection := Connection{ - ClientID: data.ClientID, - ID: data.ConnectionID, - RequestID: data.RequestID, + ClientID: data.ClientID, + ID: data.ConnectionID, } snapshot.ConnectionByID[data.ConnectionID] = &connection } diff --git a/golang/internal/babystationlist/BabyStationList_test.go b/golang/internal/babystationlist/BabyStationList_test.go index cc161d2..004854c 100644 --- a/golang/internal/babystationlist/BabyStationList_test.go +++ b/golang/internal/babystationlist/BabyStationList_test.go @@ -67,7 +67,6 @@ func TestBabyStationList(t *testing.T) { Data: fatal.UnlessMarshalJSON(connections.EventConnected{ ClientID: clientID, ConnectionID: connectionID, - RequestID: uuid.NewV4().String(), }), }) So(err, ShouldBeNil) @@ -106,7 +105,6 @@ func TestBabyStationList(t *testing.T) { Data: fatal.UnlessMarshalJSON(connections.EventConnected{ ClientID: clientID, ConnectionID: connectionID, - RequestID: uuid.NewV4().String(), }), }) So(err, ShouldBeNil) @@ -144,7 +142,6 @@ func TestBabyStationList(t *testing.T) { Data: fatal.UnlessMarshalJSON(connections.EventConnected{ ClientID: clientID, ConnectionID: connectionID, - RequestID: uuid.NewV4().String(), }), }) So(err, ShouldBeNil) @@ -220,7 +217,6 @@ func TestBabyStationList(t *testing.T) { Data: fatal.UnlessMarshalJSON(connections.EventConnected{ ClientID: clientID1, ConnectionID: connectionID1, - RequestID: uuid.NewV4().String(), }), }) So(err, ShouldBeNil) @@ -231,7 +227,6 @@ func TestBabyStationList(t *testing.T) { Data: fatal.UnlessMarshalJSON(connections.EventConnected{ ClientID: clientID2, ConnectionID: connectionID2, - RequestID: uuid.NewV4().String(), }), }) So(err, ShouldBeNil) @@ -264,6 +259,92 @@ func TestBabyStationList(t *testing.T) { So(babyStation.StartedAt, ShouldHappenWithin, time.Millisecond, sessionStartedAt1) } }) + Convey("Starting a new session on the same connection replaces the old one", func() { + clientID := uuid.NewV4().String() + connectionID := uuid.NewV4().String() + first := babystationlist.StartSessionEventData{ + ID: uuid.NewV4().String(), + Name: "First", + HostConnectionID: connectionID, + StartedAt: time.Now(), + } + _, err := eventLog.Append(ctx, eventlog.AppendInput{ + Type: babystationlist.EventTypeSessionStarted, + AccountID: accountID, + Data: fatal.UnlessMarshalJSON(first), + }) + So(err, ShouldBeNil) + _, err = eventLog.Append(ctx, eventlog.AppendInput{ + Type: connections.EventTypeConnected, + AccountID: accountID, + Data: fatal.UnlessMarshalJSON(connections.EventConnected{ + ClientID: clientID, + ConnectionID: connectionID, + }), + }) + So(err, ShouldBeNil) + + second := babystationlist.StartSessionEventData{ + ID: uuid.NewV4().String(), + Name: "Second", + HostConnectionID: connectionID, + StartedAt: time.Now().Add(time.Minute), + } + _, err = eventLog.Append(ctx, eventlog.AppendInput{ + Type: babystationlist.EventTypeSessionStarted, + AccountID: accountID, + Data: fatal.UnlessMarshalJSON(second), + }) + So(err, ShouldBeNil) + + output, err := babyStationList.GetSnapshot(ctx) + So(err, ShouldBeNil) + So(output.Snapshot.SessionByID, ShouldHaveLength, 1) + So(output.Snapshot.SessionIDByConnectionID, ShouldHaveLength, 1) + So(output.Snapshot.ConnectionByID, ShouldHaveLength, 1) + So(output.Snapshot.List(), ShouldHaveLength, 1) + babyStation := output.Snapshot.List()[0] + So(babyStation.SessionID, ShouldEqual, second.ID) + So(babyStation.Name, ShouldEqual, second.Name) + }) + Convey("Ending a session while the client stays connected should leave an empty list", func() { + clientID := uuid.NewV4().String() + connectionID := uuid.NewV4().String() + session := babystationlist.StartSessionEventData{ + ID: uuid.NewV4().String(), + Name: "First", + HostConnectionID: connectionID, + StartedAt: time.Now(), + } + _, err := eventLog.Append(ctx, eventlog.AppendInput{ + Type: babystationlist.EventTypeSessionStarted, + AccountID: accountID, + Data: fatal.UnlessMarshalJSON(session), + }) + So(err, ShouldBeNil) + _, err = eventLog.Append(ctx, eventlog.AppendInput{ + Type: connections.EventTypeConnected, + AccountID: accountID, + Data: fatal.UnlessMarshalJSON(connections.EventConnected{ + ClientID: clientID, + ConnectionID: connectionID, + }), + }) + So(err, ShouldBeNil) + _, err = eventLog.Append(ctx, eventlog.AppendInput{ + Type: babystationlist.EventTypeSessionEnded, + AccountID: accountID, + Data: fatal.UnlessMarshalJSON(babystationlist.EndSessionEventData{ + ID: session.ID, + }), + }) + So(err, ShouldBeNil) + + output, err := babyStationList.GetSnapshot(ctx) + So(err, ShouldBeNil) + So(func() { _ = output.Snapshot.List() }, ShouldNotPanic) + So(output.Snapshot.List(), ShouldHaveLength, 0) + }) }) } diff --git a/golang/internal/connections/Event.go b/golang/internal/connections/Event.go index a18ed27..b70178a 100644 --- a/golang/internal/connections/Event.go +++ b/golang/internal/connections/Event.go @@ -7,10 +7,15 @@ type EventConnected struct { ClientID string `json:"client_id"` ConnectionID string `json:"connection_id"` RequestID string `json:"request_id"` + AtMillis int64 `json:"at_millis"` } type EventDisconnected struct { ClientID string `json:"client_id"` ConnectionID string `json:"connection_id"` RequestID string `json:"request_id"` + AtMillis int64 `json:"at_millis"` + Disconnected struct { + Reason string `json:"reason"` + } `json:"disconnected"` } diff --git a/golang/internal/connectionstore/Decider.go b/golang/internal/connectionstore/Decider.go index 6486929..953a5c1 100644 --- a/golang/internal/connectionstore/Decider.go +++ b/golang/internal/connectionstore/Decider.go @@ -15,10 +15,11 @@ import ( var ErrDuplicate = errors.New("duplicate") type Connection struct { - ID string - AccountID string - ClientID string - RequestID string + ID string + AccountID string + ClientID string + AtMillis int64 + DisconnectReason string } type ApplyFunc func(ctx context.Context, event *eventlog.Event) error @@ -64,7 +65,7 @@ func (decider *Decider) Put(ctx context.Context, connection Connection) error { data, err := json.Marshal(connections.EventConnected{ ClientID: connection.ClientID, ConnectionID: connection.ID, - RequestID: connection.RequestID, + AtMillis: connection.AtMillis, }) fatal.OnError(err) _, err = decider.eventLog.Append(ctx, eventlog.AppendInput{ @@ -90,7 +91,12 @@ func (decider *Decider) Delete(ctx context.Context, connection Connection) error data, err := json.Marshal(connections.EventDisconnected{ ClientID: connection.ClientID, ConnectionID: connection.ID, - RequestID: connection.RequestID, + AtMillis: connection.AtMillis, + Disconnected: struct { + Reason string `json:"reason"` + }{ + Reason: connection.DisconnectReason, + }, }) fatal.OnError(err) _, err = decider.eventLog.Append(ctx, eventlog.AppendInput{ @@ -136,9 +142,9 @@ func (decider *Decider) applyConnected(ctx context.Context, event *eventlog.Even ID: data.ConnectionID, AccountID: event.AccountID, ClientID: data.ClientID, - RequestID: data.RequestID, } key := decider.getKey(connection) + delete(decider.disconnectedKeySet, key) decider.connectedKeySet[key] = struct{}{} return nil } @@ -151,13 +157,13 @@ func (decider *Decider) applyDisconnected(ctx context.Context, event *eventlog.E ID: data.ConnectionID, AccountID: event.AccountID, ClientID: data.ClientID, - RequestID: data.RequestID, } key := decider.getKey(connection) + delete(decider.connectedKeySet, key) decider.disconnectedKeySet[key] = struct{}{} return nil } func (decider *Decider) getKey(connection Connection) string { - return fmt.Sprintf("accounts/%s/clients/%s/connections/%s/requests/%s", connection.AccountID, connection.ClientID, connection.ID, connection.RequestID) + return fmt.Sprintf("accounts/%s/clients/%s/connections/%s", connection.AccountID, connection.ClientID, connection.ID) } diff --git a/golang/internal/connectionstore/Decider_test.go b/golang/internal/connectionstore/Decider_test.go index 9e9bfee..c3f97b5 100644 --- a/golang/internal/connectionstore/Decider_test.go +++ b/golang/internal/connectionstore/Decider_test.go @@ -28,7 +28,6 @@ func TestDecider(t *testing.T) { ID: uuid.NewV4().String(), AccountID: uuid.NewV4().String(), ClientID: uuid.NewV4().String(), - RequestID: uuid.NewV4().String(), } Convey("Put", func() { err = decider.Put(ctx, connection) @@ -44,7 +43,6 @@ func TestDecider(t *testing.T) { ID: uuid.NewV4().String(), AccountID: uuid.NewV4().String(), ClientID: uuid.NewV4().String(), - RequestID: uuid.NewV4().String(), } err = decider.Put(ctx, connection) So(err, ShouldBeNil) @@ -56,6 +54,12 @@ func TestDecider(t *testing.T) { So(err, ShouldEqual, connectionstore.ErrDuplicate) }) }) + Convey("Reconnect after disconnect", func() { + err = decider.Delete(ctx, connection) + So(err, ShouldBeNil) + err = decider.Put(ctx, connection) + So(err, ShouldBeNil) + }) }) Convey("Multiple Connections", func() { Convey("Multiple Accounts", func() { @@ -64,13 +68,11 @@ func TestDecider(t *testing.T) { ID: uuid.NewV4().String(), AccountID: uuid.NewV4().String(), ClientID: uuid.NewV4().String(), - RequestID: uuid.NewV4().String(), }, { ID: uuid.NewV4().String(), AccountID: uuid.NewV4().String(), ClientID: uuid.NewV4().String(), - RequestID: uuid.NewV4().String(), }, } for _, connection := range connections { diff --git a/golang/internal/connectionstoresync/Sync.go b/golang/internal/connectionstoresync/Sync.go index 7477c91..b449f92 100644 --- a/golang/internal/connectionstoresync/Sync.go +++ b/golang/internal/connectionstoresync/Sync.go @@ -6,25 +6,30 @@ import ( "log" "regexp" - "github.com/Ryan-A-B/beddybytes/golang/internal/connections" "github.com/Ryan-A-B/beddybytes/golang/internal/connectionstore" + "github.com/Ryan-A-B/beddybytes/golang/internal/contextx" + "github.com/Ryan-A-B/beddybytes/golang/internal/eventlog" "github.com/Ryan-A-B/beddybytes/golang/internal/fatal" "github.com/Ryan-A-B/beddybytes/golang/internal/logx" - "github.com/Ryan-A-B/beddybytes/golang/internal/messages" "github.com/Ryan-A-B/beddybytes/golang/internal/mqttx" + "github.com/Ryan-A-B/beddybytes/golang/internal/sessionlist" mqtt "github.com/eclipse/paho.mqtt.golang" ) var connectionStore *connectionstore.Decider -var handlers = map[messages.MessageType]func(ctx context.Context, input handlerInput){ - connections.MessageTypeConnected: handleConnectedMessage, - connections.MessageTypeDisconnected: handleDisconnectedMessage, +var sessionListStore *sessionlist.SessionList +var eventLogStore eventlog.EventLog +var handlers = map[string]func(ctx context.Context, input handlerInput){ + "connected": handleConnectedMessage, + "disconnected": handleDisconnectedMessage, } -var topicRegex = regexp.MustCompile(`^accounts/([^/]+)/connections/([^/]+)/status$`) +var topicRegex = regexp.MustCompile(`^accounts/([^/]+)/clients/([^/]+)/status$`) type RunInput struct { MQTTClient mqtt.Client ConnectionStore *connectionstore.Decider + SessionList *sessionlist.SessionList + EventLog eventlog.EventLog } func Run(ctx context.Context, input RunInput) { @@ -32,55 +37,86 @@ func Run(ctx context.Context, input RunInput) { panic("already running") } connectionStore = input.ConnectionStore - err := mqttx.Wait(input.MQTTClient.Subscribe("accounts/+/connections/+/status", 1, handleMessage)) + sessionListStore = input.SessionList + eventLogStore = input.EventLog + err := mqttx.Wait(input.MQTTClient.Subscribe("accounts/+/clients/+/status", 1, handleMessage)) fatal.OnError(err) <-ctx.Done() } func handleMessage(client mqtt.Client, message mqtt.Message) { ctx := context.Background() - var frame connections.MessageFrame - err := json.Unmarshal(message.Payload(), &frame) + var status statusMessage + err := json.Unmarshal(message.Payload(), &status) if err != nil { logx.Warnln(err) return } var accountID string - var connectionID string + var clientID string matches := topicRegex.FindStringSubmatch(message.Topic()) fatal.Unless(len(matches) == 3, "failed to parse topic: "+message.Topic()) - accountID, connectionID = matches[1], matches[2] - handle, ok := handlers[frame.Type] + accountID, clientID = matches[1], matches[2] + handle, ok := handlers[status.Type] if !ok { - logx.Warnln("unhandled message type:", frame.Type) + logx.Warnln("unhandled message type:", status.Type) return } handle(ctx, handlerInput{ - AccountID: accountID, - ConnectionID: connectionID, - Frame: frame, + AccountID: accountID, + ClientID: clientID, + Status: status, }) message.Ack() } func handleConnectedMessage(ctx context.Context, input handlerInput) { err := connectionStore.Put(ctx, connectionstore.Connection{ - ID: input.ConnectionID, + ID: input.Status.ConnectionID, AccountID: input.AccountID, - ClientID: input.Frame.Connected.ClientID, - RequestID: input.Frame.Connected.RequestID, + ClientID: input.ClientID, + AtMillis: input.Status.AtMillis, }) if err != nil { + if err == connectionstore.ErrDuplicate { + return + } log.Fatal(err) } } func handleDisconnectedMessage(ctx context.Context, input handlerInput) { err := connectionStore.Delete(ctx, connectionstore.Connection{ - ID: input.ConnectionID, + ID: input.Status.ConnectionID, + AccountID: input.AccountID, + ClientID: input.ClientID, + AtMillis: input.Status.AtMillis, + DisconnectReason: input.Status.Disconnected.Reason, + }) + if err != nil { + if err == connectionstore.ErrDuplicate { + return + } + log.Fatal(err) + } + if input.Status.Disconnected.Reason != "clean" { + return + } + sessionCtx := contextx.WithAccountID(ctx, input.AccountID) + session, ok := sessionListStore.GetByConnectionID(sessionCtx, input.Status.ConnectionID) + if !ok { + return + } + data, err := json.Marshal(struct { + ID string `json:"id"` + }{ + ID: session.ID, + }) + fatal.OnError(err) + _, err = eventLogStore.Append(sessionCtx, eventlog.AppendInput{ + Type: "session.ended", AccountID: input.AccountID, - ClientID: input.Frame.Disconnected.ClientID, - RequestID: input.Frame.Disconnected.RequestID, + Data: data, }) if err != nil { log.Fatal(err) @@ -88,7 +124,16 @@ func handleDisconnectedMessage(ctx context.Context, input handlerInput) { } type handlerInput struct { - AccountID string - ConnectionID string - Frame connections.MessageFrame + AccountID string + ClientID string + Status statusMessage +} + +type statusMessage struct { + Type string `json:"type"` + ConnectionID string `json:"connection_id"` + AtMillis int64 `json:"at_millis"` + Disconnected struct { + Reason string `json:"reason"` + } `json:"disconnected"` } diff --git a/golang/internal/sessionlist/SessionList.go b/golang/internal/sessionlist/SessionList.go index 01d7db0..3b97fd3 100644 --- a/golang/internal/sessionlist/SessionList.go +++ b/golang/internal/sessionlist/SessionList.go @@ -57,6 +57,26 @@ func (sessionList *SessionList) List(ctx context.Context) (output ListOutput) { return } +func (sessionList *SessionList) GetByConnectionID(ctx context.Context, connectionID string) (session *Session, ok bool) { + sessionList.catchUp(ctx) + accountID := contextx.GetAccountID(ctx) + return sessionList.getSessionByConnectionID(accountID, connectionID) +} + +func (sessionList *SessionList) Get(ctx context.Context, sessionID string) (session *Session, ok bool) { + sessionList.catchUp(ctx) + accountID := contextx.GetAccountID(ctx) + index := sessionList.search(accountID, sessionID) + if index == len(sessionList.sessions) { + return nil, false + } + session = sessionList.sessions[index] + if session.AccountID != accountID || session.ID != sessionID { + return nil, false + } + return session, true +} + func (sessionList *SessionList) catchUp(ctx context.Context) { sessionList.mutex.Lock() defer sessionList.mutex.Unlock() diff --git a/golang/internal/sessionstartdecider/Decider.go b/golang/internal/sessionstartdecider/Decider.go new file mode 100644 index 0000000..4af950a --- /dev/null +++ b/golang/internal/sessionstartdecider/Decider.go @@ -0,0 +1,144 @@ +package sessionstartdecider + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sync" + "time" + + "github.com/Ryan-A-B/beddybytes/golang/internal/eventlog" +) + +const EventTypeSessionStarted = "session.started" +const EventTypeSessionEnded = "session.ended" + +var ErrDuplicate = errors.New("duplicate") + +type Session struct { + AccountID string + ID string + Name string + HostConnectionID string + StartedAt time.Time +} + +type Decider struct { + eventLog eventlog.EventLog + mutex sync.Mutex + cursor int64 + sessionKeyByConnectionKey map[string]string + connectionKeyBySessionKey map[string]string + endedSessionKeySet map[string]struct{} +} + +type NewDeciderInput struct { + EventLog eventlog.EventLog +} + +func NewDecider(input NewDeciderInput) *Decider { + return &Decider{ + eventLog: input.EventLog, + sessionKeyByConnectionKey: make(map[string]string), + connectionKeyBySessionKey: make(map[string]string), + endedSessionKeySet: make(map[string]struct{}), + } +} + +func (decider *Decider) Put(ctx context.Context, session Session) error { + decider.mutex.Lock() + defer decider.mutex.Unlock() + + if err := decider.catchUp(ctx); err != nil { + return err + } + connectionKey := decider.getKey(session.AccountID, session.HostConnectionID) + sessionKey := decider.getSessionKey(session.AccountID, session.ID) + if _, ok := decider.endedSessionKeySet[sessionKey]; ok { + return ErrDuplicate + } + if existingSessionKey, ok := decider.sessionKeyByConnectionKey[connectionKey]; ok && existingSessionKey == sessionKey { + return ErrDuplicate + } + data, err := json.Marshal(sessionStartedEventData{ + ID: session.ID, + Name: session.Name, + HostConnectionID: session.HostConnectionID, + StartedAt: session.StartedAt, + }) + if err != nil { + return err + } + _, err = decider.eventLog.Append(ctx, eventlog.AppendInput{ + Type: EventTypeSessionStarted, + AccountID: session.AccountID, + Data: data, + }) + return err +} + +func (decider *Decider) catchUp(ctx context.Context) error { + iterator := decider.eventLog.GetEventIterator(ctx, eventlog.GetEventIteratorInput{ + FromCursor: decider.cursor, + }) + for iterator.Next(ctx) { + event := iterator.Event() + if event.Type != EventTypeSessionStarted { + if event.Type == EventTypeSessionEnded { + var data sessionEndedEventData + if err := json.Unmarshal(event.Data, &data); err != nil { + return err + } + sessionKey := decider.getSessionKey(event.AccountID, data.ID) + connectionKey, ok := decider.connectionKeyBySessionKey[sessionKey] + if ok { + delete(decider.sessionKeyByConnectionKey, connectionKey) + delete(decider.connectionKeyBySessionKey, sessionKey) + } + decider.endedSessionKeySet[sessionKey] = struct{}{} + } + decider.cursor = event.LogicalClock + continue + } + var data sessionStartedEventData + if err := json.Unmarshal(event.Data, &data); err != nil { + return err + } + connectionKey := decider.getKey(event.AccountID, data.HostConnectionID) + sessionKey := decider.getSessionKey(event.AccountID, data.ID) + if _, ended := decider.endedSessionKeySet[sessionKey]; ended { + decider.cursor = event.LogicalClock + continue + } + if replacedSessionKey, ok := decider.sessionKeyByConnectionKey[connectionKey]; ok && replacedSessionKey != sessionKey { + delete(decider.connectionKeyBySessionKey, replacedSessionKey) + } + if replacedConnectionKey, ok := decider.connectionKeyBySessionKey[sessionKey]; ok && replacedConnectionKey != connectionKey { + delete(decider.sessionKeyByConnectionKey, replacedConnectionKey) + } + decider.sessionKeyByConnectionKey[connectionKey] = sessionKey + decider.connectionKeyBySessionKey[sessionKey] = connectionKey + decider.cursor = event.LogicalClock + } + return iterator.Err() +} + +func (decider *Decider) getKey(accountID string, connectionID string) string { + return fmt.Sprintf("accounts/%s/connections/%s", accountID, connectionID) +} + +func (decider *Decider) getSessionKey(accountID string, sessionID string) string { + return fmt.Sprintf("accounts/%s/sessions/%s", accountID, sessionID) +} + +type sessionStartedEventData struct { + ID string `json:"id"` + Name string `json:"name"` + HostConnectionID string `json:"host_connection_id"` + StartedAt time.Time `json:"started_at"` +} + +type sessionEndedEventData struct { + ID string `json:"id"` +} diff --git a/golang/internal/sessionstartdecider/Decider_test.go b/golang/internal/sessionstartdecider/Decider_test.go new file mode 100644 index 0000000..bb3b9d1 --- /dev/null +++ b/golang/internal/sessionstartdecider/Decider_test.go @@ -0,0 +1,132 @@ +package sessionstartdecider_test + +import ( + "context" + "encoding/json" + "os" + "testing" + "time" + + "github.com/Ryan-A-B/beddybytes/golang/internal/eventlog" + "github.com/Ryan-A-B/beddybytes/golang/internal/sessionstartdecider" +) + +func TestDeciderAllowsRestartAfterSessionEnded(t *testing.T) { + ctx := context.Background() + folderPath, err := os.MkdirTemp("", "TestDeciderAllowsRestartAfterSessionEnded-*") + if err != nil { + t.Fatal(err) + } + eventLog := eventlog.NewFileEventLog(&eventlog.NewFileEventLogInput{ + FolderPath: folderPath, + }) + decider := sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ + EventLog: eventLog, + }) + session := sessionstartdecider.Session{ + AccountID: "account-1", + ID: "session-1", + Name: "nursery", + HostConnectionID: "connection-1", + StartedAt: time.Unix(1, 0).UTC(), + } + if err := decider.Put(ctx, session); err != nil { + t.Fatal(err) + } + data, err := json.Marshal(struct { + ID string `json:"id"` + }{ + ID: session.ID, + }) + if err != nil { + t.Fatal(err) + } + if _, err := eventLog.Append(ctx, eventlog.AppendInput{ + Type: "session.ended", + AccountID: session.AccountID, + Data: data, + }); err != nil { + t.Fatal(err) + } + restarted := session + restarted.ID = "session-2" + restarted.StartedAt = restarted.StartedAt.Add(time.Minute) + if err := decider.Put(ctx, restarted); err != nil { + t.Fatalf("expected session start after clean end to succeed, got %v", err) + } +} + +func TestDeciderAllowsRestartWithoutSessionEnded(t *testing.T) { + ctx := context.Background() + folderPath, err := os.MkdirTemp("", "TestDeciderAllowsRestartWithoutSessionEnded-*") + if err != nil { + t.Fatal(err) + } + eventLog := eventlog.NewFileEventLog(&eventlog.NewFileEventLogInput{ + FolderPath: folderPath, + }) + decider := sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ + EventLog: eventLog, + }) + first := sessionstartdecider.Session{ + AccountID: "account-1", + ID: "session-1", + Name: "nursery", + HostConnectionID: "connection-1", + StartedAt: time.Unix(1, 0).UTC(), + } + if err := decider.Put(ctx, first); err != nil { + t.Fatal(err) + } + second := first + second.ID = "session-2" + second.StartedAt = second.StartedAt.Add(time.Minute) + if err := decider.Put(ctx, second); err != nil { + t.Fatalf("expected replacement start without session.ended, got %v", err) + } +} + +func TestDeciderRejectsLateStartForEndedSession(t *testing.T) { + ctx := context.Background() + folderPath, err := os.MkdirTemp("", "TestDeciderRejectsLateStartForEndedSession-*") + if err != nil { + t.Fatal(err) + } + eventLog := eventlog.NewFileEventLog(&eventlog.NewFileEventLogInput{ + FolderPath: folderPath, + }) + decider := sessionstartdecider.NewDecider(sessionstartdecider.NewDeciderInput{ + EventLog: eventLog, + }) + session := sessionstartdecider.Session{ + AccountID: "account-1", + ID: "session-1", + Name: "nursery", + HostConnectionID: "connection-1", + StartedAt: time.Unix(1, 0).UTC(), + } + data, err := json.Marshal(struct { + ID string `json:"id"` + }{ + ID: session.ID, + }) + if err != nil { + t.Fatal(err) + } + if _, err := eventLog.Append(ctx, eventlog.AppendInput{ + Type: "session.ended", + AccountID: session.AccountID, + Data: data, + }); err != nil { + t.Fatal(err) + } + if err := decider.Put(ctx, session); err != sessionstartdecider.ErrDuplicate { + t.Fatalf("expected late start for ended session to be rejected, got %v", err) + } + restarted := session + restarted.ID = "session-2" + restarted.StartedAt = restarted.StartedAt.Add(time.Minute) + if err := decider.Put(ctx, restarted); err != nil { + t.Fatalf("expected new session after stale late start to succeed, got %v", err) + } +}