Skip to content

Commit e52bbce

Browse files
Sbussisoclaude
andcommitted
feat: receive and store motion detection events from CloudNodes
Handles the new "event" WS message type with "motion_detected" command, persisting MotionEvent rows per org. Adds REST endpoints for querying motion events with camera filtering, time windows, and per-camera aggregate stats. - MotionEvent model (motion_events table) - WS handler for motion_detected events - GET /api/motion/events + /api/motion/events/stats - Motion events included in log retention cleanup - 5 new tests (26 total, all passing) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7f6e823 commit e52bbce

6 files changed

Lines changed: 280 additions & 6 deletions

File tree

backend/app/api/motion.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
"""
2+
Motion Events API — query motion detection events reported by CloudNodes.
3+
"""
4+
5+
import logging
6+
from datetime import datetime, timedelta, timezone
7+
from typing import Optional
8+
9+
from fastapi import APIRouter, Depends, Query
10+
from sqlalchemy.orm import Session
11+
12+
from app.core.auth import AuthUser, require_view
13+
from app.core.database import get_db
14+
from app.models.models import MotionEvent
15+
16+
logger = logging.getLogger(__name__)
17+
18+
router = APIRouter(prefix="/api/motion", tags=["motion"])
19+
20+
21+
@router.get("/events")
22+
async def list_motion_events(
23+
camera_id: Optional[str] = None,
24+
hours: int = Query(default=24, le=168),
25+
limit: int = Query(default=100, le=500),
26+
offset: int = Query(default=0, ge=0),
27+
user: AuthUser = Depends(require_view),
28+
db: Session = Depends(get_db),
29+
):
30+
"""List recent motion events, optionally filtered by camera."""
31+
since = datetime.now(tz=timezone.utc).replace(tzinfo=None) - timedelta(hours=hours)
32+
33+
query = db.query(MotionEvent).filter(
34+
MotionEvent.org_id == user.org_id,
35+
MotionEvent.timestamp >= since,
36+
)
37+
38+
if camera_id:
39+
query = query.filter(MotionEvent.camera_id == camera_id)
40+
41+
total = query.count()
42+
events = (
43+
query.order_by(MotionEvent.timestamp.desc())
44+
.offset(offset)
45+
.limit(limit)
46+
.all()
47+
)
48+
49+
return {
50+
"total": total,
51+
"limit": limit,
52+
"offset": offset,
53+
"hours": hours,
54+
"events": [e.to_dict() for e in events],
55+
}
56+
57+
58+
@router.get("/events/stats")
59+
async def motion_stats(
60+
hours: int = Query(default=24, le=168),
61+
user: AuthUser = Depends(require_view),
62+
db: Session = Depends(get_db),
63+
):
64+
"""Aggregate motion stats per camera over the given time window."""
65+
from sqlalchemy import func
66+
67+
since = datetime.now(tz=timezone.utc).replace(tzinfo=None) - timedelta(hours=hours)
68+
69+
rows = (
70+
db.query(
71+
MotionEvent.camera_id,
72+
func.count(MotionEvent.id).label("count"),
73+
func.max(MotionEvent.score).label("peak_score"),
74+
func.max(MotionEvent.timestamp).label("latest"),
75+
)
76+
.filter(
77+
MotionEvent.org_id == user.org_id,
78+
MotionEvent.timestamp >= since,
79+
)
80+
.group_by(MotionEvent.camera_id)
81+
.all()
82+
)
83+
84+
return {
85+
"hours": hours,
86+
"cameras": [
87+
{
88+
"camera_id": r.camera_id,
89+
"event_count": r.count,
90+
"peak_score": r.peak_score,
91+
"latest": r.latest.isoformat() if r.latest else None,
92+
}
93+
for r in rows
94+
],
95+
}

backend/app/api/ws.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from sqlalchemy.orm import Session
3030

3131
from app.core.database import SessionLocal
32-
from app.models import CameraNode, Camera
32+
from app.models import CameraNode, Camera, MotionEvent
3333

3434
logger = logging.getLogger(__name__)
3535

