diff --git a/application/backend/app/routers/robot.py b/application/backend/app/routers/robot.py
index 6565e92..625ead4 100644
--- a/application/backend/app/routers/robot.py
+++ b/application/backend/app/routers/robot.py
@@ -2,6 +2,7 @@
from sqlalchemy.orm import Session
from sqlalchemy import desc
from datetime import datetime
+import time
from app.database import get_db
from app.models.robot import RobotStatus
from app.models.scan import ScanRecord
@@ -68,7 +69,7 @@ class MissionConfig(BaseModel):
sampling_precision_m: float = 5.0
area_width: Optional[float] = None
area_height: Optional[float] = None
- row_spacing: float = 0.8
+ row_spacing: float = 0.5
waypoint_spacing: float = 0.5
origin_x: float = 0.0
origin_y: float = 0.0
@@ -78,7 +79,7 @@ class MissionConfig(BaseModel):
def _calc_total_waypoints(area_width: float, area_height: float,
row_spacing: float, waypoint_spacing: float,
- wall_margin: float = 0.20) -> int:
+ wall_margin: float = 0.30) -> int:
"""Match the coverage_planner.py waypoint generation logic (centered grid)."""
import math
ew = area_width - 2 * wall_margin # centered: same effective width
@@ -151,18 +152,17 @@ async def start_coverage_mission(config: MissionConfig = None, db: Session = Dep
)
cmd = ['bash', '-c', ros_cmd]
- # Start the mission as a background process (may fail on non-ROS hosts; continue for capture)
- try:
- mission_process = subprocess.Popen(
- cmd,
- env=clean_env,
- preexec_fn=os.setsid if hasattr(os, "setsid") else None,
- )
- except Exception:
- mission_process = None # noqa: PLW0602
+ mission_process = subprocess.Popen(
+ cmd,
+ env=clean_env,
+ preexec_fn=os.setsid if hasattr(os, "setsid") else None,
+ )
+ time.sleep(1.0)
+ if mission_process.poll() is not None:
+ raise RuntimeError(f"roslaunch exited immediately with return code {mission_process.returncode}")
# Start waypoint capture loop (one sample per '/coverage/capture_ready'=true message).
- start_capture_loop(scan_id, total_points=total_points)
+ start_capture_loop(scan_id, total_points=total_points, require_ros=True)
progress = get_capture_progress()
return {
@@ -177,6 +177,17 @@ async def start_coverage_mission(config: MissionConfig = None, db: Session = Dep
}
except Exception as e:
+ if mission_process is not None and mission_process.poll() is None:
+ try:
+ pgid = os.getpgid(mission_process.pid)
+ if pgid != os.getpgrp():
+ os.killpg(pgid, signal.SIGTERM)
+ else:
+ mission_process.terminate()
+ mission_process.wait(timeout=5)
+ except Exception:
+ pass
+ mission_process = None # noqa: PLW0602
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to start mission: {str(e)}"
@@ -248,6 +259,18 @@ async def get_mission_status():
"message": "No mission has been started",
}
+ if active_scan_id and progress.get("status") == "completed":
+ return {
+ "status": "completed",
+ "running": False,
+ "pid": mission_process.pid if mission_process and mission_process.poll() is None else None,
+ "scan_id": active_scan_id,
+ "captured_points": progress["captured_points"],
+ "total_points": progress["total_points"],
+ "progress_percent": progress["progress_percent"],
+ "message": "Mission completed",
+ }
+
if active_scan_id and progress.get("status") == "running":
return {
"status": "running",
diff --git a/application/backend/app/services/ros_sensor_bridge.py b/application/backend/app/services/ros_sensor_bridge.py
index deb9bfd..61aaca4 100644
--- a/application/backend/app/services/ros_sensor_bridge.py
+++ b/application/backend/app/services/ros_sensor_bridge.py
@@ -16,8 +16,10 @@
import io
import os
import logging
+import socket
import threading
from typing import Optional, Dict, Any
+from urllib.parse import urlparse
logger = logging.getLogger(__name__)
@@ -38,12 +40,15 @@
"rgb_image_path": None, # kept for waypoint capture compatibility
"voltage": None,
"battery_percent": None,
+ "coverage_total_points": None,
+ "coverage_complete": False,
"capture_ready_queue": [],
"lock": threading.Lock(),
"capture_ready_condition": threading.Condition(),
}
_ros_thread: Optional[threading.Thread] = None
_ros_stop = threading.Event()
+DEFAULT_LOCAL_ROS_MASTER_URI = "http://127.0.0.1:11311"
def voltage_to_percent(voltage: float) -> int:
@@ -55,6 +60,32 @@ def voltage_to_percent(voltage: float) -> int:
return max(0, min(100, int(round(percent))))
+def _can_reach_ros_master(uri: str, timeout_sec: float = 0.5) -> bool:
+ if not uri:
+ return False
+ try:
+ parsed = urlparse(uri)
+ host = parsed.hostname
+ port = parsed.port or 11311
+ if not host:
+ return False
+ with socket.create_connection((host, port), timeout=timeout_sec):
+ return True
+ except OSError:
+ return False
+
+
+def get_ros_master_uri() -> str:
+ from app.config import settings
+
+ configured = (settings.ROS_MASTER_URI or os.environ.get("ROS_MASTER_URI") or "").strip()
+ if configured:
+ return configured
+ if _can_reach_ros_master(DEFAULT_LOCAL_ROS_MASTER_URI):
+ return DEFAULT_LOCAL_ROS_MASTER_URI
+ return ""
+
+
def get_live_rgb_bytes() -> Optional[bytes]:
"""Return latest RGB JPEG bytes from cache, or None."""
with _ros_cache["lock"]:
@@ -97,7 +128,7 @@ def _ros_subscriber_thread(thermal_image_save_dir: str, rgb_image_save_dir: str)
"""Run rospy node and subscribe to sensor topics; update _ros_cache."""
try:
import rospy
- from std_msgs.msg import Float64, Float32, Bool
+ from std_msgs.msg import Float64, Float32, Bool, Int32
from sensor_msgs.msg import Image
except ImportError as e:
logger.warning("rospy not available, ROS sensor bridge disabled: %s", e)
@@ -233,6 +264,14 @@ def cb_capture_ready(msg):
_ros_cache["capture_ready_queue"].append(True)
_ros_cache["capture_ready_condition"].notify_all()
+ def cb_coverage_total(msg):
+ with _ros_cache["lock"]:
+ _ros_cache["coverage_total_points"] = int(msg.data)
+
+ def cb_coverage_complete(msg):
+ with _ros_cache["lock"]:
+ _ros_cache["coverage_complete"] = bool(msg.data)
+
def _set_voltage(voltage_value):
with _ros_cache["lock"]:
_ros_cache["voltage"] = voltage_value
@@ -260,6 +299,8 @@ def cb_voltage_battery(msg):
rospy.Subscriber("/sensors/sht40/humidity", Float64, cb_hum, queue_size=1)
rospy.Subscriber("/sensors/thermal/mean", Float64, cb_thermal_mean, queue_size=1)
rospy.Subscriber("/coverage/capture_ready", Bool, cb_capture_ready, queue_size=50)
+ rospy.Subscriber("/coverage/total_points", Int32, cb_coverage_total, queue_size=10)
+ rospy.Subscriber("/coverage/complete", Bool, cb_coverage_complete, queue_size=10)
topic_types = {}
try:
topic_types = {name: t for name, t in rospy.get_published_topics()}
@@ -286,11 +327,15 @@ def cb_voltage_battery(msg):
def start_ros_bridge(thermal_image_save_dir: str, rgb_image_save_dir: str) -> bool:
- """Start the ROS subscriber thread if ROS_MASTER_URI is set. Return True if using ROS."""
+ """Start the ROS subscriber thread when a reachable ROS master is available."""
global _ros_thread
from app.config import settings
- uri = (settings.ROS_MASTER_URI or os.environ.get("ROS_MASTER_URI") or "").strip()
+ uri = get_ros_master_uri()
if not uri:
+ logger.warning("ROS bridge not started: no ROS master configured or reachable")
+ return False
+ if not _can_reach_ros_master(uri):
+ logger.warning("ROS bridge not started: ROS master %s is not reachable", uri)
return False
if _ros_thread is not None and _ros_thread.is_alive():
return True
@@ -326,9 +371,25 @@ def get_latest_from_ros() -> Dict[str, Any]:
"rgb_image_path": _ros_cache["rgb_image_path"],
"voltage": _ros_cache["voltage"],
"battery_percent": _ros_cache["battery_percent"],
+ "coverage_total_points": _ros_cache["coverage_total_points"],
+ "coverage_complete": bool(_ros_cache["coverage_complete"]),
+ }
+
+
+def get_coverage_state() -> Dict[str, Any]:
+ with _ros_cache["lock"]:
+ return {
+ "total_points": _ros_cache["coverage_total_points"],
+ "complete": bool(_ros_cache["coverage_complete"]),
}
+def clear_coverage_state() -> None:
+ with _ros_cache["lock"]:
+ _ros_cache["coverage_total_points"] = None
+ _ros_cache["coverage_complete"] = False
+
+
def wait_for_next_capture_ready(timeout_sec: float = 0.5) -> bool:
"""Wait and consume one capture trigger from /coverage/capture_ready."""
condition = _ros_cache["capture_ready_condition"]
@@ -348,9 +409,7 @@ def clear_capture_ready_queue() -> None:
def is_ros_configured() -> bool:
- from app.config import settings
- uri = (settings.ROS_MASTER_URI or os.environ.get("ROS_MASTER_URI") or "").strip()
- return bool(uri)
+ return bool(get_ros_master_uri())
def get_required_topics_status() -> Dict[str, Any]:
diff --git a/application/backend/app/services/waypoint_capture_service.py b/application/backend/app/services/waypoint_capture_service.py
index 999710f..c140ad1 100644
--- a/application/backend/app/services/waypoint_capture_service.py
+++ b/application/backend/app/services/waypoint_capture_service.py
@@ -24,8 +24,10 @@
is_ros_configured,
start_ros_bridge,
get_latest_from_ros,
+ get_coverage_state,
wait_for_next_capture_ready,
clear_capture_ready_queue,
+ clear_coverage_state,
save_live_rgb_to_file,
save_live_thermal_to_file,
)
@@ -112,6 +114,15 @@ def _mark_scan_completed(scan_id: int) -> None:
db.close()
+def _sync_total_points_from_ros() -> Optional[int]:
+ metrics = get_coverage_state()
+ total_points = metrics.get("total_points")
+ if total_points is not None:
+ with _capture_state_lock:
+ _capture_state["total_points"] = int(total_points)
+ return total_points
+
+
def _capture_loop_impl(scan_id: int):
stop_event = _capture_state["stop_event"]
if not stop_event:
@@ -128,6 +139,18 @@ def _capture_loop_impl(scan_id: int):
simulate = not use_ros and not _sht40_script.exists()
while not stop_event.is_set():
+ if use_ros:
+ total_points = _sync_total_points_from_ros()
+ coverage_state = get_coverage_state()
+ with _capture_state_lock:
+ captured_points = int(_capture_state.get("captured_points") or 0)
+ if coverage_state.get("complete") and total_points is not None and captured_points >= int(total_points):
+ with _capture_state_lock:
+ _capture_state["status"] = "completed"
+ _mark_scan_completed(scan_id)
+ stop_event.set()
+ break
+
capture_ready = wait_for_next_capture_ready(timeout_sec=0.5) if use_ros else False
if stop_event.is_set():
break
@@ -221,8 +244,11 @@ def _capture_loop_impl(scan_id: int):
finally:
db.close()
+ ros_total_points = _sync_total_points_from_ros() if use_ros else None
with _capture_state_lock:
_capture_state["captured_points"] = sequence_index + 1
+ if ros_total_points is not None:
+ _capture_state["total_points"] = int(ros_total_points)
total_points = _capture_state["total_points"]
sequence_index += 1
if total_points and sequence_index >= total_points:
@@ -237,7 +263,7 @@ def _capture_loop_impl(scan_id: int):
_capture_state["status"] = "stopped"
-def start_capture_loop(scan_id: int, total_points: Optional[int] = None) -> None:
+def start_capture_loop(scan_id: int, total_points: Optional[int] = None, require_ros: bool = False) -> None:
"""Start background thread that captures on each ROS '/coverage/capture_ready'=true event."""
old_thread = _capture_state["thread"]
if old_thread is not None and old_thread.is_alive():
@@ -249,8 +275,11 @@ def start_capture_loop(scan_id: int, total_points: Optional[int] = None) -> None
rgb_dir = os.path.join(settings.UPLOAD_DIR, "realsense_latest")
os.makedirs(thermal_dir, exist_ok=True)
os.makedirs(rgb_dir, exist_ok=True)
- use_ros = is_ros_configured() and start_ros_bridge(thermal_dir, rgb_dir)
+ use_ros = start_ros_bridge(thermal_dir, rgb_dir)
+ if require_ros and not use_ros:
+ raise RuntimeError("ROS bridge unavailable; refusing to simulate mission captures")
clear_capture_ready_queue()
+ clear_coverage_state()
logger.warning("Starting capture loop: scan_id=%d, total_points=%s, use_ros=%s", scan_id, total_points, use_ros)
_capture_state["use_ros"] = use_ros
_capture_state["stop_event"] = threading.Event()
@@ -295,6 +324,10 @@ def get_capture_progress() -> dict:
total_points = _capture_state.get("total_points")
status = _capture_state.get("status") or "idle"
last_capture_ready = _capture_state.get("last_capture_ready")
+ use_ros = bool(_capture_state.get("use_ros"))
+ ros_total_points = get_coverage_state().get("total_points") if use_ros else None
+ if ros_total_points is not None:
+ total_points = int(ros_total_points)
progress_percent = 0.0
if total_points and total_points > 0:
progress_percent = min(100.0, (captured_points / total_points) * 100.0)
diff --git a/application/src/App.jsx b/application/src/App.jsx
index bcae852..30e72ac 100644
--- a/application/src/App.jsx
+++ b/application/src/App.jsx
@@ -190,6 +190,11 @@ function App() {
if (progress?.status === 'completed' && !finishHandledRef.current) {
finishHandledRef.current = true
+ try {
+ await apiClient.stopCoverageMission()
+ } catch (stopError) {
+ console.error('Failed to stop completed mission cleanly:', stopError)
+ }
setIsScanning(false)
setRobotStatus(prev => ({ ...prev, operatingState: 'Idle' }))
setScanPhase('Scan complete')
@@ -275,7 +280,7 @@ function App() {
const handleConfirmStartScan = async ({
areaSize, precision, totalPoints: configuredTotal,
- originX = 0, originY = 0, rowSpacing = 0.8, dwellTime = 2.0, waypointTimeout = 30.0
+ originX = 0, originY = 0, rowSpacing = 0.5, dwellTime = 2.0, waypointTimeout = 30.0
}) => {
setShowScanConfigModal(false)
try {
@@ -506,4 +511,3 @@ function App() {
}
export default App
-
diff --git a/application/src/components/ScanConfigModal.jsx b/application/src/components/ScanConfigModal.jsx
index 248b946..8c19ef9 100644
--- a/application/src/components/ScanConfigModal.jsx
+++ b/application/src/components/ScanConfigModal.jsx
@@ -7,7 +7,7 @@ const SCAN_SECONDS_PER_SQUARE_METER = 5
const SCAN_SECONDS_PER_POINT = 8
const FUEL_ESTIMATE_SECONDS_PER_POINT = 60
-const WALL_MARGIN = 0.20
+const WALL_MARGIN = 0.45
function calcTotalPoints(areaSize, precision, rowSpacing) {
const ew = areaSize - 2 * WALL_MARGIN
@@ -32,7 +32,7 @@ function ScanConfigModal({ open, onCancel, onConfirm }) {
const [precision, setPrecision] = useState(0.5)
const [originX, setOriginX] = useState(0)
const [originY, setOriginY] = useState(0)
- const [rowSpacing, setRowSpacing] = useState(0.8)
+ const [rowSpacing, setRowSpacing] = useState(0.5)
const [dwellTime, setDwellTime] = useState(2.0)
const [waypointTimeout, setWaypointTimeout] = useState(30.0)
@@ -86,7 +86,7 @@ function ScanConfigModal({ open, onCancel, onConfirm }) {
-
Origin (robot start position in meters)
+
Area Center in Odom (meters)
diff --git a/catkin_ws/src/pyroscope_navigation/config/costmap_common.yaml b/catkin_ws/src/pyroscope_navigation/config/costmap_common.yaml
index 9b62936..e59c9b4 100644
--- a/catkin_ws/src/pyroscope_navigation/config/costmap_common.yaml
+++ b/catkin_ws/src/pyroscope_navigation/config/costmap_common.yaml
@@ -20,8 +20,8 @@ obstacle_layer:
# Inflation layer params
inflation_layer:
- inflation_radius: 0.12
- cost_scaling_factor: 5.0
+ inflation_radius: 0.20
+ cost_scaling_factor: 2.0
# Map resolution
resolution: 0.05
diff --git a/catkin_ws/src/pyroscope_navigation/config/dwa_planner.yaml b/catkin_ws/src/pyroscope_navigation/config/dwa_planner.yaml
index 041d94b..26f7fad 100644
--- a/catkin_ws/src/pyroscope_navigation/config/dwa_planner.yaml
+++ b/catkin_ws/src/pyroscope_navigation/config/dwa_planner.yaml
@@ -2,37 +2,38 @@
# Slower speeds, larger tolerances, longer simulation horizon
DWAPlannerROS:
- # Robot velocity limits (conservative for uneven terrain)
- max_vel_x: 0.45
- min_vel_x: -0.10
+ # Robot velocity limits (small indoor arena, prioritize control over speed)
+ max_vel_x: 0.25
+ min_vel_x: 0.05
max_vel_y: 0.0
min_vel_y: 0.0
- max_vel_theta: 1.2
- min_vel_theta: -1.2
- min_vel_trans: 0.08
- acc_lim_x: 1.5
+ max_vel_theta: 0.8
+ min_vel_theta: -0.8
+ min_vel_trans: 0.04
+ acc_lim_x: 0.8
acc_lim_y: 0.0
- acc_lim_theta: 3.0
+ acc_lim_theta: 1.5
- # Goal tolerances (relaxed for outdoor GPS/odom drift)
- yaw_goal_tolerance: 0.4
- xy_goal_tolerance: 0.40
+ # Goal tolerances (tight enough for real coverage, still achievable with wheel odom)
+ yaw_goal_tolerance: 0.30
+ xy_goal_tolerance: 0.15
latch_xy_goal_tolerance: true
- # Forward simulation (longer horizon for outdoor planning)
+ # Forward simulation
sim_time: 2.0
sim_granularity: 0.05
- vx_samples: 12
+ vx_samples: 10
vy_samples: 1
- vtheta_samples: 20
+ vtheta_samples: 12
- # Trajectory scoring -- balance reaching goals vs avoiding walls
- path_distance_bias: 20.0
- goal_distance_bias: 28.0
+ # Trajectory scoring
+ path_distance_bias: 8.0
+ goal_distance_bias: 16.0
occdist_scale: 0.3
# Oscillation prevention
- oscillation_reset_dist: 0.10
+ oscillation_reset_dist: 0.20
# Stop before hitting things
- forward_point_distance: 0.15
+ forward_point_distance: 0.10
+ stop_time_buffer: 0.25
diff --git a/catkin_ws/src/pyroscope_navigation/config/global_costmap.yaml b/catkin_ws/src/pyroscope_navigation/config/global_costmap.yaml
index 9be8d99..1377f1b 100644
--- a/catkin_ws/src/pyroscope_navigation/config/global_costmap.yaml
+++ b/catkin_ws/src/pyroscope_navigation/config/global_costmap.yaml
@@ -4,8 +4,8 @@
global_costmap:
global_frame: odom
robot_base_frame: base_link
- update_frequency: 2.0
- publish_frequency: 1.0
+ update_frequency: 4.0
+ publish_frequency: 2.0
transform_tolerance: 3.0
static_map: false
rolling_window: true
diff --git a/catkin_ws/src/pyroscope_navigation/launch/coverage_mission_nav.launch b/catkin_ws/src/pyroscope_navigation/launch/coverage_mission_nav.launch
index 40ded62..94a2fd7 100644
--- a/catkin_ws/src/pyroscope_navigation/launch/coverage_mission_nav.launch
+++ b/catkin_ws/src/pyroscope_navigation/launch/coverage_mission_nav.launch
@@ -2,10 +2,20 @@
-
-
+
+
+
+
+
+
+
+
+
+
+
+
@@ -14,7 +24,7 @@
args="0 0 0.15 0 0 0 base_link laser 100"/>
-
+
@@ -32,18 +42,18 @@
-
-
-
+
+
+
-
-
+
+
-
+
@@ -51,6 +61,16 @@
+
+
+
+
+
+
+
+
+
+
diff --git a/catkin_ws/src/pyroscope_navigation/scripts/coverage_planner.py b/catkin_ws/src/pyroscope_navigation/scripts/coverage_planner.py
index 800f3f1..4fbfb48 100755
--- a/catkin_ws/src/pyroscope_navigation/scripts/coverage_planner.py
+++ b/catkin_ws/src/pyroscope_navigation/scripts/coverage_planner.py
@@ -1,169 +1,146 @@
#!/usr/bin/env python
"""
-Coverage Path Planner for Pyroscope
-Generates a boustrophedon (lawnmower) pattern over a rectangular area
-and sends waypoints to move_base for execution with obstacle avoidance.
-Pauses at each waypoint for thermal camera capture.
+Costmap-aware coverage planner for Pyroscope.
+
+This node does not precommit to a blind lawnmower list. It samples the live
+global costmap inside the requested square, keeps only safe free-space targets,
+and then repeatedly chooses the next uncovered target that has the shortest
+reachable plan from the robot's current pose.
"""
-import rospy
import math
+
import actionlib
+import rospy
import tf
+from geometry_msgs.msg import PoseStamped
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
-from geometry_msgs.msg import Twist
from nav_msgs.msg import OccupancyGrid
-from sensor_msgs.msg import LaserScan
-from std_msgs.msg import Bool, String
+from nav_msgs.srv import GetPlan
+from std_msgs.msg import Bool, Int32, String
+
+
+WALL_MARGIN = 0.30 # meters
-# Margin inset from area boundary so waypoints never land on walls.
-# Must be >= robot half-width (0.125m) + inflation_radius (0.15m).
-WALL_MARGIN = 0.20 # meters
+class CoverageTarget(object):
+ def __init__(self, target_id, x, y, row, col):
+ self.target_id = target_id
+ self.x = x
+ self.y = y
+ self.row = row
+ self.col = col
+ self.covered = False
+ self.skipped = False
+ self.failures = 0
+ self.currently_safe = True
-class CoveragePlanner:
+class CoveragePlanner(object):
def __init__(self):
rospy.init_node('coverage_planner', anonymous=False)
+ self.ready = False
- # Area parameters
+ # Mission geometry
self.area_width = rospy.get_param('~area_width', 10.0)
self.area_height = rospy.get_param('~area_height', 10.0)
self.row_spacing = rospy.get_param('~row_spacing', 1.0)
self.waypoint_spacing = rospy.get_param('~waypoint_spacing', 1.0)
self.origin_x = rospy.get_param('~origin_x', 0.0)
self.origin_y = rospy.get_param('~origin_y', 0.0)
+ self.wall_margin = rospy.get_param('~wall_margin', WALL_MARGIN)
- # Timing parameters
+ # Timing and planning
self.dwell_time = rospy.get_param('~dwell_time', 3.0)
self.waypoint_timeout = rospy.get_param('~waypoint_timeout', 60.0)
- self.max_waypoint_failures = int(rospy.get_param('~max_waypoint_failures', 3))
- self.pose_stale_timeout = rospy.get_param('~pose_stale_timeout', 0.75)
- self.scan_stale_timeout = rospy.get_param('~scan_stale_timeout', 1.0)
- self.costmap_stale_timeout = rospy.get_param('~costmap_stale_timeout', 2.0)
-
- # Recovery parameters
- self.recovery_turn_speed = rospy.get_param('~recovery_turn_speed', 0.7)
- self.recovery_turn_angle = rospy.get_param('~recovery_turn_angle', 1.0)
- self.recovery_backup_speed = rospy.get_param('~recovery_backup_speed', -0.12)
- self.recovery_backup_distance = rospy.get_param('~recovery_backup_distance', 0.35)
- self.recovery_clearance = rospy.get_param('~recovery_clearance', 0.30)
- self.waypoint_validation_radius = rospy.get_param('~waypoint_validation_radius', 0.18)
- self.waypoint_cost_threshold = int(rospy.get_param('~waypoint_cost_threshold', 80))
- self.scan_blocked_margin = rospy.get_param('~scan_blocked_margin', 0.20)
- self.scan_blocked_sector = rospy.get_param('~scan_blocked_sector', 0.20)
+ self.pose_stale_timeout = rospy.get_param('~pose_stale_timeout', 2.0)
+ self.costmap_stale_timeout = rospy.get_param('~costmap_stale_timeout', 5.0)
+ self.make_plan_tolerance = rospy.get_param('~make_plan_tolerance', 0.25)
+ self.progress_log_interval = rospy.get_param('~progress_log_interval', 2.0)
+ self.progress_epsilon = rospy.get_param('~progress_epsilon', 0.05)
+ self.stall_timeout = rospy.get_param('~stall_timeout', 30.0)
+ self.no_target_retry_limit = int(rospy.get_param('~no_target_retry_limit', 15))
+ self.no_target_retry_sleep = rospy.get_param('~no_target_retry_sleep', 3.0)
+ self.costmap_settle_time = rospy.get_param('~costmap_settle_time', 3.0)
+
+ # Coverage target safety
+ self.target_cost_threshold = int(rospy.get_param('~target_cost_threshold', 85))
+ self.target_check_radius = rospy.get_param('~target_check_radius', 0.10)
+ self.max_target_failures = int(rospy.get_param('~max_target_failures', 8))
+ self.failure_penalty = rospy.get_param('~failure_penalty', 0.75)
+ self.row_change_penalty = rospy.get_param('~row_change_penalty', 0.10)
# State
- self.waypoints = []
- self.current_index = 0
- self.latest_scan = None
- self.latest_scan_stamp = None
+ self.targets = []
+ self.target_lookup = {}
+ self.next_target_id = 0
+ self.sweep_target_ids = []
+ self.sweep_cursor = 0
+ self.sequence_initialized = False
self.latest_costmap = None
self.latest_costmap_stamp = None
+ self.last_selected_row = None
# Publishers
- self.cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=1)
self.capture_ready_pub = rospy.Publisher('/coverage/capture_ready', Bool, queue_size=1)
- self.progress_pub = rospy.Publisher('/coverage/progress', String, queue_size=1)
- self.complete_pub = rospy.Publisher('/coverage/complete', Bool, queue_size=1)
- self.scan_sub = rospy.Subscriber('/scan', LaserScan, self.scan_callback, queue_size=1)
- self.costmap_sub = rospy.Subscriber('/move_base/global_costmap/costmap', OccupancyGrid, self.costmap_callback, queue_size=1)
- self.tf_listener = tf.TransformListener()
+ self.progress_pub = rospy.Publisher('/coverage/progress', String, queue_size=1, latch=True)
+ self.total_points_pub = rospy.Publisher('/coverage/total_points', Int32, queue_size=1, latch=True)
+ self.complete_pub = rospy.Publisher('/coverage/complete', Bool, queue_size=1, latch=True)
+
+ # Subscribers
+ self.costmap_sub = rospy.Subscriber(
+ '/move_base/global_costmap/costmap',
+ OccupancyGrid,
+ self.costmap_callback,
+ queue_size=1,
+ )
- # move_base action client
+ # move_base interfaces
+ self.tf_listener = tf.TransformListener()
self.move_base_client = actionlib.SimpleActionClient('move_base', MoveBaseAction)
+ self.make_plan_srv = None
+ self.clear_costmaps_srv = None
+
+ self.complete_pub.publish(Bool(data=False))
+ self.total_points_pub.publish(Int32(data=0))
+ self.progress_pub.publish(String(data='Waiting for costmap...'))
+
rospy.loginfo("Waiting for move_base action server...")
- if not self.move_base_client.wait_for_server(rospy.Duration(30.0)):
- rospy.logfatal("move_base action server not available -- aborting")
+ if not self.move_base_client.wait_for_server(rospy.Duration(60.0)):
+ rospy.logfatal("move_base action server not available -- inspect move_base startup logs")
rospy.signal_shutdown("move_base unavailable")
return
- rospy.loginfo("Connected to move_base action server")
+ try:
+ rospy.wait_for_service('/move_base/make_plan', timeout=5.0)
+ self.make_plan_srv = rospy.ServiceProxy('/move_base/make_plan', GetPlan)
+ rospy.loginfo("Connected to /move_base/make_plan")
+ except rospy.ROSException:
+ rospy.logwarn("/move_base/make_plan not available; planner will fall back to Euclidean ordering")
- # Generate waypoints
- self.generate_waypoints()
- self.validate_waypoint_distances()
+ try:
+ from std_srvs.srv import Empty
+ rospy.wait_for_service('/move_base/clear_costmaps', timeout=2.0)
+ self.clear_costmaps_srv = rospy.ServiceProxy('/move_base/clear_costmaps', Empty)
+ except Exception:
+ self.clear_costmaps_srv = None
- rospy.loginfo("Coverage Planner initialized")
- rospy.loginfo(" Area: %.1f x %.1f m (margin: %.2fm from walls)",
- self.area_width, self.area_height, WALL_MARGIN)
- rospy.loginfo(" Row spacing: %.1f m, Waypoint spacing: %.1f m",
+ rospy.loginfo("Coverage planner configured")
+ rospy.loginfo(" Area: %.2f x %.2f m centered at (%.2f, %.2f)",
+ self.area_width, self.area_height, self.origin_x, self.origin_y)
+ rospy.loginfo(" Sampling: row_spacing=%.2f m waypoint_spacing=%.2f m",
self.row_spacing, self.waypoint_spacing)
- rospy.loginfo(" Origin: (%.1f, %.1f)", self.origin_x, self.origin_y)
- rospy.loginfo(" Total waypoints: %d", len(self.waypoints))
- rospy.loginfo(" Dwell time: %.1f s", self.dwell_time)
- rospy.loginfo(" Max retries per waypoint: %d", self.max_waypoint_failures)
-
- def generate_waypoints(self):
- """Generate boustrophedon (lawnmower) waypoints with wall margin inset."""
- self.waypoints = []
-
- # Center the grid on the origin, then inset by wall margin
- min_x = self.origin_x - self.area_width / 2.0 + WALL_MARGIN
- max_x = self.origin_x + self.area_width / 2.0 - WALL_MARGIN
- min_y = self.origin_y - self.area_height / 2.0 + WALL_MARGIN
- max_y = self.origin_y + self.area_height / 2.0 - WALL_MARGIN
-
- # If the area is too small for any margin, just use the center
- if max_x <= min_x or max_y <= min_y:
- cx = self.origin_x + self.area_width / 2.0
- cy = self.origin_y + self.area_height / 2.0
- self.waypoints.append((cx, cy))
- rospy.logwarn("Area too small for margin -- using center point only (%.2f, %.2f)", cx, cy)
- return
-
- effective_width = max_x - min_x
- effective_height = max_y - min_y
-
- num_rows = max(1, int(math.ceil(effective_height / self.row_spacing)) + 1)
- num_cols = max(1, int(math.ceil(effective_width / self.waypoint_spacing)) + 1)
-
- for row in range(num_rows):
- y = min_y + row * self.row_spacing
- if y > max_y:
- y = max_y
-
- # Alternate direction for lawnmower pattern
- if row % 2 == 0:
- col_range = range(num_cols)
- else:
- col_range = range(num_cols - 1, -1, -1)
-
- for col in col_range:
- x = min_x + col * self.waypoint_spacing
- if x > max_x:
- x = max_x
-
- self.waypoints.append((x, y))
-
- rospy.loginfo("Generated %d waypoints in %d rows (inset %.2fm from walls)",
- len(self.waypoints), num_rows, WALL_MARGIN)
-
- def validate_waypoint_distances(self):
- """Warn if consecutive waypoints exceed global costmap range"""
- costmap_half = 7.5 # half of 15m rolling window
- for i in range(1, len(self.waypoints)):
- x0, y0 = self.waypoints[i - 1]
- x1, y1 = self.waypoints[i]
- dist = math.sqrt((x1 - x0) ** 2 + (y1 - y0) ** 2)
- if dist > costmap_half:
- rospy.logwarn(
- "Waypoints %d->%d are %.1fm apart (exceeds %.1fm costmap radius) "
- "-- move_base may fail to plan",
- i, i + 1, dist, costmap_half
- )
-
- def scan_callback(self, msg):
- self.latest_scan = msg
- self.latest_scan_stamp = msg.header.stamp if msg.header.stamp != rospy.Time() else rospy.Time.now()
+ rospy.loginfo(" Safety: wall_margin=%.2f m target_cost_threshold=%d target_check_radius=%.2f m",
+ self.wall_margin, self.target_cost_threshold, self.target_check_radius)
+ self.ready = True
def costmap_callback(self, msg):
self.latest_costmap = msg
- self.latest_costmap_stamp = msg.header.stamp if msg.header.stamp != rospy.Time() else rospy.Time.now()
-
- def stop_robot(self):
- self.cmd_vel_pub.publish(Twist())
+ if msg.header.stamp != rospy.Time():
+ self.latest_costmap_stamp = msg.header.stamp
+ else:
+ self.latest_costmap_stamp = rospy.Time.now()
def get_robot_pose(self):
try:
@@ -185,60 +162,54 @@ def wait_for_fresh_pose(self, timeout):
rate.sleep()
return False
- def get_scan_age(self):
- if self.latest_scan_stamp is None:
- return None
- return abs((rospy.Time.now() - self.latest_scan_stamp).to_sec())
-
def get_costmap_age(self):
if self.latest_costmap_stamp is None:
return None
return abs((rospy.Time.now() - self.latest_costmap_stamp).to_sec())
- def normalize_angle(self, angle):
- return math.atan2(math.sin(angle), math.cos(angle))
-
- def angle_in_sector(self, angle, start, end):
- angle = self.normalize_angle(angle)
- start = self.normalize_angle(start)
- end = self.normalize_angle(end)
- if start <= end:
- return start <= angle <= end
- return angle >= start or angle <= end
-
- def sector_clearance(self, start_angle, end_angle):
- if self.latest_scan is None:
- return None
+ def costmap_is_fresh(self):
+ costmap_age = self.get_costmap_age()
+ return costmap_age is not None and costmap_age <= self.costmap_stale_timeout
- values = []
- angle = self.latest_scan.angle_min
- for distance in self.latest_scan.ranges:
- if self.angle_in_sector(angle, start_angle, end_angle):
- if math.isinf(distance):
- values.append(self.latest_scan.range_max)
- elif not math.isnan(distance):
- values.append(distance)
- angle += self.latest_scan.angle_increment
-
- return min(values) if values else None
-
- def choose_turn_direction(self):
- left_clearance = self.sector_clearance(math.radians(20), math.radians(110))
- right_clearance = self.sector_clearance(math.radians(-110), math.radians(-20))
-
- if left_clearance is None and right_clearance is None:
- return 1.0
- if right_clearance is None:
- return 1.0
- if left_clearance is None:
- return -1.0
- return 1.0 if left_clearance >= right_clearance else -1.0
+ def wait_for_costmap(self, timeout):
+ deadline = rospy.Time.now() + rospy.Duration(timeout)
+ rate = rospy.Rate(10)
+ while rospy.Time.now() < deadline and not rospy.is_shutdown():
+ if self.latest_costmap is not None and self.costmap_is_fresh():
+ return True
+ rate.sleep()
+ return False
- def costmap_cell(self, mx, my):
- info = self.latest_costmap.info
- if mx < 0 or my < 0 or mx >= info.width or my >= info.height:
- return None
- return self.latest_costmap.data[my * info.width + mx]
+ def build_pose(self, x, y, yaw):
+ pose = PoseStamped()
+ pose.header.frame_id = 'odom'
+ pose.header.stamp = rospy.Time.now()
+ pose.pose.position.x = x
+ pose.pose.position.y = y
+ pose.pose.position.z = 0.0
+ quat = tf.transformations.quaternion_from_euler(0.0, 0.0, yaw)
+ pose.pose.orientation.x = quat[0]
+ pose.pose.orientation.y = quat[1]
+ pose.pose.orientation.z = quat[2]
+ pose.pose.orientation.w = quat[3]
+ return pose
+
+ def axis_points(self, min_value, max_value, spacing):
+ points = []
+ value = min_value
+ while value <= max_value + 1e-6:
+ points.append(min(value, max_value))
+ value += spacing
+ if not points:
+ points = [min_value]
+ elif abs(points[-1] - max_value) > 1e-6:
+ points.append(max_value)
+
+ deduped = []
+ for point in points:
+ if not deduped or abs(point - deduped[-1]) > 1e-6:
+ deduped.append(point)
+ return deduped
def world_to_costmap(self, x, y):
if self.latest_costmap is None:
@@ -247,267 +218,539 @@ def world_to_costmap(self, x, y):
info = self.latest_costmap.info
mx = int(math.floor((x - info.origin.position.x) / info.resolution))
my = int(math.floor((y - info.origin.position.y) / info.resolution))
+
if mx < 0 or my < 0 or mx >= info.width or my >= info.height:
return None
- return mx, my
+ return (mx, my)
- def waypoint_cost_invalid(self, x, y):
- costmap_age = self.get_costmap_age()
- if self.latest_costmap is None or costmap_age is None or costmap_age > self.costmap_stale_timeout:
+ def costmap_cell(self, mx, my):
+ info = self.latest_costmap.info
+ if mx < 0 or my < 0 or mx >= info.width or my >= info.height:
+ return None
+ return self.latest_costmap.data[my * info.width + mx]
+
+ def is_target_safe(self, x, y):
+ if self.latest_costmap is None:
return False
center = self.world_to_costmap(x, y)
if center is None:
return False
+ info = self.latest_costmap.info
+ radius_cells = int(math.ceil(self.target_check_radius / info.resolution))
mx, my = center
- radius_cells = int(math.ceil(self.waypoint_validation_radius / self.latest_costmap.info.resolution))
for cx in range(mx - radius_cells, mx + radius_cells + 1):
for cy in range(my - radius_cells, my + radius_cells + 1):
cost = self.costmap_cell(cx, cy)
if cost is None:
- continue
- if cost < 0 or cost >= self.waypoint_cost_threshold:
- rospy.logwarn(
- "Skipping waypoint (%.2f, %.2f): costmap cell %d near target exceeds threshold %d",
- x, y, cost, self.waypoint_cost_threshold
- )
- return True
+ continue # neighbour outside costmap grid; skip
+ if cost < 0:
+ continue # unknown cell; treat as potentially free
+ if cost >= self.target_cost_threshold:
+ return False
+ return True
- return False
+ def refresh_targets_from_costmap(self):
+ previous_known = len(self.targets)
+ previous_total = self.count_total_targets()
+ previous_pending = self.count_pending_targets()
+
+ min_x = self.origin_x - self.area_width / 2.0 + self.wall_margin
+ max_x = self.origin_x + self.area_width / 2.0 - self.wall_margin
+ min_y = self.origin_y - self.area_height / 2.0 + self.wall_margin
+ max_y = self.origin_y + self.area_height / 2.0 - self.wall_margin
+
+ if max_x <= min_x or max_y <= min_y:
+ rospy.logwarn_throttle(5.0, "Area too small after wall margin; using center point fallback")
+ key = (0, 0)
+ target = self.target_lookup.get(key)
+ if target is None and self.is_target_safe(self.origin_x, self.origin_y):
+ target = CoverageTarget(self.next_target_id, self.origin_x, self.origin_y, 0, 0)
+ self.next_target_id += 1
+ self.target_lookup[key] = target
+ if target is not None:
+ target.currently_safe = self.is_target_safe(self.origin_x, self.origin_y)
+ target.x = self.origin_x
+ target.y = self.origin_y
+ self.targets = sorted(self.target_lookup.values(), key=lambda existing: (existing.row, existing.col))
+ return
+
+ x_points = self.axis_points(min_x, max_x, self.waypoint_spacing)
+ y_points = self.axis_points(min_y, max_y, self.row_spacing)
+
+ for row_index, y in enumerate(y_points):
+ for col_index, x in enumerate(x_points):
+ key = (row_index, col_index)
+ currently_safe = self.is_target_safe(x, y)
+ target = self.target_lookup.get(key)
+
+ if target is None:
+ target = CoverageTarget(self.next_target_id, x, y, row_index, col_index)
+ self.next_target_id += 1
+ self.target_lookup[key] = target
+
+ target.x = x
+ target.y = y
+ target.currently_safe = currently_safe
+
+ self.targets = sorted(self.target_lookup.values(), key=lambda target: (target.row, target.col))
+ if len(self.targets) != previous_known:
+ self.sequence_initialized = False
+
+ current_total = self.count_total_targets()
+ current_pending = self.count_pending_targets()
+ if current_total != previous_total or current_pending != previous_pending:
+ rospy.loginfo(
+ "Coverage target set refreshed: total=%d pending=%d covered=%d skipped=%d",
+ current_total,
+ current_pending,
+ self.count_covered_targets(),
+ self.count_skipped_targets(),
+ )
+
+ def count_known_targets(self):
+ return len(self.targets)
+
+ def count_total_targets(self):
+ return len([target for target in self.targets if not target.skipped])
+
+ def count_covered_targets(self):
+ return len([target for target in self.targets if target.covered])
+
+ def count_pending_targets(self):
+ return len([target for target in self.targets if not target.covered and not target.skipped])
+
+ def count_active_targets(self):
+ return len([
+ target for target in self.targets
+ if not target.covered and not target.skipped and target.currently_safe
+ ])
+
+ def count_skipped_targets(self):
+ return len([target for target in self.targets if target.skipped])
+
+ def build_sweep_target_ids(self):
+ sequence = []
+ rows = sorted({target.row for target in self.targets})
+ for row in rows:
+ row_targets = [target for target in self.targets if target.row == row]
+ row_targets.sort(key=lambda target: target.col, reverse=bool(row % 2))
+ sequence.extend([target.target_id for target in row_targets])
+ self.sweep_target_ids = sequence
+
+ def initialize_sweep_sequence(self):
+ self.build_sweep_target_ids()
+ self.sequence_initialized = True
+ self.sweep_cursor = 0
+
+ if not self.sweep_target_ids:
+ return
- def waypoint_scan_blocked(self, x, y):
pose = self.get_robot_pose()
- scan_age = self.get_scan_age()
- if pose is None or scan_age is None or scan_age > self.scan_stale_timeout:
- return False
+ if pose is None:
+ return
- dx = x - pose[0]
- dy = y - pose[1]
- target_distance = math.sqrt(dx ** 2 + dy ** 2)
- if target_distance <= 0.05:
- return False
+ best_index = 0
+ best_distance = None
+ for index, target_id in enumerate(self.sweep_target_ids):
+ target = next((candidate for candidate in self.targets if candidate.target_id == target_id), None)
+ if target is None or target.covered or target.skipped:
+ continue
+ distance = math.sqrt((target.x - pose[0]) ** 2 + (target.y - pose[1]) ** 2)
+ if best_distance is None or distance < best_distance:
+ best_distance = distance
+ best_index = index
+ self.sweep_cursor = best_index
+
+ def ordered_pending_targets(self):
+ if not self.sequence_initialized:
+ self.initialize_sweep_sequence()
+
+ if not self.sweep_target_ids:
+ return []
+
+ target_by_id = {target.target_id: target for target in self.targets}
+ ordered = []
+ total = len(self.sweep_target_ids)
+ for offset in range(total):
+ index = (self.sweep_cursor + offset) % total
+ target = target_by_id.get(self.sweep_target_ids[index])
+ if target is None or target.covered or target.skipped:
+ continue
+ ordered.append(target)
+ return ordered
- target_bearing = self.normalize_angle(math.atan2(dy, dx) - pose[2])
- clearance = self.sector_clearance(
- target_bearing - self.scan_blocked_sector,
- target_bearing + self.scan_blocked_sector
+ def advance_sweep_cursor(self, target):
+ if not self.sweep_target_ids:
+ return
+ try:
+ index = self.sweep_target_ids.index(target.target_id)
+ except ValueError:
+ return
+ self.sweep_cursor = (index + 1) % len(self.sweep_target_ids)
+
+ def publish_total_points(self):
+ self.total_points_pub.publish(Int32(data=self.count_total_targets()))
+
+ def publish_progress(self):
+ covered = self.count_covered_targets()
+ total = self.count_total_targets()
+ skipped = self.count_skipped_targets()
+ active = self.count_active_targets()
+ if total > 0:
+ percent = int(round((100.0 * covered) / total))
+ else:
+ percent = 100
+ message = "%d/%d targets covered (%d%%, %d skipped, %d active)" % (
+ covered,
+ total,
+ percent,
+ skipped,
+ active,
)
- if clearance is None:
- return False
+ self.progress_pub.publish(String(data=message))
- if clearance + self.scan_blocked_margin < target_distance:
+ def path_length(self, poses):
+ if not poses:
+ return None
+ total = 0.0
+ for i in range(1, len(poses)):
+ p0 = poses[i - 1].pose.position
+ p1 = poses[i].pose.position
+ total += math.sqrt((p1.x - p0.x) ** 2 + (p1.y - p0.y) ** 2)
+ return total
+
+ def plan_length_to_target(self, start_pose, target):
+ if self.make_plan_srv is None:
+ dx = target.x - start_pose.pose.position.x
+ dy = target.y - start_pose.pose.position.y
+ return math.sqrt(dx ** 2 + dy ** 2)
+
+ goal = self.build_pose(target.x, target.y, 0.0)
+ try:
+ response = self.make_plan_srv(start_pose, goal, self.make_plan_tolerance)
+ except rospy.ServiceException:
+ return None
+
+ if not response.plan.poses:
+ return None
+ return self.path_length(response.plan.poses)
+
+ def choose_next_target_once(self):
+ self.refresh_targets_from_costmap()
+ if not self.costmap_is_fresh():
+ rospy.logwarn("Global costmap is stale while selecting the next coverage target")
+ return None
+
+ pose = self.get_robot_pose()
+ if pose is None or pose[3] > self.pose_stale_timeout:
+ rospy.logwarn("Robot pose is stale while selecting the next coverage target")
+ return None
+
+ start_pose = self.build_pose(pose[0], pose[1], pose[2])
+ best_target = None
+ best_score = None
+
+ pending = self.ordered_pending_targets()
+ fallback_target = None
+ planned_candidate_count = 0
+
+ for target in pending:
+ if not target.currently_safe:
+ continue
+
+ if fallback_target is None:
+ fallback_target = target
+
+ plan_length = self.plan_length_to_target(start_pose, target)
+ if plan_length is None:
+ continue
+ planned_candidate_count += 1
+
+ row_penalty = 0.0
+ if self.last_selected_row is not None and target.row != self.last_selected_row:
+ row_penalty = abs(target.row - self.last_selected_row) * self.row_change_penalty
+
+ score = plan_length + (target.failures * self.failure_penalty) + row_penalty
+ if best_score is None or score < best_score:
+ best_score = score
+ best_target = target
+
+ if best_target is not None:
+ return best_target
+
+ if fallback_target is not None:
rospy.logwarn(
- "Skipping waypoint (%.2f, %.2f): scan shows obstacle at %.2fm before target distance %.2fm",
- x, y, clearance, target_distance
+ "make_plan found no path for ordered safe targets; falling back to sweep target %d at (%.2f, %.2f)",
+ fallback_target.target_id,
+ fallback_target.x,
+ fallback_target.y,
)
- return True
+ return fallback_target
- return False
+ if self.count_pending_targets() > 0:
+ rospy.logwarn(
+ "No active safe targets remain in the live costmap (pending=%d active=%d planned=%d)",
+ self.count_pending_targets(),
+ self.count_active_targets(),
+ planned_candidate_count,
+ )
- def waypoint_is_valid(self, x, y):
- if self.waypoint_cost_invalid(x, y):
- return False
- if self.waypoint_scan_blocked(x, y):
- return False
- return True
+ return best_target
- def rotate_for_recovery(self, turn_direction):
- cmd = Twist()
- cmd.angular.z = turn_direction * abs(self.recovery_turn_speed)
+ def choose_next_target(self):
+ target = self.choose_next_target_once()
+ if target is not None:
+ return target
+ if self.clear_costmaps_srv is not None:
+ try:
+ rospy.logwarn("No reachable target found; clearing costmaps and retrying")
+ self.clear_costmaps_srv()
+ rospy.sleep(2.0)
+ self.refresh_targets_from_costmap()
+ except Exception:
+ pass
+ return self.choose_next_target_once()
+
+ return None
+
+ def send_move_base_goal(self, target):
pose = self.get_robot_pose()
- if pose is not None and pose[3] <= self.pose_stale_timeout:
- start_yaw = pose[2]
- timeout = rospy.Time.now() + rospy.Duration(4.0)
- rate = rospy.Rate(15)
- while not rospy.is_shutdown() and rospy.Time.now() < timeout:
- pose = self.get_robot_pose()
- if pose is None:
- break
- yaw_delta = abs(self.normalize_angle(pose[2] - start_yaw))
- if yaw_delta >= self.recovery_turn_angle:
- break
- self.cmd_vel_pub.publish(cmd)
- rate.sleep()
- else:
- duration = self.recovery_turn_angle / max(abs(self.recovery_turn_speed), 0.1)
- end_time = rospy.Time.now() + rospy.Duration(duration)
- rate = rospy.Rate(15)
- while not rospy.is_shutdown() and rospy.Time.now() < end_time:
- self.cmd_vel_pub.publish(cmd)
- rate.sleep()
-
- self.stop_robot()
-
- def backup_for_recovery(self):
- cmd = Twist()
- cmd.linear.x = min(self.recovery_backup_speed, -0.05)
- pose = self.get_robot_pose()
- target_distance = abs(self.recovery_backup_distance)
-
- if pose is not None and pose[3] <= self.pose_stale_timeout:
- start_x = pose[0]
- start_y = pose[1]
- timeout = rospy.Time.now() + rospy.Duration(6.0)
- rate = rospy.Rate(15)
- while not rospy.is_shutdown() and rospy.Time.now() < timeout:
- scan_age = self.get_scan_age()
- rear_clearance = self.sector_clearance(math.radians(140), math.radians(-140))
- if scan_age is not None and scan_age <= self.scan_stale_timeout and rear_clearance is not None and rear_clearance < self.recovery_clearance:
- rospy.logwarn("Stopping recovery backup early -- rear clearance only %.2fm", rear_clearance)
- break
-
- pose = self.get_robot_pose()
- if pose is None:
- break
- distance = math.sqrt((pose[0] - start_x) ** 2 + (pose[1] - start_y) ** 2)
- if distance >= target_distance:
- break
- self.cmd_vel_pub.publish(cmd)
- rate.sleep()
- else:
- duration = target_distance / max(abs(cmd.linear.x), 0.05)
- end_time = rospy.Time.now() + rospy.Duration(duration)
- rate = rospy.Rate(15)
- while not rospy.is_shutdown() and rospy.Time.now() < end_time:
- scan_age = self.get_scan_age()
- rear_clearance = self.sector_clearance(math.radians(140), math.radians(-140))
- if scan_age is not None and scan_age <= self.scan_stale_timeout and rear_clearance is not None and rear_clearance < self.recovery_clearance:
- rospy.logwarn("Stopping timed recovery backup early -- rear clearance only %.2fm", rear_clearance)
- break
- self.cmd_vel_pub.publish(cmd)
- rate.sleep()
-
- self.stop_robot()
-
- def perform_escape_recovery(self):
- rospy.logwarn("Running escape recovery: cancel goal, clear costmaps, rotate toward open side, then back up")
- self.move_base_client.cancel_goal()
- self.stop_robot()
- self.clear_costmaps()
- rospy.sleep(0.5)
-
- if self.get_scan_age() is None or self.get_scan_age() > self.scan_stale_timeout:
- rospy.logwarn("Laser scan is stale during recovery; using timed turn")
-
- turn_direction = self.choose_turn_direction()
- self.rotate_for_recovery(turn_direction)
- self.backup_for_recovery()
- self.clear_costmaps()
- rospy.sleep(1.0)
+ if pose is None or pose[3] > self.pose_stale_timeout:
+ rospy.logwarn("Robot pose is stale before goal dispatch")
+ return False
+
+ if not self.costmap_is_fresh():
+ rospy.logwarn("Global costmap is stale before goal dispatch")
+ return False
- def send_move_base_goal(self, x, y):
- """Send a goal to move_base and wait for result. Returns True on success."""
- if not self.wait_for_fresh_pose(2.0):
- rospy.logwarn("Robot pose is stale before goal dispatch; refusing to send goal until TF recovers")
+ target.currently_safe = self.is_target_safe(target.x, target.y)
+ if not target.currently_safe:
+ rospy.logwarn("Target %d at (%.2f, %.2f) is no longer safe before dispatch",
+ target.target_id, target.x, target.y)
return False
+ goal_yaw = pose[2]
+
goal = MoveBaseGoal()
- goal.target_pose.header.frame_id = "odom"
+ goal.target_pose.header.frame_id = 'odom'
goal.target_pose.header.stamp = rospy.Time.now()
- goal.target_pose.pose.position.x = x
- goal.target_pose.pose.position.y = y
+ goal.target_pose.pose.position.x = target.x
+ goal.target_pose.pose.position.y = target.y
goal.target_pose.pose.position.z = 0.0
- goal.target_pose.pose.orientation.w = 1.0
+
+ quat = tf.transformations.quaternion_from_euler(0.0, 0.0, goal_yaw)
+ goal.target_pose.pose.orientation.x = quat[0]
+ goal.target_pose.pose.orientation.y = quat[1]
+ goal.target_pose.pose.orientation.z = quat[2]
+ goal.target_pose.pose.orientation.w = quat[3]
+
+ start_distance = math.sqrt((target.x - pose[0]) ** 2 + (target.y - pose[1]) ** 2)
+ best_distance = start_distance
+ start_time = rospy.Time.now()
+ last_progress_time = start_time
+ last_log_time = start_time
self.move_base_client.send_goal(goal)
- finished = self.move_base_client.wait_for_result(rospy.Duration(self.waypoint_timeout))
+ rospy.loginfo(
+ "Tracking target %d at (%.2f, %.2f): start_distance=%.2f m timeout=%.1f s stall_timeout=%.1f s",
+ target.target_id,
+ target.x,
+ target.y,
+ start_distance,
+ self.waypoint_timeout,
+ self.stall_timeout,
+ )
- if not finished:
- self.move_base_client.cancel_goal()
- rospy.logwarn("move_base timed out reaching (%.2f, %.2f)", x, y)
- return False
+ rate = rospy.Rate(4)
+ while not rospy.is_shutdown():
+ if self.move_base_client.wait_for_result(rospy.Duration(0.0)):
+ break
+
+ now = rospy.Time.now()
+ pose = self.get_robot_pose()
+ if pose is not None and pose[3] <= self.pose_stale_timeout:
+ current_distance = math.sqrt((target.x - pose[0]) ** 2 + (target.y - pose[1]) ** 2)
+ if current_distance < best_distance - self.progress_epsilon:
+ improvement = best_distance - current_distance
+ best_distance = current_distance
+ last_progress_time = now
+ rospy.loginfo(
+ "Target %d progress: distance %.2f m (improved %.2f m, elapsed %.1f s)",
+ target.target_id,
+ current_distance,
+ improvement,
+ (now - start_time).to_sec(),
+ )
+ elif (now - last_log_time).to_sec() >= self.progress_log_interval:
+ rospy.loginfo(
+ "Target %d status: distance %.2f m best %.2f m elapsed %.1f s stalled %.1f s",
+ target.target_id,
+ current_distance,
+ best_distance,
+ (now - start_time).to_sec(),
+ (now - last_progress_time).to_sec(),
+ )
+ last_log_time = now
+
+ elapsed = (now - start_time).to_sec()
+ stalled = (now - last_progress_time).to_sec()
+ if elapsed >= self.waypoint_timeout:
+ self.move_base_client.cancel_goal()
+ rospy.logwarn(
+ "move_base timed out reaching target %d at (%.2f, %.2f): best_distance=%.2f m elapsed=%.1f s",
+ target.target_id,
+ target.x,
+ target.y,
+ best_distance,
+ elapsed,
+ )
+ return False
+
+ if stalled >= self.stall_timeout:
+ self.move_base_client.cancel_goal()
+ rospy.logwarn(
+ "move_base stalled on target %d at (%.2f, %.2f): best_distance=%.2f m stalled=%.1f s",
+ target.target_id,
+ target.x,
+ target.y,
+ best_distance,
+ stalled,
+ )
+ return False
+
+ rate.sleep()
state = self.move_base_client.get_state()
if state == actionlib.GoalStatus.SUCCEEDED:
+ rospy.loginfo(
+ "Reached target %d at (%.2f, %.2f): start_distance=%.2f m best_distance=%.2f m",
+ target.target_id,
+ target.x,
+ target.y,
+ start_distance,
+ best_distance,
+ )
return True
- else:
- rospy.logwarn("move_base failed for (%.2f, %.2f) -- state %d", x, y, state)
- return False
- def clear_costmaps(self):
- """Ask move_base to clear costmaps -- helps recover from stuck states."""
- try:
- from std_srvs.srv import Empty
- rospy.wait_for_service('/move_base/clear_costmaps', timeout=2.0)
- clear = rospy.ServiceProxy('/move_base/clear_costmaps', Empty)
- clear()
- rospy.loginfo("Costmaps cleared")
- except Exception:
- rospy.logwarn("Could not clear costmaps")
+ rospy.logwarn("move_base failed for target %d at (%.2f, %.2f) -- state %d",
+ target.target_id, target.x, target.y, state)
+ return False
- def publish_progress(self):
- """Publish current progress"""
- total = len(self.waypoints)
- msg = "{}/{} waypoints ({}%)".format(
- self.current_index, total,
- int(100.0 * self.current_index / total) if total > 0 else 0
- )
- self.progress_pub.publish(String(data=msg))
+ def capture_target(self):
+ rospy.loginfo("Dwelling for %.1f s and triggering capture", self.dwell_time)
+ self.capture_ready_pub.publish(Bool(data=True))
+ rospy.sleep(self.dwell_time)
+ self.capture_ready_pub.publish(Bool(data=False))
+
+ def complete_mission(self):
+ covered = self.count_covered_targets()
+ skipped = self.count_skipped_targets()
+ total = self.count_total_targets()
+ rospy.loginfo("Coverage mission complete: covered=%d total=%d skipped=%d", covered, total, skipped)
+ self.publish_total_points()
+ self.publish_progress()
+ self.complete_pub.publish(Bool(data=True))
+ rospy.sleep(1.0)
def run(self):
- # Wait for subscribers to connect and costmap to populate
- rospy.loginfo("Waiting 5s for costmap to populate from lidar...")
- rospy.sleep(5.0)
-
- rospy.loginfo("Starting coverage mission!")
-
- while self.current_index < len(self.waypoints) and not rospy.is_shutdown():
- x, y = self.waypoints[self.current_index]
- waypoint_failures = 0
+ if not self.ready or rospy.is_shutdown():
+ return
- if not self.waypoint_is_valid(x, y):
- rospy.logwarn("Skipping waypoint %d/%d because the target is invalid in the live environment",
- self.current_index + 1, len(self.waypoints))
- self.current_index += 1
- continue
+ rospy.loginfo("Waiting for a fresh robot pose...")
+ if not self.wait_for_fresh_pose(10.0):
+ rospy.logfatal("No fresh robot pose available; aborting coverage mission")
+ self.complete_pub.publish(Bool(data=True))
+ return
- while waypoint_failures < self.max_waypoint_failures and not rospy.is_shutdown():
- rospy.loginfo("Waypoint %d/%d attempt %d/%d: (%.2f, %.2f)",
- self.current_index + 1, len(self.waypoints),
- waypoint_failures + 1, self.max_waypoint_failures, x, y)
- self.publish_progress()
+ rospy.loginfo("Waiting for the global costmap to populate...")
+ if not self.wait_for_costmap(10.0):
+ rospy.logfatal("No fresh global costmap available; aborting coverage mission")
+ self.complete_pub.publish(Bool(data=True))
+ return
- if not self.waypoint_is_valid(x, y):
- waypoint_failures = self.max_waypoint_failures
- break
+ rospy.loginfo("Letting costmap settle for %.1f seconds before generating targets...",
+ self.costmap_settle_time)
+ rospy.sleep(self.costmap_settle_time)
+ self.refresh_targets_from_costmap()
+ self.publish_total_points()
+ self.publish_progress()
- success = self.send_move_base_goal(x, y)
- if success:
- rospy.loginfo("Reached waypoint %d/%d",
- self.current_index + 1, len(self.waypoints))
+ if self.count_known_targets() == 0:
+ rospy.logwarn("No coverage targets found inside the requested square")
+ self.complete_mission()
+ return
- # Only capture data at successfully reached waypoints
- rospy.loginfo("Dwelling for %.1f s (thermal capture)", self.dwell_time)
- self.capture_ready_pub.publish(Bool(data=True))
- rospy.sleep(self.dwell_time)
- self.capture_ready_pub.publish(Bool(data=False))
- break
+ active = self.count_active_targets()
+ pending = self.count_pending_targets()
+ rospy.loginfo("Initial target census: total=%d pending=%d active=%d",
+ self.count_known_targets(), pending, active)
- waypoint_failures += 1
- rospy.logwarn("Failed waypoint %d/%d on attempt %d/%d",
- self.current_index + 1, len(self.waypoints),
- waypoint_failures, self.max_waypoint_failures)
+ no_target_retries = 0
- if waypoint_failures < self.max_waypoint_failures:
- self.perform_escape_recovery()
+ while not rospy.is_shutdown():
+ self.refresh_targets_from_costmap()
+ self.publish_total_points()
+ self.publish_progress()
- if waypoint_failures >= self.max_waypoint_failures:
- rospy.logwarn("Skipping waypoint %d/%d after %d failed attempts",
- self.current_index + 1, len(self.waypoints),
- self.max_waypoint_failures)
+ if self.count_pending_targets() == 0:
+ break
- self.current_index += 1
+ target = self.choose_next_target()
+ if target is None:
+ pending_count = self.count_pending_targets()
+ active_count = self.count_active_targets()
+ if pending_count > 0 and no_target_retries < self.no_target_retry_limit:
+ no_target_retries += 1
+ rospy.logwarn(
+ "No selectable target right now (pending=%d active=%d). Waiting %.1fs for costmap/planner recovery before retry %d/%d",
+ pending_count,
+ active_count,
+ self.no_target_retry_sleep,
+ no_target_retries,
+ self.no_target_retry_limit,
+ )
+ rospy.sleep(self.no_target_retry_sleep)
+ continue
- # Mission complete
- rospy.loginfo("Coverage mission complete! %d/%d waypoints visited",
- self.current_index, len(self.waypoints))
- self.complete_pub.publish(Bool(data=True))
- self.publish_progress()
+ rospy.logwarn(
+ "No remaining reachable coverage targets; ending mission after %d retries (pending=%d active=%d)",
+ no_target_retries,
+ pending_count,
+ active_count,
+ )
+ for pending in self.targets:
+ if not pending.covered and not pending.skipped:
+ pending.skipped = True
+ self.publish_total_points()
+ self.publish_progress()
+ break
+
+ no_target_retries = 0
+ rospy.loginfo("Selected target %d at (%.2f, %.2f) row=%d col=%d failures=%d",
+ target.target_id, target.x, target.y,
+ target.row, target.col, target.failures)
+ self.advance_sweep_cursor(target)
+
+ success = self.send_move_base_goal(target)
+ if success:
+ target.covered = True
+ self.last_selected_row = target.row
+ self.publish_progress()
+ self.capture_target()
+ else:
+ target.failures += 1
+ if target.failures >= self.max_target_failures:
+ target.skipped = True
+ self.publish_total_points()
+ rospy.logwarn("Skipping target %d after %d failures",
+ target.target_id, self.max_target_failures)
+ self.publish_progress()
- rospy.spin()
+ self.complete_mission()
if __name__ == '__main__':
diff --git a/catkin_ws/src/pyroscope_navigation/scripts/test_coverage_planner.py b/catkin_ws/src/pyroscope_navigation/scripts/test_coverage_planner.py
new file mode 100644
index 0000000..ce0ad2d
--- /dev/null
+++ b/catkin_ws/src/pyroscope_navigation/scripts/test_coverage_planner.py
@@ -0,0 +1,358 @@
+#!/usr/bin/env python
+
+"""
+Unit tests for coverage_planner.CoveragePlanner.
+
+These tests exercise the target-safety and target-generation logic without
+requiring a running ROS graph. A lightweight stub replaces rospy so the
+planner class can be imported and tested in a plain Python environment.
+"""
+
+import math
+import sys
+import types
+import unittest
+
+# ---------------------------------------------------------------------------
+# Minimal rospy stub so coverage_planner.py can be imported outside ROS.
+# Only the symbols actually used at import / construction time are provided.
+# ---------------------------------------------------------------------------
+_rospy = types.ModuleType('rospy')
+_rospy.ROSInterruptException = type('ROSInterruptException', (Exception,), {})
+_rospy.ROSException = type('ROSException', (Exception,), {})
+
+
+class _FakeTime(object):
+ def __init__(self, secs=0):
+ self._secs = secs
+
+ def to_sec(self):
+ return float(self._secs)
+
+ def __sub__(self, other):
+ return _FakeDuration(self._secs - other._secs)
+
+ def __add__(self, other):
+ return _FakeTime(self._secs + other._secs)
+
+ def __lt__(self, other):
+ return self._secs < other._secs
+
+ def __ne__(self, other):
+ return self._secs != other._secs
+
+ def __call__(self, secs=0, nsecs=0):
+ return _FakeTime(secs)
+
+
+class _FakeDuration(object):
+ def __init__(self, secs=0):
+ self._secs = secs
+
+ def to_sec(self):
+ return float(self._secs)
+
+
+_rospy.Time = _FakeTime
+_rospy.Duration = _FakeDuration
+
+_rospy.get_param = lambda name, default=None: default
+_rospy.loginfo = lambda *a, **kw: None
+_rospy.logwarn = lambda *a, **kw: None
+_rospy.logwarn_throttle = lambda *a, **kw: None
+_rospy.logfatal = lambda *a, **kw: None
+_rospy.is_shutdown = lambda: False
+_rospy.signal_shutdown = lambda msg: None
+_rospy.sleep = lambda d: None
+_rospy.Rate = lambda hz: type('R', (), {'sleep': lambda s: None})()
+_rospy.init_node = lambda *a, **kw: None
+
+
+class _FakePublisher(object):
+ def __init__(self, *a, **kw):
+ pass
+
+ def publish(self, msg=None):
+ pass
+
+
+class _FakeSubscriber(object):
+ def __init__(self, *a, **kw):
+ pass
+
+
+_rospy.Publisher = _FakePublisher
+_rospy.Subscriber = _FakeSubscriber
+
+
+# Stub out every ROS dependency that coverage_planner imports at module level
+sys.modules['rospy'] = _rospy
+sys.modules['actionlib'] = types.ModuleType('actionlib')
+
+# Fake SimpleActionClient
+_sac = type('SimpleActionClient', (), {
+ '__init__': lambda self, *a, **kw: None,
+ 'wait_for_server': lambda self, timeout=None: True,
+})
+sys.modules['actionlib'].SimpleActionClient = _sac
+sys.modules['actionlib'].GoalStatus = type('GoalStatus', (), {'SUCCEEDED': 3})
+
+sys.modules['tf'] = types.ModuleType('tf')
+sys.modules['tf'].Exception = Exception
+sys.modules['tf'].LookupException = type('LookupException', (Exception,), {})
+sys.modules['tf'].ConnectivityException = type('ConnectivityException', (Exception,), {})
+sys.modules['tf'].ExtrapolationException = type('ExtrapolationException', (Exception,), {})
+sys.modules['tf'].TransformListener = lambda: None
+_tf_trans = types.ModuleType('tf.transformations')
+_tf_trans.euler_from_quaternion = lambda q: (0, 0, 0)
+_tf_trans.quaternion_from_euler = lambda r, p, y: (0, 0, 0, 1)
+sys.modules['tf'].transformations = _tf_trans
+sys.modules['tf.transformations'] = _tf_trans
+
+sys.modules['geometry_msgs'] = types.ModuleType('geometry_msgs')
+sys.modules['geometry_msgs.msg'] = types.ModuleType('geometry_msgs.msg')
+sys.modules['geometry_msgs.msg'].PoseStamped = type('PoseStamped', (), {})
+
+sys.modules['move_base_msgs'] = types.ModuleType('move_base_msgs')
+sys.modules['move_base_msgs.msg'] = types.ModuleType('move_base_msgs.msg')
+sys.modules['move_base_msgs.msg'].MoveBaseAction = type('MoveBaseAction', (), {})
+sys.modules['move_base_msgs.msg'].MoveBaseGoal = type('MoveBaseGoal', (), {})
+
+sys.modules['nav_msgs'] = types.ModuleType('nav_msgs')
+sys.modules['nav_msgs.msg'] = types.ModuleType('nav_msgs.msg')
+sys.modules['nav_msgs.msg'].OccupancyGrid = type('OccupancyGrid', (), {})
+sys.modules['nav_msgs.srv'] = types.ModuleType('nav_msgs.srv')
+sys.modules['nav_msgs.srv'].GetPlan = type('GetPlan', (), {})
+
+sys.modules['std_msgs'] = types.ModuleType('std_msgs')
+sys.modules['std_msgs.msg'] = types.ModuleType('std_msgs.msg')
+sys.modules['std_msgs.msg'].Bool = type('Bool', (), {'__init__': lambda s, **kw: None})
+sys.modules['std_msgs.msg'].Int32 = type('Int32', (), {'__init__': lambda s, **kw: None})
+sys.modules['std_msgs.msg'].String = type('String', (), {'__init__': lambda s, **kw: None})
+
+# ---------------------------------------------------------------------------
+# Now we can safely import the module under test.
+# ---------------------------------------------------------------------------
+import os
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+from coverage_planner import CoveragePlanner, CoverageTarget # noqa: E402
+
+
+# ---------------------------------------------------------------------------
+# Helper to build a fake costmap (nav_msgs/OccupancyGrid-like object).
+# ---------------------------------------------------------------------------
+class FakeCostmapInfo(object):
+ def __init__(self, width, height, resolution, origin_x, origin_y):
+ self.width = width
+ self.height = height
+ self.resolution = resolution
+
+ class Pos(object):
+ def __init__(self, x, y):
+ self.x = x
+ self.y = y
+
+ class Origin(object):
+ def __init__(self, x, y):
+ self.position = Pos(x, y)
+
+ self.origin = Origin(origin_x, origin_y)
+
+
+class FakeHeader(object):
+ def __init__(self):
+ self.stamp = _FakeTime(1)
+
+
+def make_costmap(width_cells, height_cells, resolution, origin_x, origin_y, default_cost=0):
+ """Return a fake OccupancyGrid with *default_cost* everywhere."""
+ cm = type('OG', (), {})()
+ cm.info = FakeCostmapInfo(width_cells, height_cells, resolution, origin_x, origin_y)
+ cm.data = [default_cost] * (width_cells * height_cells)
+ cm.header = FakeHeader()
+ return cm
+
+
+def set_cell(costmap, mx, my, cost):
+ costmap.data[my * costmap.info.width + mx] = cost
+
+
+# ---------------------------------------------------------------------------
+# Tests
+# ---------------------------------------------------------------------------
+class TestIsTargetSafe(unittest.TestCase):
+ """Verify is_target_safe against various costmap states."""
+
+ def _make_planner(self, threshold=85, check_radius=0.10):
+ planner = CoveragePlanner.__new__(CoveragePlanner)
+ planner.latest_costmap = None
+ planner.latest_costmap_stamp = None
+ planner.target_cost_threshold = threshold
+ planner.target_check_radius = check_radius
+ return planner
+
+ # -- basic cases --
+
+ def test_no_costmap_returns_false(self):
+ planner = self._make_planner()
+ self.assertFalse(planner.is_target_safe(0, 0))
+
+ def test_target_outside_costmap_returns_false(self):
+ planner = self._make_planner()
+ # Costmap covers [0, 5) x [0, 5) metres
+ planner.latest_costmap = make_costmap(100, 100, 0.05, 0.0, 0.0)
+ # Target at (10, 10) is outside
+ self.assertFalse(planner.is_target_safe(10.0, 10.0))
+
+ def test_free_space_returns_true(self):
+ planner = self._make_planner()
+ # 10x10 m costmap, all cost=0 (free)
+ planner.latest_costmap = make_costmap(200, 200, 0.05, -5.0, -5.0)
+ self.assertTrue(planner.is_target_safe(0.0, 0.0))
+
+ def test_obstacle_at_target_returns_false(self):
+ planner = self._make_planner(threshold=85)
+ planner.latest_costmap = make_costmap(200, 200, 0.05, -5.0, -5.0)
+ # Place obstacle (cost=100) at cell corresponding to (0, 0)
+ mx, my = planner.world_to_costmap(0.0, 0.0)
+ set_cell(planner.latest_costmap, mx, my, 100)
+ self.assertFalse(planner.is_target_safe(0.0, 0.0))
+
+ def test_obstacle_near_target_within_radius(self):
+ planner = self._make_planner(threshold=85, check_radius=0.10)
+ planner.latest_costmap = make_costmap(200, 200, 0.05, -5.0, -5.0)
+ mx, my = planner.world_to_costmap(0.0, 0.0)
+ # Place obstacle one cell away (within check_radius of 0.10m = 2 cells at 0.05 res)
+ set_cell(planner.latest_costmap, mx + 1, my, 100)
+ self.assertFalse(planner.is_target_safe(0.0, 0.0))
+
+ # -- unknown space handling (the key fix) --
+
+ def test_all_unknown_space_is_safe(self):
+ """Targets in completely unknown space should be considered safe.
+
+ This is the critical behaviour for mapless rolling-window navigation.
+ Unknown cells have cost -1 in the OccupancyGrid encoding.
+ """
+ planner = self._make_planner()
+ planner.latest_costmap = make_costmap(200, 200, 0.05, -5.0, -5.0, default_cost=-1)
+ self.assertTrue(planner.is_target_safe(0.0, 0.0))
+
+ def test_mixed_unknown_and_free_is_safe(self):
+ planner = self._make_planner()
+ planner.latest_costmap = make_costmap(200, 200, 0.05, -5.0, -5.0, default_cost=-1)
+ # Set center cell to free
+ mx, my = planner.world_to_costmap(0.0, 0.0)
+ set_cell(planner.latest_costmap, mx, my, 0)
+ self.assertTrue(planner.is_target_safe(0.0, 0.0))
+
+ def test_unknown_with_obstacle_is_unsafe(self):
+ planner = self._make_planner(threshold=85)
+ planner.latest_costmap = make_costmap(200, 200, 0.05, -5.0, -5.0, default_cost=-1)
+ mx, my = planner.world_to_costmap(0.0, 0.0)
+ set_cell(planner.latest_costmap, mx, my, 100)
+ self.assertFalse(planner.is_target_safe(0.0, 0.0))
+
+ # -- edge-of-costmap neighbour handling --
+
+ def test_target_near_costmap_edge_still_safe(self):
+ """A target at the edge of the costmap should not fail just because
+ some of its radius cells fall outside the grid."""
+ planner = self._make_planner(check_radius=0.10)
+ # Small costmap: 10x10 cells = 0.5x0.5 m starting at origin
+ planner.latest_costmap = make_costmap(10, 10, 0.05, 0.0, 0.0)
+ # Target at (0.025, 0.025) — cell (0, 0), edge of the grid
+ self.assertTrue(planner.is_target_safe(0.025, 0.025))
+
+ def test_cost_below_threshold_is_safe(self):
+ """Inflated cells with cost below the threshold should be safe."""
+ planner = self._make_planner(threshold=253)
+ planner.latest_costmap = make_costmap(200, 200, 0.05, -5.0, -5.0)
+ mx, my = planner.world_to_costmap(0.0, 0.0)
+ # Set cost to 99 (inscribed inflation in OccupancyGrid), below 253
+ set_cell(planner.latest_costmap, mx, my, 99)
+ self.assertTrue(planner.is_target_safe(0.0, 0.0))
+
+
+class TestRefreshTargets(unittest.TestCase):
+ """Verify target generation from costmap."""
+
+ def _make_planner(self, width=2.0, height=2.0, spacing=0.5, margin=0.3):
+ planner = CoveragePlanner.__new__(CoveragePlanner)
+ planner.area_width = width
+ planner.area_height = height
+ planner.row_spacing = spacing
+ planner.waypoint_spacing = spacing
+ planner.origin_x = 0.0
+ planner.origin_y = 0.0
+ planner.wall_margin = margin
+ planner.target_cost_threshold = 253
+ planner.target_check_radius = 0.05
+ planner.targets = []
+ planner.target_lookup = {}
+ planner.next_target_id = 0
+ planner.sweep_target_ids = []
+ planner.sweep_cursor = 0
+ planner.sequence_initialized = False
+ planner.latest_costmap = None
+ planner.latest_costmap_stamp = None
+ return planner
+
+ def test_generates_targets_in_free_space(self):
+ planner = self._make_planner(width=2.0, height=2.0, spacing=0.5, margin=0.3)
+ planner.latest_costmap = make_costmap(200, 200, 0.05, -5.0, -5.0)
+ planner.refresh_targets_from_costmap()
+ self.assertGreater(len(planner.targets), 0)
+ # All targets should be safe (no obstacles in the costmap)
+ for t in planner.targets:
+ self.assertTrue(t.currently_safe, "Target at (%.2f, %.2f) should be safe" % (t.x, t.y))
+
+ def test_generates_targets_in_unknown_space(self):
+ """Targets in unknown space should still be generated and marked safe."""
+ planner = self._make_planner(width=2.0, height=2.0, spacing=0.5, margin=0.3)
+ planner.latest_costmap = make_costmap(200, 200, 0.05, -5.0, -5.0, default_cost=-1)
+ planner.refresh_targets_from_costmap()
+ self.assertGreater(len(planner.targets), 0)
+ safe_count = sum(1 for t in planner.targets if t.currently_safe)
+ self.assertEqual(safe_count, len(planner.targets),
+ "All targets should be safe when space is unknown")
+
+ def test_targets_near_obstacle_are_unsafe(self):
+ planner = self._make_planner(width=2.0, height=2.0, spacing=0.5, margin=0.3)
+ planner.latest_costmap = make_costmap(200, 200, 0.05, -5.0, -5.0)
+ planner.target_cost_threshold = 85
+ # Place obstacle at origin
+ mx, my = planner.world_to_costmap(0.0, 0.0)
+ set_cell(planner.latest_costmap, mx, my, 100)
+ planner.refresh_targets_from_costmap()
+ # The target at (0, 0) should be unsafe
+ origin_target = None
+ for t in planner.targets:
+ if abs(t.x) < 0.01 and abs(t.y) < 0.01:
+ origin_target = t
+ break
+ if origin_target is not None:
+ self.assertFalse(origin_target.currently_safe)
+
+
+class TestSweepSequence(unittest.TestCase):
+ """Verify the lawnmower sweep ordering."""
+
+ def test_alternating_row_direction(self):
+ planner = CoveragePlanner.__new__(CoveragePlanner)
+ planner.targets = [
+ CoverageTarget(0, 0, 0, 0, 0),
+ CoverageTarget(1, 1, 0, 0, 1),
+ CoverageTarget(2, 2, 0, 0, 2),
+ CoverageTarget(3, 0, 1, 1, 0),
+ CoverageTarget(4, 1, 1, 1, 1),
+ CoverageTarget(5, 2, 1, 1, 2),
+ ]
+ planner.build_sweep_target_ids()
+ # Row 0 (even): left to right → ids 0, 1, 2
+ # Row 1 (odd): right to left → ids 5, 4, 3
+ self.assertEqual(planner.sweep_target_ids, [0, 1, 2, 5, 4, 3])
+
+
+if __name__ == '__main__':
+ unittest.main()