Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 161 additions & 2 deletions rclrs/src/clock.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::{error::ToResult, rcl_bindings::*, time::Time, to_rclrs_result};
use std::sync::{Arc, Mutex};
use crate::{error::ToResult, rcl_bindings::*, time::Time, to_rclrs_result, Context};
use std::{
sync::{Arc, Mutex},
time::Duration,
};

/// Enum to describe clock type. Redefined for readability and to eliminate the uninitialized case
/// from the `rcl_clock_type_t` enum in the binding.
Expand Down Expand Up @@ -38,6 +41,20 @@ pub struct ClockSource {
rcl_clock: Arc<Mutex<rcl_clock_t>>,
}

/// How long a sleep loop waits between checks for shutdown and clock progress.
/// This caps the wakeup latency for detecting shutdown and, under sim time, the
/// latency for noticing that the clock has advanced.
const SLEEP_POLL_TICK: Duration = Duration::from_millis(10);

/// The outcome of a single iteration of a sleep loop.
enum SleepStep {
/// The sleep is finished. The boolean is the value to return: `true` if the
/// target time was reached, `false` if the context became invalid.
Done(bool),
/// The target time has not been reached yet. Wait this long and check again.
Wait(Duration),
}

impl Clock {
/// Creates a new Clock with `ClockType::SystemTime`
pub fn system() -> Self {
Expand Down Expand Up @@ -111,6 +128,98 @@ impl Clock {
clock: Arc::downgrade(&self.rcl_clock),
}
}

/// Returns `true` if this is a [`ClockType::RosTime`] clock that is currently
/// being driven by an external time source, i.e. `use_sim_time` is active.
///
/// System and steady clocks always return `false`.
pub fn ros_time_is_active(&self) -> bool {
if !matches!(self.kind, ClockType::RosTime) {
return false;
}
let mut clock = self.rcl_clock.lock().unwrap();
let mut is_enabled = false;
// SAFETY: The clock is valid and the out-pointer is a valid reference.
let ret = unsafe { rcl_is_enabled_ros_time_override(&mut *clock, &mut is_enabled) };
debug_assert_eq!(ret, 0);
is_enabled
}

/// Sleeps until this clock reaches the time `until`, according to the clock's type.
///
/// Returns `true` once the target time is reached. Returns `false` if `context`
/// becomes invalid, for example because ROS was shut down, before the target time
/// is reached.
///
/// Returns immediately if the target time is already in the past.
///
/// # Notes for [`ClockType::RosTime`]
/// When ROS time is active, i.e. `use_sim_time:=true`, this can block forever if the
/// external `/clock` source never advances to `until`. Time only advances while the
/// executor that owns the node's `/clock` subscription is spinning, so calling this
/// from a callback on a single-threaded executor will deadlock under sim time. Prefer
/// [`Clock::async_sleep_until`] or driving sim-time sleeps from a separate thread.
pub fn sleep_until(&self, until: Time, context: &Context) -> bool {
// Evaluated once: a time-source change mid-sleep is not detected by this
// implementation. See issue #604 for a jump-callback-based follow-up.
let wall_based = !self.ros_time_is_active();
loop {
match self.sleep_step(wall_based, until.nsec, context) {
SleepStep::Done(reached) => return reached,
SleepStep::Wait(tick) => std::thread::sleep(tick),
}
}
}

/// Sleeps for the relative duration `duration`, according to the clock's type.
///
/// Equivalent to `self.sleep_until(self.now() + duration, context)`. See
/// [`Clock::sleep_until`] for the return value and sim-time caveats.
pub fn sleep_for(&self, duration: Duration, context: &Context) -> bool {
self.sleep_until(self.now() + duration, context)
}

/// The async counterpart of [`Clock::sleep_until`].
///
/// This yields to the async runtime between checks instead of blocking the thread,
/// so it can be awaited inside an async callback without preventing the executor from
/// spinning. That makes it safe to use under sim time, where the `/clock` subscription
/// must keep being processed for the clock to advance.
pub async fn async_sleep_until(&self, until: Time, context: &Context) -> bool {
let wall_based = !self.ros_time_is_active();
loop {
match self.sleep_step(wall_based, until.nsec, context) {
SleepStep::Done(reached) => return reached,
SleepStep::Wait(tick) => async_std::task::sleep(tick).await,
}
}
}

/// The async counterpart of [`Clock::sleep_for`].
///
/// Equivalent to `self.async_sleep_until(self.now() + duration, context).await`.
pub async fn async_sleep_for(&self, duration: Duration, context: &Context) -> bool {
self.async_sleep_until(self.now() + duration, context).await
}

/// Computes one iteration of a sleep loop. Shared by the sync and async sleep methods.
fn sleep_step(&self, wall_based: bool, until_nsec: i64, context: &Context) -> SleepStep {
let now = self.now().nsec;
if now >= until_nsec {
return SleepStep::Done(true);
}
if !context.ok() {
return SleepStep::Done(false);
}
let tick = if wall_based {
// For system and steady clocks `until_nsec - now` is a real elapsed
// duration, so we can wait most of it, capped so we still notice shutdown.
Duration::from_nanos((until_nsec - now) as u64).min(SLEEP_POLL_TICK)
} else {
SLEEP_POLL_TICK
};
SleepStep::Wait(tick)
}
}

impl Drop for ClockSource {
Expand Down Expand Up @@ -212,4 +321,54 @@ mod tests {
// Ros time is set, should return the value that was set
assert_eq!(clock.now().nsec, set_time);
}

#[test]
fn sleep_for_steady_time_waits() {
let context = Context::default();
let clock = Clock::steady();
let start = clock.now().nsec;
let reached = clock.sleep_for(Duration::from_millis(50), &context);
let elapsed = clock.now().nsec - start;
assert!(reached);
assert!(elapsed >= 50_000_000);
}

#[test]
fn sleep_until_past_returns_immediately() {
let context = Context::default();
let clock = Clock::system();
let until = clock.now() - Duration::from_secs(1);
assert!(clock.sleep_until(until, &context));
}

#[test]
fn sleep_for_ros_time_advances_with_source() {
use std::thread;
let context = Context::default();
let (clock, source) = Clock::with_source();
source.set_ros_time_override(0);

let advancer = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
// Push ROS time well past the 5s target.
source.set_ros_time_override(10_000_000_000);
});

let reached = clock.sleep_for(Duration::from_secs(5), &context);
advancer.join().unwrap();
assert!(reached);
}

#[tokio::test]
async fn async_sleep_for_steady_time_waits() {
let context = Context::default();
let clock = Clock::steady();
let start = clock.now().nsec;
let reached = clock
.async_sleep_for(Duration::from_millis(50), &context)
.await;
let elapsed = clock.now().nsec - start;
assert!(reached);
assert!(elapsed >= 50_000_000);
}
}
Loading