diff --git a/pyproject.toml b/pyproject.toml index 16699144..91a4b6b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -141,12 +141,13 @@ known-third-party = ["pillow", "tqdm", "onnxruntime", "huggingface_hub"] [tool.ruff.lint.per-file-ignores] "**/__init__.py" = ["I001", "F401", "CPY001"] "src/**.py" = ["D", "T201", "S101", "ANN", "BLE001", "S106", "S113", "S501"] +"scripts/**.py" = ["D", "T201", "S101", "ANN", "BLE001", "S106", "S113", "S501"] ".github/**.py" = ["D", "T201", "ANN", "S", "PYI024"] "tests/**.py" = ["D103", "CPY001", "S101", "T201", "ANN001", "ANN201", "ANN202", "ARG001", "S113"] "pyro_camera_api/**.py" = ["D", "T201", "S101", "ANN", "BLE001", "S113", "S501", "S404", "S603", "S405", "S314", "E402", "RUF029"] "pyroengine/core.py" = ["BLE001"] "pyroengine/sensors.py" = ["S113", "S501", "ANN"] -"watchdog/**.py" = ["CPY001", "S108", "S310", "S404", "S603", "S607", "BLE001", "LOG015"] +"watchdog/**.py" = ["CPY001", "S108", "S310", "S404", "S603", "S607", "BLE001", "LOG015", "T201", "FURB105"] [tool.ruff.format] quote-style = "double" diff --git a/scripts/setup_patrol_presets.py b/scripts/setup_patrol_presets.py new file mode 100644 index 00000000..627a1c04 --- /dev/null +++ b/scripts/setup_patrol_presets.py @@ -0,0 +1,70 @@ +# Copyright (C) 2022-2026, Pyronear. + +# This program is licensed under the Apache License 2.0. +# See LICENSE or go to for full license details. + +""" +Setup patrol presets on a PTZ camera via the pyro_camera_api client. + +Sequence: + 1. Go to pose 10 + 2. Move left 70° -> save as preset 0 + 3. Move right 45° -> save as preset 1 + 4. Move right 45° -> save as preset 2 + 5. Move right 45° -> save as preset 3 +""" + +from __future__ import annotations + +import argparse +import time + +from pyro_camera_api_client import PyroCameraAPIClient + +START_POSE = 10 +LEFT_DEG = 70.0 +STEP_DEG = 45.0 + + +def wait(seconds: float) -> None: + time.sleep(seconds) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Setup patrol presets on a PTZ camera.") + parser.add_argument("--cam-ip", required=True, help="Camera IP, e.g. 192.168.1.11") + parser.add_argument("--login", default="admin", help="Camera login (handled server-side, kept for reference)") + parser.add_argument("--pwd", default="@Pyronear", help="Camera password (handled server-side, kept for reference)") + parser.add_argument("--base-url", default="http://192.168.1.99:8080", help="Camera API base URL") + parser.add_argument("--settle", type=float, default=4.0, help="Seconds to wait after each move before saving") + args = parser.parse_args() + + # Note: --login and --pwd are accepted for parity with operator notes, + # but the camera API service uses its own server-side credentials. + _ = (args.login, args.pwd) + + client = PyroCameraAPIClient(base_url=args.base_url, timeout=30.0) + + print(f"[1/5] Moving to pose {START_POSE}") + client.move_camera(camera_ip=args.cam_ip, pose_id=START_POSE) + wait(args.settle + 4.0) + + print(f"[2/5] Moving left {LEFT_DEG}°, saving as preset 0") + client.move_camera(camera_ip=args.cam_ip, direction="Left", degrees=LEFT_DEG) + wait(args.settle) + client.set_preset(camera_ip=args.cam_ip, idx=0) + print(" preset 0 saved") + + for preset_id in (1, 2, 3): + print(f"[{preset_id + 2}/5] Moving right {STEP_DEG}°, saving as preset {preset_id}") + client.move_camera(camera_ip=args.cam_ip, direction="Right", degrees=STEP_DEG) + wait(args.settle) + client.set_preset(camera_ip=args.cam_ip, idx=preset_id) + print(f" preset {preset_id} saved") + + print("Done. Listing presets:") + print(client.list_presets(camera_ip=args.cam_ip)) + + +if __name__ == "__main__": + main() diff --git a/watchdog/main_pi/relay_check.py b/watchdog/main_pi/relay_check.py new file mode 100644 index 00000000..e18d7eaa --- /dev/null +++ b/watchdog/main_pi/relay_check.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +""" +End-to-end relay test for the main Pi watchdog setup. + +For each relay, this script: + 1) pings the controlled device to confirm it is reachable, + 2) drives the GPIO LOW for POWER_OFF_TIME seconds (cut power), + 3) verifies the device drops during the cut, + 4) drives the GPIO back HIGH (restore power) and waits for the device to come back. + +Install dependencies (once per Pi): + sudo apt install python3-rpi-lgpio + +Run on the main Pi: + python3 /home/pi/pyro-engine/watchdog/main_pi/relay_check.py + +Exits non-zero if any relay fails the verification. +""" + +import subprocess +import sys +import time +from pathlib import Path + +import RPi.GPIO as GPIO + +# ================= CONFIG ================= + +RELAY_PIZERO = 16 + +_ENV_FILE = Path("/home/pi/watchdog.env") + + +def _load_env(path: Path) -> dict: + env: dict = {} + try: + for line in path.read_text().splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + key, _, value = line.partition("=") + env[key.strip()] = value.strip() + except FileNotFoundError: + pass + return env + + +_env = _load_env(_ENV_FILE) + +PIZERO_IP: str = _env.get("PIZERO_IP", "192.168.1.98") + +PING_COUNT = 1 +PING_TIMEOUT = 2 +POWER_OFF_TIME = 15 + +RETURN_TIMEOUT = 90 +RETURN_POLL = 3 + + +# ================ HELPERS ================= + + +def ping(ip: str) -> bool: + try: + subprocess.check_output( + ["ping", "-c", str(PING_COUNT), "-W", str(PING_TIMEOUT), ip], + stderr=subprocess.DEVNULL, + ) + return True + except subprocess.CalledProcessError: + return False + + +def wait_for_drop(ip: str, deadline_s: int) -> bool: + # require two consecutive failed pings to debounce transient packet loss + end = time.time() + deadline_s + while time.time() < end: + if not ping(ip) and not ping(ip): + return True + time.sleep(1) + return False + + +def wait_for_return(ip: str, deadline_s: int, poll_s: int) -> bool: + end = time.time() + deadline_s + while time.time() < end: + if ping(ip): + return True + time.sleep(poll_s) + return False + + +def test_relay(name: str, gpio_pin: int, target_ip: str) -> bool: + print(f"\n=== Testing relay '{name}' on GPIO {gpio_pin} (target {target_ip}) ===", flush=True) + + print(f"[BEFORE] ping {target_ip} ...", flush=True) + if not ping(target_ip): + print(f"[BEFORE] FAIL: {target_ip} unreachable before cut, aborting test", flush=True) + return False + print(f"[BEFORE] OK: {target_ip} reachable", flush=True) + + print(f"[CUT] GPIO {gpio_pin} -> LOW for {POWER_OFF_TIME}s", flush=True) + GPIO.output(gpio_pin, GPIO.LOW) + cut_start = time.time() + try: + dropped = wait_for_drop(target_ip, deadline_s=POWER_OFF_TIME) + if not dropped: + print( + f"[CUT] FAIL: {target_ip} still reachable after {POWER_OFF_TIME}s -- relay or wiring issue", + flush=True, + ) + return False + print(f"[CUT] OK: {target_ip} dropped after {time.time() - cut_start:.1f}s", flush=True) + # hold the cut for the remainder of POWER_OFF_TIME to ensure a real power cycle + remaining = POWER_OFF_TIME - (time.time() - cut_start) + if remaining > 0: + time.sleep(remaining) + finally: + print(f"[RESTORE] GPIO {gpio_pin} -> HIGH", flush=True) + GPIO.output(gpio_pin, GPIO.HIGH) + + print(f"[AFTER] waiting up to {RETURN_TIMEOUT}s for {target_ip} to return ...", flush=True) + if not wait_for_return(target_ip, deadline_s=RETURN_TIMEOUT, poll_s=RETURN_POLL): + print(f"[AFTER] FAIL: {target_ip} did not come back within {RETURN_TIMEOUT}s", flush=True) + return False + print(f"[AFTER] OK: {target_ip} reachable again", flush=True) + return True + + +# ================== MAIN ================== + + +def main() -> int: + GPIO.setmode(GPIO.BCM) + GPIO.setwarnings(False) + GPIO.setup(RELAY_PIZERO, GPIO.OUT, initial=GPIO.HIGH) + + try: + ok = test_relay("pizero", RELAY_PIZERO, PIZERO_IP) + finally: + GPIO.output(RELAY_PIZERO, GPIO.HIGH) + GPIO.cleanup() + + print("", flush=True) + if not ok: + print("FAILED: pizero", flush=True) + return 1 + print("ALL RELAYS OK", flush=True) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/watchdog/main_pi/watchdog.py b/watchdog/main_pi/watchdog.py index 1cca9493..6180d73d 100644 --- a/watchdog/main_pi/watchdog.py +++ b/watchdog/main_pi/watchdog.py @@ -3,6 +3,9 @@ Watchdog for the main Pi that pings the Pi Zero and power-cycles its relay after repeated failures, with cooldown and daily limits. +Install dependencies (once per Pi): + sudo apt install python3-rpi-lgpio + Cron setup (every 10 minutes at :05): 1) Edit crontab: crontab -e 2) Add the line: diff --git a/watchdog/pi_zero/relay_check.py b/watchdog/pi_zero/relay_check.py new file mode 100644 index 00000000..79f1c828 --- /dev/null +++ b/watchdog/pi_zero/relay_check.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python3 +""" +End-to-end relay test for the Pi Zero watchdog setup. + +For each selected relay, this script: + 1) pings the controlled device to confirm it is reachable, + 2) drives the GPIO LOW for POWER_OFF_TIME seconds (cut power), + 3) verifies the device drops during the cut, + 4) drives the GPIO back HIGH (restore power) and waits for the device to come back. + +Install dependencies (once per Pi): + sudo apt install python3-rpi-lgpio + +Run on the Pi Zero: + python3 /home/pi/pyro-engine/watchdog/pi_zero/relay_check.py + python3 /home/pi/pyro-engine/watchdog/pi_zero/relay_check.py --relay main + python3 /home/pi/pyro-engine/watchdog/pi_zero/relay_check.py --relay cams + +Run from the main Pi (when you cannot reach the Pi Zero directly): + Testing the 'main' relay cuts power to the main Pi, which kills the SSH + chain to the Pi Zero, so the test must be detached on the Pi Zero. From + the main Pi: + + ssh pi@ \ + 'nohup python3 /home/pi/pyro-engine/watchdog/pi_zero/relay_check.py \ + > /tmp/relay_check.log 2>&1 < /dev/null & disown; exit' + + Wait ~3-4 minutes (15s cut + 90s return per relay), then reconnect and: + ssh pi@ 'cat /tmp/relay_check.log' + +The cams test pings only the first camera in CAM_IPS as a proxy for the 12V rail. +Exits non-zero if any relay fails the verification. +""" + +import argparse +import subprocess +import sys +import time +from pathlib import Path + +import RPi.GPIO as GPIO + +# ================= CONFIG ================= + +RELAY_MAIN = 16 +RELAY_CAMS = 26 + +_ENV_FILE = Path("/home/pi/watchdog.env") + + +def _load_env(path: Path) -> dict: + env: dict = {} + try: + for line in path.read_text().splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + key, _, value = line.partition("=") + env[key.strip()] = value.strip() + except FileNotFoundError: + pass + return env + + +_env = _load_env(_ENV_FILE) + +MAIN_PI_IP: str = _env.get("MAIN_PI_IP", "192.168.1.99") + +_cam_ips_raw = _env.get("CAM_IPS", "") +CAM_IPS: list[str] = [ip.strip() for ip in _cam_ips_raw.split(",") if ip.strip()] or [ + "192.168.1.11", + "192.168.1.12", +] + +PING_COUNT = 1 +PING_TIMEOUT = 2 +POWER_OFF_TIME = 15 + +RETURN_TIMEOUT = 90 +RETURN_POLL = 3 + + +# ================ HELPERS ================= + + +def ping(ip: str) -> bool: + try: + subprocess.check_output( + ["ping", "-c", str(PING_COUNT), "-W", str(PING_TIMEOUT), ip], + stderr=subprocess.DEVNULL, + ) + return True + except subprocess.CalledProcessError: + return False + + +def wait_for_drop(ip: str, deadline_s: int) -> bool: + # require two consecutive failed pings to debounce transient packet loss + end = time.time() + deadline_s + while time.time() < end: + if not ping(ip) and not ping(ip): + return True + time.sleep(1) + return False + + +def wait_for_return(ip: str, deadline_s: int, poll_s: int) -> bool: + end = time.time() + deadline_s + while time.time() < end: + if ping(ip): + return True + time.sleep(poll_s) + return False + + +def test_relay(name: str, gpio_pin: int, target_ip: str) -> bool: + print(f"\n=== Testing relay '{name}' on GPIO {gpio_pin} (target {target_ip}) ===", flush=True) + + print(f"[BEFORE] ping {target_ip} ...", flush=True) + if not ping(target_ip): + print(f"[BEFORE] FAIL: {target_ip} unreachable before cut, aborting test", flush=True) + return False + print(f"[BEFORE] OK: {target_ip} reachable", flush=True) + + print(f"[CUT] GPIO {gpio_pin} -> LOW for {POWER_OFF_TIME}s", flush=True) + GPIO.output(gpio_pin, GPIO.LOW) + cut_start = time.time() + try: + dropped = wait_for_drop(target_ip, deadline_s=POWER_OFF_TIME) + if not dropped: + print( + f"[CUT] FAIL: {target_ip} still reachable after {POWER_OFF_TIME}s -- relay or wiring issue", + flush=True, + ) + return False + print(f"[CUT] OK: {target_ip} dropped after {time.time() - cut_start:.1f}s", flush=True) + # hold the cut for the remainder of POWER_OFF_TIME to ensure a real power cycle + remaining = POWER_OFF_TIME - (time.time() - cut_start) + if remaining > 0: + time.sleep(remaining) + finally: + print(f"[RESTORE] GPIO {gpio_pin} -> HIGH", flush=True) + GPIO.output(gpio_pin, GPIO.HIGH) + + print(f"[AFTER] waiting up to {RETURN_TIMEOUT}s for {target_ip} to return ...", flush=True) + if not wait_for_return(target_ip, deadline_s=RETURN_TIMEOUT, poll_s=RETURN_POLL): + print(f"[AFTER] FAIL: {target_ip} did not come back within {RETURN_TIMEOUT}s", flush=True) + return False + print(f"[AFTER] OK: {target_ip} reachable again", flush=True) + return True + + +# ================== MAIN ================== + + +RELAYS = { + "main": (RELAY_MAIN, lambda: MAIN_PI_IP), + "cams": (RELAY_CAMS, lambda: CAM_IPS[0]), +} + + +def main() -> int: + parser = argparse.ArgumentParser(description="End-to-end relay test for Pi Zero") + parser.add_argument( + "--relay", + choices=[*RELAYS.keys(), "all"], + default="all", + help="Relay to test (default: all)", + ) + args = parser.parse_args() + + selected = list(RELAYS.keys()) if args.relay == "all" else [args.relay] + + GPIO.setmode(GPIO.BCM) + GPIO.setwarnings(False) + for name in selected: + pin, _ = RELAYS[name] + GPIO.setup(pin, GPIO.OUT, initial=GPIO.HIGH) + + failures: list[str] = [] + try: + for name in selected: + pin, ip_getter = RELAYS[name] + if not test_relay(name, pin, ip_getter()): + failures.append(name) + finally: + for name in selected: + pin, _ = RELAYS[name] + GPIO.output(pin, GPIO.HIGH) + GPIO.cleanup() + + print("", flush=True) + if failures: + print(f"FAILED: {', '.join(failures)}", flush=True) + return 1 + print("ALL RELAYS OK", flush=True) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/watchdog/pi_zero/watchdog.py b/watchdog/pi_zero/watchdog.py index 68984d5f..faadfac4 100644 --- a/watchdog/pi_zero/watchdog.py +++ b/watchdog/pi_zero/watchdog.py @@ -5,6 +5,9 @@ It checks the main Pi health endpoint and pings camera IPs, tracking failures and power-cycling relays after repeated failures with cooldown/daily limits. +Install dependencies (once per Pi): + sudo apt install python3-rpi-lgpio + Cron setup (every 10 minutes): 1) Edit crontab: crontab -e 2) Add the line: