Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ MEDIAMTX_SERVER_IP=1.2.3.4
# Docker image tag for both services (defaults to "latest" if unset)
# Set to a specific version for pinned deployments, e.g. PYRO_ENGINE_VERSION=1.2.0
PYRO_ENGINE_VERSION=latest

# Stuck-PTZ detector (auto-reboots a PTZ camera that stops rotating during patrol).
# Enabled by default. Set to "false" to disable.
ENABLE_STUCK_DETECTOR=true
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
CAM_USER: ${CAM_USER}
CAM_PWD: ${CAM_PWD}
MEDIAMTX_SERVER_IP: ${MEDIAMTX_SERVER_IP}
ENABLE_STUCK_DETECTOR: ${ENABLE_STUCK_DETECTOR:-true}
volumes:
- ./data:/usr/src/app/data
restart: always
Expand Down
8 changes: 8 additions & 0 deletions pyro_camera_api/client/pyro_camera_api_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@
resp = self._request("POST", f"/control/zoom/{camera_ip}/{level}")
return resp.json()

def reboot_camera(self, camera_ip: str) -> Dict[str, Any]:
"""

Check notice on line 213 in pyro_camera_api/client/pyro_camera_api_client/client.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

pyro_camera_api/client/pyro_camera_api_client/client.py#L213

1 blank line required between summary line and description (found 0) (D205)
Reboot a camera. Supported for Reolink and Linovision adapters.
Raises HTTP 501 for adapters that do not implement reboot.
"""
resp = self._request("POST", f"/control/reboot/{camera_ip}")
return resp.json()

