diff --git a/CHANGELOG.md b/CHANGELOG.md index a0b82a1..74a6086 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ Pre-1.0 note: while `pg_durable` is in major version `0`, minor releases may inc ### Changed +- **`df.wait_for_schedule()` cron timing:** the next cron tick is now computed at execution time using duroxide's deterministic clock (`ctx.utc_now()`) inside the `execute_function_graph` orchestration, instead of being pre-computed at `df.start()` time. This makes recurring `@>` schedules and any start-to-execution delay target the correct upcoming tick (#130). + + > ⚠️ **Replay-breaking for in-flight `wait_for_schedule` instances.** This change adds a recorded `utc_now()` decision before the WAIT_SCHEDULE timer, altering the orchestration's history sequence. Any durable function that was started under a `<= 0.2.3` binary and is **mid-`wait_for_schedule`** (parked on its timer) when this `.so` is loaded will fail with a duroxide nondeterminism error on replay, because its recorded history no longer matches the new code. Drain or allow such in-flight `wait_for_schedule` instances to complete before upgrading. Instances that are not currently inside a `wait_for_schedule` node are unaffected. We accepted this break (rather than introducing orchestration versioning) given the early pre-1.0 stage of the project. + - **`df.grant_usage()` / `df.revoke_usage()`:** dropped the explicit per-function `EXECUTE` allowlist. Schema `USAGE` on `df` is the real access gate for ordinary `df.*` functions, so the helpers now grant/revoke schema `USAGE`, the table privileges, and `EXECUTE` only on the sensitive functions (`df.http`, `df.grant_usage`, `df.revoke_usage`). Function signatures are unchanged and existing privileges are unaffected (#242). ### Removed diff --git a/src/dsl.rs b/src/dsl.rs index d0d044f..58f7d2b 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -3,7 +3,6 @@ //! DSL functions for defining durable SQL functions -use chrono::Utc; use cron::Schedule as CronSchedule; use pgrx::datum::DatumWithOid; use pgrx::prelude::*; @@ -272,29 +271,29 @@ pub fn sleep(seconds: i64) -> String { } /// Creates a wait-for-schedule node that waits until the next cron match. -/// The wait duration is computed at DSL time (when this function is called) -/// to ensure deterministic replay in the orchestration. +/// +/// The cron expression is only *validated* here (at DSL time) so an invalid +/// expression fails fast at `df.start()`. The actual "next tick" is computed at +/// execution time inside the orchestration (see `execute_wait_schedule_node`), +/// using duroxide's deterministic clock. This is intentional: a cron schedule is +/// a function of the current wall-clock time, so it must be evaluated when the +/// node actually runs — not at `df.start()` time — otherwise any delay between +/// `df.start()` and execution (and, critically, every iteration of a recurring +/// `@>` loop) would wake at the wrong moment. Only the cron expression is stored +/// in the node config. #[pg_extern(schema = "df")] pub fn wait_for_schedule(cron_expr: &str) -> String { + // Validate eagerly so a bad expression is rejected at df.start() time. The + // "0 " prefix supplies the seconds field the `cron` crate expects; the same + // prefix is re-applied at execution time when the schedule is recomputed. let cron_with_seconds = format!("0 {cron_expr}"); - let schedule = match CronSchedule::from_str(&cron_with_seconds) { - Ok(s) => s, - Err(e) => pgrx::error!("Invalid cron expression '{}': {}", cron_expr, e), - }; - - // Compute wait duration NOW (at DSL time) for deterministic orchestration replay - let now = Utc::now(); - let next = match schedule.upcoming(Utc).next() { - Some(t) => t, - None => pgrx::error!("No upcoming schedule found for '{}'", cron_expr), - }; - - let duration_secs = (next - now).num_seconds().max(0) as u64; + if let Err(e) = CronSchedule::from_str(&cron_with_seconds) { + pgrx::error!("Invalid cron expression '{}': {}", cron_expr, e); + } - // Store pre-computed seconds, not the cron expression + // Store only the cron expression. The wait is computed at execution time. let config = serde_json::json!({ "cron_expr": cron_expr, - "wait_seconds": duration_secs }); Durofut { diff --git a/src/explain.rs b/src/explain.rs index 2ac8c3b..796c3ba 100644 --- a/src/explain.rs +++ b/src/explain.rs @@ -647,18 +647,15 @@ fn format_node_display(node: &ExplainNode) -> String { format!("SLEEP {seconds}s{name_suffix}") } "WAIT_SCHEDULE" => { - // Parse config to get cron expression and wait seconds - let (cron, secs) = node + // Parse config to get the cron expression. The next tick is computed + // at execution time (not stored), so only the cron expr is shown. + let cron = node .query .as_ref() .and_then(|q| serde_json::from_str::(q).ok()) - .map(|cfg| { - let c = cfg["cron_expr"].as_str().unwrap_or("?").to_string(); - let s = cfg["wait_seconds"].as_u64().unwrap_or(0); - (c, s) - }) - .unwrap_or_else(|| ("?".to_string(), 0)); - format!("WAIT '{cron}' ({secs}s){name_suffix}") + .and_then(|cfg| cfg["cron_expr"].as_str().map(|s| s.to_string())) + .unwrap_or_else(|| "?".to_string()); + format!("WAIT '{cron}'{name_suffix}") } "HTTP" => { // Parse config to get method and URL diff --git a/src/lib.rs b/src/lib.rs index 76b26a3..39e29b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -906,6 +906,24 @@ mod tests { let json = crate::dsl::wait_for_schedule("*/5 * * * *"); let fut = Durofut::from_json(&json); assert_eq!(fut.node_type, "WAIT_SCHEDULE"); + + // The node stores only the cron expression; the next tick is computed at + // execution time, so there must be no pre-computed wait baked in at DSL time. + let config: serde_json::Value = + serde_json::from_str(fut.query.as_ref().expect("query must be set")).unwrap(); + assert_eq!( + config["cron_expr"].as_str(), + Some("*/5 * * * *"), + "cron_expr should be preserved" + ); + assert!( + config.get("wait_seconds").is_none(), + "config must not pre-compute wait_seconds at DSL time" + ); + assert!( + config.get("target_timestamp").is_none(), + "config must not pre-compute a target_timestamp at DSL time" + ); } #[pg_test] diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index e593610..d4286e0 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -9,8 +9,11 @@ //! - Same input must always produce the same scheduling decisions use std::collections::HashMap; +use std::str::FromStr; use std::time::Duration; +use chrono::{DateTime, Utc}; +use cron::Schedule as CronSchedule; use duroxide::OrchestrationContext; use crate::activities; @@ -509,20 +512,50 @@ async fn execute_wait_schedule_node( .as_ref() .ok_or_else(|| format!("WAIT_SCHEDULE node {node_id} has no config"))?; - // Parse pre-computed config from DSL time let config: serde_json::Value = serde_json::from_str(config_str) .map_err(|e| format!("Invalid WAIT_SCHEDULE config: {e}"))?; - let wait_seconds = config["wait_seconds"] - .as_u64() - .ok_or_else(|| "WAIT_SCHEDULE missing wait_seconds".to_string())?; - - let cron_expr = config["cron_expr"].as_str().unwrap_or("?"); + let cron_expr = config["cron_expr"] + .as_str() + .ok_or_else(|| "WAIT_SCHEDULE missing cron_expr".to_string())?; + + // A cron schedule is a function of "now", so the next tick MUST be computed + // when this node actually executes — not at df.start() time — so that any + // delay before execution, and every iteration of a recurring `@>` loop, + // targets the correct upcoming tick. + // + // `ctx.utc_now()` is duroxide's deterministic clock (the only sanctioned way + // to read wall-clock time in this deterministic file): the value is recorded + // in history and replayed verbatim. The cron math below is pure given `now`, + // so the whole computation is replay-safe. The "0 " prefix supplies the + // seconds field the `cron` crate expects (mirrors df.wait_for_schedule()). + let now: DateTime = ctx + .utc_now() + .await + .map_err(|e| format!("WAIT_SCHEDULE failed to read deterministic clock: {e}"))? + .into(); + + let cron_with_seconds = format!("0 {cron_expr}"); + let schedule = CronSchedule::from_str(&cron_with_seconds) + .map_err(|e| format!("Invalid cron expression '{cron_expr}': {e}"))?; + let next = schedule + .after(&now) + .next() + .ok_or_else(|| format!("No upcoming schedule found for '{cron_expr}'"))?; + + // Clamp to zero if the tick is already in the past by the time we get here. + // + // NOTE: once duroxide gains an absolute-deadline timer + // (https://github.com/microsoft/duroxide/issues/34), this `now`-read + + // subtraction can be replaced with `ctx.schedule_timer_until(next)`, which + // targets the absolute tick directly and drops the extra utc_now() syscall. + let wait = (next - now).to_std().unwrap_or(Duration::ZERO); ctx.trace_info(format!( - "Waiting {wait_seconds} seconds until schedule: {cron_expr}" + "Waiting {}s until next schedule tick {next} (cron: {cron_expr})", + wait.as_secs() )); - ctx.schedule_timer(Duration::from_secs(wait_seconds)).await; + ctx.schedule_timer(wait).await; Ok(r#"{"scheduled": true}"#.to_string()) } diff --git a/tests/e2e/sql/24_wait_for_schedule_exec_time.sql b/tests/e2e/sql/24_wait_for_schedule_exec_time.sql new file mode 100644 index 0000000..e398195 --- /dev/null +++ b/tests/e2e/sql/24_wait_for_schedule_exec_time.sql @@ -0,0 +1,125 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Tests: df.wait_for_schedule() computes the next cron tick at *execution time* +-- (issue #130), not at df.start() time. +-- +-- Design: two df.wait_for_schedule('* * * * *') in a row. The cron '* * * * *' +-- (via the internal "0 ..." seconds field) fires at second :00 of every +-- minute, so a correct implementation makes BOTH waits land on a minute +-- boundary. +-- +-- NEW (execution-time): the second wait is recomputed after the first one +-- completes, so it targets the *next* :00 boundary -> +-- both fires land within a few seconds of :00. +-- OLD (df.start()-time): both nodes bake wait_seconds = (60 - start_second) +-- at parse time. The first wait fires at :00, but the +-- second reuses that stale offset starting *from* :00, +-- firing at second ~= (60 - start_second) -- well off +-- the boundary. +-- +-- Asserting that BOTH fires land within 5s of *successive* minute boundaries +-- (i.e. each near :00 and exactly one minute apart) therefore PASSES on the new +-- implementation and FAILS on the old one. +-- +-- Pre-start guard: if df.start() runs within ~5s of a minute boundary, the old +-- implementation's second fire (60 - start_second) would itself be near :00 and +-- spuriously pass. We wait until the start second is in a safe window so the +-- "fails on old" property is deterministic. +-- +-- Runtime: up to ~2 minutes (two minute-boundary waits). + +DROP TABLE IF EXISTS wait_sched_exec_test; +CREATE TABLE wait_sched_exec_test (id SERIAL, fired_at TIMESTAMPTZ); + +-- Keep the start second away from the minute boundary (see header). +DO $$ +BEGIN + WHILE date_part('second', clock_timestamp()) < 5 + OR date_part('second', clock_timestamp()) > 50 LOOP + PERFORM pg_sleep(0.5); + END LOOP; +END $$; + +CREATE TEMP TABLE _test_state (instance_id TEXT); + +INSERT INTO _test_state SELECT df.start( + df.wait_for_schedule('* * * * *') + ~> 'INSERT INTO wait_sched_exec_test (fired_at) VALUES (clock_timestamp())' + ~> df.wait_for_schedule('* * * * *') + ~> 'INSERT INTO wait_sched_exec_test (fired_at) VALUES (clock_timestamp())', + 'test-wait-schedule-exec-time' +); + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; + cnt INT; + f1 TIMESTAMPTZ; + f2 TIMESTAMPTZ; + b1 TIMESTAMPTZ; + b2 TIMESTAMPTZ; + off1 NUMERIC; + off2 NUMERIC; +BEGIN + SELECT instance_id INTO inst_id FROM _test_state; + RAISE NOTICE 'Testing instance: %', inst_id; + + -- Two minute-boundary waits (up to ~60s each) plus scheduling latency. + SELECT df.await_instance(inst_id, 180) INTO status; + + IF status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED: status = %', status; + END IF; + + SELECT count(*) INTO cnt FROM wait_sched_exec_test; + IF cnt <> 2 THEN + RAISE EXCEPTION 'TEST FAILED: expected 2 fired rows, got %', cnt; + END IF; + + SELECT fired_at INTO f1 FROM wait_sched_exec_test ORDER BY id LIMIT 1; + SELECT fired_at INTO f2 FROM wait_sched_exec_test ORDER BY id DESC LIMIT 1; + + -- Nearest minute boundary to each fire (round to nearest minute: add 30s, + -- then truncate down). Works whether a fire lands just after :00 or just + -- before the next :00. + b1 := date_trunc('minute', f1 + interval '30 seconds'); + b2 := date_trunc('minute', f2 + interval '30 seconds'); + off1 := abs(extract(epoch FROM (f1 - b1))); + off2 := abs(extract(epoch FROM (f2 - b2))); + + RAISE NOTICE 'fire 1 at % (% s from boundary %)', f1, round(off1, 1), b1; + RAISE NOTICE 'fire 2 at % (% s from boundary %)', f2, round(off2, 1), b2; + + -- (1) Each wait must fire within 5s of a minute boundary. The new code lands + -- near :00 for both; the old start-time code fires the second wait at + -- second ~= (60 - start_second), well outside this window. + IF off1 > 5 THEN + RAISE EXCEPTION + 'TEST FAILED: first wait_for_schedule fired % s from the minute boundary, expected ' + 'within 5s. Cron wait appears computed at df.start() time, not execution time (#130).', + round(off1, 1); + END IF; + IF off2 > 5 THEN + RAISE EXCEPTION + 'TEST FAILED: second wait_for_schedule fired % s from the minute boundary, expected ' + 'within 5s. Cron wait appears computed at df.start() time, not execution time (#130).', + round(off2, 1); + END IF; + + -- (2) The two waits must land on *successive* minute boundaries (60s apart), + -- confirming the second wait advanced to the next tick rather than repeating + -- or skipping a boundary. + IF b2 - b1 <> interval '1 minute' THEN + RAISE EXCEPTION + 'TEST FAILED: waits fired at boundaries % and %, expected successive minutes (60s ' + 'apart).', b1, b2; + END IF; + + RAISE NOTICE 'TEST PASSED: both waits fired within 5s of successive minute boundaries'; +END $$; + +DROP TABLE _test_state; +DROP TABLE wait_sched_exec_test; +SELECT 'TEST PASSED' AS result;