Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 17 additions & 18 deletions src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

//! DSL functions for defining durable SQL functions

use chrono::Utc;
use cron::Schedule as CronSchedule;
use pgrx::datum::DatumWithOid;
use pgrx::prelude::*;
Expand Down Expand Up @@ -272,29 +271,29 @@ pub fn sleep(seconds: i64) -> String {
}

/// Creates a wait-for-schedule node that waits until the next cron match.
/// The wait duration is computed at DSL time (when this function is called)
/// to ensure deterministic replay in the orchestration.
///
/// The cron expression is only *validated* here (at DSL time) so an invalid
/// expression fails fast at `df.start()`. The actual "next tick" is computed at
/// execution time inside the orchestration (see `execute_wait_schedule_node`),
/// using duroxide's deterministic clock. This is intentional: a cron schedule is
/// a function of the current wall-clock time, so it must be evaluated when the
/// node actually runs — not at `df.start()` time — otherwise any delay between
/// `df.start()` and execution (and, critically, every iteration of a recurring
/// `@>` loop) would wake at the wrong moment. Only the cron expression is stored
/// in the node config.
#[pg_extern(schema = "df")]
pub fn wait_for_schedule(cron_expr: &str) -> String {
// Validate eagerly so a bad expression is rejected at df.start() time. The
// "0 " prefix supplies the seconds field the `cron` crate expects; the same
// prefix is re-applied at execution time when the schedule is recomputed.
let cron_with_seconds = format!("0 {cron_expr}");
let schedule = match CronSchedule::from_str(&cron_with_seconds) {
Ok(s) => s,
Err(e) => pgrx::error!("Invalid cron expression '{}': {}", cron_expr, e),
};

// Compute wait duration NOW (at DSL time) for deterministic orchestration replay
let now = Utc::now();
let next = match schedule.upcoming(Utc).next() {
Some(t) => t,
None => pgrx::error!("No upcoming schedule found for '{}'", cron_expr),
};

let duration_secs = (next - now).num_seconds().max(0) as u64;
if let Err(e) = CronSchedule::from_str(&cron_with_seconds) {
pgrx::error!("Invalid cron expression '{}': {}", cron_expr, e);
}

// Store pre-computed seconds, not the cron expression
// Store only the cron expression. The wait is computed at execution time.
let config = serde_json::json!({
"cron_expr": cron_expr,
"wait_seconds": duration_secs
});

Durofut {
Expand Down
15 changes: 6 additions & 9 deletions src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,18 +647,15 @@ fn format_node_display(node: &ExplainNode) -> String {
format!("SLEEP {seconds}s{name_suffix}")
}
"WAIT_SCHEDULE" => {
// Parse config to get cron expression and wait seconds
let (cron, secs) = node
// Parse config to get the cron expression. The next tick is computed
// at execution time (not stored), so only the cron expr is shown.
let cron = node
.query
.as_ref()
.and_then(|q| serde_json::from_str::<serde_json::Value>(q).ok())
.map(|cfg| {
let c = cfg["cron_expr"].as_str().unwrap_or("?").to_string();
let s = cfg["wait_seconds"].as_u64().unwrap_or(0);
(c, s)
})
.unwrap_or_else(|| ("?".to_string(), 0));
format!("WAIT '{cron}' ({secs}s){name_suffix}")
.and_then(|cfg| cfg["cron_expr"].as_str().map(|s| s.to_string()))
.unwrap_or_else(|| "?".to_string());
format!("WAIT '{cron}'{name_suffix}")
}
"HTTP" => {
// Parse config to get method and URL
Expand Down
18 changes: 18 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,24 @@ mod tests {
let json = crate::dsl::wait_for_schedule("*/5 * * * *");
let fut = Durofut::from_json(&json);
assert_eq!(fut.node_type, "WAIT_SCHEDULE");

// The node stores only the cron expression; the next tick is computed at
// execution time, so there must be no pre-computed wait baked in at DSL time.
let config: serde_json::Value =
serde_json::from_str(fut.query.as_ref().expect("query must be set")).unwrap();
assert_eq!(
config["cron_expr"].as_str(),
Some("*/5 * * * *"),
"cron_expr should be preserved"
);
assert!(
config.get("wait_seconds").is_none(),
"config must not pre-compute wait_seconds at DSL time"
);
assert!(
config.get("target_timestamp").is_none(),
"config must not pre-compute a target_timestamp at DSL time"
);
}

#[pg_test]
Expand Down
49 changes: 41 additions & 8 deletions src/orchestrations/execute_function_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
//! - Same input must always produce the same scheduling decisions

use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;

use chrono::{DateTime, Utc};
use cron::Schedule as CronSchedule;
use duroxide::OrchestrationContext;

use crate::activities;
Expand Down Expand Up @@ -509,20 +512,50 @@ async fn execute_wait_schedule_node(
.as_ref()
.ok_or_else(|| format!("WAIT_SCHEDULE node {node_id} has no config"))?;

// Parse pre-computed config from DSL time
let config: serde_json::Value = serde_json::from_str(config_str)
.map_err(|e| format!("Invalid WAIT_SCHEDULE config: {e}"))?;

let wait_seconds = config["wait_seconds"]
.as_u64()
.ok_or_else(|| "WAIT_SCHEDULE missing wait_seconds".to_string())?;

let cron_expr = config["cron_expr"].as_str().unwrap_or("?");
let cron_expr = config["cron_expr"]
.as_str()
.ok_or_else(|| "WAIT_SCHEDULE missing cron_expr".to_string())?;

// A cron schedule is a function of "now", so the next tick MUST be computed
// when this node actually executes — not at df.start() time — so that any
// delay before execution, and every iteration of a recurring `@>` loop,
// targets the correct upcoming tick.
//
// `ctx.utc_now()` is duroxide's deterministic clock (the only sanctioned way
// to read wall-clock time in this deterministic file): the value is recorded
// in history and replayed verbatim. The cron math below is pure given `now`,
// so the whole computation is replay-safe. The "0 " prefix supplies the
// seconds field the `cron` crate expects (mirrors df.wait_for_schedule()).
let now: DateTime<Utc> = ctx
.utc_now()
.await
.map_err(|e| format!("WAIT_SCHEDULE failed to read deterministic clock: {e}"))?
.into();

let cron_with_seconds = format!("0 {cron_expr}");
let schedule = CronSchedule::from_str(&cron_with_seconds)
.map_err(|e| format!("Invalid cron expression '{cron_expr}': {e}"))?;
let next = schedule
.after(&now)
.next()
.ok_or_else(|| format!("No upcoming schedule found for '{cron_expr}'"))?;

// Clamp to zero if the tick is already in the past by the time we get here.
//
// NOTE: once duroxide gains an absolute-deadline timer
// (https://github.com/microsoft/duroxide/issues/34), this `now`-read +
// subtraction can be replaced with `ctx.schedule_timer_until(next)`, which
// targets the absolute tick directly and drops the extra utc_now() syscall.
let wait = (next - now).to_std().unwrap_or(Duration::ZERO);

ctx.trace_info(format!(
"Waiting {wait_seconds} seconds until schedule: {cron_expr}"
"Waiting {}s until next schedule tick {next} (cron: {cron_expr})",
wait.as_secs()
));
ctx.schedule_timer(Duration::from_secs(wait_seconds)).await;
ctx.schedule_timer(wait).await;

Ok(r#"{"scheduled": true}"#.to_string())
}
Expand Down