Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,13 @@ known-third-party = ["pillow", "tqdm", "onnxruntime", "huggingface_hub"]
[tool.ruff.lint.per-file-ignores]
"**/__init__.py" = ["I001", "F401", "CPY001"]
"src/**.py" = ["D", "T201", "S101", "ANN", "BLE001", "S106", "S113", "S501"]
"scripts/**.py" = ["D", "T201", "S101", "ANN", "BLE001", "S106", "S113", "S501"]
".github/**.py" = ["D", "T201", "ANN", "S", "PYI024"]
"tests/**.py" = ["D103", "CPY001", "S101", "T201", "ANN001", "ANN201", "ANN202", "ARG001", "S113"]
"pyro_camera_api/**.py" = ["D", "T201", "S101", "ANN", "BLE001", "S113", "S501", "S404", "S603", "S405", "S314", "E402", "RUF029"]
"pyroengine/core.py" = ["BLE001"]
"pyroengine/sensors.py" = ["S113", "S501", "ANN"]
"watchdog/**.py" = ["CPY001", "S108", "S310", "S404", "S603", "S607", "BLE001", "LOG015"]
"watchdog/**.py" = ["CPY001", "S108", "S310", "S404", "S603", "S607", "BLE001", "LOG015", "T201", "FURB105"]

[tool.ruff.format]
quote-style = "double"
Expand Down
70 changes: 70 additions & 0 deletions scripts/setup_patrol_presets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright (C) 2022-2026, Pyronear.

# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.

"""

Check notice on line 6 in scripts/setup_patrol_presets.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

scripts/setup_patrol_presets.py#L6

Multi-line docstring summary should start at the first line (D212)
Setup patrol presets on a PTZ camera via the pyro_camera_api client.

Sequence:
1. Go to pose 10
2. Move left 70° -> save as preset 0
3. Move right 45° -> save as preset 1
4. Move right 45° -> save as preset 2
5. Move right 45° -> save as preset 3
"""

from __future__ import annotations

import argparse
import time

from pyro_camera_api_client import PyroCameraAPIClient

START_POSE = 10
LEFT_DEG = 70.0
STEP_DEG = 45.0


def wait(seconds: float) -> None:
time.sleep(seconds)


def main() -> None:
parser = argparse.ArgumentParser(description="Setup patrol presets on a PTZ camera.")
parser.add_argument("--cam-ip", required=True, help="Camera IP, e.g. 192.168.1.11")
parser.add_argument("--login", default="admin", help="Camera login (handled server-side, kept for reference)")
parser.add_argument("--pwd", default="@Pyronear", help="Camera password (handled server-side, kept for reference)")
parser.add_argument("--base-url", default="http://192.168.1.99:8080", help="Camera API base URL")
parser.add_argument("--settle", type=float, default=4.0, help="Seconds to wait after each move before saving")
args = parser.parse_args()

# Note: --login and --pwd are accepted for parity with operator notes,
# but the camera API service uses its own server-side credentials.
_ = (args.login, args.pwd)

client = PyroCameraAPIClient(base_url=args.base_url, timeout=30.0)

print(f"[1/5] Moving to pose {START_POSE}")
client.move_camera(camera_ip=args.cam_ip, pose_id=START_POSE)
wait(args.settle + 4.0)

print(f"[2/5] Moving left {LEFT_DEG}°, saving as preset 0")
client.move_camera(camera_ip=args.cam_ip, direction="Left", degrees=LEFT_DEG)
wait(args.settle)
client.set_preset(camera_ip=args.cam_ip, idx=0)
print(" preset 0 saved")

for preset_id in (1, 2, 3):
print(f"[{preset_id + 2}/5] Moving right {STEP_DEG}°, saving as preset {preset_id}")
client.move_camera(camera_ip=args.cam_ip, direction="Right", degrees=STEP_DEG)
wait(args.settle)
client.set_preset(camera_ip=args.cam_ip, idx=preset_id)
print(f" preset {preset_id} saved")

print("Done. Listing presets:")
print(client.list_presets(camera_ip=args.cam_ip))


if __name__ == "__main__":
main()
153 changes: 153 additions & 0 deletions watchdog/main_pi/relay_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#!/usr/bin/env python3
"""

Check notice on line 2 in watchdog/main_pi/relay_check.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

watchdog/main_pi/relay_check.py#L2

Multi-line docstring summary should start at the first line (D212)
End-to-end relay test for the main Pi watchdog setup.

For each relay, this script:
1) pings the controlled device to confirm it is reachable,
2) drives the GPIO LOW for POWER_OFF_TIME seconds (cut power),
3) verifies the device drops during the cut,
4) drives the GPIO back HIGH (restore power) and waits for the device to come back.

Install dependencies (once per Pi):
sudo apt install python3-rpi-lgpio

Run on the main Pi:
python3 /home/pi/pyro-engine/watchdog/main_pi/relay_check.py

Exits non-zero if any relay fails the verification.
"""

