From 28cc21e5c5ee8c06fa125b3f0ee74a369081d3a7 Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Tue, 23 Jun 2026 11:08:16 +0000 Subject: [PATCH] Add schedule_timer_until absolute-deadline timer Add OrchestrationContext::schedule_timer_until(deadline), an absolute wall-clock deadline timer variant that complements the existing relative schedule_timer. A deadline already in the past fires immediately. The change is purely additive: it reuses the existing timer future and replay path via a shared schedule_timer_at_ms helper, with no changes to the action/event schema, the provider's visible_at handling, or replay semantics. Closes #34 --- CHANGELOG.md | 9 +++++ src/lib.rs | 38 ++++++++++++++++++- tests/timer_tests.rs | 87 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 132 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index efe9995..18833e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **`ctx.schedule_timer_until(deadline)`** — A new absolute-deadline timer + variant that fires at a wall-clock `SystemTime` instead of a relative + `Duration`. A deadline already in the past fires immediately. It is a thin + constructor over the existing timer machinery (the durable record is already + an absolute `fire_at_ms`), so there are no changes to the action/event schema + or replay semantics. `schedule_timer` now shares the same internal code path. + ### Changed - **`ctx.new_guid()` now returns a standard UUID v4.** The previous diff --git a/src/lib.rs b/src/lib.rs index e465352..4e13c4a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3535,10 +3535,44 @@ impl OrchestrationContext { pub fn schedule_timer(&self, delay: std::time::Duration) -> DurableFuture<()> { let delay_ms = delay.as_millis() as u64; + let now = { + let inner = self.inner.lock().expect("Mutex should not be poisoned"); + inner.now_ms() + }; + let fire_at_ms = now.saturating_add(delay_ms); + self.schedule_timer_at_ms(fire_at_ms) + } + + /// Schedule a timer that fires at an absolute wall-clock deadline. + /// + /// Equivalent to [`schedule_timer`](Self::schedule_timer) but anchored to an + /// absolute point in time rather than a delay from "now". A deadline already in + /// the past fires immediately (next turn). The deadline is recorded durably and + /// replayed verbatim, exactly like the existing relative timer. + /// + /// This is useful when the caller already has an absolute target time (a cron + /// tick, an SLA, a scheduled-at timestamp) and wants to avoid the manual + /// `deadline - now` clamp and the extra recorded `utc_now` reading that the + /// relative form would require. + /// + /// # Example + /// ```ignore + /// use std::time::{Duration, SystemTime}; + /// let deadline = SystemTime::now() + Duration::from_secs(30); + /// ctx.schedule_timer_until(deadline).await; + /// ``` + pub fn schedule_timer_until(&self, deadline: std::time::SystemTime) -> DurableFuture<()> { + let fire_at_ms = deadline + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + self.schedule_timer_at_ms(fire_at_ms) + } + + /// Shared timer constructor over an absolute ms-since-epoch fire time. + fn schedule_timer_at_ms(&self, fire_at_ms: u64) -> DurableFuture<()> { let mut inner = self.inner.lock().expect("Mutex should not be poisoned"); - let now = inner.now_ms(); - let fire_at_ms = now.saturating_add(delay_ms); let token = inner.emit_action(Action::CreateTimer { scheduling_event_id: 0, fire_at_ms, diff --git a/tests/timer_tests.rs b/tests/timer_tests.rs index 767d50a..f6a054d 100644 --- a/tests/timer_tests.rs +++ b/tests/timer_tests.rs @@ -603,3 +603,90 @@ async fn timer_fires_at_correct_time_after_previous_timer() { _ => panic!("Unexpected status: {status:?}"), } } + +// ============================================================================ +// ABSOLUTE-DEADLINE TIMER TESTS (schedule_timer_until) +// ============================================================================ + +#[tokio::test] +async fn schedule_timer_until_fires_at_deadline() { + let (store, _td) = create_sqlite_store().await; + + const DELAY_MS: u64 = 50; + let orch = |ctx: OrchestrationContext, _input: String| async move { + let deadline = std::time::SystemTime::now() + Duration::from_millis(DELAY_MS); + ctx.schedule_timer_until(deadline).await; + Ok("done".to_string()) + }; + + let reg = OrchestrationRegistry::builder() + .register("TimerUntil", orch) + .build(); + let acts = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg).await; + let client = duroxide::Client::new(store.clone()); + + let start = std::time::Instant::now(); + client + .start_orchestration("inst-until", "TimerUntil", "") + .await + .unwrap(); + + let status = client + .wait_for_orchestration("inst-until", Duration::from_secs(5)) + .await + .unwrap(); + let elapsed = start.elapsed().as_millis() as u64; + + assert!( + elapsed >= DELAY_MS, + "Timer fired too early: expected >={DELAY_MS}ms, got {elapsed}ms" + ); + + match status { + duroxide::runtime::OrchestrationStatus::Completed { output, .. } => { + assert_eq!(output, "done"); + } + other => panic!("Unexpected status: {other:?}"), + } + + drop(rt); +} + +#[tokio::test] +async fn schedule_timer_until_past_deadline_fires_immediately() { + let (store, _td) = create_sqlite_store().await; + + // A deadline well in the past should fire immediately (next turn). + let orch = |ctx: OrchestrationContext, _input: String| async move { + let deadline = std::time::UNIX_EPOCH; // far in the past + ctx.schedule_timer_until(deadline).await; + Ok("done".to_string()) + }; + + let reg = OrchestrationRegistry::builder() + .register("TimerUntilPast", orch) + .build(); + let acts = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg).await; + let client = duroxide::Client::new(store.clone()); + + client + .start_orchestration("inst-until-past", "TimerUntilPast", "") + .await + .unwrap(); + + let status = client + .wait_for_orchestration("inst-until-past", Duration::from_secs(5)) + .await + .unwrap(); + + match status { + duroxide::runtime::OrchestrationStatus::Completed { output, .. } => { + assert_eq!(output, "done"); + } + other => panic!("Unexpected status: {other:?}"), + } + + drop(rt); +}