Skip to content

Commit 9a9c695

Browse files
committed
feat(integration): Phase 3 — motion SSE for Home Assistant binary_sensors
GET /api/integration/motion/stream — a Server-Sent Events feed of motion detections across all of the org's cameras, the source for HA motion binary_sensors. Reuses the org-wide motion pipeline the dashboard already consumes (nodes push motion over WS → broadcast), emitting the same {type:"motion", camera_id, node_id, score, timestamp} events with a 25s keepalive. Crucially it uses a SEPARATE subscriber pool (integration_motion_broadcaster) from the dashboard, so a persistent HA connection never consumes a dashboard SSE slot — the motion publisher in ws.py now fans out to both pools. Capped at INTEGRATION_MAX_SSE_SUBSCRIBERS (10) streams per org, independent of the dashboard's per-tier cap. No plan gate (all tiers). Tests: broadcaster is a distinct instance + delivers; the two pools are independent (filling one doesn't block the other); cap is per-org; endpoint 401 without a key and 429 when the pool is full. Full suite 660 passed. With this the backend HA story is complete: cameras (LAN-direct video + snapshots + recording) + motion. Remaining: Phase 1b key UI; Phase 2b off-LAN proxy; Phase 4 the HACS component.
1 parent 773c140 commit 9a9c695

4 files changed

Lines changed: 189 additions & 4 deletions

File tree

backend/app/api/integration.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@
1010
``app/api/mcp_keys.py`` for why the two key kinds can't cross surfaces.
1111
"""
1212

13+
import asyncio
1314
import hashlib
15+
import json
1416
import logging
1517
import secrets
1618

1719
from fastapi import APIRouter, Depends, HTTPException, Request, Response
20+
from fastapi.responses import StreamingResponse
1821
from sqlalchemy.orm import Session
1922

2023
from app.core.audit import audit_label, write_audit
@@ -31,6 +34,13 @@
3134

3235
KEY_PREFIX = "osi_"
3336

37+
# Per-org cap on concurrent integration motion-SSE streams. This is a
38+
# SEPARATE pool from the dashboard (see integration_motion_broadcaster in
39+
# motion.py), so an HA connection never consumes a dashboard subscriber
40+
# slot. A small fixed cap is plenty — a home runs one or two HA instances —
41+
# and bounds memory against a scripted connect loop.
42+
INTEGRATION_MAX_SSE_SUBSCRIBERS = 10
43+
3444

3545
def _generate_key() -> str:
3646
"""Generate a random integration API key: ``osi_`` + 32 hex chars."""
@@ -355,3 +365,63 @@ async def integration_status(
355365
"items": node_items,
356366
},
357367
}
368+
369+
370+
@router.get("/motion/stream")
371+
@limiter.limit("60/minute")
372+
async def integration_motion_stream(
373+
request: Request,
374+
user: AuthUser = Depends(require_integration_org),
375+
):
376+
"""Server-Sent Events feed of motion detections across ALL of the org's
377+
cameras — the source for Home Assistant motion ``binary_sensor``s.
378+
379+
Reuses the same org-wide motion pipeline the dashboard consumes (nodes
380+
push motion over WS → broadcast), but via a SEPARATE subscriber pool
381+
(``integration_motion_broadcaster``) so a persistent HA connection never
382+
eats into the dashboard's per-tier SSE cap.
383+
384+
Each event is ``{type:"motion", camera_id, node_id, score, timestamp}``;
385+
a ``": keepalive"`` comment is sent every 25s to hold the connection
386+
open. Capped at ``INTEGRATION_MAX_SSE_SUBSCRIBERS`` streams per org (429
387+
past that).
388+
"""
389+
from app.api.motion import integration_motion_broadcaster
390+
391+
org_id = user.org_id
392+
queue = integration_motion_broadcaster.subscribe(
393+
org_id, INTEGRATION_MAX_SSE_SUBSCRIBERS
394+
)
395+
if queue is None:
396+
raise HTTPException(
397+
status_code=429,
398+
detail=(
399+
f"Too many open integration motion streams for this org "
400+
f"(cap: {INTEGRATION_MAX_SSE_SUBSCRIBERS}). Close unused "
401+
f"connections and retry."
402+
),
403+
)
404+
405+
async def event_generator():
406+
try:
407+
yield f"data: {json.dumps({'type': 'connected', 'org_id': org_id})}\n\n"
408+
while True:
409+
try:
410+
event = await asyncio.wait_for(queue.get(), timeout=25.0)
411+
yield f"data: {json.dumps(event)}\n\n"
412+
except TimeoutError:
413+
yield ": keepalive\n\n"
414+
except asyncio.CancelledError:
415+
pass
416+
finally:
417+
integration_motion_broadcaster.unsubscribe(org_id, queue)
418+
419+
return StreamingResponse(
420+
event_generator(),
421+
media_type="text/event-stream",
422+
headers={
423+
"Cache-Control": "no-cache",
424+
"Connection": "keep-alive",
425+
"X-Accel-Buffering": "no",
426+
},
427+
)

backend/app/api/motion.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,13 @@ def unsubscribe(self, org_id: str, q: asyncio.Queue):
9292
# Singleton — imported by ws.py to broadcast events.
9393
motion_broadcaster = MotionBroadcaster()
9494

95+
# Second, independent pool for the Home Assistant integration SSE
96+
# (app/api/integration.py). The motion publisher in ws.py fans out to BOTH,
97+
# so integration subscribers get the same org-wide events — but a persistent
98+
# HA connection lives in its own pool and never consumes a dashboard SSE
99+
# subscriber slot (the dashboard cap and the integration cap are separate).
100+
integration_motion_broadcaster = MotionBroadcaster()
101+
95102

96103
@router.get("/events")
97104
async def list_motion_events(

backend/app/api/ws.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -591,17 +591,24 @@ async def _handle_motion_event(node_id: str, org_id: str, payload: dict):
591591
camera_id, score_int, node_id,
592592
)
593593

594-
# Broadcast to the motion SSE stream so live dashboards show
594+
# Broadcast to the motion SSE streams so live dashboards show
595595
# toasts immediately; the inbox notification below handles the
596596
# durable history.
597-
from app.api.motion import motion_broadcaster
598-
motion_broadcaster.notify(org_id, {
597+
from app.api.motion import (
598+
integration_motion_broadcaster,
599+
motion_broadcaster,
600+
)
601+
motion_payload = {
599602
"type": "motion",
600603
"camera_id": camera_id,
601604
"node_id": node_id,
602605
"score": score_int,
603606
"timestamp": ts.isoformat(),
604-
})
607+
}
608+
motion_broadcaster.notify(org_id, motion_payload)
609+
# Mirror to the Home Assistant integration SSE (separate pool so HA
610+
# doesn't consume dashboard subscriber slots — see motion.py).
611+
integration_motion_broadcaster.notify(org_id, motion_payload)
605612

606613

607614
# Also emit an inbox notification so the user can see motion
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
"""Phase 3 integration motion-SSE tests.
2+
3+
The integration motion feed reuses the org-wide motion pipeline but via a
4+
SEPARATE subscriber pool so a persistent Home Assistant connection never
5+
consumes a dashboard SSE slot. These cover the separate-pool design, the
6+
per-org cap, and the endpoint's auth + 429 (the streaming success path is
7+
left to the dashboard SSE's own coverage — it's an identical generator —
8+
since a TestClient GET on an infinite stream would block).
9+
"""
10+
11+
import hashlib
12+
13+
import pytest
14+
15+
from app.api.integration import INTEGRATION_MAX_SSE_SUBSCRIBERS
16+
from app.api.motion import integration_motion_broadcaster, motion_broadcaster
17+
from app.models.models import McpApiKey
18+
19+
ORG = "org_test123"
20+
RAW_KEY = "osi_phase3testkey00000000000000000000"
21+
22+
23+
@pytest.fixture(autouse=True)
24+
def _clear_pools():
25+
"""Both broadcasters are module singletons — reset their subscriber
26+
pools around each test so cap/separation assertions are deterministic."""
27+
integration_motion_broadcaster._subscribers.clear()
28+
motion_broadcaster._subscribers.clear()
29+
yield
30+
integration_motion_broadcaster._subscribers.clear()
31+
motion_broadcaster._subscribers.clear()
32+
33+
34+
def _make_integration_key(db, org=ORG, raw=RAW_KEY):
35+
db.add(McpApiKey(
36+
org_id=org,
37+
key_hash=hashlib.sha256(raw.encode()).hexdigest(),
38+
name="Home Assistant",
39+
kind="integration",
40+
))
41+
db.commit()
42+
43+
44+
# ── Separate-pool design ────────────────────────────────────────────
45+
46+
def test_integration_broadcaster_is_a_separate_instance():
47+
assert integration_motion_broadcaster is not motion_broadcaster
48+
49+
50+
def test_integration_broadcaster_delivers_events():
51+
q = integration_motion_broadcaster.subscribe(ORG, INTEGRATION_MAX_SSE_SUBSCRIBERS)
52+
payload = {"type": "motion", "camera_id": "cam_a", "score": 80}
53+
integration_motion_broadcaster.notify(ORG, payload)
54+
assert q.get_nowait() == payload
55+
56+
57+
def test_pools_are_independent():
58+
"""Filling the integration pool to its cap must NOT block the dashboard
59+
pool (and vice versa) — HA connections and dashboard tabs don't contend."""
60+
for _ in range(INTEGRATION_MAX_SSE_SUBSCRIBERS):
61+
assert integration_motion_broadcaster.subscribe(
62+
ORG, INTEGRATION_MAX_SSE_SUBSCRIBERS
63+
) is not None
64+
# Integration pool is now full...
65+
assert integration_motion_broadcaster.subscribe(
66+
ORG, INTEGRATION_MAX_SSE_SUBSCRIBERS
67+
) is None
68+
# ...but the dashboard pool is entirely untouched.
69+
assert motion_broadcaster.subscribe(ORG, 5) is not None
70+
71+
72+
def test_cap_is_per_org():
73+
"""One org filling its integration pool doesn't affect another org."""
74+
for _ in range(INTEGRATION_MAX_SSE_SUBSCRIBERS):
75+
integration_motion_broadcaster.subscribe(ORG, INTEGRATION_MAX_SSE_SUBSCRIBERS)
76+
assert integration_motion_broadcaster.subscribe(ORG, INTEGRATION_MAX_SSE_SUBSCRIBERS) is None
77+
# A different org has its own budget.
78+
assert integration_motion_broadcaster.subscribe(
79+
"org_other", INTEGRATION_MAX_SSE_SUBSCRIBERS
80+
) is not None
81+
82+
83+
# ── Endpoint ────────────────────────────────────────────────────────
84+
85+
def test_motion_stream_requires_key(unauthenticated_client):
86+
# 401 is raised by the auth dependency before any streaming begins.
87+
assert unauthenticated_client.get("/api/integration/motion/stream").status_code == 401
88+
89+
90+
def test_motion_stream_429_when_pool_full(unauthenticated_client, db):
91+
"""A valid key still 429s when the org's integration pool is full — the
92+
429 is raised before the StreamingResponse, so the GET returns cleanly."""
93+
_make_integration_key(db)
94+
for _ in range(INTEGRATION_MAX_SSE_SUBSCRIBERS):
95+
integration_motion_broadcaster.subscribe(ORG, INTEGRATION_MAX_SSE_SUBSCRIBERS)
96+
97+
resp = unauthenticated_client.get(
98+
"/api/integration/motion/stream",
99+
headers={"Authorization": f"Bearer {RAW_KEY}"},
100+
)
101+
assert resp.status_code == 429

0 commit comments

Comments
 (0)