@@ -162,6 +162,14 @@ async def node_websocket(
162162
if correlation_id:
163163
manager.resolve_command(correlation_id, data.get("payload", {}))
164164

165+
elif msg_type == "event":
166+
command = data.get("command")
167+
payload = data.get("payload", {})
168+
if command == "motion_detected":
169+
await _handle_motion_event(node_id, org_id, payload)
170+
else:
171+
logger.debug("Unhandled event command from node %s: %s", node_id, command)
172+
165173
else:
166174
logger.warning("Unknown WS message type from node %s: %s", node_id, msg_type)
167175
await ws.send_json({
@@ -217,3 +225,48 @@ async def _handle_heartbeat(node_id: str, node_db_id: int, org_id: str, payload:
217225
db.rollback()
218226
finally:
219227
db.close()
228+
229+
230+
# ── Motion Event Handler ─────────────────────────────────────────────
231+
232+
async def _handle_motion_event(node_id: str, org_id: str, payload: dict):
233+
"""Persist a motion detection event reported by a CloudNode."""
234+
camera_id = payload.get("camera_id")
235+
score = payload.get("score")
236+
segment_seq = payload.get("segment_seq")
237+
event_ts = payload.get("timestamp") # ISO 8601 from node
238+
239+
if not camera_id or score is None:
240+
logger.warning("Motion event from node %s missing camera_id or score", node_id)
241+
return
242+
243+
ts = None
244+
if event_ts:
245+
try:
246+
ts = datetime.fromisoformat(event_ts).replace(tzinfo=None)
247+
except (ValueError, TypeError):
248+
pass
249+
if ts is None:
250+
ts = datetime.now(tz=timezone.utc).replace(tzinfo=None)
251+
252+
db: Session = SessionLocal()
253+
try:
254+
event = MotionEvent(
255+
org_id=org_id,
256+
camera_id=camera_id,
257+
node_id=node_id,
258+
score=int(score),
259+
segment_seq=int(segment_seq) if segment_seq is not None else None,
260+
timestamp=ts,
261+
)
262+
db.add(event)
263+
db.commit()
264+
logger.info(
265+
"Motion event: camera=%s score=%d%% node=%s",
266+
camera_id, int(score), node_id,
267+
)
268+
except Exception as e:
269+
logger.error("Failed to save motion event: %s", e)
270+
db.rollback()
271+
finally:
272+
db.close()

backend/app/main.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from app.core.config import settings
1414
from app.core.database import Base, engine, SessionLocal
1515
from app.core.limiter import limiter
16-
from app.api import cameras, webhooks, nodes, audit, hls, ws, install, mcp_keys, mcp_activity, incidents
16+
from app.api import cameras, webhooks, nodes, audit, hls, ws, install, mcp_keys, mcp_activity, incidents, motion
1717
from app.mcp.server import mcp
1818

1919
logger = logging.getLogger(__name__)
@@ -105,6 +105,7 @@ async def security_headers(request: Request, call_next):
105105
app.include_router(mcp_keys.router)
106106
app.include_router(mcp_activity.router)
107107
app.include_router(incidents.router)
108+
app.include_router(motion.router)
108109

109110
# Mount MCP server at /mcp
110111
app.mount("/mcp", mcp_app)
@@ -121,7 +122,7 @@ async def security_headers(request: Request, call_next):
121122
async def _log_cleanup_loop():
122123
"""Background task: delete old logs and free segment caches
123124
for cameras that have been offline."""
124-
from app.models.models import StreamAccessLog, McpActivityLog, AuditLog, Camera
125+
from app.models.models import StreamAccessLog, McpActivityLog, AuditLog, MotionEvent, Camera
125126

126127
while True:
127128
await asyncio.sleep(LOG_CLEANUP_INTERVAL_HOURS * 3600)
@@ -134,12 +135,13 @@ async def _log_cleanup_loop():
134135
stream_count = db.query(StreamAccessLog).filter(StreamAccessLog.accessed_at < cutoff).delete()
135136
mcp_count = db.query(McpActivityLog).filter(McpActivityLog.timestamp < cutoff).delete()
136137
audit_count = db.query(AuditLog).filter(AuditLog.timestamp < cutoff).delete()
138+
motion_count = db.query(MotionEvent).filter(MotionEvent.timestamp < cutoff).delete()
137139
db.commit()
138-
total = stream_count + mcp_count + audit_count
140+
total = stream_count + mcp_count + audit_count + motion_count
139141
if total > 0:
140142
logger.info(
141-
"[Cleanup] Deleted %d old logs (stream=%d, mcp=%d, audit=%d, retention=%dd)",
142-
total, stream_count, mcp_count, audit_count, LOG_RETENTION_DAYS,
143+
"[Cleanup] Deleted %d old logs (stream=%d, mcp=%d, audit=%d, motion=%d, retention=%dd)",
144+
total, stream_count, mcp_count, audit_count, motion_count, LOG_RETENTION_DAYS,
143145
)
144146
finally:
145147
db.close()

backend/app/models/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
McpActivityLog,
99
Incident,
1010
IncidentEvidence,
11+
MotionEvent,
1112
)
1213

1314
__all__ = [
@@ -20,4 +21,5 @@
2021
"McpActivityLog",
2122
"Incident",
2223
"IncidentEvidence",
24+
"MotionEvent",
2325
]

backend/app/models/models.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,3 +359,36 @@ def to_dict(self) -> dict:
359359
}
360360

361361

362+
class MotionEvent(Base):
363+
"""A motion detection event reported by a CloudNode.
364+
365+
Created when a node's FFmpeg scene-change analysis exceeds the
366+
configured threshold for a camera segment.
367+
"""
368+
369+
__tablename__ = "motion_events"
370+
371+
id = Column(Integer, primary_key=True)
372+
org_id = Column(String(100), nullable=False, index=True)
373+
camera_id = Column(String(100), nullable=False, index=True)
374+
node_id = Column(String(100), nullable=False, index=True)
375+
score = Column(Integer, nullable=False) # 0-100 (normalised)
376+
segment_seq = Column(Integer, nullable=True)
377+
timestamp = Column(
378+
DateTime,
379+
default=lambda: datetime.now(tz=timezone.utc).replace(tzinfo=None),
380+
index=True,
381+
)
382+
383+
def to_dict(self) -> dict:
384+
return {
385+
"id": self.id,
386+
"org_id": self.org_id,
387+
"camera_id": self.camera_id,
388+
"node_id": self.node_id,
389+
"score": self.score,
390+
"segment_seq": self.segment_seq,
391+
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
392+
}
393+
394+

backend/tests/test_motion.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
"""Tests for motion detection event endpoints."""
2+
3+
from datetime import datetime, timedelta, timezone
4+
from app.models.models import MotionEvent
5+
6+
7+
def test_list_motion_events_empty(viewer_client):
8+
"""No motion events returns empty list."""
9+
resp = viewer_client.get("/api/motion/events")
10+
assert resp.status_code == 200
11+
data = resp.json()
12+
assert data["total"] == 0
13+
assert data["events"] == []
14+
15+
16+
def test_list_motion_events_with_data(viewer_client, db):
17+
"""Motion events are returned in reverse chronological order."""
18+
now = datetime.now(tz=timezone.utc).replace(tzinfo=None)
19+
for i in range(3):
20+
db.add(MotionEvent(
21+
org_id="org_test123",
22+
camera_id="cam_front",
23+
node_id="node_1",
24+
score=50 + i * 10,
25+
segment_seq=100 + i,
26+
timestamp=now - timedelta(minutes=10 - i),
27+
))
28+
db.commit()
29+
30+
resp = viewer_client.get("/api/motion/events")
31+
assert resp.status_code == 200
32+
data = resp.json()
33+
assert data["total"] == 3
34+
assert len(data["events"]) == 3
35+
# Most recent first
36+
assert data["events"][0]["score"] == 70
37+
assert data["events"][2]["score"] == 50
38+
39+
40+
def test_list_motion_events_camera_filter(viewer_client, db):
41+
"""Filtering by camera_id returns only matching events."""
42+
now = datetime.now(tz=timezone.utc).replace(tzinfo=None)
43+
db.add(MotionEvent(org_id="org_test123", camera_id="cam_front", node_id="n1", score=80, timestamp=now))
44+
db.add(MotionEvent(org_id="org_test123", camera_id="cam_back", node_id="n1", score=60, timestamp=now))
45+
db.commit()
46+
47+
resp = viewer_client.get("/api/motion/events?camera_id=cam_front")
48+
assert resp.status_code == 200
49+
data = resp.json()
50+
assert data["total"] == 1
51+
assert data["events"][0]["camera_id"] == "cam_front"
52+
53+
54+
def test_motion_events_org_isolation(viewer_client, db):
55+
"""Events from other orgs are not visible."""
56+
now = datetime.now(tz=timezone.utc).replace(tzinfo=None)
57+
db.add(MotionEvent(org_id="org_test123", camera_id="cam_1", node_id="n1", score=90, timestamp=now))
58+
db.add(MotionEvent(org_id="org_other", camera_id="cam_2", node_id="n2", score=85, timestamp=now))
59+
db.commit()
60+
61+
resp = viewer_client.get("/api/motion/events")
62+
data = resp.json()
63+
assert data["total"] == 1
64+
assert data["events"][0]["camera_id"] == "cam_1"
65+
66+
67+
def test_motion_stats(viewer_client, db):
68+
"""Stats endpoint returns per-camera aggregates."""
69+
now = datetime.now(tz=timezone.utc).replace(tzinfo=None)
70+
for score in [40, 60, 80]:
71+
db.add(MotionEvent(
72+
org_id="org_test123", camera_id="cam_front", node_id="n1",
73+
score=score, timestamp=now - timedelta(minutes=score),
74+
))
75+
db.add(MotionEvent(
76+
org_id="org_test123", camera_id="cam_back", node_id="n1",
77+
score=95, timestamp=now,
78+
))
79+
db.commit()
80+
81+
resp = viewer_client.get("/api/motion/events/stats")
82+
assert resp.status_code == 200
83+
data = resp.json()
84+
cameras = {c["camera_id"]: c for c in data["cameras"]}
85+
assert len(cameras) == 2
86+
assert cameras["cam_front"]["event_count"] == 3
87+
assert cameras["cam_front"]["peak_score"] == 80
88+
assert cameras["cam_back"]["event_count"] == 1
89+
assert cameras["cam_back"]["peak_score"] == 95

0 commit comments

Comments
 (0)