import subprocess

Check notice on line 20 in watchdog/main_pi/relay_check.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

watchdog/main_pi/relay_check.py#L20

Consider possible security implications associated with the subprocess module.
import sys
import time
from pathlib import Path

import RPi.GPIO as GPIO

# ================= CONFIG =================

RELAY_PIZERO = 16

_ENV_FILE = Path("/home/pi/watchdog.env")


def _load_env(path: Path) -> dict:
env: dict = {}
try:
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
key, _, value = line.partition("=")
env[key.strip()] = value.strip()
except FileNotFoundError:
pass
return env


_env = _load_env(_ENV_FILE)

PIZERO_IP: str = _env.get("PIZERO_IP", "192.168.1.98")

PING_COUNT = 1
PING_TIMEOUT = 2
POWER_OFF_TIME = 15

RETURN_TIMEOUT = 90
RETURN_POLL = 3


# ================ HELPERS =================


def ping(ip: str) -> bool:
try:
subprocess.check_output(

Check warning on line 65 in watchdog/main_pi/relay_check.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

watchdog/main_pi/relay_check.py#L65

Starting a process with a partial executable path

Check warning on line 65 in watchdog/main_pi/relay_check.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

watchdog/main_pi/relay_check.py#L65

subprocess call - check for execution of untrusted input.
["ping", "-c", str(PING_COUNT), "-W", str(PING_TIMEOUT), ip],
stderr=subprocess.DEVNULL,
)
return True
except subprocess.CalledProcessError:
return False


def wait_for_drop(ip: str, deadline_s: int) -> bool:
# require two consecutive failed pings to debounce transient packet loss
end = time.time() + deadline_s
while time.time() < end:
if not ping(ip) and not ping(ip):
return True
time.sleep(1)
return False


def wait_for_return(ip: str, deadline_s: int, poll_s: int) -> bool:
end = time.time() + deadline_s
while time.time() < end:
if ping(ip):
return True
time.sleep(poll_s)
return False


def test_relay(name: str, gpio_pin: int, target_ip: str) -> bool:
print(f"\n=== Testing relay '{name}' on GPIO {gpio_pin} (target {target_ip}) ===", flush=True)

print(f"[BEFORE] ping {target_ip} ...", flush=True)
if not ping(target_ip):
print(f"[BEFORE] FAIL: {target_ip} unreachable before cut, aborting test", flush=True)
return False
print(f"[BEFORE] OK: {target_ip} reachable", flush=True)

print(f"[CUT] GPIO {gpio_pin} -> LOW for {POWER_OFF_TIME}s", flush=True)
GPIO.output(gpio_pin, GPIO.LOW)
cut_start = time.time()
try:
dropped = wait_for_drop(target_ip, deadline_s=POWER_OFF_TIME)
if not dropped:
print(
f"[CUT] FAIL: {target_ip} still reachable after {POWER_OFF_TIME}s -- relay or wiring issue",
flush=True,
)
return False
print(f"[CUT] OK: {target_ip} dropped after {time.time() - cut_start:.1f}s", flush=True)
# hold the cut for the remainder of POWER_OFF_TIME to ensure a real power cycle
remaining = POWER_OFF_TIME - (time.time() - cut_start)
if remaining > 0:
time.sleep(remaining)
finally:
print(f"[RESTORE] GPIO {gpio_pin} -> HIGH", flush=True)
GPIO.output(gpio_pin, GPIO.HIGH)

print(f"[AFTER] waiting up to {RETURN_TIMEOUT}s for {target_ip} to return ...", flush=True)
if not wait_for_return(target_ip, deadline_s=RETURN_TIMEOUT, poll_s=RETURN_POLL):
print(f"[AFTER] FAIL: {target_ip} did not come back within {RETURN_TIMEOUT}s", flush=True)
return False
print(f"[AFTER] OK: {target_ip} reachable again", flush=True)
return True


# ================== MAIN ==================


def main() -> int:
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(RELAY_PIZERO, GPIO.OUT, initial=GPIO.HIGH)

try:
ok = test_relay("pizero", RELAY_PIZERO, PIZERO_IP)
finally:
GPIO.output(RELAY_PIZERO, GPIO.HIGH)
GPIO.cleanup()

print("", flush=True)
if not ok:
print("FAILED: pizero", flush=True)
return 1
print("ALL RELAYS OK", flush=True)
return 0


if __name__ == "__main__":
sys.exit(main())
3 changes: 3 additions & 0 deletions watchdog/main_pi/watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
Watchdog for the main Pi that pings the Pi Zero and power-cycles its relay
after repeated failures, with cooldown and daily limits.

Install dependencies (once per Pi):
sudo apt install python3-rpi-lgpio

Cron setup (every 10 minutes at :05):
1) Edit crontab: crontab -e
2) Add the line:
Expand Down
Loading
Loading