Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
69e85ed
feat: add risk-api client and sequence confidence helper
MateoLostanlen May 4, 2026
72e4990
feat: schedule daily risk-api refresh in app lifespan
MateoLostanlen May 4, 2026
5357a9a
feat: filter alerts and sequences by risk-driven confidence threshold
MateoLostanlen May 4, 2026
a003b89
feat: skip slack alert when sequence max conf below risk threshold
MateoLostanlen May 4, 2026
7e3650f
feat: query risk-api per date for from_date endpoints
MateoLostanlen May 4, 2026
dd2921d
test: cover risk-driven filtering of alerts and sequences
MateoLostanlen May 4, 2026
71da132
chore: apply ruff fixes
MateoLostanlen May 4, 2026
c48824d
fix: clamp risk-refresh hour to 0..23 to avoid retry loop
MateoLostanlen May 4, 2026
a1cc3e2
feat: scope from_date risk lookup to caller organization, normalize f…
MateoLostanlen May 4, 2026
8750cbf
chore: rename risk-api env vars to RISK_API_URL/LOGIN/PWD
MateoLostanlen May 4, 2026
9d56748
feat: add max_conf column to sequences with backfill migration
MateoLostanlen May 5, 2026
d1e9a8c
feat: maintain sequence max_conf at ingest with atomic update
MateoLostanlen May 5, 2026
ce6f2d7
refactor: read max_conf from sequence row instead of parsing detections
MateoLostanlen May 5, 2026
507ed53
test: seed max_conf directly on test sequences
MateoLostanlen May 5, 2026
104bdc9
fix: portable max_conf bump and validate fwi thresholds in [0,1]
MateoLostanlen May 5, 2026
b6a4e6b
feat: add risk_score query param to override fwi class on alerts endp…
MateoLostanlen May 5, 2026
0bf0b03
refactor: collapse risk filter helpers into one and use literal type …
MateoLostanlen May 5, 2026
36353bc
fix: restore refresh() cache-replace on empty list and harden payload…
MateoLostanlen May 5, 2026
3869113
feat: push risk filter into SQL WHERE for exact pagination
MateoLostanlen May 5, 2026
be1af2c
refactor: replace fwi conf settings with FWI_MIN_CONF dict in risk mo…
MateoLostanlen May 5, 2026
27f7cf7
test: pagination on /sequences/all/fromdate keeps page full when filt…
MateoLostanlen May 5, 2026
aa20bdb
fix: drop max_conf clause collapse and route pagination test through …
MateoLostanlen May 5, 2026
fb0745a
fix: compute sequence max_conf from primary bbox only, ignore sibling…
MateoLostanlen May 5, 2026
9c14549
chore: apply ruff format
MateoLostanlen May 5, 2026
3bd419a
fix: silence mypy on case() and chained where() over join()
MateoLostanlen May 5, 2026
0607a81
fix: annotate case() result as Any to satisfy mypy and reformat with …
MateoLostanlen May 5, 2026
cbdd642
test: cover _seconds_until_next_utc_hour and risk_score override on /…
MateoLostanlen May 5, 2026
712d579
chore: address risk filter review comments
MateoLostanlen May 5, 2026
416e2c3
fix: satisfy mypy for risk filter queries
MateoLostanlen May 5, 2026
5da133d
test: cover risk refresh lifecycle
MateoLostanlen May 5, 2026
06d540a
test: cover sequence risk filter endpoints
MateoLostanlen May 5, 2026
e77bd89
test: cover risk service http paths
MateoLostanlen May 5, 2026
addca3b
test: cover alerts/fromdate risk filter and mixed-seq alert
MateoLostanlen May 5, 2026
d12e74c
test: parametrize keep-all assertion across moderate/high/very_high/e…
MateoLostanlen May 5, 2026
2b5e9f2
test: pin fail-open on null max_conf and unknown cameras
MateoLostanlen May 5, 2026
ce064b8
test: parametrize alerts no-filter override across moderate/high/very…
MateoLostanlen May 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ POSTHOG_KEY=
SUPPORT_EMAIL=
TELEGRAM_TOKEN=

