From ac3fdfb37e3d9277c0b1823d046cc237f7e62284 Mon Sep 17 00:00:00 2001 From: Mateo Date: Wed, 29 Apr 2026 12:20:56 +0200 Subject: [PATCH 01/10] feat: add end-to-end relay test for main Pi watchdog --- watchdog/main_pi/test_relays.py | 147 ++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 watchdog/main_pi/test_relays.py diff --git a/watchdog/main_pi/test_relays.py b/watchdog/main_pi/test_relays.py new file mode 100644 index 00000000..d487dc2c --- /dev/null +++ b/watchdog/main_pi/test_relays.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +""" +End-to-end relay test for the main Pi watchdog setup. + +For each relay, this script: + 1) pings the controlled device to confirm it is reachable, + 2) drives the GPIO LOW for POWER_OFF_TIME seconds (cut power), + 3) verifies the device drops during the cut, + 4) drives the GPIO back HIGH (restore power) and waits for the device to come back. + +Run on the main Pi: + python3 /home/pi/pyro-engine/watchdog/main_pi/test_relays.py + +Exits non-zero if any relay fails the verification. +""" + +import subprocess +import sys +import time +from pathlib import Path + +import RPi.GPIO as GPIO + +# ================= CONFIG ================= + +RELAY_PIZERO = 16 + +_ENV_FILE = Path("/home/pi/watchdog.env") + + +def _load_env(path: Path) -> dict: + env: dict = {} + try: + for line in path.read_text().splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + key, _, value = line.partition("=") + env[key.strip()] = value.strip() + except FileNotFoundError: + pass + return env + + +_env = _load_env(_ENV_FILE) + +PIZERO_IP: str = _env.get("PIZERO_IP", "192.168.1.98") + +PING_COUNT = 1 +PING_TIMEOUT = 2 +POWER_OFF_TIME = 15 + +DROP_GRACE = 5 +RETURN_TIMEOUT = 90 +RETURN_POLL = 3 + + +# ================ HELPERS ================= + + +def ping(ip: str) -> bool: + try: + subprocess.check_output( + ["ping", "-c", str(PING_COUNT), "-W", str(PING_TIMEOUT), ip], + stderr=subprocess.DEVNULL, + ) + return True + except subprocess.CalledProcessError: + return False + + +def wait_for_drop(ip: str, deadline_s: int) -> bool: + end = time.time() + deadline_s + while time.time() < end: + if not ping(ip): + return True + time.sleep(1) + return False + + +def wait_for_return(ip: str, deadline_s: int, poll_s: int) -> bool: + end = time.time() + deadline_s + while time.time() < end: + if ping(ip): + return True + time.sleep(poll_s) + return False + + +def test_relay(name: str, gpio_pin: int, target_ip: str) -> bool: + print(f"\n=== Testing relay '{name}' on GPIO {gpio_pin} (target {target_ip}) ===", flush=True) + + print(f"[BEFORE] ping {target_ip} ...", flush=True) + if not ping(target_ip): + print(f"[BEFORE] FAIL: {target_ip} unreachable before cut, aborting test", flush=True) + return False + print(f"[BEFORE] OK: {target_ip} reachable", flush=True) + + print(f"[CUT] GPIO {gpio_pin} -> LOW for {POWER_OFF_TIME}s", flush=True) + GPIO.output(gpio_pin, GPIO.LOW) + try: + dropped = wait_for_drop(target_ip, deadline_s=DROP_GRACE) + if not dropped: + print( + f"[CUT] FAIL: {target_ip} still reachable after {DROP_GRACE}s -- relay or wiring issue", + flush=True, + ) + time.sleep(max(0, POWER_OFF_TIME - DROP_GRACE)) + return False + print(f"[CUT] OK: {target_ip} dropped", flush=True) + time.sleep(max(0, POWER_OFF_TIME - DROP_GRACE)) + finally: + print(f"[RESTORE] GPIO {gpio_pin} -> HIGH", flush=True) + GPIO.output(gpio_pin, GPIO.HIGH) + + print(f"[AFTER] waiting up to {RETURN_TIMEOUT}s for {target_ip} to return ...", flush=True) + if not wait_for_return(target_ip, deadline_s=RETURN_TIMEOUT, poll_s=RETURN_POLL): + print(f"[AFTER] FAIL: {target_ip} did not come back within {RETURN_TIMEOUT}s", flush=True) + return False + print(f"[AFTER] OK: {target_ip} reachable again", flush=True) + return True + + +# ================== MAIN ================== + + +def main() -> int: + GPIO.setmode(GPIO.BCM) + GPIO.setwarnings(False) + GPIO.setup(RELAY_PIZERO, GPIO.OUT, initial=GPIO.HIGH) + + try: + ok = test_relay("pizero", RELAY_PIZERO, PIZERO_IP) + finally: + GPIO.output(RELAY_PIZERO, GPIO.HIGH) + GPIO.cleanup() + + print("", flush=True) + if not ok: + print("FAILED: pizero", flush=True) + return 1 + print("ALL RELAYS OK", flush=True) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) From 762c26eb418f66ee59f4a5ee00896a17a549a7fb Mon Sep 17 00:00:00 2001 From: Mateo Date: Wed, 29 Apr 2026 12:21:30 +0200 Subject: [PATCH 02/10] feat: add end-to-end relay test for Pi Zero watchdog --- watchdog/pi_zero/test_relays.py | 187 ++++++++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 watchdog/pi_zero/test_relays.py diff --git a/watchdog/pi_zero/test_relays.py b/watchdog/pi_zero/test_relays.py new file mode 100644 index 00000000..d766286f --- /dev/null +++ b/watchdog/pi_zero/test_relays.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +""" +End-to-end relay test for the Pi Zero watchdog setup. + +For each selected relay, this script: + 1) pings the controlled device to confirm it is reachable, + 2) drives the GPIO LOW for POWER_OFF_TIME seconds (cut power), + 3) verifies the device drops during the cut, + 4) drives the GPIO back HIGH (restore power) and waits for the device to come back. + +Run on the Pi Zero: + python3 /home/pi/pyro-engine/watchdog/pi_zero/test_relays.py + python3 /home/pi/pyro-engine/watchdog/pi_zero/test_relays.py --relay main + python3 /home/pi/pyro-engine/watchdog/pi_zero/test_relays.py --relay cams + +The cams test pings only the first camera in CAM_IPS as a proxy for the 12V rail. +Exits non-zero if any relay fails the verification. +""" + +import argparse +import subprocess +import sys +import time +from pathlib import Path + +import RPi.GPIO as GPIO + +# ================= CONFIG ================= + +RELAY_MAIN = 16 +RELAY_CAMS = 26 + +_ENV_FILE = Path("/home/pi/watchdog.env") + + +def _load_env(path: Path) -> dict: + env: dict = {} + try: + for line in path.read_text().splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + key, _, value = line.partition("=") + env[key.strip()] = value.strip() + except FileNotFoundError: + pass + return env + + +_env = _load_env(_ENV_FILE) + +MAIN_PI_IP: str = _env.get("MAIN_PI_IP", "192.168.1.99") + +_cam_ips_raw = _env.get("CAM_IPS", "") +CAM_IPS: list[str] = [ip.strip() for ip in _cam_ips_raw.split(",") if ip.strip()] or [ + "192.168.1.11", + "192.168.1.12", +] + +PING_COUNT = 1 +PING_TIMEOUT = 2 +POWER_OFF_TIME = 15 + +DROP_GRACE = 5 +RETURN_TIMEOUT = 90 +RETURN_POLL = 3 + + +# ================ HELPERS ================= + + +def ping(ip: str) -> bool: + try: + subprocess.check_output( + ["ping", "-c", str(PING_COUNT), "-W", str(PING_TIMEOUT), ip], + stderr=subprocess.DEVNULL, + ) + return True + except subprocess.CalledProcessError: + return False + + +def wait_for_drop(ip: str, deadline_s: int) -> bool: + end = time.time() + deadline_s + while time.time() < end: + if not ping(ip): + return True + time.sleep(1) + return False + + +def wait_for_return(ip: str, deadline_s: int, poll_s: int) -> bool: + end = time.time() + deadline_s + while time.time() < end: + if ping(ip): + return True + time.sleep(poll_s) + return False + + +def test_relay(name: str, gpio_pin: int, target_ip: str) -> bool: + print(f"\n=== Testing relay '{name}' on GPIO {gpio_pin} (target {target_ip}) ===", flush=True) + + print(f"[BEFORE] ping {target_ip} ...", flush=True) + if not ping(target_ip): + print(f"[BEFORE] FAIL: {target_ip} unreachable before cut, aborting test", flush=True) + return False + print(f"[BEFORE] OK: {target_ip} reachable", flush=True) + + print(f"[CUT] GPIO {gpio_pin} -> LOW for {POWER_OFF_TIME}s", flush=True) + GPIO.output(gpio_pin, GPIO.LOW) + try: + dropped = wait_for_drop(target_ip, deadline_s=DROP_GRACE) + if not dropped: + print( + f"[CUT] FAIL: {target_ip} still reachable after {DROP_GRACE}s -- relay or wiring issue", + flush=True, + ) + time.sleep(max(0, POWER_OFF_TIME - DROP_GRACE)) + return False + print(f"[CUT] OK: {target_ip} dropped", flush=True) + time.sleep(max(0, POWER_OFF_TIME - DROP_GRACE)) + finally: + print(f"[RESTORE] GPIO {gpio_pin} -> HIGH", flush=True) + GPIO.output(gpio_pin, GPIO.HIGH) + + print(f"[AFTER] waiting up to {RETURN_TIMEOUT}s for {target_ip} to return ...", flush=True) + if not wait_for_return(target_ip, deadline_s=RETURN_TIMEOUT, poll_s=RETURN_POLL): + print(f"[AFTER] FAIL: {target_ip} did not come back within {RETURN_TIMEOUT}s", flush=True) + return False + print(f"[AFTER] OK: {target_ip} reachable again", flush=True) + return True + + +# ================== MAIN ================== + + +def _cams_target_ip() -> str: + if not CAM_IPS: + raise SystemExit("No camera IPs configured (CAM_IPS); cannot run cams relay test.") + return CAM_IPS[0] + + +RELAYS = { + "main": (RELAY_MAIN, lambda: MAIN_PI_IP), + "cams": (RELAY_CAMS, _cams_target_ip), +} + + +def main() -> int: + parser = argparse.ArgumentParser(description="End-to-end relay test for Pi Zero") + parser.add_argument( + "--relay", + choices=[*RELAYS.keys(), "all"], + default="all", + help="Relay to test (default: all)", + ) + args = parser.parse_args() + + GPIO.setmode(GPIO.BCM) + GPIO.setwarnings(False) + for pin, _ in RELAYS.values(): + GPIO.setup(pin, GPIO.OUT, initial=GPIO.HIGH) + + selected = list(RELAYS.keys()) if args.relay == "all" else [args.relay] + + failures: list[str] = [] + try: + for name in selected: + pin, ip_getter = RELAYS[name] + if not test_relay(name, pin, ip_getter()): + failures.append(name) + finally: + for pin, _ in RELAYS.values(): + GPIO.output(pin, GPIO.HIGH) + GPIO.cleanup() + + print("", flush=True) + if failures: + print(f"FAILED: {', '.join(failures)}", flush=True) + return 1 + print("ALL RELAYS OK", flush=True) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) From de741758b2892b531357ad03d04f590f57febb07 Mon Sep 17 00:00:00 2001 From: Mateo Date: Wed, 29 Apr 2026 12:25:40 +0200 Subject: [PATCH 03/10] refactor: rename relay test scripts to relay_check.py Avoid pytest auto-collection of files matching test_*.py pattern. --- watchdog/main_pi/{test_relays.py => relay_check.py} | 0 watchdog/pi_zero/{test_relays.py => relay_check.py} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename watchdog/main_pi/{test_relays.py => relay_check.py} (100%) rename watchdog/pi_zero/{test_relays.py => relay_check.py} (100%) diff --git a/watchdog/main_pi/test_relays.py b/watchdog/main_pi/relay_check.py similarity index 100% rename from watchdog/main_pi/test_relays.py rename to watchdog/main_pi/relay_check.py diff --git a/watchdog/pi_zero/test_relays.py b/watchdog/pi_zero/relay_check.py similarity index 100% rename from watchdog/pi_zero/test_relays.py rename to watchdog/pi_zero/relay_check.py From f5294df7ed216372ae41a3ba5b73960dda11c3eb Mon Sep 17 00:00:00 2001 From: Mateo Date: Wed, 29 Apr 2026 12:25:46 +0200 Subject: [PATCH 04/10] fix(relay-check): debounce drop detection and scope GPIO setup - Require two consecutive failed pings before declaring the target dropped to avoid transient packet-loss false positives. - Detect drop across the full POWER_OFF_TIME window so slow-discharging targets are not falsely flagged. - Pi Zero: only setup and restore the relays selected via --relay; do not touch unselected pins. - Drop dead CAM_IPS guard (CAM_IPS always falls back to a default list). --- watchdog/main_pi/relay_check.py | 19 ++++++++------- watchdog/pi_zero/relay_check.py | 41 ++++++++++++++++----------------- 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/watchdog/main_pi/relay_check.py b/watchdog/main_pi/relay_check.py index d487dc2c..52bdbd98 100644 --- a/watchdog/main_pi/relay_check.py +++ b/watchdog/main_pi/relay_check.py @@ -9,7 +9,7 @@ 4) drives the GPIO back HIGH (restore power) and waits for the device to come back. Run on the main Pi: - python3 /home/pi/pyro-engine/watchdog/main_pi/test_relays.py + python3 /home/pi/pyro-engine/watchdog/main_pi/relay_check.py Exits non-zero if any relay fails the verification. """ @@ -50,7 +50,6 @@ def _load_env(path: Path) -> dict: PING_TIMEOUT = 2 POWER_OFF_TIME = 15 -DROP_GRACE = 5 RETURN_TIMEOUT = 90 RETURN_POLL = 3 @@ -70,9 +69,10 @@ def ping(ip: str) -> bool: def wait_for_drop(ip: str, deadline_s: int) -> bool: + # require two consecutive failed pings to debounce transient packet loss end = time.time() + deadline_s while time.time() < end: - if not ping(ip): + if not ping(ip) and not ping(ip): return True time.sleep(1) return False @@ -98,17 +98,20 @@ def test_relay(name: str, gpio_pin: int, target_ip: str) -> bool: print(f"[CUT] GPIO {gpio_pin} -> LOW for {POWER_OFF_TIME}s", flush=True) GPIO.output(gpio_pin, GPIO.LOW) + cut_start = time.time() try: - dropped = wait_for_drop(target_ip, deadline_s=DROP_GRACE) + dropped = wait_for_drop(target_ip, deadline_s=POWER_OFF_TIME) if not dropped: print( - f"[CUT] FAIL: {target_ip} still reachable after {DROP_GRACE}s -- relay or wiring issue", + f"[CUT] FAIL: {target_ip} still reachable after {POWER_OFF_TIME}s -- relay or wiring issue", flush=True, ) - time.sleep(max(0, POWER_OFF_TIME - DROP_GRACE)) return False - print(f"[CUT] OK: {target_ip} dropped", flush=True) - time.sleep(max(0, POWER_OFF_TIME - DROP_GRACE)) + print(f"[CUT] OK: {target_ip} dropped after {time.time() - cut_start:.1f}s", flush=True) + # hold the cut for the remainder of POWER_OFF_TIME to ensure a real power cycle + remaining = POWER_OFF_TIME - (time.time() - cut_start) + if remaining > 0: + time.sleep(remaining) finally: print(f"[RESTORE] GPIO {gpio_pin} -> HIGH", flush=True) GPIO.output(gpio_pin, GPIO.HIGH) diff --git a/watchdog/pi_zero/relay_check.py b/watchdog/pi_zero/relay_check.py index d766286f..050c73ce 100644 --- a/watchdog/pi_zero/relay_check.py +++ b/watchdog/pi_zero/relay_check.py @@ -9,9 +9,9 @@ 4) drives the GPIO back HIGH (restore power) and waits for the device to come back. Run on the Pi Zero: - python3 /home/pi/pyro-engine/watchdog/pi_zero/test_relays.py - python3 /home/pi/pyro-engine/watchdog/pi_zero/test_relays.py --relay main - python3 /home/pi/pyro-engine/watchdog/pi_zero/test_relays.py --relay cams + python3 /home/pi/pyro-engine/watchdog/pi_zero/relay_check.py + python3 /home/pi/pyro-engine/watchdog/pi_zero/relay_check.py --relay main + python3 /home/pi/pyro-engine/watchdog/pi_zero/relay_check.py --relay cams The cams test pings only the first camera in CAM_IPS as a proxy for the 12V rail. Exits non-zero if any relay fails the verification. @@ -61,7 +61,6 @@ def _load_env(path: Path) -> dict: PING_TIMEOUT = 2 POWER_OFF_TIME = 15 -DROP_GRACE = 5 RETURN_TIMEOUT = 90 RETURN_POLL = 3 @@ -81,9 +80,10 @@ def ping(ip: str) -> bool: def wait_for_drop(ip: str, deadline_s: int) -> bool: + # require two consecutive failed pings to debounce transient packet loss end = time.time() + deadline_s while time.time() < end: - if not ping(ip): + if not ping(ip) and not ping(ip): return True time.sleep(1) return False @@ -109,17 +109,20 @@ def test_relay(name: str, gpio_pin: int, target_ip: str) -> bool: print(f"[CUT] GPIO {gpio_pin} -> LOW for {POWER_OFF_TIME}s", flush=True) GPIO.output(gpio_pin, GPIO.LOW) + cut_start = time.time() try: - dropped = wait_for_drop(target_ip, deadline_s=DROP_GRACE) + dropped = wait_for_drop(target_ip, deadline_s=POWER_OFF_TIME) if not dropped: print( - f"[CUT] FAIL: {target_ip} still reachable after {DROP_GRACE}s -- relay or wiring issue", + f"[CUT] FAIL: {target_ip} still reachable after {POWER_OFF_TIME}s -- relay or wiring issue", flush=True, ) - time.sleep(max(0, POWER_OFF_TIME - DROP_GRACE)) return False - print(f"[CUT] OK: {target_ip} dropped", flush=True) - time.sleep(max(0, POWER_OFF_TIME - DROP_GRACE)) + print(f"[CUT] OK: {target_ip} dropped after {time.time() - cut_start:.1f}s", flush=True) + # hold the cut for the remainder of POWER_OFF_TIME to ensure a real power cycle + remaining = POWER_OFF_TIME - (time.time() - cut_start) + if remaining > 0: + time.sleep(remaining) finally: print(f"[RESTORE] GPIO {gpio_pin} -> HIGH", flush=True) GPIO.output(gpio_pin, GPIO.HIGH) @@ -135,15 +138,9 @@ def test_relay(name: str, gpio_pin: int, target_ip: str) -> bool: # ================== MAIN ================== -def _cams_target_ip() -> str: - if not CAM_IPS: - raise SystemExit("No camera IPs configured (CAM_IPS); cannot run cams relay test.") - return CAM_IPS[0] - - RELAYS = { "main": (RELAY_MAIN, lambda: MAIN_PI_IP), - "cams": (RELAY_CAMS, _cams_target_ip), + "cams": (RELAY_CAMS, lambda: CAM_IPS[0]), } @@ -157,13 +154,14 @@ def main() -> int: ) args = parser.parse_args() + selected = list(RELAYS.keys()) if args.relay == "all" else [args.relay] + GPIO.setmode(GPIO.BCM) GPIO.setwarnings(False) - for pin, _ in RELAYS.values(): + for name in selected: + pin, _ = RELAYS[name] GPIO.setup(pin, GPIO.OUT, initial=GPIO.HIGH) - selected = list(RELAYS.keys()) if args.relay == "all" else [args.relay] - failures: list[str] = [] try: for name in selected: @@ -171,7 +169,8 @@ def main() -> int: if not test_relay(name, pin, ip_getter()): failures.append(name) finally: - for pin, _ in RELAYS.values(): + for name in selected: + pin, _ = RELAYS[name] GPIO.output(pin, GPIO.HIGH) GPIO.cleanup() From c8dda7a6f62ad66592a3d61d318b57676e92c3d5 Mon Sep 17 00:00:00 2001 From: Mateo Date: Wed, 29 Apr 2026 14:06:19 +0200 Subject: [PATCH 05/10] feat: add patrol preset setup script Adds scripts/setup_patrol_presets.py to configure PTZ presets 0-3 via the camera API. Updates ruff per-file-ignores to cover scripts/. --- pyproject.toml | 1 + scripts/setup_patrol_presets.py | 70 +++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 scripts/setup_patrol_presets.py diff --git a/pyproject.toml b/pyproject.toml index 16699144..87834a27 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -141,6 +141,7 @@ known-third-party = ["pillow", "tqdm", "onnxruntime", "huggingface_hub"] [tool.ruff.lint.per-file-ignores] "**/__init__.py" = ["I001", "F401", "CPY001"] "src/**.py" = ["D", "T201", "S101", "ANN", "BLE001", "S106", "S113", "S501"] +"scripts/**.py" = ["D", "T201", "S101", "ANN", "BLE001", "S106", "S113", "S501"] ".github/**.py" = ["D", "T201", "ANN", "S", "PYI024"] "tests/**.py" = ["D103", "CPY001", "S101", "T201", "ANN001", "ANN201", "ANN202", "ARG001", "S113"] "pyro_camera_api/**.py" = ["D", "T201", "S101", "ANN", "BLE001", "S113", "S501", "S404", "S603", "S405", "S314", "E402", "RUF029"] diff --git a/scripts/setup_patrol_presets.py b/scripts/setup_patrol_presets.py new file mode 100644 index 00000000..627a1c04 --- /dev/null +++ b/scripts/setup_patrol_presets.py @@ -0,0 +1,70 @@ +# Copyright (C) 2022-2026, Pyronear. + +# This program is licensed under the Apache License 2.0. +# See LICENSE or go to for full license details. + +""" +Setup patrol presets on a PTZ camera via the pyro_camera_api client. + +Sequence: + 1. Go to pose 10 + 2. Move left 70° -> save as preset 0 + 3. Move right 45° -> save as preset 1 + 4. Move right 45° -> save as preset 2 + 5. Move right 45° -> save as preset 3 +""" + +from __future__ import annotations + +import argparse +import time + +from pyro_camera_api_client import PyroCameraAPIClient + +START_POSE = 10 +LEFT_DEG = 70.0 +STEP_DEG = 45.0 + + +def wait(seconds: float) -> None: + time.sleep(seconds) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Setup patrol presets on a PTZ camera.") + parser.add_argument("--cam-ip", required=True, help="Camera IP, e.g. 192.168.1.11") + parser.add_argument("--login", default="admin", help="Camera login (handled server-side, kept for reference)") + parser.add_argument("--pwd", default="@Pyronear", help="Camera password (handled server-side, kept for reference)") + parser.add_argument("--base-url", default="http://192.168.1.99:8080", help="Camera API base URL") + parser.add_argument("--settle", type=float, default=4.0, help="Seconds to wait after each move before saving") + args = parser.parse_args() + + # Note: --login and --pwd are accepted for parity with operator notes, + # but the camera API service uses its own server-side credentials. + _ = (args.login, args.pwd) + + client = PyroCameraAPIClient(base_url=args.base_url, timeout=30.0) + + print(f"[1/5] Moving to pose {START_POSE}") + client.move_camera(camera_ip=args.cam_ip, pose_id=START_POSE) + wait(args.settle + 4.0) + + print(f"[2/5] Moving left {LEFT_DEG}°, saving as preset 0") + client.move_camera(camera_ip=args.cam_ip, direction="Left", degrees=LEFT_DEG) + wait(args.settle) + client.set_preset(camera_ip=args.cam_ip, idx=0) + print(" preset 0 saved") + + for preset_id in (1, 2, 3): + print(f"[{preset_id + 2}/5] Moving right {STEP_DEG}°, saving as preset {preset_id}") + client.move_camera(camera_ip=args.cam_ip, direction="Right", degrees=STEP_DEG) + wait(args.settle) + client.set_preset(camera_ip=args.cam_ip, idx=preset_id) + print(f" preset {preset_id} saved") + + print("Done. Listing presets:") + print(client.list_presets(camera_ip=args.cam_ip)) + + +if __name__ == "__main__": + main() From d3db0e8580b22f4116ca61c2295f2254c61ca954 Mon Sep 17 00:00:00 2001 From: Mateo Date: Wed, 29 Apr 2026 14:07:51 +0200 Subject: [PATCH 06/10] feat(watchdog): add requirements.txt with RPi.GPIO Watchdog and relay_check scripts on both main Pi and Pi Zero need RPi.GPIO; document it so deployment can pip install it. --- watchdog/requirements.txt | 1 + 1 file changed, 1 insertion(+) create mode 100644 watchdog/requirements.txt diff --git a/watchdog/requirements.txt b/watchdog/requirements.txt new file mode 100644 index 00000000..bd61b232 --- /dev/null +++ b/watchdog/requirements.txt @@ -0,0 +1 @@ +RPi.GPIO>=0.7.1 From 894ad90a109efa74fd0fc4c6bf0c86e137a8ab2d Mon Sep 17 00:00:00 2001 From: Mateo Date: Wed, 29 Apr 2026 14:09:15 +0200 Subject: [PATCH 07/10] docs(watchdog): add install dependencies note to all scripts Point operators at watchdog/requirements.txt so they install RPi.GPIO before running watchdog.py or relay_check.py on either Pi. --- watchdog/main_pi/relay_check.py | 3 +++ watchdog/main_pi/watchdog.py | 3 +++ watchdog/pi_zero/relay_check.py | 3 +++ watchdog/pi_zero/watchdog.py | 3 +++ 4 files changed, 12 insertions(+) diff --git a/watchdog/main_pi/relay_check.py b/watchdog/main_pi/relay_check.py index 52bdbd98..3b8823ef 100644 --- a/watchdog/main_pi/relay_check.py +++ b/watchdog/main_pi/relay_check.py @@ -8,6 +8,9 @@ 3) verifies the device drops during the cut, 4) drives the GPIO back HIGH (restore power) and waits for the device to come back. +Install dependencies (once per Pi): + pip install -r /home/pi/pyro-engine/watchdog/requirements.txt + Run on the main Pi: python3 /home/pi/pyro-engine/watchdog/main_pi/relay_check.py diff --git a/watchdog/main_pi/watchdog.py b/watchdog/main_pi/watchdog.py index 1cca9493..09e18551 100644 --- a/watchdog/main_pi/watchdog.py +++ b/watchdog/main_pi/watchdog.py @@ -3,6 +3,9 @@ Watchdog for the main Pi that pings the Pi Zero and power-cycles its relay after repeated failures, with cooldown and daily limits. +Install dependencies (once per Pi): + pip install -r /home/pi/pyro-engine/watchdog/requirements.txt + Cron setup (every 10 minutes at :05): 1) Edit crontab: crontab -e 2) Add the line: diff --git a/watchdog/pi_zero/relay_check.py b/watchdog/pi_zero/relay_check.py index 050c73ce..ffb78cd1 100644 --- a/watchdog/pi_zero/relay_check.py +++ b/watchdog/pi_zero/relay_check.py @@ -8,6 +8,9 @@ 3) verifies the device drops during the cut, 4) drives the GPIO back HIGH (restore power) and waits for the device to come back. +Install dependencies (once per Pi): + pip install -r /home/pi/pyro-engine/watchdog/requirements.txt + Run on the Pi Zero: python3 /home/pi/pyro-engine/watchdog/pi_zero/relay_check.py python3 /home/pi/pyro-engine/watchdog/pi_zero/relay_check.py --relay main diff --git a/watchdog/pi_zero/watchdog.py b/watchdog/pi_zero/watchdog.py index 68984d5f..9f670cf5 100644 --- a/watchdog/pi_zero/watchdog.py +++ b/watchdog/pi_zero/watchdog.py @@ -5,6 +5,9 @@ It checks the main Pi health endpoint and pings camera IPs, tracking failures and power-cycling relays after repeated failures with cooldown/daily limits. +Install dependencies (once per Pi): + pip install -r /home/pi/pyro-engine/watchdog/requirements.txt + Cron setup (every 10 minutes): 1) Edit crontab: crontab -e 2) Add the line: From c2bd2079c8160974e7322471370d6eeea4271bf8 Mon Sep 17 00:00:00 2001 From: Mateo Date: Wed, 29 Apr 2026 14:11:25 +0200 Subject: [PATCH 08/10] fix(watchdog): switch to rpi-lgpio for Python 3.13 / kernel 6 compat RPi.GPIO 0.7.1 has no piwheels wheel for Python 3.13 and fails to compile without python3-dev. rpi-lgpio is a drop-in replacement (same RPi.GPIO API), ships wheels, and works on Pi 5 / kernel 6.x where the legacy package is broken. --- watchdog/requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/watchdog/requirements.txt b/watchdog/requirements.txt index bd61b232..c59dcb97 100644 --- a/watchdog/requirements.txt +++ b/watchdog/requirements.txt @@ -1 +1,2 @@ -RPi.GPIO>=0.7.1 +rpi-lgpio>=0.6 + From 7f40b8ba7e038ba290182ff0e6a27da477dacf1e Mon Sep 17 00:00:00 2001 From: Mateo Date: Wed, 29 Apr 2026 14:12:46 +0200 Subject: [PATCH 09/10] fix(watchdog): install GPIO via apt instead of pip Cron runs scripts with /usr/bin/python3, so the GPIO library must be available to system Python. Use python3-rpi-lgpio (Debian package) so no venv or PEP 668 workaround is needed; drop requirements.txt. --- watchdog/main_pi/relay_check.py | 2 +- watchdog/main_pi/watchdog.py | 2 +- watchdog/pi_zero/relay_check.py | 2 +- watchdog/pi_zero/watchdog.py | 2 +- watchdog/requirements.txt | 2 -- 5 files changed, 4 insertions(+), 6 deletions(-) delete mode 100644 watchdog/requirements.txt diff --git a/watchdog/main_pi/relay_check.py b/watchdog/main_pi/relay_check.py index 3b8823ef..e18d7eaa 100644 --- a/watchdog/main_pi/relay_check.py +++ b/watchdog/main_pi/relay_check.py @@ -9,7 +9,7 @@ 4) drives the GPIO back HIGH (restore power) and waits for the device to come back. Install dependencies (once per Pi): - pip install -r /home/pi/pyro-engine/watchdog/requirements.txt + sudo apt install python3-rpi-lgpio Run on the main Pi: python3 /home/pi/pyro-engine/watchdog/main_pi/relay_check.py diff --git a/watchdog/main_pi/watchdog.py b/watchdog/main_pi/watchdog.py index 09e18551..6180d73d 100644 --- a/watchdog/main_pi/watchdog.py +++ b/watchdog/main_pi/watchdog.py @@ -4,7 +4,7 @@ after repeated failures, with cooldown and daily limits. Install dependencies (once per Pi): - pip install -r /home/pi/pyro-engine/watchdog/requirements.txt + sudo apt install python3-rpi-lgpio Cron setup (every 10 minutes at :05): 1) Edit crontab: crontab -e diff --git a/watchdog/pi_zero/relay_check.py b/watchdog/pi_zero/relay_check.py index ffb78cd1..fe76902e 100644 --- a/watchdog/pi_zero/relay_check.py +++ b/watchdog/pi_zero/relay_check.py @@ -9,7 +9,7 @@ 4) drives the GPIO back HIGH (restore power) and waits for the device to come back. Install dependencies (once per Pi): - pip install -r /home/pi/pyro-engine/watchdog/requirements.txt + sudo apt install python3-rpi-lgpio Run on the Pi Zero: python3 /home/pi/pyro-engine/watchdog/pi_zero/relay_check.py diff --git a/watchdog/pi_zero/watchdog.py b/watchdog/pi_zero/watchdog.py index 9f670cf5..faadfac4 100644 --- a/watchdog/pi_zero/watchdog.py +++ b/watchdog/pi_zero/watchdog.py @@ -6,7 +6,7 @@ and power-cycling relays after repeated failures with cooldown/daily limits. Install dependencies (once per Pi): - pip install -r /home/pi/pyro-engine/watchdog/requirements.txt + sudo apt install python3-rpi-lgpio Cron setup (every 10 minutes): 1) Edit crontab: crontab -e diff --git a/watchdog/requirements.txt b/watchdog/requirements.txt deleted file mode 100644 index c59dcb97..00000000 --- a/watchdog/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -rpi-lgpio>=0.6 - From 7b5dce3f9fcf31676a34950d3fe47bfa5bd33b1e Mon Sep 17 00:00:00 2001 From: Mateo Date: Wed, 29 Apr 2026 14:41:09 +0200 Subject: [PATCH 10/10] docs(relay-check): document running pi_zero test from main Pi Add instructions for the common case where operators only have SSH to the main Pi: launch the test detached on the Pi Zero so the script survives the main Pi rebooting mid-cut, then read /tmp/relay_check.log after reconnect. Also ignore T201/FURB105 under watchdog/ since these CLI scripts intentionally use print(). --- pyproject.toml | 2 +- watchdog/pi_zero/relay_check.py | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 87834a27..91a4b6b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -147,7 +147,7 @@ known-third-party = ["pillow", "tqdm", "onnxruntime", "huggingface_hub"] "pyro_camera_api/**.py" = ["D", "T201", "S101", "ANN", "BLE001", "S113", "S501", "S404", "S603", "S405", "S314", "E402", "RUF029"] "pyroengine/core.py" = ["BLE001"] "pyroengine/sensors.py" = ["S113", "S501", "ANN"] -"watchdog/**.py" = ["CPY001", "S108", "S310", "S404", "S603", "S607", "BLE001", "LOG015"] +"watchdog/**.py" = ["CPY001", "S108", "S310", "S404", "S603", "S607", "BLE001", "LOG015", "T201", "FURB105"] [tool.ruff.format] quote-style = "double" diff --git a/watchdog/pi_zero/relay_check.py b/watchdog/pi_zero/relay_check.py index fe76902e..79f1c828 100644 --- a/watchdog/pi_zero/relay_check.py +++ b/watchdog/pi_zero/relay_check.py @@ -16,6 +16,18 @@ python3 /home/pi/pyro-engine/watchdog/pi_zero/relay_check.py --relay main python3 /home/pi/pyro-engine/watchdog/pi_zero/relay_check.py --relay cams +Run from the main Pi (when you cannot reach the Pi Zero directly): + Testing the 'main' relay cuts power to the main Pi, which kills the SSH + chain to the Pi Zero, so the test must be detached on the Pi Zero. From + the main Pi: + + ssh pi@ \ + 'nohup python3 /home/pi/pyro-engine/watchdog/pi_zero/relay_check.py \ + > /tmp/relay_check.log 2>&1 < /dev/null & disown; exit' + + Wait ~3-4 minutes (15s cut + 90s return per relay), then reconnect and: + ssh pi@ 'cat /tmp/relay_check.log' + The cams test pings only the first camera in CAM_IPS as a proxy for the 12V rail. Exits non-zero if any relay fails the verification. """