# ------------------------------------------------------------------
# Focus
# ------------------------------------------------------------------
Expand Down
5 changes: 3 additions & 2 deletions pyro_camera_api/pyro_camera_api/api/routes_cameras.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,10 @@ def get_latest_image(
if cam is None:
raise HTTPException(status_code=404, detail="Unknown camera")

if pose not in cam.last_images or cam.last_images[pose] is None:
image = cam.last_images.get(pose)
if image is None:
return Response(status_code=status.HTTP_204_NO_CONTENT)

buffer = BytesIO()
cam.last_images[pose].save(buffer, format="JPEG", quality=quality)
image.save(buffer, format="JPEG", quality=quality)
return Response(buffer.getvalue(), media_type="image/jpeg")
34 changes: 34 additions & 0 deletions pyro_camera_api/pyro_camera_api/api/routes_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,40 @@ def set_preset(camera_ip: str, idx: Optional[int] = None):
return {"status": "preset_set", "camera_ip": camera_ip, "id": idx}


@router.post("/reboot/{camera_ip}")
def reboot_camera(camera_ip: str):
"""
Reboot a camera.

Supported for adapters that expose a reboot_camera method (Reolink,
Linovision). Used to recover cameras that occasionally get stuck
(e.g. PTZ stops responding during patrol). Returns 501 for adapters
that do not implement reboot.
"""
cam = CAMERA_REGISTRY.get(camera_ip)
if cam is None:
raise HTTPException(status_code=404, detail=f"Camera with IP '{camera_ip}' not found")

if not hasattr(cam, "reboot_camera"):
raise HTTPException(
status_code=501,
detail="Reboot is not implemented for this camera adapter",
)

try:
logger.warning("[%s] Rebooting camera", camera_ip)
ok = cam.reboot_camera()
except Exception as exc:
logger.error("[%s] Failed to reboot camera: %s", camera_ip, exc)
raise HTTPException(status_code=500, detail=str(exc))

if not ok:
logger.error("[%s] Camera rejected reboot command", camera_ip)
raise HTTPException(status_code=502, detail="Camera rejected reboot command")

return {"status": "rebooting", "camera_ip": camera_ip}


@router.post("/zoom/{camera_ip}/{level}")
def zoom_camera(camera_ip: str, level: int):
"""
Expand Down
10 changes: 8 additions & 2 deletions pyro_camera_api/pyro_camera_api/camera/adapters/reolink.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,17 @@ def set_ptz_preset(self, idx: Optional[int] = None):
response = requests.post(url, json=data, verify=False) # nosec: B501
self._handle_response(response, f"Preset {name} set successfully.")

def reboot_camera(self):
def reboot_camera(self) -> bool:
url = self._build_url("Reboot")
data = [{"cmd": "Reboot"}]
response = requests.post(url, json=data, verify=False) # nosec: B501
return self._handle_response(response, "Camera reboot initiated successfully.")
response_data = self._handle_response(response, "Camera reboot initiated successfully.")
if not response_data:
return False
try:
return response_data[0]["code"] == 0
except (KeyError, IndexError, TypeError):
return False

def get_auto_focus(self):
url = self._build_url("GetAutoFocus")
Expand Down
4 changes: 4 additions & 0 deletions pyro_camera_api/pyro_camera_api/camera/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
PATROL_THREADS: Dict[str, threading.Thread] = {}
PATROL_FLAGS: Dict[str, threading.Event] = {}

# Stuck-detector threading state, later managed in camera.stuck_detector
STUCK_CHECK_THREADS: Dict[str, threading.Thread] = {}
STUCK_CHECK_FLAGS: Dict[str, threading.Event] = {}


def build_camera_object(key: str, conf: dict) -> Optional[BaseCamera]:
"""
Expand Down
187 changes: 187 additions & 0 deletions pyro_camera_api/pyro_camera_api/camera/stuck_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright (C) 2022-2026, Pyronear.

# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.


"""

Check notice on line 7 in pyro_camera_api/pyro_camera_api/camera/stuck_detector.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

pyro_camera_api/pyro_camera_api/camera/stuck_detector.py#L7

Multi-line docstring summary should start at the first line (D212)
PTZ stuck-camera detector.

Every CHECK_INTERVAL seconds, compute pairwise pHash distances across the
most recent per-pose images produced by the patrol loop. A turret that has
frozen returns near-identical frames for all poses, giving a very small
maximum pairwise distance. After CONSECUTIVE_HITS_BEFORE_REBOOT consecutive
low-distance checks, reboot the camera.

Thresholds were calibrated on real sdis-77 captures: stuck-episode max
pairwise Hamming <= 6, working-patrol min pairwise Hamming >= 17.
"""

from __future__ import annotations

import logging
import threading
from typing import Dict, List

import cv2
import numpy as np
from PIL import Image

from pyro_camera_api.camera.registry import CAMERA_REGISTRY, PATROL_FLAGS, PATROL_THREADS

logger = logging.getLogger(__name__)

CHECK_INTERVAL = 30 * 60.0 # seconds between checks
INITIAL_DELAY = 3 * 60.0 # delay before the first check, lets patrol populate last_images
STUCK_MAX_HAMMING = 10 # max pairwise distance below which we suspect stuck
CONSECUTIVE_HITS_BEFORE_REBOOT = 2
MIN_POSES_FOR_CHECK = 2
# Fog / low-light scenes collapse to near-uniform gray and produce unstable pHashes.
# Skip the check when the mean per-image gray variance falls below this threshold.
# Calibrated on sun_test data: foggy rounds mean variance <= 332, stuck rounds mean >= 854.
MIN_MEAN_VARIANCE_FOR_CHECK = 500.0

CONSECUTIVE_HITS: Dict[str, int] = {}


def _phash(img: Image.Image, hash_size: int = 8, highfreq_factor: int = 4) -> np.ndarray:
"""Classic pHash: downscale to grayscale, DCT, threshold low-freq block on its median."""
img_size = hash_size * highfreq_factor
gray = img.convert("L").resize((img_size, img_size), Image.Resampling.LANCZOS)
arr = np.asarray(gray, dtype=np.float32)
dct = cv2.dct(arr)
low = dct[:hash_size, :hash_size]
med = np.median(low[1:, 1:])
return (low > med).flatten()


def _hamming(a: np.ndarray, b: np.ndarray) -> int:
return int(np.count_nonzero(a != b))


def _mean_gray_variance(images: List[Image.Image]) -> float:
return float(np.mean([np.asarray(im.convert("L"), dtype=np.float32).var() for im in images]))


def _max_pairwise_hamming(images: List[Image.Image]) -> int:
hashes = [_phash(im) for im in images]
n = len(hashes)
return max(_hamming(hashes[i], hashes[j]) for i in range(n) for j in range(i + 1, n))


def _patrol_is_running(camera_ip: str) -> bool:
thr = PATROL_THREADS.get(camera_ip)
flag = PATROL_FLAGS.get(camera_ip)
return bool(thr and thr.is_alive() and flag and not flag.is_set())


def stuck_check_loop(camera_ip: str, stop_flag: threading.Event) -> None:

Check warning on line 78 in pyro_camera_api/pyro_camera_api/camera/stuck_detector.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

pyro_camera_api/pyro_camera_api/camera/stuck_detector.py#L78

stuck_check_loop is too complex (16) (MC0001)
cam = CAMERA_REGISTRY[camera_ip]

if not hasattr(cam, "reboot_camera"):
logger.info("[%s] Stuck detector disabled: adapter does not support reboot", camera_ip)
return

logger.info(
"[%s] Stuck detector started (initial=%ds, interval=%ds, threshold=%d, consecutive=%d)",
camera_ip,
int(INITIAL_DELAY),
int(CHECK_INTERVAL),
STUCK_MAX_HAMMING,
CONSECUTIVE_HITS_BEFORE_REBOOT,
)

CONSECUTIVE_HITS[camera_ip] = 0
next_delay = INITIAL_DELAY

while not stop_flag.wait(next_delay):
next_delay = CHECK_INTERVAL
if not _patrol_is_running(camera_ip):
logger.info("[%s] Stuck check skipped: patrol not running", camera_ip)
CONSECUTIVE_HITS[camera_ip] = 0
continue

images = [im for pose, im in cam.last_images.items() if pose != -1 and im is not None]
if len(images) < MIN_POSES_FOR_CHECK:
logger.info(
"[%s] Stuck check skipped: only %d pose images available",
camera_ip,
len(images),
)
continue

try:
mean_var = _mean_gray_variance(images)
except Exception as exc:
logger.warning("[%s] Stuck check failed computing variance: %s", camera_ip, exc)
continue

if mean_var < MIN_MEAN_VARIANCE_FOR_CHECK:
logger.info(
"[%s] Stuck check skipped: low-variance scene (mean=%.0f < %d, likely fog/night)",
camera_ip,
mean_var,
int(MIN_MEAN_VARIANCE_FOR_CHECK),
)
CONSECUTIVE_HITS[camera_ip] = 0
continue

try:
max_dist = _max_pairwise_hamming(images)
except Exception as exc:
logger.warning("[%s] Stuck check failed: %s", camera_ip, exc)
continue

logger.info(
"[%s] Stuck check: max pHash distance=%d across %d poses (threshold=%d)",
camera_ip,
max_dist,
len(images),
STUCK_MAX_HAMMING,
)

if max_dist < STUCK_MAX_HAMMING:
CONSECUTIVE_HITS[camera_ip] += 1
logger.warning(
"[%s] Possible stuck PTZ: max pHash distance=%d across %d poses (hit %d/%d)",
camera_ip,
max_dist,
len(images),
CONSECUTIVE_HITS[camera_ip],
CONSECUTIVE_HITS_BEFORE_REBOOT,
)
if CONSECUTIVE_HITS[camera_ip] >= CONSECUTIVE_HITS_BEFORE_REBOOT:
logger.error(
"[%s] Rebooting camera due to stuck PTZ detection (max distance=%d)",
camera_ip,
max_dist,
)
ok = False
try:
ok = bool(cam.reboot_camera())
except Exception as exc:
logger.error("[%s] Reboot raised: %s", camera_ip, exc)

if ok:
cam.last_images.clear()
CONSECUTIVE_HITS[camera_ip] = 0
else:
logger.error(
"[%s] Reboot command rejected by camera, will retry on next check",
camera_ip,
)
# keep the hit counter so we retry, use fast-confirm cadence
next_delay = INITIAL_DELAY
else:
# confirm the hit quickly rather than waiting a full interval
next_delay = INITIAL_DELAY
else:
if CONSECUTIVE_HITS[camera_ip] > 0:
logger.info(
"[%s] Stuck detector cleared: max distance=%d",
camera_ip,
max_dist,
)
CONSECUTIVE_HITS[camera_ip] = 0

logger.info("[%s] Stuck detector exited cleanly", camera_ip)
30 changes: 29 additions & 1 deletion pyro_camera_api/pyro_camera_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from __future__ import annotations

import logging
import os
import threading
from contextlib import asynccontextmanager

Expand All @@ -20,7 +21,14 @@
from pyro_camera_api.api.routes_patrol import router as patrol_router
from pyro_camera_api.api.routes_stream import router as stream_router
from pyro_camera_api.camera.patrol import patrol_loop, static_loop
from pyro_camera_api.camera.registry import CAMERA_REGISTRY, PATROL_FLAGS, PATROL_THREADS
from pyro_camera_api.camera.registry import (
CAMERA_REGISTRY,
PATROL_FLAGS,
PATROL_THREADS,
STUCK_CHECK_FLAGS,
STUCK_CHECK_THREADS,
)
from pyro_camera_api.camera.stuck_detector import stuck_check_loop
from pyro_camera_api.core.logging import setup_logging
from pyro_camera_api.services.anonymizer_rtsp import AnonymizerWorker, BoxStore, LastFrameStore
from pyro_camera_api.services.stream import set_app_for_stream, stop_stream_if_idle
Expand Down Expand Up @@ -66,6 +74,22 @@ async def lifespan(app: FastAPI):
PATROL_FLAGS[cam_id] = stop_flag
thread.start()

stuck_detector_enabled = os.getenv("ENABLE_STUCK_DETECTOR", "true").strip().lower() in (
"1",
"true",
"yes",
"on",
)
if stuck_detector_enabled and getattr(cam, "cam_type", "static") == "ptz" and hasattr(cam, "reboot_camera"):
stuck_flag = threading.Event()
stuck_thread = threading.Thread(target=stuck_check_loop, args=(cam_id, stuck_flag), daemon=True)
STUCK_CHECK_THREADS[cam_id] = stuck_thread
STUCK_CHECK_FLAGS[cam_id] = stuck_flag
stuck_thread.start()
logger.info("Starting stuck detector for PTZ camera %s", cam_id)
elif not stuck_detector_enabled and getattr(cam, "cam_type", "static") == "ptz":
logger.info("Stuck detector disabled by ENABLE_STUCK_DETECTOR for %s", cam_id)

threading.Thread(target=stop_stream_if_idle, daemon=True).start()

try:
Expand All @@ -75,6 +99,10 @@ async def lifespan(app: FastAPI):
logger.info("Stopping loop for camera %s", cam_id)
flag.set()

for cam_id, flag in STUCK_CHECK_FLAGS.items():
logger.info("Stopping stuck detector for camera %s", cam_id)
flag.set()

try:
workers = getattr(app.state, "stream_workers", {})
for cam_id, p in list(workers.items()):
Expand Down
Loading