diff --git a/src/routers/occupancy.py b/src/routers/occupancy.py index 0c038c6..28d853f 100644 --- a/src/routers/occupancy.py +++ b/src/routers/occupancy.py @@ -8,7 +8,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from pydantic import ValidationError from sqlalchemy import func, text -from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.exc import IntegrityError, SQLAlchemyError from sqlalchemy.orm import Session from ..database import get_db @@ -47,7 +47,6 @@ def _confidence_level(confidence: float) -> ConfidenceLevel | None: else: return ConfidenceLevel.very_low - def _to_utc_naive(value: datetime) -> datetime: if value.tzinfo is None or value.tzinfo.utcoffset(value) is None: return value @@ -65,6 +64,26 @@ def _is_newer_or_same_observation( return _to_utc_naive(incoming_observed_at) >= _to_utc_naive(current_zone_updated_at) +def _clamp_int(value: int, min_value: int, max_value: int | None = None) -> int: + value = max(value, min_value) + + if max_value is not None: + value = min(value, max_value) + + return value + + +def _clamp_float(value: float, min_value: float, max_value: float) -> float: + return max(min_value, min(value, max_value)) + + +def _confidence_level_value(confidence: float) -> str: + level = _confidence_level(confidence) + + if level is None: + return "very_low" + + return level.value def _serialize_obs(obs: OccupancyObservation, db: Session) -> OccupancyObservationResponse: return OccupancyObservationResponse( observation_id=obs.observation_id, @@ -325,89 +344,146 @@ async def create_observation( detail={"error_description": "Zone not found"}, ) - zone_camera_id = cast(int, zone.camera_id) + zone_id = body.zone_id + zone_camera_id = cast(int | None, zone.camera_id) zone_partner_id = cast(int | None, zone.partner_id) + zone_capacity = cast(int, zone.capacity) zone_occupancy_updated_at = cast(datetime | None, zone.occupancy_updated_at) current_user_id = cast(int | None, current_user.user_id) - zone_capacity = cast(int, zone.capacity) - capacity = body.capacity if body.capacity is not None else zone_capacity - occupied = min(body.occupied, capacity) - confidence = body.confidence - - cl = _confidence_level(confidence) - now = datetime.now(timezone.utc) + capacity = _clamp_int(capacity, min_value=0) - # Если источник прислал стабильный source_ref, то повторный запрос - # должен обновлять существующее наблюдение, а не падать с 409. - obs: OccupancyObservation | None = None + occupied = _clamp_int(body.occupied, min_value=0, max_value=capacity) + confidence = _clamp_float(body.confidence, min_value=0.0, max_value=1.0) + confidence_level = _confidence_level_value(confidence) - if body.source_ref: - obs = db.query(OccupancyObservation).filter( - OccupancyObservation.source_type == body.source_type, - OccupancyObservation.source_ref == body.source_ref, - ).one_or_none() + now = datetime.now(timezone.utc) - # Если source_ref нет, считаем это новым наблюдением. - if obs is None: - obs = OccupancyObservation( - zone_id=body.zone_id, - camera_id=zone_camera_id, - partner_id=zone_partner_id, - source_type=body.source_type, - source_ref=body.source_ref, - capacity=capacity, - occupied=occupied, - confidence=confidence, - confidence_level=cl, - observed_at=body.observed_at, - ingested_at=now, - metadata_json=body.metadata, - created_by_user_id=current_user_id, + try: + result = db.execute( + text( + """ + INSERT INTO occupancy_observations ( + zone_id, + camera_id, + partner_id, + source_type, + source_ref, + capacity, + occupied, + confidence, + confidence_level, + observed_at, + ingested_at, + metadata, + created_by_user_id + ) + VALUES ( + :zone_id, + :camera_id, + :partner_id, + :source_type, + :source_ref, + :capacity, + :occupied, + :confidence, + :confidence_level, + :observed_at, + :ingested_at, + CAST(:metadata AS jsonb), + :created_by_user_id + ) + ON CONFLICT (source_type, source_ref) + DO UPDATE SET + zone_id = EXCLUDED.zone_id, + camera_id = EXCLUDED.camera_id, + partner_id = EXCLUDED.partner_id, + capacity = EXCLUDED.capacity, + occupied = EXCLUDED.occupied, + confidence = EXCLUDED.confidence, + confidence_level = EXCLUDED.confidence_level, + observed_at = EXCLUDED.observed_at, + ingested_at = EXCLUDED.ingested_at, + metadata = EXCLUDED.metadata, + created_by_user_id = COALESCE( + occupancy_observations.created_by_user_id, + EXCLUDED.created_by_user_id + ) + RETURNING observation_id + """ + ), + { + "zone_id": zone_id, + "camera_id": zone_camera_id, + "partner_id": zone_partner_id, + "source_type": body.source_type, + "source_ref": body.source_ref, + "capacity": capacity, + "occupied": occupied, + "confidence": confidence, + "confidence_level": confidence_level, + "observed_at": body.observed_at, + "ingested_at": now, + "metadata": json.dumps(body.metadata), + "created_by_user_id": current_user_id, + }, ) - db.add(obs) - else: - obs.zone_id = body.zone_id - obs.camera_id = zone_camera_id - obs.partner_id = zone_partner_id - obs.capacity = capacity - obs.occupied = occupied - obs.confidence = confidence - obs.confidence_level = cl - obs.observed_at = body.observed_at - obs.ingested_at = now - obs.metadata_json = body.metadata - - if obs.created_by_user_id is None: - obs.created_by_user_id = current_user_id - # Обновляем денормализованные поля parking_zones только если пришло - # самое свежее наблюдение или наблюдение с тем же временем. - # - # Важно: body.observed_at может быть timezone-aware из-за "Z" в ISO-строке, - # а parking_zones.occupancy_updated_at в БД — TIMESTAMP без timezone. - # Поэтому напрямую их не сравниваем. - if _is_newer_or_same_observation(body.observed_at, zone_occupancy_updated_at): - zone.occupied = occupied - zone.confidence = confidence - zone.confidence_level = cl - zone.occupancy_updated_at = _to_utc_naive(body.observed_at) + observation_id = result.scalar_one() + + if _is_newer_or_same_observation(body.observed_at, zone_occupancy_updated_at): + db.execute( + text( + """ + UPDATE parking_zones + SET + occupied = :occupied, + confidence = :confidence, + confidence_level = CAST(:confidence_level AS confidence_level_types), + occupancy_updated_at = :occupancy_updated_at, + updated_at = NOW() + WHERE parking_zone_id = :zone_id + """ + ), + { + "zone_id": zone_id, + "occupied": occupied, + "confidence": confidence, + "confidence_level": confidence_level, + "occupancy_updated_at": _to_utc_naive(body.observed_at), + }, + ) - try: db.commit() - db.refresh(obs) + + except IntegrityError as exc: + db.rollback() + orig = getattr(exc, "orig", None) + diag = getattr(orig, "diag", None) + raise HTTPException( + status.HTTP_422_UNPROCESSABLE_ENTITY, + detail={ + "error_description": "Occupancy payload could not be saved", + "error_type": "IntegrityError", + "pg_error": str(orig), + "constraint": getattr(diag, "constraint_name", None), + "table": getattr(diag, "table_name", None), + "column": getattr(diag, "column_name", None), + }, + ) + except SQLAlchemyError as exc: db.rollback() raise HTTPException( status.HTTP_422_UNPROCESSABLE_ENTITY, detail={ "error_description": "Occupancy payload could not be saved", - "details": exc.__class__.__name__, + "error_type": exc.__class__.__name__, }, ) - return {"observation_id": obs.observation_id} + return {"observation_id": observation_id} # ---------------------------------------------------------------------------