# Risk API (daily fire-weather index per camera)
RISK_API_URL=
RISK_API_LOGIN=
RISK_API_PWD=
RISK_REFRESH_HOUR_UTC=4

# Production-only
ACME_EMAIL=
BACKEND_HOST=
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ services:
- S3_PROXY_URL=${S3_PROXY_URL}
- SERVER_NAME=${SERVER_NAME}
- PLATFORM_URL=${PLATFORM_URL:-https://platform.pyronear.org}
- RISK_API_URL=${RISK_API_URL}
- RISK_API_LOGIN=${RISK_API_LOGIN}
- RISK_API_PWD=${RISK_API_PWD}
- RISK_REFRESH_HOUR_UTC=${RISK_REFRESH_HOUR_UTC:-4}
volumes:
- ./src/:/app/
command: "sh -c 'alembic upgrade head && python app/db.py && uvicorn app.main:app --reload --host 0.0.0.0 --port 5050 --proxy-headers'"
Expand Down
88 changes: 70 additions & 18 deletions src/app/api/api_v1/endpoints/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@

from fastapi import APIRouter, Depends, HTTPException, Path, Query, Security, status
from sqlalchemy import asc, desc
from sqlalchemy.sql import ColumnElement
from sqlmodel import delete, func, select
from sqlmodel.ext.asyncio.session import AsyncSession

from app.api.dependencies import get_alert_crud, get_jwt
from app.core.time import utcnow
from app.crud import AlertCRUD
from app.db import get_session
from app.models import Alert, AlertSequence, Sequence, UserRole
from app.models import Alert, AlertSequence, Camera, Sequence, UserRole
from app.schemas.alerts import AlertReadWithSequences
from app.schemas.login import TokenPayload
from app.schemas.sequences import SequenceRead
from app.services.risk import FwiClass, risk_service
from app.services.sequence_confidence import max_conf_filter_clause
from app.services.sequence_counts import get_detection_counts_by_sequence_ids
from app.services.telemetry import telemetry_client

Expand All @@ -31,22 +34,44 @@ def verify_org_rights(organization_id: int, alert: Alert) -> None:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access forbidden.")


async def _fetch_sequences_by_alert_ids(session: AsyncSession, alert_ids: List[int]) -> Dict[int, List[Sequence]]:
async def _fetch_sequences_by_alert_ids(
session: AsyncSession,
alert_ids: List[int],
seq_filter: Union[ColumnElement[bool], None] = None,
) -> Dict[int, List[Sequence]]:
mapping: Dict[int, List[Sequence]] = {}
if not alert_ids:
return mapping
seq_stmt: Any = (
select(AlertSequence.alert_id, Sequence)
.join(Sequence, cast(Any, Sequence.id == AlertSequence.sequence_id))
.where(AlertSequence.alert_id.in_(alert_ids)) # type: ignore[attr-defined]
.order_by(cast(Any, AlertSequence.alert_id), desc(cast(Any, Sequence.last_seen_at)))
)
if seq_filter is not None:
seq_stmt = seq_stmt.where(seq_filter)
seq_stmt = seq_stmt.order_by(cast(Any, AlertSequence.alert_id), desc(cast(Any, Sequence.last_seen_at)))
res = await session.exec(seq_stmt)
for alert_id, sequence in res.all():
mapping.setdefault(int(alert_id), []).append(sequence)
return mapping


async def _resolve_fwi_class_per_camera(
session: AsyncSession,
organization_id: int,
target_date: Union[date, None] = None,
override_class: Union[str, None] = None,
) -> Dict[int, Union[str, None]]:
"""Resolve ``{camera_id: fwi_class}`` for the org, picking override -> per-date -> today's cache."""
if override_class is not None:
cam_ids = (await session.exec(select(Camera.id).where(Camera.organization_id == organization_id))).all()
return dict.fromkeys(cam_ids, override_class)
if target_date is not None:
scores = await risk_service.get_scores_for_date(target_date, organization_id=organization_id)
return {cid: cls for cid, cls in scores.items()}
return {cid: cls for cid, cls in risk_service.scores().items()}


def _serialize_sequence(sequence: Sequence, detections_count: int = 0) -> SequenceRead:
return SequenceRead(**sequence.model_dump(), detections_count=detections_count)

Expand Down Expand Up @@ -113,24 +138,39 @@ async def fetch_alert_sequences(
summary="Fetch all the alerts with unlabeled sequences from the last 24 hours",
)
async def fetch_latest_unlabeled_alerts(
risk_score: Union[FwiClass, None] = Query(
None, description="Override FWI class applied to every sequence; bypasses risk-api lookup."
),
session: AsyncSession = Depends(get_session),
token_payload: TokenPayload = Security(get_jwt, scopes=[UserRole.ADMIN, UserRole.AGENT, UserRole.USER]),
) -> List[AlertReadWithSequences]:
telemetry_client.capture(token_payload.sub, event="alerts-fetch-latest")

alerts_stmt: Any = select(Alert).join(AlertSequence, cast(Any, AlertSequence.alert_id == Alert.id))
alerts_stmt = alerts_stmt.join(Sequence, cast(Any, Sequence.id == AlertSequence.sequence_id))
alerts_stmt = (
alerts_stmt.where(Alert.organization_id == token_payload.organization_id)
.where(Sequence.last_seen_at > utcnow() - timedelta(hours=24))
.where(Sequence.is_wildfire.is_(None)) # type: ignore[union-attr]
fwi_classes_by_camera = await _resolve_fwi_class_per_camera(
session, token_payload.organization_id, override_class=risk_score
)
seq_filter = max_conf_filter_clause(fwi_classes_by_camera)

seq_match: Any = cast(
Any,
select(AlertSequence.alert_id).join(Sequence, cast(Any, Sequence.id == AlertSequence.sequence_id)),
)
seq_match = (
seq_match.where(Sequence.last_seen_at > utcnow() - timedelta(hours=24)).where(Sequence.is_wildfire.is_(None)) # type: ignore[union-attr]
)
if seq_filter is not None:
seq_match = seq_match.where(seq_filter)

alerts_stmt: Any = (
select(Alert)
.where(Alert.organization_id == token_payload.organization_id)
.where(cast(Any, Alert.id).in_(seq_match))
.order_by(Alert.started_at.desc()) # type: ignore[attr-defined]
.limit(15)
)
alerts_res = await session.exec(alerts_stmt)
alerts = alerts_res.unique().all()
alerts = list((await session.exec(alerts_stmt)).all())
alert_ids = [alert.id for alert in alerts]
seq_map = await _fetch_sequences_by_alert_ids(session, alert_ids)
seq_map = await _fetch_sequences_by_alert_ids(session, alert_ids, seq_filter)
detection_counts = await get_detection_counts_by_sequence_ids(
session,
list({sequence.id for sequences in seq_map.values() for sequence in sequences}),
Expand All @@ -143,23 +183,35 @@ async def fetch_alerts_from_date(
from_date: date = Query(),
limit: Union[int, None] = Query(15, description="Maximum number of alerts to fetch"),
offset: Union[int, None] = Query(0, description="Number of alerts to skip before starting to fetch"),
risk_score: Union[FwiClass, None] = Query(
None, description="Override FWI class applied to every sequence; bypasses risk-api lookup."
),
session: AsyncSession = Depends(get_session),
token_payload: TokenPayload = Security(get_jwt, scopes=[UserRole.ADMIN, UserRole.AGENT, UserRole.USER]),
) -> List[AlertReadWithSequences]:
telemetry_client.capture(token_payload.sub, event="alerts-fetch-from-date")

fwi_classes_by_camera = await _resolve_fwi_class_per_camera(
session, token_payload.organization_id, target_date=from_date, override_class=risk_score
)
seq_filter = max_conf_filter_clause(fwi_classes_by_camera)

alerts_stmt: Any = (
select(Alert)
.where(Alert.organization_id == token_payload.organization_id)
.where(func.date(Alert.started_at) == from_date)
.order_by(Alert.started_at.desc()) # type: ignore[attr-defined]
.limit(limit)
.offset(offset)
)
alerts_res = await session.exec(alerts_stmt)
alerts = alerts_res.all()
if seq_filter is not None:
seq_match: Any = select(AlertSequence.alert_id).join(
Sequence, cast(Any, Sequence.id == AlertSequence.sequence_id)
)
seq_match = seq_match.where(seq_filter)
alerts_stmt = alerts_stmt.where(cast(Any, Alert.id).in_(seq_match))
alerts_stmt = alerts_stmt.order_by(Alert.started_at.desc()).limit(limit).offset(offset) # type: ignore[attr-defined]

alerts = list((await session.exec(alerts_stmt)).all())
alert_ids = [alert.id for alert in alerts]
seq_map = await _fetch_sequences_by_alert_ids(session, alert_ids)
seq_map = await _fetch_sequences_by_alert_ids(session, alert_ids, seq_filter)
detection_counts = await get_detection_counts_by_sequence_ids(
session,
list({sequence.id for sequences in seq_map.values() for sequence in sequences}),
Expand Down
30 changes: 25 additions & 5 deletions src/app/api/api_v1/endpoints/detections.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@


import json
import logging
import re
from ast import literal_eval
from datetime import datetime, timedelta
Expand Down Expand Up @@ -56,11 +57,15 @@
from app.schemas.sequences import SequenceUpdate
from app.services.cones import resolve_cone
from app.services.overlap import compute_overlap, haversine_km
from app.services.risk import risk_service
from app.services.sequence_confidence import max_conf_from_bboxes
from app.services.slack import slack_client
from app.services.storage import s3_service, upload_file
from app.services.telegram import telegram_client
from app.services.telemetry import telemetry_client

logger = logging.getLogger("uvicorn.error")

router = APIRouter()


Expand Down Expand Up @@ -427,6 +432,10 @@ async def create_detection(
if matched_sequence is not None:
await sequences.update(matched_sequence.id, SequenceUpdate(last_seen_at=det.created_at))
det = await detections.update(det.id, DetectionSequence(sequence_id=matched_sequence.id))
# Only the primary bbox tracks the sequence; siblings in others_bboxes are unrelated detections.
det_max_conf = max_conf_from_bboxes(det.bbox)
if det_max_conf is not None:
await sequences.bump_max_conf(matched_sequence.id, det_max_conf)
else:
det_filters: List[tuple[str, Any]] = [
("camera_id", token_payload.sub),
Expand Down Expand Up @@ -455,6 +464,7 @@ async def create_detection(
if len(overlapping_dets) >= settings.SEQUENCE_MIN_INTERVAL_DETS:
first_det = min(overlapping_dets, key=lambda item: item.created_at)
cone_azimuth, cone_angle = resolve_cone(pose.azimuth, first_det.bbox, camera.angle_of_view)
seq_max_conf = max_conf_from_bboxes(*[d.bbox for d in overlapping_dets])
sequence_ = await sequences.create(
Sequence(
camera_id=token_payload.sub,
Expand All @@ -464,6 +474,7 @@ async def create_detection(
cone_angle=cone_angle,
started_at=first_det.created_at,
last_seen_at=det.created_at,
max_conf=seq_max_conf,
)
)
for det_ in overlapping_dets:
Expand All @@ -490,11 +501,20 @@ async def create_detection(
if org is None:
org = cast(Organization, await organizations.get(token_payload.organization_id, strict=True))
if org.slack_hook:
slack_payload = jsonable_encoder(det)
slack_payload["sequence_azimuth"] = sequence_.sequence_azimuth
background_tasks.add_task(
slack_client.notify, org.slack_hook, json.dumps(slack_payload), camera.name, alert_id
)
min_conf = risk_service.min_confidence(camera.id)
if min_conf is None or sequence_.max_conf is None or sequence_.max_conf >= min_conf:
slack_payload = jsonable_encoder(det)
slack_payload["sequence_azimuth"] = sequence_.sequence_azimuth
background_tasks.add_task(
slack_client.notify, org.slack_hook, json.dumps(slack_payload), camera.name, alert_id
)
else:
logger.info(
"Skipping Slack notification for camera %s: max conf %.3f < threshold %.3f",
camera.name,
sequence_.max_conf,
min_conf,
)

created.append(det)

Expand Down
65 changes: 43 additions & 22 deletions src/app/api/api_v1/endpoints/sequences.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from app.schemas.login import TokenPayload
from app.schemas.sequences import SequenceLabel, SequenceRead
from app.services.overlap import compute_overlap
from app.services.risk import FwiClass, risk_service
from app.services.sequence_confidence import max_conf_filter_clause
from app.services.sequence_counts import get_detection_counts_by_sequence_ids
from app.services.storage import s3_service
from app.services.telemetry import telemetry_client
Expand Down Expand Up @@ -146,22 +148,32 @@ async def fetch_sequence_detections(
summary="Fetch all the unlabeled sequences from the last 24 hours",
)
async def fetch_latest_unlabeled_sequences(
risk_score: Union[FwiClass, None] = Query(
None, description="Override FWI class applied to every sequence; bypasses risk-api lookup."
),
session: AsyncSession = Depends(get_session),
token_payload: TokenPayload = Security(get_jwt, scopes=[UserRole.ADMIN, UserRole.AGENT, UserRole.USER]),
) -> List[SequenceRead]:
telemetry_client.capture(token_payload.sub, event="sequence-fetch-latest")
camera_ids = await session.exec(select(Camera.id).where(Camera.organization_id == token_payload.organization_id))

fetched_sequences = (
await session.exec(
select(Sequence)
.where(Sequence.started_at > utcnow() - timedelta(hours=24))
.where(Sequence.camera_id.in_(camera_ids.all())) # type: ignore[attr-defined]
.where(Sequence.is_wildfire.is_(None)) # type: ignore[union-attr]
.order_by(Sequence.started_at.desc()) # type: ignore[attr-defined]
.limit(15)
)
camera_ids = (
await session.exec(select(Camera.id).where(Camera.organization_id == token_payload.organization_id))
).all()
classes: dict[int, Union[str, None]] = (
dict.fromkeys(camera_ids, risk_score) if risk_score is not None else dict(risk_service.scores())
)

stmt: Any = (
select(Sequence)
.where(Sequence.started_at > utcnow() - timedelta(hours=24))
.where(Sequence.camera_id.in_(camera_ids)) # type: ignore[attr-defined]
.where(Sequence.is_wildfire.is_(None)) # type: ignore[union-attr]
)
seq_filter = max_conf_filter_clause(classes)
if seq_filter is not None:
stmt = stmt.where(seq_filter)
stmt = stmt.order_by(Sequence.started_at.desc()).limit(15) # type: ignore[attr-defined]

fetched_sequences = (await session.exec(stmt)).all()
counts = await get_detection_counts_by_sequence_ids(session, [sequence.id for sequence in fetched_sequences])
return [_serialize_sequence(sequence, counts.get(sequence.id, 0)) for sequence in fetched_sequences]

Expand All @@ -171,23 +183,32 @@ async def fetch_sequences_from_date(
from_date: date = Query(),
limit: Union[int, None] = Query(15, description="Maximum number of sequences to fetch"),
offset: Union[int, None] = Query(0, description="Number of sequences to skip before starting to fetch"),
risk_score: Union[FwiClass, None] = Query(
None, description="Override FWI class applied to every sequence; bypasses risk-api lookup."
),
session: AsyncSession = Depends(get_session),
token_payload: TokenPayload = Security(get_jwt, scopes=[UserRole.ADMIN, UserRole.AGENT, UserRole.USER]),
) -> List[SequenceRead]:
telemetry_client.capture(token_payload.sub, event="sequence-fetch-from-date")
# Limit to cameras in the same organization
camera_ids = await session.exec(select(Camera.id).where(Camera.organization_id == token_payload.organization_id))
# Identify the sequences from that day
fetched_sequences = (
await session.exec(
select(Sequence)
.where(func.date(Sequence.started_at) == from_date)
.where(Sequence.camera_id.in_(camera_ids.all())) # type: ignore[attr-defined]
.order_by(Sequence.started_at.desc()) # type: ignore[attr-defined]
.limit(limit)
.offset(offset)
)
camera_ids = (
await session.exec(select(Camera.id).where(Camera.organization_id == token_payload.organization_id))
).all()
if risk_score is not None:
classes: dict[int, Union[str, None]] = dict.fromkeys(camera_ids, risk_score)
else:
scores = await risk_service.get_scores_for_date(from_date, organization_id=token_payload.organization_id)
classes = dict(scores)

stmt: Any = (
select(Sequence).where(func.date(Sequence.started_at) == from_date).where(Sequence.camera_id.in_(camera_ids)) # type: ignore[attr-defined]
)
seq_filter = max_conf_filter_clause(classes)
if seq_filter is not None:
stmt = stmt.where(seq_filter)
stmt = stmt.order_by(Sequence.started_at.desc()).limit(limit).offset(offset) # type: ignore[attr-defined]

fetched_sequences = (await session.exec(stmt)).all()
counts = await get_detection_counts_by_sequence_ids(session, [sequence.id for sequence in fetched_sequences])
return [_serialize_sequence(sequence, counts.get(sequence.id, 0)) for sequence in fetched_sequences]

Expand Down
6 changes: 6 additions & 0 deletions src/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ def sqlachmey_uri(cls, v: str) -> str:
TELEGRAM_TOKEN: Union[str, None] = os.environ.get("TELEGRAM_TOKEN")
PLATFORM_URL: str = os.environ.get("PLATFORM_URL", "")

# Risk API (daily fire-weather index per camera)
RISK_API_URL: Union[str, None] = os.environ.get("RISK_API_URL")
RISK_API_LOGIN: Union[str, None] = os.environ.get("RISK_API_LOGIN")
RISK_API_PWD: Union[str, None] = os.environ.get("RISK_API_PWD")
RISK_REFRESH_HOUR_UTC: int = int(os.environ.get("RISK_REFRESH_HOUR_UTC") or 4)

# Error monitoring
SENTRY_DSN: Union[str, None] = os.environ.get("SENTRY_DSN")
SERVER_NAME: str = os.environ.get("SERVER_NAME", socket.gethostname())
Expand Down
17 changes: 16 additions & 1 deletion src/app/crud/crud_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
# See LICENSE or go to <https://www.apache.org/licenses/LICENSE-2.0> for full license details.


from typing import Union
from typing import Any, Union, cast

from sqlalchemy import case, or_, update
from sqlmodel.ext.asyncio.session import AsyncSession

from app.crud.base import BaseCRUD
Expand All @@ -18,3 +19,17 @@
class SequenceCRUD(BaseCRUD[Sequence, Sequence, Union[SequenceUpdate, SequenceLabel]]):
def __init__(self, session: AsyncSession) -> None:
super().__init__(session, Sequence)

async def bump_max_conf(self, sequence_id: int, candidate: float) -> None:
"""Atomically raise sequences.max_conf to candidate if higher (or set if NULL).

Check notice on line 24 in src/app/crud/crud_sequence.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/app/crud/crud_sequence.py#L24

Multi-line docstring summary should start at the second line (D213)

Uses a portable CASE expression so it runs on SQLite as well as Postgres.
"""
max_conf_col = cast(Any, Sequence.max_conf)
bumped: Any = cast(Any, case)(
(or_(max_conf_col.is_(None), max_conf_col < candidate), candidate),
else_=max_conf_col,
)
stmt: Any = update(Sequence).where(cast(Any, Sequence.id) == sequence_id).values(max_conf=bumped)
await self.session.exec(stmt)
await self.session.commit()
Loading
Loading