Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 140 additions & 64 deletions src/routers/occupancy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import ValidationError
from sqlalchemy import func, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm import Session

from ..database import get_db
Expand Down Expand Up @@ -47,7 +47,6 @@ def _confidence_level(confidence: float) -> ConfidenceLevel | None:
else:
return ConfidenceLevel.very_low


def _to_utc_naive(value: datetime) -> datetime:
if value.tzinfo is None or value.tzinfo.utcoffset(value) is None:
return value
Expand All @@ -65,6 +64,26 @@ def _is_newer_or_same_observation(
return _to_utc_naive(incoming_observed_at) >= _to_utc_naive(current_zone_updated_at)


def _clamp_int(value: int, min_value: int, max_value: int | None = None) -> int:
value = max(value, min_value)

if max_value is not None:
value = min(value, max_value)

return value


def _clamp_float(value: float, min_value: float, max_value: float) -> float:
return max(min_value, min(value, max_value))


def _confidence_level_value(confidence: float) -> str:
level = _confidence_level(confidence)

if level is None:
return "very_low"

return level.value
def _serialize_obs(obs: OccupancyObservation, db: Session) -> OccupancyObservationResponse:
return OccupancyObservationResponse(
observation_id=obs.observation_id,
Expand Down Expand Up @@ -325,89 +344,146 @@ async def create_observation(
detail={"error_description": "Zone not found"},
)

zone_camera_id = cast(int, zone.camera_id)
zone_id = body.zone_id
zone_camera_id = cast(int | None, zone.camera_id)
zone_partner_id = cast(int | None, zone.partner_id)
zone_capacity = cast(int, zone.capacity)
zone_occupancy_updated_at = cast(datetime | None, zone.occupancy_updated_at)
current_user_id = cast(int | None, current_user.user_id)

zone_capacity = cast(int, zone.capacity)

capacity = body.capacity if body.capacity is not None else zone_capacity
occupied = min(body.occupied, capacity)
confidence = body.confidence

cl = _confidence_level(confidence)
now = datetime.now(timezone.utc)
capacity = _clamp_int(capacity, min_value=0)

# Если источник прислал стабильный source_ref, то повторный запрос
# должен обновлять существующее наблюдение, а не падать с 409.
obs: OccupancyObservation | None = None
occupied = _clamp_int(body.occupied, min_value=0, max_value=capacity)
confidence = _clamp_float(body.confidence, min_value=0.0, max_value=1.0)
confidence_level = _confidence_level_value(confidence)

if body.source_ref:
obs = db.query(OccupancyObservation).filter(
OccupancyObservation.source_type == body.source_type,
OccupancyObservation.source_ref == body.source_ref,
).one_or_none()
now = datetime.now(timezone.utc)

# Если source_ref нет, считаем это новым наблюдением.
if obs is None:
obs = OccupancyObservation(
zone_id=body.zone_id,
camera_id=zone_camera_id,
partner_id=zone_partner_id,
source_type=body.source_type,
source_ref=body.source_ref,
capacity=capacity,
occupied=occupied,
confidence=confidence,
confidence_level=cl,
observed_at=body.observed_at,
ingested_at=now,
metadata_json=body.metadata,
created_by_user_id=current_user_id,
try:
result = db.execute(
text(
"""
INSERT INTO occupancy_observations (
zone_id,
camera_id,
partner_id,
source_type,
source_ref,
capacity,
occupied,
confidence,
confidence_level,
observed_at,
ingested_at,
metadata,
created_by_user_id
)
VALUES (
:zone_id,
:camera_id,
:partner_id,
:source_type,
:source_ref,
:capacity,
:occupied,
:confidence,
:confidence_level,
:observed_at,
:ingested_at,
CAST(:metadata AS jsonb),
:created_by_user_id
)
ON CONFLICT (source_type, source_ref)
DO UPDATE SET
zone_id = EXCLUDED.zone_id,
camera_id = EXCLUDED.camera_id,
partner_id = EXCLUDED.partner_id,
capacity = EXCLUDED.capacity,
occupied = EXCLUDED.occupied,
confidence = EXCLUDED.confidence,
confidence_level = EXCLUDED.confidence_level,
observed_at = EXCLUDED.observed_at,
ingested_at = EXCLUDED.ingested_at,
metadata = EXCLUDED.metadata,
created_by_user_id = COALESCE(
occupancy_observations.created_by_user_id,
EXCLUDED.created_by_user_id
)
RETURNING observation_id
"""
),
{
"zone_id": zone_id,
"camera_id": zone_camera_id,
"partner_id": zone_partner_id,
"source_type": body.source_type,
"source_ref": body.source_ref,
"capacity": capacity,
"occupied": occupied,
"confidence": confidence,
"confidence_level": confidence_level,
"observed_at": body.observed_at,
"ingested_at": now,
"metadata": json.dumps(body.metadata),
"created_by_user_id": current_user_id,
},
)
db.add(obs)
else:
obs.zone_id = body.zone_id
obs.camera_id = zone_camera_id
obs.partner_id = zone_partner_id
obs.capacity = capacity
obs.occupied = occupied
obs.confidence = confidence
obs.confidence_level = cl
obs.observed_at = body.observed_at
obs.ingested_at = now
obs.metadata_json = body.metadata

if obs.created_by_user_id is None:
obs.created_by_user_id = current_user_id

# Обновляем денормализованные поля parking_zones только если пришло
# самое свежее наблюдение или наблюдение с тем же временем.
#
# Важно: body.observed_at может быть timezone-aware из-за "Z" в ISO-строке,
# а parking_zones.occupancy_updated_at в БД — TIMESTAMP без timezone.
# Поэтому напрямую их не сравниваем.
if _is_newer_or_same_observation(body.observed_at, zone_occupancy_updated_at):
zone.occupied = occupied
zone.confidence = confidence
zone.confidence_level = cl
zone.occupancy_updated_at = _to_utc_naive(body.observed_at)
observation_id = result.scalar_one()

if _is_newer_or_same_observation(body.observed_at, zone_occupancy_updated_at):
db.execute(
text(
"""
UPDATE parking_zones
SET
occupied = :occupied,
confidence = :confidence,
confidence_level = CAST(:confidence_level AS confidence_level_types),
occupancy_updated_at = :occupancy_updated_at,
updated_at = NOW()
WHERE parking_zone_id = :zone_id
"""
),
{
"zone_id": zone_id,
"occupied": occupied,
"confidence": confidence,
"confidence_level": confidence_level,
"occupancy_updated_at": _to_utc_naive(body.observed_at),
},
)

try:
db.commit()
db.refresh(obs)

except IntegrityError as exc:
db.rollback()
orig = getattr(exc, "orig", None)
diag = getattr(orig, "diag", None)
raise HTTPException(
status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={
"error_description": "Occupancy payload could not be saved",
"error_type": "IntegrityError",
"pg_error": str(orig),
"constraint": getattr(diag, "constraint_name", None),
"table": getattr(diag, "table_name", None),
"column": getattr(diag, "column_name", None),
},
)

except SQLAlchemyError as exc:
db.rollback()
raise HTTPException(
status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={
"error_description": "Occupancy payload could not be saved",
"details": exc.__class__.__name__,
"error_type": exc.__class__.__name__,
},
)

return {"observation_id": obs.observation_id}
return {"observation_id": observation_id}


# ---------------------------------------------------------------------------
Expand Down
Loading