Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- **`ctx.schedule_timer_until(deadline)`** — A new absolute-deadline timer
variant that fires at a wall-clock `SystemTime` instead of a relative
`Duration`. A deadline already in the past fires immediately. It is a thin
constructor over the existing timer machinery (the durable record is already
an absolute `fire_at_ms`), so there are no changes to the action/event schema
or replay semantics. `schedule_timer` now shares the same internal code path.

### Changed

- **`ctx.new_guid()` now returns a standard UUID v4.** The previous
Expand Down
38 changes: 36 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3535,10 +3535,44 @@ impl OrchestrationContext {
pub fn schedule_timer(&self, delay: std::time::Duration) -> DurableFuture<()> {
let delay_ms = delay.as_millis() as u64;

let now = {
let inner = self.inner.lock().expect("Mutex should not be poisoned");
inner.now_ms()
};
let fire_at_ms = now.saturating_add(delay_ms);
self.schedule_timer_at_ms(fire_at_ms)
}

/// Schedule a timer that fires at an absolute wall-clock deadline.
///
/// Equivalent to [`schedule_timer`](Self::schedule_timer) but anchored to an
/// absolute point in time rather than a delay from "now". A deadline already in
/// the past fires immediately (next turn). The deadline is recorded durably and
/// replayed verbatim, exactly like the existing relative timer.
///
/// This is useful when the caller already has an absolute target time (a cron
/// tick, an SLA, a scheduled-at timestamp) and wants to avoid the manual
/// `deadline - now` clamp and the extra recorded `utc_now` reading that the
/// relative form would require.
///
/// # Example
/// ```ignore
/// use std::time::{Duration, SystemTime};
/// let deadline = SystemTime::now() + Duration::from_secs(30);
/// ctx.schedule_timer_until(deadline).await;
/// ```
pub fn schedule_timer_until(&self, deadline: std::time::SystemTime) -> DurableFuture<()> {
let fire_at_ms = deadline
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
self.schedule_timer_at_ms(fire_at_ms)
}

/// Shared timer constructor over an absolute ms-since-epoch fire time.
fn schedule_timer_at_ms(&self, fire_at_ms: u64) -> DurableFuture<()> {
let mut inner = self.inner.lock().expect("Mutex should not be poisoned");

let now = inner.now_ms();
let fire_at_ms = now.saturating_add(delay_ms);
let token = inner.emit_action(Action::CreateTimer {
scheduling_event_id: 0,
fire_at_ms,
Expand Down
87 changes: 87 additions & 0 deletions tests/timer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,3 +603,90 @@ async fn timer_fires_at_correct_time_after_previous_timer() {
_ => panic!("Unexpected status: {status:?}"),
}
}

// ============================================================================
// ABSOLUTE-DEADLINE TIMER TESTS (schedule_timer_until)
// ============================================================================

#[tokio::test]
async fn schedule_timer_until_fires_at_deadline() {
let (store, _td) = create_sqlite_store().await;

const DELAY_MS: u64 = 50;
let orch = |ctx: OrchestrationContext, _input: String| async move {
let deadline = std::time::SystemTime::now() + Duration::from_millis(DELAY_MS);
ctx.schedule_timer_until(deadline).await;
Ok("done".to_string())
};

let reg = OrchestrationRegistry::builder()
.register("TimerUntil", orch)
.build();
let acts = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg).await;
let client = duroxide::Client::new(store.clone());

let start = std::time::Instant::now();
client
.start_orchestration("inst-until", "TimerUntil", "")
.await
.unwrap();

let status = client
.wait_for_orchestration("inst-until", Duration::from_secs(5))
.await
.unwrap();
let elapsed = start.elapsed().as_millis() as u64;

assert!(
elapsed >= DELAY_MS,
"Timer fired too early: expected >={DELAY_MS}ms, got {elapsed}ms"
);

match status {
duroxide::runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "done");
}
other => panic!("Unexpected status: {other:?}"),
}

drop(rt);
}

#[tokio::test]
async fn schedule_timer_until_past_deadline_fires_immediately() {
let (store, _td) = create_sqlite_store().await;

// A deadline well in the past should fire immediately (next turn).
let orch = |ctx: OrchestrationContext, _input: String| async move {
let deadline = std::time::UNIX_EPOCH; // far in the past
ctx.schedule_timer_until(deadline).await;
Ok("done".to_string())
};

let reg = OrchestrationRegistry::builder()
.register("TimerUntilPast", orch)
.build();
let acts = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg).await;
let client = duroxide::Client::new(store.clone());

client
.start_orchestration("inst-until-past", "TimerUntilPast", "")
.await
.unwrap();

let status = client
.wait_for_orchestration("inst-until-past", Duration::from_secs(5))
.await
.unwrap();

match status {
duroxide::runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "done");
}
other => panic!("Unexpected status: {other:?}"),
}

drop(rt);
}
Loading