diff --git a/.env.example b/.env.example index e04421de..245e9218 100644 --- a/.env.example +++ b/.env.example @@ -11,3 +11,7 @@ MEDIAMTX_SERVER_IP=1.2.3.4 # Docker image tag for both services (defaults to "latest" if unset) # Set to a specific version for pinned deployments, e.g. PYRO_ENGINE_VERSION=1.2.0 PYRO_ENGINE_VERSION=latest + +# Stuck-PTZ detector (auto-reboots a PTZ camera that stops rotating during patrol). +# Enabled by default. Set to "false" to disable. +ENABLE_STUCK_DETECTOR=true diff --git a/docker-compose.yml b/docker-compose.yml index 602672d3..41815e75 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,7 @@ services: CAM_USER: ${CAM_USER} CAM_PWD: ${CAM_PWD} MEDIAMTX_SERVER_IP: ${MEDIAMTX_SERVER_IP} + ENABLE_STUCK_DETECTOR: ${ENABLE_STUCK_DETECTOR:-true} volumes: - ./data:/usr/src/app/data restart: always diff --git a/pyro_camera_api/client/pyro_camera_api_client/client.py b/pyro_camera_api/client/pyro_camera_api_client/client.py index 70adfae9..e590c62c 100644 --- a/pyro_camera_api/client/pyro_camera_api_client/client.py +++ b/pyro_camera_api/client/pyro_camera_api_client/client.py @@ -209,6 +209,14 @@ def zoom(self, camera_ip: str, level: int) -> Dict[str, Any]: resp = self._request("POST", f"/control/zoom/{camera_ip}/{level}") return resp.json() + def reboot_camera(self, camera_ip: str) -> Dict[str, Any]: + """ + Reboot a camera. Supported for Reolink and Linovision adapters. + Raises HTTP 501 for adapters that do not implement reboot. + """ + resp = self._request("POST", f"/control/reboot/{camera_ip}") + return resp.json() + # ------------------------------------------------------------------ # Focus # ------------------------------------------------------------------ diff --git a/pyro_camera_api/pyro_camera_api/api/routes_cameras.py b/pyro_camera_api/pyro_camera_api/api/routes_cameras.py index f7b961f6..744d4102 100644 --- a/pyro_camera_api/pyro_camera_api/api/routes_cameras.py +++ b/pyro_camera_api/pyro_camera_api/api/routes_cameras.py @@ -251,9 +251,10 @@ def get_latest_image( if cam is None: raise HTTPException(status_code=404, detail="Unknown camera") - if pose not in cam.last_images or cam.last_images[pose] is None: + image = cam.last_images.get(pose) + if image is None: return Response(status_code=status.HTTP_204_NO_CONTENT) buffer = BytesIO() - cam.last_images[pose].save(buffer, format="JPEG", quality=quality) + image.save(buffer, format="JPEG", quality=quality) return Response(buffer.getvalue(), media_type="image/jpeg") diff --git a/pyro_camera_api/pyro_camera_api/api/routes_control.py b/pyro_camera_api/pyro_camera_api/api/routes_control.py index b5d0006b..bdc5621a 100644 --- a/pyro_camera_api/pyro_camera_api/api/routes_control.py +++ b/pyro_camera_api/pyro_camera_api/api/routes_control.py @@ -207,6 +207,40 @@ def set_preset(camera_ip: str, idx: Optional[int] = None): return {"status": "preset_set", "camera_ip": camera_ip, "id": idx} +@router.post("/reboot/{camera_ip}") +def reboot_camera(camera_ip: str): + """ + Reboot a camera. + + Supported for adapters that expose a reboot_camera method (Reolink, + Linovision). Used to recover cameras that occasionally get stuck + (e.g. PTZ stops responding during patrol). Returns 501 for adapters + that do not implement reboot. + """ + cam = CAMERA_REGISTRY.get(camera_ip) + if cam is None: + raise HTTPException(status_code=404, detail=f"Camera with IP '{camera_ip}' not found") + + if not hasattr(cam, "reboot_camera"): + raise HTTPException( + status_code=501, + detail="Reboot is not implemented for this camera adapter", + ) + + try: + logger.warning("[%s] Rebooting camera", camera_ip) + ok = cam.reboot_camera() + except Exception as exc: + logger.error("[%s] Failed to reboot camera: %s", camera_ip, exc) + raise HTTPException(status_code=500, detail=str(exc)) + + if not ok: + logger.error("[%s] Camera rejected reboot command", camera_ip) + raise HTTPException(status_code=502, detail="Camera rejected reboot command") + + return {"status": "rebooting", "camera_ip": camera_ip} + + @router.post("/zoom/{camera_ip}/{level}") def zoom_camera(camera_ip: str, level: int): """ diff --git a/pyro_camera_api/pyro_camera_api/camera/adapters/reolink.py b/pyro_camera_api/pyro_camera_api/camera/adapters/reolink.py index 437f7d76..f49864a5 100644 --- a/pyro_camera_api/pyro_camera_api/camera/adapters/reolink.py +++ b/pyro_camera_api/pyro_camera_api/camera/adapters/reolink.py @@ -152,11 +152,17 @@ def set_ptz_preset(self, idx: Optional[int] = None): response = requests.post(url, json=data, verify=False) # nosec: B501 self._handle_response(response, f"Preset {name} set successfully.") - def reboot_camera(self): + def reboot_camera(self) -> bool: url = self._build_url("Reboot") data = [{"cmd": "Reboot"}] response = requests.post(url, json=data, verify=False) # nosec: B501 - return self._handle_response(response, "Camera reboot initiated successfully.") + response_data = self._handle_response(response, "Camera reboot initiated successfully.") + if not response_data: + return False + try: + return response_data[0]["code"] == 0 + except (KeyError, IndexError, TypeError): + return False def get_auto_focus(self): url = self._build_url("GetAutoFocus") diff --git a/pyro_camera_api/pyro_camera_api/camera/registry.py b/pyro_camera_api/pyro_camera_api/camera/registry.py index f1269782..dc883445 100644 --- a/pyro_camera_api/pyro_camera_api/camera/registry.py +++ b/pyro_camera_api/pyro_camera_api/camera/registry.py @@ -28,6 +28,10 @@ PATROL_THREADS: Dict[str, threading.Thread] = {} PATROL_FLAGS: Dict[str, threading.Event] = {} +# Stuck-detector threading state, later managed in camera.stuck_detector +STUCK_CHECK_THREADS: Dict[str, threading.Thread] = {} +STUCK_CHECK_FLAGS: Dict[str, threading.Event] = {} + def build_camera_object(key: str, conf: dict) -> Optional[BaseCamera]: """ diff --git a/pyro_camera_api/pyro_camera_api/camera/stuck_detector.py b/pyro_camera_api/pyro_camera_api/camera/stuck_detector.py new file mode 100644 index 00000000..aafeded5 --- /dev/null +++ b/pyro_camera_api/pyro_camera_api/camera/stuck_detector.py @@ -0,0 +1,187 @@ +# Copyright (C) 2022-2026, Pyronear. + +# This program is licensed under the Apache License 2.0. +# See LICENSE or go to for full license details. + + +""" +PTZ stuck-camera detector. + +Every CHECK_INTERVAL seconds, compute pairwise pHash distances across the +most recent per-pose images produced by the patrol loop. A turret that has +frozen returns near-identical frames for all poses, giving a very small +maximum pairwise distance. After CONSECUTIVE_HITS_BEFORE_REBOOT consecutive +low-distance checks, reboot the camera. + +Thresholds were calibrated on real sdis-77 captures: stuck-episode max +pairwise Hamming <= 6, working-patrol min pairwise Hamming >= 17. +""" + +from __future__ import annotations + +import logging +import threading +from typing import Dict, List + +import cv2 +import numpy as np +from PIL import Image + +from pyro_camera_api.camera.registry import CAMERA_REGISTRY, PATROL_FLAGS, PATROL_THREADS + +logger = logging.getLogger(__name__) + +CHECK_INTERVAL = 30 * 60.0 # seconds between checks +INITIAL_DELAY = 3 * 60.0 # delay before the first check, lets patrol populate last_images +STUCK_MAX_HAMMING = 10 # max pairwise distance below which we suspect stuck +CONSECUTIVE_HITS_BEFORE_REBOOT = 2 +MIN_POSES_FOR_CHECK = 2 +# Fog / low-light scenes collapse to near-uniform gray and produce unstable pHashes. +# Skip the check when the mean per-image gray variance falls below this threshold. +# Calibrated on sun_test data: foggy rounds mean variance <= 332, stuck rounds mean >= 854. +MIN_MEAN_VARIANCE_FOR_CHECK = 500.0 + +CONSECUTIVE_HITS: Dict[str, int] = {} + + +def _phash(img: Image.Image, hash_size: int = 8, highfreq_factor: int = 4) -> np.ndarray: + """Classic pHash: downscale to grayscale, DCT, threshold low-freq block on its median.""" + img_size = hash_size * highfreq_factor + gray = img.convert("L").resize((img_size, img_size), Image.Resampling.LANCZOS) + arr = np.asarray(gray, dtype=np.float32) + dct = cv2.dct(arr) + low = dct[:hash_size, :hash_size] + med = np.median(low[1:, 1:]) + return (low > med).flatten() + + +def _hamming(a: np.ndarray, b: np.ndarray) -> int: + return int(np.count_nonzero(a != b)) + + +def _mean_gray_variance(images: List[Image.Image]) -> float: + return float(np.mean([np.asarray(im.convert("L"), dtype=np.float32).var() for im in images])) + + +def _max_pairwise_hamming(images: List[Image.Image]) -> int: + hashes = [_phash(im) for im in images] + n = len(hashes) + return max(_hamming(hashes[i], hashes[j]) for i in range(n) for j in range(i + 1, n)) + + +def _patrol_is_running(camera_ip: str) -> bool: + thr = PATROL_THREADS.get(camera_ip) + flag = PATROL_FLAGS.get(camera_ip) + return bool(thr and thr.is_alive() and flag and not flag.is_set()) + + +def stuck_check_loop(camera_ip: str, stop_flag: threading.Event) -> None: + cam = CAMERA_REGISTRY[camera_ip] + + if not hasattr(cam, "reboot_camera"): + logger.info("[%s] Stuck detector disabled: adapter does not support reboot", camera_ip) + return + + logger.info( + "[%s] Stuck detector started (initial=%ds, interval=%ds, threshold=%d, consecutive=%d)", + camera_ip, + int(INITIAL_DELAY), + int(CHECK_INTERVAL), + STUCK_MAX_HAMMING, + CONSECUTIVE_HITS_BEFORE_REBOOT, + ) + + CONSECUTIVE_HITS[camera_ip] = 0 + next_delay = INITIAL_DELAY + + while not stop_flag.wait(next_delay): + next_delay = CHECK_INTERVAL + if not _patrol_is_running(camera_ip): + logger.info("[%s] Stuck check skipped: patrol not running", camera_ip) + CONSECUTIVE_HITS[camera_ip] = 0 + continue + + images = [im for pose, im in cam.last_images.items() if pose != -1 and im is not None] + if len(images) < MIN_POSES_FOR_CHECK: + logger.info( + "[%s] Stuck check skipped: only %d pose images available", + camera_ip, + len(images), + ) + continue + + try: + mean_var = _mean_gray_variance(images) + except Exception as exc: + logger.warning("[%s] Stuck check failed computing variance: %s", camera_ip, exc) + continue + + if mean_var < MIN_MEAN_VARIANCE_FOR_CHECK: + logger.info( + "[%s] Stuck check skipped: low-variance scene (mean=%.0f < %d, likely fog/night)", + camera_ip, + mean_var, + int(MIN_MEAN_VARIANCE_FOR_CHECK), + ) + CONSECUTIVE_HITS[camera_ip] = 0 + continue + + try: + max_dist = _max_pairwise_hamming(images) + except Exception as exc: + logger.warning("[%s] Stuck check failed: %s", camera_ip, exc) + continue + + logger.info( + "[%s] Stuck check: max pHash distance=%d across %d poses (threshold=%d)", + camera_ip, + max_dist, + len(images), + STUCK_MAX_HAMMING, + ) + + if max_dist < STUCK_MAX_HAMMING: + CONSECUTIVE_HITS[camera_ip] += 1 + logger.warning( + "[%s] Possible stuck PTZ: max pHash distance=%d across %d poses (hit %d/%d)", + camera_ip, + max_dist, + len(images), + CONSECUTIVE_HITS[camera_ip], + CONSECUTIVE_HITS_BEFORE_REBOOT, + ) + if CONSECUTIVE_HITS[camera_ip] >= CONSECUTIVE_HITS_BEFORE_REBOOT: + logger.error( + "[%s] Rebooting camera due to stuck PTZ detection (max distance=%d)", + camera_ip, + max_dist, + ) + ok = False + try: + ok = bool(cam.reboot_camera()) + except Exception as exc: + logger.error("[%s] Reboot raised: %s", camera_ip, exc) + + if ok: + cam.last_images.clear() + CONSECUTIVE_HITS[camera_ip] = 0 + else: + logger.error( + "[%s] Reboot command rejected by camera, will retry on next check", + camera_ip, + ) + # keep the hit counter so we retry, use fast-confirm cadence + next_delay = INITIAL_DELAY + else: + # confirm the hit quickly rather than waiting a full interval + next_delay = INITIAL_DELAY + else: + if CONSECUTIVE_HITS[camera_ip] > 0: + logger.info( + "[%s] Stuck detector cleared: max distance=%d", + camera_ip, + max_dist, + ) + CONSECUTIVE_HITS[camera_ip] = 0 + + logger.info("[%s] Stuck detector exited cleanly", camera_ip) diff --git a/pyro_camera_api/pyro_camera_api/main.py b/pyro_camera_api/pyro_camera_api/main.py index 2bc46d99..d1e6f8a5 100644 --- a/pyro_camera_api/pyro_camera_api/main.py +++ b/pyro_camera_api/pyro_camera_api/main.py @@ -7,6 +7,7 @@ from __future__ import annotations import logging +import os import threading from contextlib import asynccontextmanager @@ -20,7 +21,14 @@ from pyro_camera_api.api.routes_patrol import router as patrol_router from pyro_camera_api.api.routes_stream import router as stream_router from pyro_camera_api.camera.patrol import patrol_loop, static_loop -from pyro_camera_api.camera.registry import CAMERA_REGISTRY, PATROL_FLAGS, PATROL_THREADS +from pyro_camera_api.camera.registry import ( + CAMERA_REGISTRY, + PATROL_FLAGS, + PATROL_THREADS, + STUCK_CHECK_FLAGS, + STUCK_CHECK_THREADS, +) +from pyro_camera_api.camera.stuck_detector import stuck_check_loop from pyro_camera_api.core.logging import setup_logging from pyro_camera_api.services.anonymizer_rtsp import AnonymizerWorker, BoxStore, LastFrameStore from pyro_camera_api.services.stream import set_app_for_stream, stop_stream_if_idle @@ -66,6 +74,22 @@ async def lifespan(app: FastAPI): PATROL_FLAGS[cam_id] = stop_flag thread.start() + stuck_detector_enabled = os.getenv("ENABLE_STUCK_DETECTOR", "true").strip().lower() in ( + "1", + "true", + "yes", + "on", + ) + if stuck_detector_enabled and getattr(cam, "cam_type", "static") == "ptz" and hasattr(cam, "reboot_camera"): + stuck_flag = threading.Event() + stuck_thread = threading.Thread(target=stuck_check_loop, args=(cam_id, stuck_flag), daemon=True) + STUCK_CHECK_THREADS[cam_id] = stuck_thread + STUCK_CHECK_FLAGS[cam_id] = stuck_flag + stuck_thread.start() + logger.info("Starting stuck detector for PTZ camera %s", cam_id) + elif not stuck_detector_enabled and getattr(cam, "cam_type", "static") == "ptz": + logger.info("Stuck detector disabled by ENABLE_STUCK_DETECTOR for %s", cam_id) + threading.Thread(target=stop_stream_if_idle, daemon=True).start() try: @@ -75,6 +99,10 @@ async def lifespan(app: FastAPI): logger.info("Stopping loop for camera %s", cam_id) flag.set() + for cam_id, flag in STUCK_CHECK_FLAGS.items(): + logger.info("Stopping stuck detector for camera %s", cam_id) + flag.set() + try: workers = getattr(app.state, "stream_workers", {}) for cam_id, p in list(workers.items()):