diff --git a/.fluree-memory/repo.ttl b/.fluree-memory/repo.ttl index a11257ef05..5077405a0f 100644 --- a/.fluree-memory/repo.ttl +++ b/.fluree-memory/repo.ttl @@ -1202,6 +1202,20 @@ mem:fact-01krb618x0axad5mj8jev4ej0s a mem:Fact ; mem:branch "feature/edge-annotations" ; mem:createdAt "2026-05-11T09:31:32.640122+00:00"^^xsd:dateTime . +mem:constraint-01kwscqt5qs9raejdt9drmr0xe a mem:Constraint ; + mem:content "Event-time validation belongs at the BUILD path (resolve_commit_times), NOT replay. resolve_commit_times rejects new commits with future/backwards event time or future receivedAt (bounded by MAX_EVENT_TIME_FUTURE_SKEW_MS=5min). apply_single_commit (incremental catch-up over already-durable commits) must only WARN on non-monotonic event time, never Err: history is immutable and the full-reload path load_novelty has no guard, so erroring wedges catch-up inconsistently. Separately: dual-stamp (db:receivedAt) is gated on received.is_some()||head.dual_stamp() — with_timestamp alone does NOT flip it (event-axis-only, e.g. fluree-memory git replay); the server pairs eventTime with with_received_at(now)." ; + mem:tag "commit" ; + mem:tag "dual-stamp" ; + mem:tag "event-time" ; + mem:tag "receivedat" ; + mem:tag "replay" ; + mem:scope mem:repo ; + mem:artifactRef "fluree-db-ledger/src/lib.rs" ; + mem:artifactRef "fluree-db-transact/src/commit.rs" ; + mem:branch "feature/event-time-commits" ; + mem:createdAt "2026-07-05T15:00:52.023159+00:00"^^xsd:dateTime ; + mem:rationale "Hard-erroring on replay broke incremental catch-up for pre-existing non-monotonic chains (NTP/raft skew), inconsistent with the unguarded full-reload path." . + mem:constraint-01kv5g47r2z640tkx6nxd1d8qm a mem:Constraint ; mem:content "To roll back / delete a freshly-created ledger you must call `drop_ledger(name, mode)` with the BARE name (it rejects a `:branch` suffix), NOT `drop_branch` — drop_branch refuses the root branch (source_branch=None). Use `split_ledger_id(id)` to strip the branch. Also: after a commits-only restore, `handle.snapshot.t` is the INDEX t (0, no index built), not the commit-head t — assert restore success via `nameservice().lookup(id).commit_t` or a query, not snapshot.t." ; mem:tag "drop" ; diff --git a/Cargo.lock b/Cargo.lock index e54f43064b..7459d496ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2551,6 +2551,7 @@ dependencies = [ "base64 0.22.1", "bs58", "bytes", + "chrono", "clap", "clap_complete", "colored", diff --git a/docs/api/endpoints.md b/docs/api/endpoints.md index 85eb1b2184..c135de9501 100644 --- a/docs/api/endpoints.md +++ b/docs/api/endpoints.md @@ -206,6 +206,20 @@ curl -X POST "http://localhost:8090/v1/fluree/update?ledger=mydb:main" \ }' ``` +JSON-LD transaction with a caller-supplied event time (backdated commit — see +[Time Travel: Event Time](../concepts/time-travel.md#event-time-backdated-commits); +must be RFC 3339, monotonically non-decreasing along the ledger's commit +chain, and not in the future): +```bash +curl -X POST "http://localhost:8090/v1/fluree/update?ledger=mydb:main" \ + -H "Content-Type: application/json" \ + -d '{ + "@context": { "ex": "http://example.org/ns/" }, + "@graph": [{ "@id": "ex:alice", "ex:role": "Engineer" }], + "opts": { "eventTime": "2021-03-15T00:00:00Z" } + }' +``` + SPARQL UPDATE (ledger-scoped endpoint): ```bash curl -X POST http://localhost:8090/v1/fluree/update/mydb:main \ diff --git a/docs/concepts/time-travel.md b/docs/concepts/time-travel.md index bd7ebb28b8..86fdb86d47 100644 --- a/docs/concepts/time-travel.md +++ b/docs/concepts/time-travel.md @@ -99,6 +99,74 @@ Query at a specific commit using `@commit:` with a commit ContentId: } ``` +## Event Time: Backdated Commits + +Every commit carries an **event time** (`db:time` in the txn-meta graph) — the +wall-clock instant the commit's changes are *about*. By default it is stamped +with the current time at commit, but a transaction may supply its own: + +```json +{ + "@context": { "ex": "http://example.org/ns/" }, + "@graph": [{ "@id": "ex:alice", "ex:role": "Engineer" }], + "opts": { "eventTime": "2021-03-15T00:00:00Z" } +} +``` + +This is how historical data gets *real* time travel: replay a year of history +as ordinary transactions, each stamped with the date the change actually +happened, and `@iso:` queries work over that custom timeline with no further +setup. (The Rust API equivalent is `CommitOpts::with_timestamp`.) + +Two rules keep the timeline coherent — both enforced at commit time: + +1. **Monotonic**: a commit's event time must be `>=` the head commit's event + time. `@iso:` resolution depends on this ordering; a fresh ledger's first + commit can carry any past time (there is no floor — pre-1970 works). +2. **No future times**: event time may not exceed the current wall clock + (plus a small skew allowance). Commits are immutable, so one future-dated + stamp would otherwise permanently pin the ledger's timeline ahead of + reality. + +The default stamp is also clamped to the head's event time, so a system clock +that steps backwards can no longer corrupt `@iso:` resolution. + +### Recorded Time: the Audit Axis (`@recorded:`) + +When the server pairs a caller-supplied event time with a second, +system-controlled timestamp — **`db:receivedAt`**, the wall-clock time the +commit was actually recorded — the ledger gains an audit axis. The HTTP +transact route does this automatically (it pairs `eventTime` with the current +time); programmatically it is enabled by `CommitOpts::with_received_at`. Once a +commit dual-stamps, every commit thereafter does too (sticky), so the audit +trail has no gaps. + +The audit axis is *not* triggered by event time alone. A ledger that only ever +backdates via `CommitOpts::with_timestamp` (for example fluree-memory's git +replay) is event-axis-only and carries no `db:receivedAt`; on it, `@recorded:` +falls back to the event axis. Ledgers that never supply event times carry no +extra metadata at all. + +The `@recorded:` selector time-travels along that axis: + +```json +{ + "@context": { "ex": "http://example.org/ns/" }, + "from": "ledger:main@recorded:2026-01-15T00:00:00Z", + "select": ["?name"], + "where": [{ "@id": "?person", "ex:name": "?name" }] +} +``` + +- `@iso:` answers *"what was true at this time?"* (event axis) +- `@recorded:` answers *"what had been loaded into the ledger by this + time?"* (audit axis) + +On a ledger that never used `eventTime`, the two axes are identical and +`@recorded:` behaves exactly like `@iso:`. Both timestamps live inside the +signed commit envelope, so a signature attests to the claimed event time and +the recording time together. + ## Temporal Data Model ### Immutable Facts @@ -142,9 +210,15 @@ Fluree primarily uses **transaction time** (when the fact was recorded in the da ``` This allows you to query by both: -- **Transaction time**: When was this recorded? (using `@t:`, `@iso:`, `@commit:`) +- **Transaction time**: When was this recorded? (using `@t:`, `@recorded:`, `@commit:`) - **Valid time**: When was this true? (using standard WHERE clause filters on `ex:validFrom`/`ex:validTo`) +For commit-granular valid time — importing historical data so `@iso:` time +travel works over the dates things actually changed — see +[Event Time: Backdated Commits](#event-time-backdated-commits) above. Explicit +`validFrom`/`validTo` properties remain the right model for *fact*-granular +intervals and overlapping validity. + ## Snapshot and Indexing ### Database Snapshots @@ -393,7 +467,8 @@ Query changes for a specific property across all subjects: Different time specifiers have different performance characteristics: - **@t:NNN** (fastest): Direct transaction number, no resolution needed -- **@iso:DATETIME**: O(log n) binary search through commit timestamps using POST index +- **@iso:DATETIME**: O(log n) binary search through commit event timestamps using POST index +- **@recorded:DATETIME**: Same POST-index probes over `db:receivedAt`; identical to `@iso:` cost (and identical *behavior* on ledgers that never used `eventTime`) - **@commit:CID**: Bounded SPOT scan, O(k) where k is commits matching prefix (use longer prefixes for better performance) ### Index Selection @@ -679,11 +754,12 @@ This recreates the exact state across multiple ledgers at the time the bug occur ### Time Travel Resolution -When you query with `@t:`, `@iso:`, or `@commit:`: +When you query with `@t:`, `@iso:`, `@recorded:`, or `@commit:`: 1. **@t:NNN** - Direct transaction number (fastest) -2. **@iso:DATETIME** - Binary search through commit timestamps using POST index -3. **@commit:CID** - Bounded SPOT scan to find matching commit +2. **@iso:DATETIME** - Binary search through commit event timestamps using POST index +3. **@recorded:DATETIME** - Same probes over `db:receivedAt` (audit axis), falling back to the event axis for history before the first backdated commit +4. **@commit:CID** - Bounded SPOT scan to find matching commit ### Query Execution diff --git a/fluree-db-api/src/dataset.rs b/fluree-db-api/src/dataset.rs index cafbbaa4df..bc2a7174bb 100644 --- a/fluree-db-api/src/dataset.rs +++ b/fluree-db-api/src/dataset.rs @@ -435,8 +435,14 @@ pub enum TimeSpec { AtT(i64), /// At a specific commit hash AtCommit(String), - /// At a specific ISO 8601 timestamp + /// At a specific ISO 8601 timestamp, resolved against commit *event + /// time* (`db:time` — user-suppliable for backdated historical loads) AtTime(String), + /// At a specific ISO 8601 timestamp, resolved against the wall-clock + /// time commits were *recorded* (`db:receivedAt`, audit axis). + /// Identical to `AtTime` on ledgers that never used caller-supplied + /// event times. + AtRecorded(String), /// "latest" keyword - resolves to current ledger t Latest, } @@ -457,6 +463,11 @@ impl TimeSpec { Self::AtTime(time.into()) } + /// Create at-recorded specification (audit axis) + pub fn at_recorded(time: impl Into) -> Self { + Self::AtRecorded(time.into()) + } + /// Create latest specification pub fn latest() -> Self { Self::Latest @@ -905,6 +916,7 @@ fn parse_ledger_id_time_travel( LedgerIdTimeSpec::AtT(t) => TimeSpec::AtT(t), LedgerIdTimeSpec::AtIso(value) => TimeSpec::AtTime(value), LedgerIdTimeSpec::AtCommit(value) => TimeSpec::AtCommit(value), + LedgerIdTimeSpec::AtRecorded(value) => TimeSpec::AtRecorded(value), }); Ok((format!("{identifier}{fragment_suffix}"), time_spec)) @@ -994,6 +1006,8 @@ fn parse_named_graph_object( if let Some(at_str) = at_val.as_str() { if let Some(commit_hash) = at_str.strip_prefix("commit:") { source.time_spec = Some(TimeSpec::AtCommit(commit_hash.to_string())); + } else if let Some(recorded) = at_str.strip_prefix("recorded:") { + source.time_spec = Some(TimeSpec::AtRecorded(recorded.to_string())); } else { source.time_spec = Some(TimeSpec::AtTime(at_str.to_string())); } @@ -1072,9 +1086,12 @@ fn parse_single_graph_source( } } else if let Some(at_val) = obj.get("at") { if let Some(at_str) = at_val.as_str() { - // Determine if it's a commit hash or timestamp + // Determine if it's a commit hash, recorded-axis + // timestamp, or event-time timestamp if let Some(commit_hash) = at_str.strip_prefix("commit:") { source.time_spec = Some(TimeSpec::AtCommit(commit_hash.to_string())); + } else if let Some(recorded) = at_str.strip_prefix("recorded:") { + source.time_spec = Some(TimeSpec::AtRecorded(recorded.to_string())); } else { // Assume ISO timestamp source.time_spec = Some(TimeSpec::AtTime(at_str.to_string())); diff --git a/fluree-db-api/src/ledger_view.rs b/fluree-db-api/src/ledger_view.rs index e0961142da..a02dbac0be 100644 --- a/fluree-db-api/src/ledger_view.rs +++ b/fluree-db-api/src/ledger_view.rs @@ -180,6 +180,10 @@ impl LedgerView { ns_record: self.ns_record, binary_store: self.binary_store.map(|store| TypeErasedStore(store)), spatial_indexes: None, + // Read-path conversion: head temporal metadata is only needed by + // the write path, which resolves it lazily via + // `ensure_head_temporal`. + head_temporal: None, } } } diff --git a/fluree-db-api/src/time_resolve.rs b/fluree-db-api/src/time_resolve.rs index f222cd06f7..178ecca16e 100644 --- a/fluree-db-api/src/time_resolve.rs +++ b/fluree-db-api/src/time_resolve.rs @@ -19,7 +19,7 @@ use fluree_db_core::{ range_bounded_with_overlay, range_with_overlay, Flake, FlakeValue, IndexType, LedgerSnapshot, ObjectBounds, RangeMatch, RangeOptions, RangeTest, Sid, TXN_META_GRAPH_ID, }; -use fluree_vocab::db::TIME as LEDGER_TIME; +use fluree_vocab::db::{RECEIVED_AT as LEDGER_RECEIVED_AT, TIME as LEDGER_TIME}; use fluree_vocab::namespaces::{FLUREE_COMMIT, FLUREE_DB}; use crate::error::{ApiError, Result}; @@ -75,35 +75,8 @@ where // Step 1: Check if any ledger#time flakes exist at all // (and get the earliest commit time) - let probe_opts = RangeOptions::default() - .with_to_t(current_t) - .with_flake_limit(1); - - let probe_match = RangeMatch::predicate(time_predicate.clone()); - - let earliest_flakes = if let Some(ovl) = overlay { - range_with_overlay( - snapshot, - TXN_META_GRAPH_ID, - ovl, - IndexType::Post, - RangeTest::Eq, - probe_match, - probe_opts, - ) - .await? - } else { - range_with_overlay( - snapshot, - TXN_META_GRAPH_ID, - &fluree_db_core::NoOverlay, - IndexType::Post, - RangeTest::Eq, - probe_match, - probe_opts, - ) - .await? - }; + let earliest_flakes = + probe_timestamp_axis(snapshot, overlay, time_predicate.clone(), current_t, None).await?; if earliest_flakes.is_empty() { // No commit timestamps exist - fall back to head (matches existing behavior) @@ -129,24 +102,72 @@ where // Step 2: Find the first commit AFTER the target time // Use object bounds with lower > target_epoch_ms (exclusive) - let search_opts = RangeOptions::default() + let after_flakes = probe_timestamp_axis( + snapshot, + overlay, + time_predicate, + current_t, + Some(target_epoch_ms), + ) + .await?; + + if after_flakes.is_empty() { + // Target is >= all commit times, return head + tracing::debug!("datetime_to_t: no commit after target; returning head"); + return Ok(current_t); + } + + // The flake we found is the first commit AFTER target. + // Its t is the transaction number of that commit. + // The previous transaction (t - 1) is what we want. + let after_t = after_flakes[0].t; + let after_o = &after_flakes[0].o; + tracing::debug!( + after_t, + ?after_o, + "datetime_to_t: first commit after target" + ); + + // Clamp to 0 minimum (t=0 may be valid for genesis-as-of queries) + let resolved_t = (after_t - 1).max(0); + tracing::debug!(resolved_t, "datetime_to_t: resolved t"); + + Ok(resolved_t) +} + +/// Single-flake POST probe over a commit-timestamp predicate in the txn-meta +/// graph. With `after` = `None` returns the earliest flake (POST orders by +/// object ascending); with `Some(target)` returns the first flake whose +/// value is strictly greater than `target`. +async fn probe_timestamp_axis( + snapshot: &LedgerSnapshot, + overlay: Option<&O>, + predicate: Sid, + current_t: i64, + after: Option, +) -> Result> +where + O: OverlayProvider + ?Sized, +{ + let mut opts = RangeOptions::default() .with_to_t(current_t) - .with_flake_limit(1) - .with_object_bounds( - ObjectBounds::new().with_lower(FlakeValue::Long(target_epoch_ms), false), // exclusive: > target + .with_flake_limit(1); + if let Some(target) = after { + opts = opts.with_object_bounds( + ObjectBounds::new().with_lower(FlakeValue::Long(target), false), // exclusive: > target ); + } + let range_match = RangeMatch::predicate(predicate); - let search_match = RangeMatch::predicate(time_predicate); - - let after_flakes = if let Some(ovl) = overlay { + let flakes = if let Some(ovl) = overlay { range_with_overlay( snapshot, TXN_META_GRAPH_ID, ovl, IndexType::Post, RangeTest::Eq, - search_match, - search_opts, + range_match, + opts, ) .await? } else { @@ -156,34 +177,98 @@ where &fluree_db_core::NoOverlay, IndexType::Post, RangeTest::Eq, - search_match, - search_opts, + range_match, + opts, ) .await? }; + Ok(flakes) +} - if after_flakes.is_empty() { - // Target is >= all commit times, return head - tracing::debug!("datetime_to_t: no commit after target; returning head"); - return Ok(current_t); - } +/// Resolve an ISO-8601 datetime against the *recorded* (audit) axis: +/// the wall-clock time each commit was actually recorded, as opposed to +/// its (possibly caller-supplied) event time. +/// +/// # Algorithm +/// +/// Ledgers only carry `db:receivedAt` flakes from their first +/// caller-supplied event time onward (sticky dual-stamp mode); before that +/// point — and on ledgers that never used the feature — the two axes are +/// identical, so resolution falls back to [`datetime_to_t`]: +/// +/// 1. No `db:receivedAt` flakes at all → `datetime_to_t` (axes coincide). +/// 2. Target before the earliest `receivedAt` → the answer lies in the +/// pre-flip segment: `datetime_to_t`, clamped to `flip_t - 1` (a +/// post-flip commit may carry a backdated *event* time ≤ target, which +/// the clamp excludes — it wasn't recorded yet). +/// 3. Otherwise → first commit with `receivedAt > target`, minus one; +/// head if none. +pub async fn recorded_to_t( + snapshot: &LedgerSnapshot, + overlay: Option<&O>, + target_epoch_ms: i64, + current_t: i64, +) -> Result +where + O: OverlayProvider + ?Sized, +{ + tracing::debug!( + target_epoch_ms, + current_t, + "recorded_to_t: resolving recorded-axis epoch-ms" + ); + let recv_predicate = Sid::new(FLUREE_DB, LEDGER_RECEIVED_AT); - // The flake we found is the first commit AFTER target. - // Its t is the transaction number of that commit. - // The previous transaction (t - 1) is what we want. - let after_t = after_flakes[0].t; - let after_o = &after_flakes[0].o; + let earliest_flakes = + probe_timestamp_axis(snapshot, overlay, recv_predicate.clone(), current_t, None).await?; + + let Some(earliest) = earliest_flakes.first() else { + // Never dual-stamped: the recorded axis is identical to event time. + tracing::debug!("recorded_to_t: no db:receivedAt flakes; falling back to event axis"); + return datetime_to_t(snapshot, overlay, target_epoch_ms, current_t).await; + }; + let FlakeValue::Long(earliest_recv) = earliest.o else { + return Ok(current_t); // Invalid timestamp type, fall back to head + }; + // receivedAt is monotonically non-decreasing along t, so the smallest + // value (first in POST object order) belongs to the flip commit. + let flip_t = earliest.t; tracing::debug!( - after_t, - ?after_o, - "datetime_to_t: first commit after target" + earliest_recv, + flip_t, + "recorded_to_t: dual-stamp flip point" ); - // Clamp to 0 minimum (t=0 may be valid for genesis-as-of queries) - let resolved_t = (after_t - 1).max(0); - tracing::debug!(resolved_t, "datetime_to_t: resolved t"); + if target_epoch_ms < earliest_recv { + if flip_t <= 1 { + // Dual-stamped from the first commit: nothing was recorded + // before the earliest receivedAt. + let target_iso = epoch_ms_to_iso(target_epoch_ms); + let earliest_iso = epoch_ms_to_iso(earliest_recv); + return Err(ApiError::internal(format!( + "There is no data recorded as of {target_iso} (earliest commit was recorded \ + at {earliest_iso})" + ))); + } + // Pre-flip segment: axes coincide there, but clamp below the flip + // point so backdated post-flip *event* times can't leak in. + let event_t = datetime_to_t(snapshot, overlay, target_epoch_ms, current_t).await?; + return Ok(event_t.min(flip_t - 1)); + } - Ok(resolved_t) + let after_flakes = probe_timestamp_axis( + snapshot, + overlay, + recv_predicate, + current_t, + Some(target_epoch_ms), + ) + .await?; + + match after_flakes.first() { + None => Ok(current_t), // recorded target >= all commits: head + Some(after) => Ok((after.t - 1).max(0)), + } } /// Resolve a commit prefix to a transaction number using bounded SPOT index scan. @@ -373,6 +458,16 @@ pub(crate) async fn resolve_time_spec( ) .await } + crate::TimeSpec::AtRecorded(iso) => { + let target_epoch_ms = iso_to_target_epoch_ms(iso)?; + recorded_to_t( + &ledger.snapshot, + Some(ledger.novelty.as_ref()), + target_epoch_ms, + current_t, + ) + .await + } crate::TimeSpec::AtCommit(commit_prefix) => { commit_to_t( &ledger.snapshot, @@ -385,6 +480,22 @@ pub(crate) async fn resolve_time_spec( } } +/// Parse an ISO-8601 timestamp to epoch milliseconds, ceiling sub-ms +/// precision to avoid truncation off-by-one (commit-timestamp flakes store +/// epoch milliseconds). +pub(crate) fn iso_to_target_epoch_ms(iso: &str) -> Result { + let dt = chrono::DateTime::parse_from_rfc3339(iso).map_err(|e| { + ApiError::internal(format!( + "Invalid ISO-8601 timestamp for time travel: {iso} ({e})" + )) + })?; + let mut target_epoch_ms = dt.timestamp_millis(); + if dt.timestamp_subsec_nanos() % 1_000_000 != 0 { + target_epoch_ms += 1; + } + Ok(target_epoch_ms) +} + #[cfg(test)] mod tests { #[test] diff --git a/fluree-db-api/src/tx.rs b/fluree-db-api/src/tx.rs index e9b5b861e8..53db50ff2d 100644 --- a/fluree-db-api/src/tx.rs +++ b/fluree-db-api/src/tx.rs @@ -2151,11 +2151,28 @@ impl crate::Fluree { /// Commit a staged transaction (persists commit record + publishes nameservice head). pub async fn commit_staged( &self, - view: StagedLedger, + mut view: StagedLedger, ns_registry: NamespaceRegistry, index_config: &IndexConfig, commit_opts: CommitOpts, ) -> Result<(CommitReceipt, LedgerState)> { + // Resolve head temporal metadata if it wasn't observed at load time + // (index == head, no novelty walk): the event-time monotonicity + // guard and sticky dual-stamp decision in `build_commit` need it. + // Gated on `None` so it costs nothing once resolved; uses the + // branch-aware store because a branched ledger's head commit may + // live in an ancestor's namespace. + if view.base().head_temporal.is_none() && view.base().head_commit_id.is_some() { + let ledger_id = view.db().ledger_id.clone(); + let store = self + .content_store_for_record_or_id(view.base().ns_record.as_ref(), &ledger_id) + .await?; + view.base_mut() + .ensure_head_temporal(store.as_ref()) + .await + .map_err(fluree_db_transact::TransactError::from)?; + } + let content_store = self.content_store(view.db().ledger_id.as_str()); let publisher = self.publisher()?; let (receipt, ledger) = commit_txn( diff --git a/fluree-db-api/src/tx_builder.rs b/fluree-db-api/src/tx_builder.rs index 4a962af535..b60c90eb7d 100644 --- a/fluree-db-api/src/tx_builder.rs +++ b/fluree-db-api/src/tx_builder.rs @@ -1228,7 +1228,7 @@ impl Fluree { .await?; let StageResult { - view, + mut view, ns_registry, txn_meta, graph_delta, @@ -1237,6 +1237,22 @@ impl Fluree { .with_txn_meta(txn_meta) .with_graph_delta(graph_delta.into_iter().collect()); + // Resolve head temporal metadata if it wasn't observed at load time + // (index == head, no novelty walk): the event-time monotonicity + // guard and sticky dual-stamp decision in `build_commit` need it. + // Gated on `None` so it costs nothing once resolved; uses the + // branch-aware store because a branched ledger's head commit may + // live in an ancestor's namespace. + if view.base().head_temporal.is_none() && view.base().head_commit_id.is_some() { + let store = self + .content_store_for_record_or_id(view.base().ns_record.as_ref(), ledger.id()) + .await?; + view.base_mut() + .ensure_head_temporal(store.as_ref()) + .await + .map_err(fluree_db_transact::TransactError::from)?; + } + // Resolve the raw-txn CID before invoking the build phase // (build_commit is now pure and does no I/O). A pre-resolved // `raw_txn_id` (carried e.g. through the Raft queue envelope diff --git a/fluree-db-api/src/view/dataset_builder.rs b/fluree-db-api/src/view/dataset_builder.rs index 2356f60c84..6ea0203f87 100644 --- a/fluree-db-api/src/view/dataset_builder.rs +++ b/fluree-db-api/src/view/dataset_builder.rs @@ -321,6 +321,16 @@ async fn resolve_history_endpoint_t( ) .await } + dataset::TimeSpec::AtRecorded(iso) => { + let target_epoch_ms = time_resolve::iso_to_target_epoch_ms(iso)?; + time_resolve::recorded_to_t( + &ledger.snapshot, + Some(ledger.novelty.as_ref()), + target_epoch_ms, + latest_t, + ) + .await + } dataset::TimeSpec::AtCommit(commit_prefix) => { time_resolve::commit_to_t( &ledger.snapshot, @@ -338,6 +348,7 @@ fn convert_time_spec(ts: &dataset::TimeSpec) -> Result { match ts { dataset::TimeSpec::AtT(t) => Ok(crate::TimeSpec::AtT(*t)), dataset::TimeSpec::AtTime(iso) => Ok(crate::TimeSpec::AtTime(iso.clone())), + dataset::TimeSpec::AtRecorded(iso) => Ok(crate::TimeSpec::AtRecorded(iso.clone())), dataset::TimeSpec::AtCommit(sha) => Ok(crate::TimeSpec::AtCommit(sha.clone())), dataset::TimeSpec::Latest => Ok(crate::TimeSpec::Latest), } diff --git a/fluree-db-api/src/view/fluree_ext.rs b/fluree-db-api/src/view/fluree_ext.rs index 6c364ccd40..d5b525b582 100644 --- a/fluree-db-api/src/view/fluree_ext.rs +++ b/fluree-db-api/src/view/fluree_ext.rs @@ -448,7 +448,8 @@ impl Fluree { /// Load a view at a flexible time specification. /// - /// Resolves `@t:`, `@iso:`, `@commit:`, or `latest` time specifications. + /// Resolves `@t:`, `@iso:`, `@recorded:`, `@commit:`, or `latest` time + /// specifications. pub(crate) async fn load_graph_db_at( &self, ledger_id: &str, @@ -485,6 +486,21 @@ impl Fluree { .await?; self.load_graph_db_at_t(ledger_id, resolved_t).await } + TimeSpec::AtRecorded(iso) => { + let handle = self.ledger_cached(ledger_id).await?; + let snapshot = handle.snapshot().await; + let ledger = snapshot.to_ledger_state(); + let current_t = ledger.t(); + let target_epoch_ms = time_resolve::iso_to_target_epoch_ms(&iso)?; + let resolved_t = time_resolve::recorded_to_t( + &ledger.snapshot, + Some(ledger.novelty.as_ref()), + target_epoch_ms, + current_t, + ) + .await?; + self.load_graph_db_at_t(ledger_id, resolved_t).await + } TimeSpec::AtCommit(commit_prefix) => { let handle = self.ledger_cached(ledger_id).await?; let snapshot = handle.snapshot().await; diff --git a/fluree-db-api/tests/grp_query_history.rs b/fluree-db-api/tests/grp_query_history.rs index 6698f750e9..c3f0345b06 100644 --- a/fluree-db-api/tests/grp_query_history.rs +++ b/fluree-db-api/tests/grp_query_history.rs @@ -1,6 +1,8 @@ #[path = "support/mod.rs"] mod support; +#[path = "it_event_time.rs"] +mod it_event_time; #[path = "it_query_history_combinations.rs"] mod it_query_history_combinations; #[path = "it_query_history_range.rs"] diff --git a/fluree-db-api/tests/it_event_time.rs b/fluree-db-api/tests/it_event_time.rs new file mode 100644 index 0000000000..8e8d41dda6 --- /dev/null +++ b/fluree-db-api/tests/it_event_time.rs @@ -0,0 +1,455 @@ +//! Event-time commits + recorded-axis time travel integration tests +//! +//! Covers: +//! - Caller-supplied event times (`CommitOpts::timestamp`) driving `@iso:` +//! time travel over backdated history +//! - Event-time monotonicity + future-bound rejection +//! - Sticky dual-stamp mode (`db:receivedAt`) and `@recorded:` resolution +//! - Plain ledgers staying free of receivedAt metadata, with `@recorded:` +//! falling back to the event axis + +#![cfg(feature = "native")] + +use crate::support::{genesis_ledger, normalize_rows, MemoryFluree, MemoryLedger}; +use chrono::{SecondsFormat, Utc}; +use fluree_db_api::{FlureeBuilder, IndexConfig}; +use fluree_db_core::{range_with_overlay, IndexType, RangeMatch, RangeTest}; +use fluree_db_transact::{CommitOpts, TxnOpts}; +use serde_json::{json, Value as JsonValue}; + +fn ctx_test() -> JsonValue { + json!({ + "test": "http://example.org/test#", + "name": "test:name", + "Person": "test:Person" + }) +} + +fn person_tx(id: &str, name: &str) -> JsonValue { + json!({ + "@context": ctx_test(), + "@graph": [{"@id": format!("test:{id}"), "@type": "Person", "name": name}] + }) +} + +fn test_index_config() -> IndexConfig { + IndexConfig { + reindex_min_bytes: 100_000, + reindex_max_bytes: 1_000_000_000, + } +} + +async fn insert_at( + fluree: &MemoryFluree, + ledger: MemoryLedger, + tx: &JsonValue, + opts: CommitOpts, +) -> Result { + fluree + .insert_with_opts(ledger, tx, TxnOpts::default(), opts, &test_index_config()) + .await + .map(|r| r.ledger) +} + +async fn query_names_at( + fluree: &MemoryFluree, + db_for_formatting: fluree_db_core::GraphDbRef<'_>, + from_spec: &str, +) -> Result, fluree_db_api::ApiError> { + let q = json!({ + "@context": ctx_test(), + "from": [from_spec], + "select": ["?name"], + "where": [{"@id":"?s","name":"?name"}], + "orderBy": ["?name"] + }); + let result = fluree.query_connection(&q).await?; + let jsonld = result.to_jsonld_async(db_for_formatting).await?; + Ok(normalize_rows(&jsonld)) +} + +/// Count `db:receivedAt` flakes in the txn-meta graph, optionally at one t. +async fn received_at_flakes( + ledger: &MemoryLedger, + at_t: Option, +) -> Vec { + let pred = fluree_db_core::Sid::new( + fluree_vocab::namespaces::FLUREE_DB, + fluree_vocab::db::RECEIVED_AT, + ); + let flakes = range_with_overlay( + &ledger.snapshot, + fluree_db_core::TXN_META_GRAPH_ID, + ledger.novelty.as_ref(), + IndexType::Post, + RangeTest::Eq, + RangeMatch::predicate(pred), + fluree_db_core::RangeOptions::default().with_to_t(ledger.t()), + ) + .await + .expect("receivedAt range scan"); + match at_t { + Some(t) => flakes.into_iter().filter(|f| f.t == t).collect(), + None => flakes, + } +} + +// ============================================================================= +// Backdated event times + @iso: time travel +// ============================================================================= + +#[tokio::test] +async fn backdated_event_times_drive_iso_time_travel() { + let fluree = FlureeBuilder::memory().build_memory(); + let ledger_id = "it/event-time-backdated:main"; + let ledger0 = genesis_ledger(&fluree, ledger_id); + + // Three commits with historical event times, years before "now". + let ledger1 = insert_at( + &fluree, + ledger0, + &person_tx("p1", "Alice"), + CommitOpts::default().with_timestamp("2020-01-01T00:00:00Z"), + ) + .await + .expect("backdated commit t=1"); + let ledger2 = insert_at( + &fluree, + ledger1, + &person_tx("p2", "Bob"), + CommitOpts::default().with_timestamp("2021-01-01T00:00:00Z"), + ) + .await + .expect("backdated commit t=2"); + let ledger3 = insert_at( + &fluree, + ledger2, + &person_tx("p3", "Carol"), + CommitOpts::default().with_timestamp("2022-01-01T00:00:00Z"), + ) + .await + .expect("backdated commit t=3"); + + let db = ledger3.as_graph_db_ref(0); + + // Time travel by event time lands between the historical commits. + let names = query_names_at( + &fluree, + db, + &format!("{ledger_id}@iso:2020-06-01T00:00:00Z"), + ) + .await + .expect("@iso mid-2020"); + assert_eq!(names, vec![json!(["Alice"])]); + + let names = query_names_at( + &fluree, + db, + &format!("{ledger_id}@iso:2021-06-01T00:00:00Z"), + ) + .await + .expect("@iso mid-2021"); + assert_eq!(names, vec![json!(["Alice"]), json!(["Bob"])]); + + // At/after the last event time: full state. + let names = query_names_at( + &fluree, + db, + &format!("{ledger_id}@iso:2025-01-01T00:00:00Z"), + ) + .await + .expect("@iso 2025"); + assert_eq!( + names, + vec![json!(["Alice"]), json!(["Bob"]), json!(["Carol"])] + ); + + // Before the earliest event time: error. + let err = query_names_at( + &fluree, + db, + &format!("{ledger_id}@iso:2019-01-01T00:00:00Z"), + ) + .await + .expect_err("@iso 2019 should error"); + assert!( + err.to_string().contains("no data as of"), + "unexpected error: {err}" + ); +} + +// ============================================================================= +// Guard rejections +// ============================================================================= + +#[tokio::test] +async fn event_time_earlier_than_head_is_rejected() { + let fluree = FlureeBuilder::memory().build_memory(); + let ledger_id = "it/event-time-monotonic:main"; + let ledger0 = genesis_ledger(&fluree, ledger_id); + + let ledger1 = insert_at( + &fluree, + ledger0, + &person_tx("p1", "Alice"), + CommitOpts::default().with_timestamp("2022-01-01T00:00:00Z"), + ) + .await + .expect("commit at 2022"); + + let err = insert_at( + &fluree, + ledger1, + &person_tx("p2", "Bob"), + CommitOpts::default().with_timestamp("2021-01-01T00:00:00Z"), + ) + .await + .expect_err("earlier event time must be rejected"); + assert!( + err.to_string().contains("monotonically non-decreasing"), + "unexpected error: {err}" + ); + + // Equal event time is allowed (non-decreasing, not strictly increasing). + let ledger1b = genesis_ledger(&fluree, "it/event-time-equal:main"); + let ledger2b = insert_at( + &fluree, + ledger1b, + &person_tx("p1", "Alice"), + CommitOpts::default().with_timestamp("2022-01-01T00:00:00Z"), + ) + .await + .expect("commit at 2022"); + insert_at( + &fluree, + ledger2b, + &person_tx("p2", "Bob"), + CommitOpts::default().with_timestamp("2022-01-01T00:00:00Z"), + ) + .await + .expect("equal event time is allowed"); +} + +#[tokio::test] +async fn future_event_time_is_rejected() { + let fluree = FlureeBuilder::memory().build_memory(); + let ledger_id = "it/event-time-future:main"; + let ledger0 = genesis_ledger(&fluree, ledger_id); + + let tomorrow = + (Utc::now() + chrono::Duration::days(1)).to_rfc3339_opts(SecondsFormat::Millis, true); + let err = insert_at( + &fluree, + ledger0, + &person_tx("p1", "Alice"), + CommitOpts::default().with_timestamp(tomorrow), + ) + .await + .expect_err("future event time must be rejected"); + assert!( + err.to_string().contains("future"), + "unexpected error: {err}" + ); + + let ledger0 = genesis_ledger(&fluree, "it/event-time-garbage:main"); + let err = insert_at( + &fluree, + ledger0, + &person_tx("p1", "Alice"), + CommitOpts::default().with_timestamp("not-a-timestamp"), + ) + .await + .expect_err("unparseable event time must be rejected"); + assert!( + err.to_string().contains("RFC 3339"), + "unexpected error: {err}" + ); +} + +// ============================================================================= +// Dual-stamp mode + @recorded: +// ============================================================================= + +#[tokio::test] +async fn dual_stamp_recorded_axis_resolution() { + let fluree = FlureeBuilder::memory().build_memory(); + let ledger_id = "it/event-time-recorded:main"; + let ledger0 = genesis_ledger(&fluree, ledger_id); + + // Fully backdated ledger: event times 2020/2021/2022, recorded times + // pinned to distinct 2026 instants (deterministic). + let ledger1 = insert_at( + &fluree, + ledger0, + &person_tx("p1", "Alice"), + CommitOpts::default() + .with_timestamp("2020-01-01T00:00:00Z") + .with_received_at("2026-01-01T00:00:00Z"), + ) + .await + .expect("t=1"); + let ledger2 = insert_at( + &fluree, + ledger1, + &person_tx("p2", "Bob"), + CommitOpts::default() + .with_timestamp("2021-01-01T00:00:00Z") + .with_received_at("2026-01-02T00:00:00Z"), + ) + .await + .expect("t=2"); + let ledger3 = insert_at( + &fluree, + ledger2, + &person_tx("p3", "Carol"), + CommitOpts::default() + .with_timestamp("2022-01-01T00:00:00Z") + .with_received_at("2026-01-03T00:00:00Z"), + ) + .await + .expect("t=3"); + + let db = ledger3.as_graph_db_ref(0); + + // Event axis: mid-2021 sees Alice + Bob. + let names = query_names_at( + &fluree, + db, + &format!("{ledger_id}@iso:2021-06-01T00:00:00Z"), + ) + .await + .expect("@iso mid-2021"); + assert_eq!(names, vec![json!(["Alice"]), json!(["Bob"])]); + + // Recorded axis: mid-recorded-day-2 sees Alice + Bob (t=2 recorded + // 2026-01-02, t=3 not until 01-03) even though all EVENT times are past. + let names = query_names_at( + &fluree, + db, + &format!("{ledger_id}@recorded:2026-01-02T12:00:00Z"), + ) + .await + .expect("@recorded day 2"); + assert_eq!(names, vec![json!(["Alice"]), json!(["Bob"])]); + + // Recorded axis after everything: full state. + let names = query_names_at( + &fluree, + db, + &format!("{ledger_id}@recorded:2026-02-01T00:00:00Z"), + ) + .await + .expect("@recorded after all"); + assert_eq!( + names, + vec![json!(["Alice"]), json!(["Bob"]), json!(["Carol"])] + ); + + // Recorded axis before anything was recorded: error, even though the + // EVENT axis has data for 2025 (backdated commits must not leak). + let err = query_names_at( + &fluree, + db, + &format!("{ledger_id}@recorded:2025-06-01T00:00:00Z"), + ) + .await + .expect_err("@recorded before first recording should error"); + assert!( + err.to_string().contains("no data recorded as of"), + "unexpected error: {err}" + ); +} + +#[tokio::test] +async fn dual_stamp_is_sticky_after_first_event_time() { + let fluree = FlureeBuilder::memory().build_memory(); + let ledger_id = "it/event-time-sticky:main"; + let ledger0 = genesis_ledger(&fluree, ledger_id); + + // t=1: plain commit — no receivedAt. + let ledger1 = insert_at( + &fluree, + ledger0, + &person_tx("p1", "Alice"), + CommitOpts::default(), + ) + .await + .expect("t=1 plain"); + assert!( + received_at_flakes(&ledger1, None).await.is_empty(), + "plain ledger must not carry receivedAt metadata" + ); + + // t=2: explicit received_at flips the ledger into dual-stamp mode. + // (Event time defaults to now, which is >= t=1's stamp.) + let ledger2 = insert_at( + &fluree, + ledger1, + &person_tx("p2", "Bob"), + CommitOpts::default().with_received_at(Utc::now().to_rfc3339()), + ) + .await + .expect("t=2 flip"); + assert_eq!( + received_at_flakes(&ledger2, Some(2)).await.len(), + 1, + "flip commit must carry receivedAt" + ); + + // t=3: plain commit opts — sticky mode must auto-stamp receivedAt. + let ledger3 = insert_at( + &fluree, + ledger2, + &person_tx("p3", "Carol"), + CommitOpts::default(), + ) + .await + .expect("t=3 plain after flip"); + assert_eq!( + received_at_flakes(&ledger3, Some(3)).await.len(), + 1, + "post-flip commit must auto-stamp receivedAt (sticky dual-stamp mode)" + ); +} + +#[tokio::test] +async fn plain_ledger_recorded_equals_iso() { + let fluree = FlureeBuilder::memory().build_memory(); + let ledger_id = "it/event-time-plain:main"; + let ledger0 = genesis_ledger(&fluree, ledger_id); + + let ledger1 = insert_at( + &fluree, + ledger0, + &person_tx("p1", "Alice"), + CommitOpts::default(), + ) + .await + .expect("t=1"); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + let mid = Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + let ledger2 = insert_at( + &fluree, + ledger1, + &person_tx("p2", "Bob"), + CommitOpts::default(), + ) + .await + .expect("t=2"); + + assert!( + received_at_flakes(&ledger2, None).await.is_empty(), + "plain ledger must not carry receivedAt metadata" + ); + + let db = ledger2.as_graph_db_ref(0); + let via_iso = query_names_at(&fluree, db, &format!("{ledger_id}@iso:{mid}")) + .await + .expect("@iso mid"); + let via_recorded = query_names_at(&fluree, db, &format!("{ledger_id}@recorded:{mid}")) + .await + .expect("@recorded mid"); + assert_eq!(via_iso, via_recorded, "axes must coincide on plain ledgers"); + assert_eq!(via_iso, vec![json!(["Alice"])]); +} diff --git a/fluree-db-cli/Cargo.toml b/fluree-db-cli/Cargo.toml index a1da94f7af..300e9acf23 100644 --- a/fluree-db-cli/Cargo.toml +++ b/fluree-db-cli/Cargo.toml @@ -66,6 +66,7 @@ fluree-db-mcp = { path = "../fluree-db-mcp" } fluree-db-server = { path = "../fluree-db-server", optional = true } clap = { workspace = true } +chrono.workspace = true tokio.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/fluree-db-cli/src/commands/create.rs b/fluree-db-cli/src/commands/create.rs index a915cc7579..c37a53d039 100644 --- a/fluree-db-cli/src/commands/create.rs +++ b/fluree-db-cli/src/commands/create.rs @@ -1180,10 +1180,17 @@ pub async fn run_memory_import( .map_err(|e| CliError::Config(format!("failed to create ledger: {e}")))?; let schema = fluree_db_memory::schema::memory_schema_jsonld(); - fluree - .graph(ledger) - .transact() - .insert(&schema) + // Backdate the schema transaction to the first git commit's timestamp: + // commit event time is monotonically non-decreasing along the chain, so + // stamping the schema with "now" would reject every replayed commit. + let graph = fluree.graph(ledger); + let mut schema_txn = graph.transact().insert(&schema); + if let Some(first) = commits.first() { + schema_txn = schema_txn.commit_opts( + fluree_db_api::CommitOpts::default().with_timestamp(first.timestamp.clone()), + ); + } + schema_txn .commit() .await .map_err(|e| CliError::Config(format!("failed to transact schema: {e}")))?; @@ -1218,6 +1225,10 @@ pub async fn run_memory_import( } let mut last_t = 1u64; // t=1 is the schema transaction + // Git author dates are not guaranteed monotonic (rebases, merges, clock + // skew), but commit event time must be non-decreasing along the chain. + // Clamp: any date earlier than the previous one is bumped to it. + let mut last_event_time: Option> = None; for (i, commit) in commits.iter().enumerate() { // Extract TTL content at this commit let repo_ttl = git_show(&repo_root, &commit.sha, ".fluree-memory/repo.ttl")?; @@ -1256,8 +1267,23 @@ pub async fn run_memory_import( // Build commit metadata from the git commit. f:message is a user // claim — supply it via the txn-meta sidecar (works for update-shape // transactions which have no @graph envelope). - let commit_opts = - fluree_db_api::CommitOpts::default().with_timestamp(commit.timestamp.clone()); + let event_time = match chrono::DateTime::parse_from_rfc3339(&commit.timestamp) { + Ok(dt) => { + let clamped = match last_event_time { + Some(prev) if dt < prev => prev, + _ => dt, + }; + last_event_time = Some(clamped); + clamped.to_rfc3339() + } + // Unparseable git date: reuse the previous event time (keeps the + // chain monotonic) or fall back to the raw string for the first + // commit and let the commit guard produce the precise error. + Err(_) => last_event_time + .map(|prev| prev.to_rfc3339()) + .unwrap_or_else(|| commit.timestamp.clone()), + }; + let commit_opts = fluree_db_api::CommitOpts::default().with_timestamp(event_time); // Single transaction: retract all existing memory triples + insert new state. // The WHERE pivots on mem:content to target only memory instances (not schema). diff --git a/fluree-db-cli/src/commands/query.rs b/fluree-db-cli/src/commands/query.rs index 43fc9258a1..a17be1f7a1 100644 --- a/fluree-db-cli/src/commands/query.rs +++ b/fluree-db-cli/src/commands/query.rs @@ -185,6 +185,7 @@ fn time_spec_to_suffix(spec: &fluree_db_api::TimeSpec) -> String { fluree_db_api::TimeSpec::Latest => "@t:latest".to_string(), fluree_db_api::TimeSpec::AtT(t) => format!("@t:{t}"), fluree_db_api::TimeSpec::AtTime(iso) => format!("@iso:{iso}"), + fluree_db_api::TimeSpec::AtRecorded(iso) => format!("@recorded:{iso}"), fluree_db_api::TimeSpec::AtCommit(prefix) => format!("@commit:{prefix}"), } } diff --git a/fluree-db-core/src/ledger_id.rs b/fluree-db-core/src/ledger_id.rs index 52d3677257..46112f180e 100644 --- a/fluree-db-core/src/ledger_id.rs +++ b/fluree-db-core/src/ledger_id.rs @@ -13,10 +13,14 @@ pub const DEFAULT_BRANCH: &str = "main"; pub enum LedgerIdTimeSpec { /// @t: AtT(i64), - /// @iso: + /// @iso: — resolves against commit *event time* (`db:time`) AtIso(String), /// @commit: AtCommit(String), + /// @recorded: — resolves against the wall-clock time commits + /// were recorded (`db:receivedAt`, audit axis). Identical to `@iso:` on + /// ledgers that never used caller-supplied event times. + AtRecorded(String), } /// Parsed ledger ID parts with optional time-travel spec. @@ -81,7 +85,7 @@ pub fn format_ledger_id(name: &str, branch: &str) -> String { format!("{name}:{branch}") } -/// Parse a ledger ID with optional `@t:`, `@iso:`, or `@commit:` time-travel suffix. +/// Parse a ledger ID with optional `@t:`, `@iso:`, `@recorded:`, or `@commit:` time-travel suffix. pub fn parse_ledger_id_with_time(ledger_id: &str) -> Result { let (base, time) = split_time_travel_suffix(ledger_id)?; @@ -92,7 +96,7 @@ pub fn parse_ledger_id_with_time(ledger_id: &str) -> Result Result<(String, Option), LedgerIdParseError> { @@ -129,9 +133,14 @@ pub fn split_time_travel_suffix( )); } Some(LedgerIdTimeSpec::AtCommit(val.to_string())) + } else if let Some(val) = time_str.strip_prefix("recorded:") { + if val.is_empty() { + return Err(LedgerIdParseError::new("Missing value after '@recorded:'")); + } + Some(LedgerIdTimeSpec::AtRecorded(val.to_string())) } else { return Err(LedgerIdParseError::new(format!( - "Invalid time travel format: '{time_str}'. Expected @t:, @iso:, or @commit: prefix" + "Invalid time travel format: '{time_str}'. Expected @t:, @iso:, @recorded:, or @commit: prefix" ))); }; diff --git a/fluree-db-indexer/src/run_index/resolve/resolver.rs b/fluree-db-indexer/src/run_index/resolve/resolver.rs index 2b0d638ac6..87e0ca04d9 100644 --- a/fluree-db-indexer/src/run_index/resolve/resolver.rs +++ b/fluree-db-indexer/src/run_index/resolve/resolver.rs @@ -33,6 +33,16 @@ use fluree_vocab::{db, fluree}; /// (not imported) to avoid adding a transact → indexer layering inversion; /// the two sites are asserted in sync by the debug_assert in emit paths. const RESERVED_PREDICATE_NAMESPACES: &[u16] = &[FLUREE_DB, FLUREE_COMMIT, FLUREE_URN]; + +/// System-injected txn-meta predicates that legitimately live in a reserved +/// namespace: `build_commit` strips any user-supplied claim for these and +/// injects the system-controlled value (`f:identity` for provenance, +/// `db:receivedAt` for the dual-stamp audit axis). They must flow through +/// the resolver like ordinary entries so they stay queryable post-index. +fn is_system_txn_meta_entry(entry: &fluree_db_novelty::TxnMetaEntry) -> bool { + entry.predicate_ns == FLUREE_DB + && (entry.predicate_name == db::IDENTITY || entry.predicate_name == db::RECEIVED_AT) +} use num_bigint::BigInt; use rustc_hash::FxHashMap; use std::collections::HashMap; @@ -479,7 +489,8 @@ impl CommitResolver { writer: &mut W, ) -> Result { debug_assert!( - !RESERVED_PREDICATE_NAMESPACES.contains(&entry.predicate_ns), + !RESERVED_PREDICATE_NAMESPACES.contains(&entry.predicate_ns) + || is_system_txn_meta_entry(entry), "TxnMetaEntry in reserved namespace {} reached resolver — extract_txn_meta guard bypassed?", entry.predicate_ns ); @@ -1990,7 +2001,8 @@ impl SharedResolverState { chunk: &mut RebuildChunk, ) -> Result { debug_assert!( - !RESERVED_PREDICATE_NAMESPACES.contains(&entry.predicate_ns), + !RESERVED_PREDICATE_NAMESPACES.contains(&entry.predicate_ns) + || is_system_txn_meta_entry(entry), "TxnMetaEntry in reserved namespace {} reached resolver — extract_txn_meta guard bypassed?", entry.predicate_ns ); diff --git a/fluree-db-ledger/src/lib.rs b/fluree-db-ledger/src/lib.rs index 5c759d1cce..ce0f6fc1cf 100644 --- a/fluree-db-ledger/src/lib.rs +++ b/fluree-db-ledger/src/lib.rs @@ -72,6 +72,50 @@ pub struct IndexConfig { pub reindex_max_bytes: usize, } +/// Temporal metadata of the HEAD commit, tracked in memory so the commit +/// build path can enforce event-time monotonicity and sticky dual-stamp +/// emission with integer compares — no per-commit storage reads. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct HeadTemporal { + /// HEAD commit's event time (`Commit.time`) as epoch milliseconds. + pub event_time_ms: i64, + /// HEAD commit's `db:receivedAt` txn-meta value (epoch milliseconds), + /// when present. `Some` means the ledger is in dual-stamp mode: once any + /// commit dual-stamps, all subsequent commits must too, so `@recorded:` + /// resolution stays exact from the flip point onward. + pub received_time_ms: Option, +} + +impl HeadTemporal { + /// Whether the ledger is in sticky dual-stamp mode as of this commit. + pub fn dual_stamp(&self) -> bool { + self.received_time_ms.is_some() + } + + /// Extract from a commit record. `None` when the commit has no + /// parseable `time` (legacy or malformed — callers fall back to + /// unguarded behavior rather than failing the load). + pub fn from_commit(commit: &Commit) -> Option { + let event_time_ms = fluree_db_novelty::iso_to_epoch_ms_opt(commit.time.as_deref()?)?; + let received_time_ms = commit.txn_meta.iter().find_map(|e| { + if e.predicate_ns == fluree_vocab::namespaces::FLUREE_DB + && e.predicate_name == fluree_vocab::db::RECEIVED_AT + { + match e.value { + fluree_db_core::TxnMetaValue::Long(ms) => Some(ms), + _ => None, + } + } else { + None + } + }); + Some(Self { + event_time_ms, + received_time_ms, + }) + } +} + /// Ledger state combining indexed LedgerSnapshot with novelty overlay /// /// Provides a consistent view of the ledger by combining: @@ -133,6 +177,13 @@ pub struct LedgerState { /// Each entry is `Arc`. Set by `Fluree::ledger()` /// when spatial indexes are available in the binary index root. pub spatial_indexes: Option, + /// Temporal metadata of the HEAD commit (event time + dual-stamp flag). + /// + /// `None` means "not yet observed" — the ledger was loaded with no + /// novelty walk (index == head), so the commit build path lazily fetches + /// the head commit once and caches the result here. Kept current by + /// `apply_single_commit` and the commit build path. + pub head_temporal: Option, } impl LedgerState { @@ -219,7 +270,7 @@ impl LedgerState { // Load novelty from commits since index_t let head_commit_id = match &record.commit_head_id { Some(head_cid) if record.commit_t > snapshot.t => { - let (novelty_overlay, head_id) = Self::load_novelty( + let (novelty_overlay, head_id, head_temporal) = Self::load_novelty( store, head_cid, snapshot.t, @@ -247,6 +298,7 @@ impl LedgerState { ns_record: Some(record), binary_store: None, spatial_indexes: None, + head_temporal, }); } _ => record.commit_head_id.clone(), @@ -266,6 +318,10 @@ impl LedgerState { ns_record: Some(record), binary_store: None, spatial_indexes: None, + // Index == head: no commit was walked, so the head's temporal + // metadata is unknown. The commit build path resolves it lazily + // (one head-commit fetch per loaded ledger, only when writing). + head_temporal: None, }) } @@ -277,7 +333,9 @@ impl LedgerState { /// Envelope deltas (namespace codes, graph IRIs) are accumulated and applied /// to the snapshot via `apply_envelope_deltas()` after the walk completes. /// - /// Returns the novelty overlay and the head commit's ContentId. + /// Returns the novelty overlay, the head commit's ContentId, and the + /// head commit's temporal metadata (captured from the first streamed + /// commit — the walk is HEAD → oldest). async fn load_novelty( store: C, head_cid: &ContentId, @@ -285,7 +343,7 @@ impl LedgerState { ledger_id: &str, snapshot: &mut LedgerSnapshot, dict_novelty: &mut DictNovelty, - ) -> Result<(Novelty, Option)> { + ) -> Result<(Novelty, Option, Option)> { use std::collections::{HashMap, HashSet}; let mut novelty = Novelty::new(index_t); @@ -305,9 +363,17 @@ impl LedgerState { let stream = trace_commits_by_id(store, head_cid.clone(), index_t); futures::pin_mut!(stream); + let mut head_temporal: Option = None; + let mut first_commit = true; while let Some(result) = stream.next().await { let commit = result?; + // The stream is HEAD → oldest, so the first commit is the head. + if first_commit { + head_temporal = HeadTemporal::from_commit(&commit); + first_commit = false; + } + // Collect flakes for deferred replay let meta_flakes = generate_commit_flakes(&commit, ledger_id, commit.t); let mut all_flakes = commit.flakes; @@ -372,7 +438,7 @@ impl LedgerState { // sorts, totaling `O(N log N)` regardless of M. novelty.bulk_apply_commits(commit_batches, &reverse_graph)?; - Ok((novelty, Some(head_cid.clone()))) + Ok((novelty, Some(head_cid.clone()), head_temporal)) } /// Create a new ledger state from components @@ -396,9 +462,31 @@ impl LedgerState { ns_record: None, binary_store: None, spatial_indexes: None, + head_temporal: None, } } + /// Resolve the HEAD commit's temporal metadata, fetching the head commit + /// once if it wasn't observed during load (index == head, no novelty + /// walk). Called by the commit path before building a new commit so the + /// event-time monotonicity guard and dual-stamp decision are + /// authoritative; at most one storage read per loaded ledger, and only + /// when writing — pure readers never pay it. + pub async fn ensure_head_temporal( + &mut self, + store: &C, + ) -> Result> { + if self.head_temporal.is_none() { + if let Some(cid) = &self.head_commit_id { + let bytes = store.get(cid).await?; + let commit = fluree_db_core::commit::codec::read_commit(&bytes) + .map_err(|e| LedgerError::InvalidData(format!("head commit decode: {e}")))?; + self.head_temporal = HeadTemporal::from_commit(&commit); + } + } + Ok(self.head_temporal) + } + /// Get the current transaction time (max of index and novelty) pub fn t(&self) -> i64 { self.novelty.t.max(self.snapshot.t) @@ -670,6 +758,35 @@ impl LedgerState { ))); } + // Event time should be monotonically non-decreasing along the chain, so + // `@iso:`/`@recorded:` resolution stays exact. This is the *replay* path + // (incremental catch-up over already-durable commits): the build path + // (`resolve_commit_times`) rejects a *new* commit that violates this, but + // here the history is immutable and may predate the invariant (a pre-PR + // ledger with an NTP step-back or a raft leadership handoff between + // skewed clocks). Wedging catch-up can't fix history it can't change, so + // warn and continue — resolution across such a point may be approximate. + // (The full-reload path has no guard either; this keeps them consistent.) + let commit_temporal = HeadTemporal::from_commit(&commit); + if let (Some(prev), Some(new)) = (self.head_temporal, commit_temporal) { + if new.event_time_ms < prev.event_time_ms { + tracing::warn!( + commit_t, + new_event_time_ms = new.event_time_ms, + prev_event_time_ms = prev.event_time_ms, + "applying commit whose event time predates the head; wall-clock \ + (@iso:/@recorded:) resolution may be approximate across this point" + ); + } + if prev.dual_stamp() && !new.dual_stamp() { + tracing::warn!( + commit_t, + "applying a post-flip commit without db:receivedAt; @recorded: \ + resolution may be approximate across this point" + ); + } + } + // Collect graph IRIs from graph_delta let graph_iris: std::collections::HashSet = commit.graph_delta.values().cloned().collect(); @@ -737,6 +854,9 @@ impl LedgerState { // Update state self.head_commit_id = Some(commit_id.clone()); + if commit_temporal.is_some() { + self.head_temporal = commit_temporal; + } // Update ns_record if let Some(ref mut record) = self.ns_record { @@ -1368,6 +1488,29 @@ mod tests { assert_eq!(state.t(), 0); } + #[test] + fn test_apply_single_commit_tolerates_backwards_event_time() { + // Replay/catch-up over already-durable commits must not wedge on a + // non-monotonic event time (clock skew / pre-invariant history). The + // build path rejects new violations; this path warns and continues. + let snapshot = LedgerSnapshot::genesis("test:main"); + let mut state = LedgerState::new(snapshot, Novelty::new(0)); + + let c1 = Commit::new(1, vec![make_flake(10, 1, 100, 1)]) + .with_id(make_test_commit_id("commit:1")) + .with_time("2026-01-02T00:00:00Z"); + state.apply_single_commit(c1, "test:main").unwrap(); + + // c2's event time steps backwards relative to c1 — must still apply. + let c2 = Commit::new(2, vec![make_flake(11, 1, 200, 2)]) + .with_id(make_test_commit_id("commit:2")) + .with_time("2026-01-01T00:00:00Z"); + state + .apply_single_commit(c2, "test:main") + .expect("replay must tolerate a backwards event time"); + assert_eq!(state.t(), 2); + } + #[test] fn test_apply_single_commit_rejects_non_monotonic_skip() { let snapshot = LedgerSnapshot::genesis("test:main"); diff --git a/fluree-db-ledger/src/staged.rs b/fluree-db-ledger/src/staged.rs index f73b461cb5..b6d22a7911 100644 --- a/fluree-db-ledger/src/staged.rs +++ b/fluree-db-ledger/src/staged.rs @@ -207,6 +207,14 @@ impl StagedLedger { &self.base } + /// Get the base ledger state mutably. + /// + /// Used by the commit path to lazily resolve head temporal metadata + /// (`LedgerState::ensure_head_temporal`) before building a commit. + pub fn base_mut(&mut self) -> &mut LedgerState { + &mut self.base + } + /// Consume the view and return the owned base ledger state /// /// Use this when the staged changes should be discarded (e.g., no-op updates). diff --git a/fluree-db-novelty/src/commit_flakes.rs b/fluree-db-novelty/src/commit_flakes.rs index eb9ee7d3e0..23d3cb359b 100644 --- a/fluree-db-novelty/src/commit_flakes.rs +++ b/fluree-db-novelty/src/commit_flakes.rs @@ -41,13 +41,18 @@ pub fn stamp_graph_on_commit_flakes(flakes: &mut [Flake], graph_sid: &Sid) { } } +/// Parse ISO-8601 timestamp to epoch milliseconds. +pub fn iso_to_epoch_ms_opt(iso: &str) -> Option { + DateTime::parse_from_rfc3339(iso) + .ok() + .map(|dt| dt.timestamp_millis()) +} + /// Parse ISO-8601 timestamp to epoch milliseconds /// /// Falls back to 0 if parsing fails. fn iso_to_epoch_ms(iso: &str) -> i64 { - DateTime::parse_from_rfc3339(iso) - .map(|dt| dt.timestamp_millis()) - .unwrap_or(0) + iso_to_epoch_ms_opt(iso).unwrap_or(0) } /// Generate commit metadata flakes for a commit. diff --git a/fluree-db-novelty/src/lib.rs b/fluree-db-novelty/src/lib.rs index bcaf87cc74..169a6cfe7d 100644 --- a/fluree-db-novelty/src/lib.rs +++ b/fluree-db-novelty/src/lib.rs @@ -49,7 +49,9 @@ pub use commit::{ CommitEnvelope, CommonAncestor, TxnMetaEntry, TxnMetaValue, TxnSignature, MAX_TXN_META_BYTES, MAX_TXN_META_ENTRIES, }; -pub use commit_flakes::{generate_commit_flakes, stamp_graph_on_commit_flakes}; +pub use commit_flakes::{ + generate_commit_flakes, iso_to_epoch_ms_opt, stamp_graph_on_commit_flakes, +}; pub use delta::compute_delta_keys; pub use error::{NoveltyError, Result}; pub use fluree_db_core::commit::codec::envelope::{MAX_GRAPH_DELTA_ENTRIES, MAX_GRAPH_IRI_LENGTH}; diff --git a/fluree-db-server/src/routes/query.rs b/fluree-db-server/src/routes/query.rs index ba7ccd97a5..ef02e9ef98 100644 --- a/fluree-db-server/src/routes/query.rs +++ b/fluree-db-server/src/routes/query.rs @@ -2305,6 +2305,9 @@ pub(crate) fn ledger_scoped_sparql_dataset_spec( fluree_db_core::ledger_id::LedgerIdTimeSpec::AtT(t) => TimeSpec::AtT(t), fluree_db_core::ledger_id::LedgerIdTimeSpec::AtIso(iso) => TimeSpec::AtTime(iso), fluree_db_core::ledger_id::LedgerIdTimeSpec::AtCommit(c) => TimeSpec::AtCommit(c), + fluree_db_core::ledger_id::LedgerIdTimeSpec::AtRecorded(r) => { + TimeSpec::AtRecorded(r) + } }); let selector = frag .map(GraphSelector::from_str) @@ -2341,6 +2344,9 @@ pub(crate) fn ledger_scoped_sparql_dataset_spec( fluree_db_core::ledger_id::LedgerIdTimeSpec::AtT(t) => TimeSpec::AtT(t), fluree_db_core::ledger_id::LedgerIdTimeSpec::AtIso(iso) => TimeSpec::AtTime(iso), fluree_db_core::ledger_id::LedgerIdTimeSpec::AtCommit(c) => TimeSpec::AtCommit(c), + fluree_db_core::ledger_id::LedgerIdTimeSpec::AtRecorded(r) => { + TimeSpec::AtRecorded(r) + } }); let selector = frag .map(GraphSelector::from_str) diff --git a/fluree-db-server/src/routes/transact.rs b/fluree-db-server/src/routes/transact.rs index c592367cc4..97812db806 100644 --- a/fluree-db-server/src/routes/transact.rs +++ b/fluree-db-server/src/routes/transact.rs @@ -1491,7 +1491,35 @@ async fn execute_transaction( }; let did = effective_did(&prepared_transaction.governance, author); - let commit_opts = build_commit_opts(did, credential, &state.fluree, &handle); + let mut commit_opts = build_commit_opts(did, credential, &state.fluree, &handle); + + // `opts.eventTime`: caller-supplied event time for this commit + // (backdated historical loads). Validated for RFC 3339 shape at the + // boundary; monotonicity/future bounds are enforced by the commit + // build path against the ledger head. Recording the wall-clock + // receipt time alongside flips the ledger into dual-stamp mode so + // `@recorded:` (audit-axis) time travel stays exact. + if let Some(event_time_raw) = prepared_transaction + .body + .get("opts") + .and_then(|o| o.get("eventTime")) + { + let Some(event_time) = event_time_raw.as_str() else { + set_span_error_code(&span, "error:BadRequest"); + return Err(ServerError::bad_request( + "opts.eventTime must be an RFC 3339 timestamp string", + )); + }; + if chrono::DateTime::parse_from_rfc3339(event_time).is_err() { + set_span_error_code(&span, "error:BadRequest"); + return Err(ServerError::bad_request(format!( + "opts.eventTime is not a valid RFC 3339 timestamp: {event_time}" + ))); + } + commit_opts = commit_opts + .with_timestamp(event_time.to_string()) + .with_received_at(chrono::Utc::now().to_rfc3339()); + } // Pick up `opts.shapes` and `opts.uniqueProperties` from the body // so inline SHACL shapes and unique-property constraints reach the diff --git a/fluree-db-transact/src/commit.rs b/fluree-db-transact/src/commit.rs index 54e82f378f..10cd43c8ca 100644 --- a/fluree-db-transact/src/commit.rs +++ b/fluree-db-transact/src/commit.rs @@ -25,9 +25,11 @@ use crate::raw_txn_upload::PendingRawTxnUpload; use chrono::Utc; use fluree_db_binary_index::BinaryIndexStore; use fluree_db_core::{ContentId, ContentKind, ContentStore, DictNovelty, Flake, TXN_META_GRAPH_ID}; -use fluree_db_ledger::{IndexConfig, LedgerState, StagedLedger}; +use fluree_db_ledger::{HeadTemporal, IndexConfig, LedgerState, StagedLedger}; use fluree_db_nameservice::{CasResult, NameServiceLookup, RefKind, RefPublisher, RefValue}; -use fluree_db_novelty::{generate_commit_flakes, stamp_graph_on_commit_flakes}; +use fluree_db_novelty::{ + generate_commit_flakes, iso_to_epoch_ms_opt, stamp_graph_on_commit_flakes, +}; use fluree_db_novelty::{Commit, SigningKey, TxnMetaEntry, TxnMetaValue, TxnSignature}; use fluree_db_query::BinaryRangeProvider; use serde::{Deserialize, Serialize}; @@ -166,9 +168,23 @@ pub struct CommitOpts { pub merge_parents: Vec, /// ISO 8601 timestamp for the commit. /// - /// When `None`, defaults to `Utc::now().to_rfc3339()`. Provide a fixed - /// value for deterministic commit hashes (testing, replay). + /// When `None`, defaults to `Utc::now().to_rfc3339()` (clamped to the + /// head commit's event time so the chain stays monotonic under clock + /// skew). Provide a fixed value for deterministic commit hashes + /// (testing, replay). This is the commit's *event time* — the axis + /// `@iso:` time travel resolves against. Supplied values are validated: + /// monotonically non-decreasing along the chain, and at most a small + /// skew into the future. pub timestamp: Option, + /// ISO 8601 wall-clock time the commit was recorded (audit axis). + /// + /// Setting this (or committing onto a ledger whose head already carries + /// `db:receivedAt`) puts the ledger in sticky dual-stamp mode: this and + /// every subsequent commit records a system-controlled `db:receivedAt` + /// txn-meta entry, and `@recorded:` time travel resolves against it. + /// When `None` on a dual-stamp ledger, defaults to wall clock (clamped + /// monotonic). Normal ledgers (no event-time use) never emit it. + pub received_at: Option, } impl std::fmt::Debug for CommitOpts { @@ -194,6 +210,8 @@ impl std::fmt::Debug for CommitOpts { .field("skip_backpressure", &self.skip_backpressure) .field("skip_sequencing", &self.skip_sequencing) .field("merge_parents", &self.merge_parents.len()) + .field("timestamp", &self.timestamp) + .field("received_at", &self.received_at) .finish() } } @@ -217,6 +235,7 @@ impl Clone for CommitOpts { skip_sequencing: self.skip_sequencing, merge_parents: self.merge_parents.clone(), timestamp: self.timestamp.clone(), + received_at: self.received_at.clone(), } } } @@ -314,6 +333,17 @@ impl CommitOpts { self.timestamp = Some(ts.into()); self } + + /// Set the recorded-at wall-clock timestamp (ISO 8601, audit axis). + /// + /// Flips the ledger into sticky dual-stamp mode. Normally left unset: + /// the build path stamps wall clock automatically on dual-stamp + /// ledgers. Provide a fixed value only for deterministic commit hashes + /// (testing, replay). + pub fn with_received_at(mut self, ts: impl Into) -> Self { + self.received_at = Some(ts.into()); + self + } } /// Serializable subset of [`CommitOpts`] carrying only the fields a @@ -341,6 +371,9 @@ pub struct CommitOptsRequest { pub txn_signature: Option, pub txn_meta: Vec, pub timestamp: Option, + /// Recorded-at wall-clock timestamp (audit axis, dual-stamp mode). + #[serde(default)] + pub received_at: Option, /// Pre-resolved raw-txn CID. Set when the upstream caller's /// [`CommitOpts::raw_txn_upload`] was awaited before projection /// (e.g. by the Raft leader so the worker can reference the same @@ -372,6 +405,7 @@ impl CommitOptsRequest { skip_sequencing: false, merge_parents: Vec::new(), timestamp: self.timestamp, + received_at: self.received_at, } } } @@ -394,11 +428,112 @@ impl From<&CommitOpts> for CommitOptsRequest { txn_signature: opts.txn_signature.clone(), txn_meta: opts.txn_meta.clone(), timestamp: opts.timestamp.clone(), + received_at: opts.received_at.clone(), raw_txn_id: opts.raw_txn_id.clone(), } } } +/// Maximum allowed forward skew for a caller-supplied event time. +/// +/// Commits are immutable and event time is monotonically non-decreasing, so +/// a single future-dated event time would permanently pin the ledger's +/// timeline ahead of reality. Small allowance for client/server clock drift. +const MAX_EVENT_TIME_FUTURE_SKEW_MS: i64 = 5 * 60 * 1000; + +fn epoch_ms_to_rfc3339(ms: i64) -> Result { + chrono::DateTime::::from_timestamp_millis(ms) + .map(|dt| dt.to_rfc3339()) + .ok_or_else(|| { + TransactError::InvalidEventTime(format!("epoch milliseconds {ms} out of range")) + }) +} + +/// Resolve the commit's event time (`Commit.time`) and optional audit-axis +/// receivedAt stamp (epoch ms) from caller opts + head temporal metadata. +/// +/// Event time: caller-supplied values must be valid RFC 3339, at most +/// [`MAX_EVENT_TIME_FUTURE_SKEW_MS`] in the future, and not earlier than the +/// head commit's event time. Defaults to wall clock, clamped to the head's +/// event time so the chain stays monotonic under clock skew — `@iso:` +/// resolution silently mis-resolves on a non-monotonic chain. +/// +/// receivedAt: `Some` when the caller supplied one or the ledger is already +/// in dual-stamp mode (sticky — every post-flip commit must carry it for +/// `@recorded:` resolution to stay exact). Clamped monotonic likewise. +fn resolve_commit_times( + event: Option, + received: Option, + head: Option, +) -> Result<(String, Option)> { + let now = Utc::now(); + let now_ms = now.timestamp_millis(); + let head_event_ms = head.map(|h| h.event_time_ms); + + let timestamp = match event { + Some(ts) => { + let ms = iso_to_epoch_ms_opt(&ts).ok_or_else(|| { + TransactError::InvalidEventTime(format!("'{ts}' is not a valid RFC 3339 timestamp")) + })?; + if ms > now_ms + MAX_EVENT_TIME_FUTURE_SKEW_MS { + return Err(TransactError::InvalidEventTime(format!( + "event time '{ts}' is in the future; commits are immutable and event \ + time is monotonic, so a future event time would permanently pin the \ + ledger's timeline ahead of reality" + ))); + } + if let Some(head_ms) = head_event_ms { + if ms < head_ms { + return Err(TransactError::InvalidEventTime(format!( + "event time '{ts}' is earlier than the head commit's event time \ + ({}); event time must be monotonically non-decreasing", + epoch_ms_to_rfc3339(head_ms)? + ))); + } + } + ts + } + None => match head_event_ms { + // Clock went backwards (or the head is event-timed at/after our + // present): reuse the head's event time so the chain stays + // monotonic and `@iso:` resolution stays correct. + Some(head_ms) if now_ms < head_ms => epoch_ms_to_rfc3339(head_ms)?, + _ => now.to_rfc3339(), + }, + }; + + let dual_stamp = received.is_some() || head.is_some_and(|h| h.dual_stamp()); + let received_at_ms = if dual_stamp { + let ms = match received { + Some(ts) => iso_to_epoch_ms_opt(&ts).ok_or_else(|| { + TransactError::InvalidEventTime(format!( + "receivedAt '{ts}' is not a valid RFC 3339 timestamp" + )) + })?, + None => now_ms, + }; + // Symmetric with the event axis: a caller-supplied receivedAt (reachable + // from the Rust API / CLI — the HTTP route always passes `now`) must not + // be in the future. The audit axis is immutable and monotonic, so a + // future stamp would permanently pin the ledger's timeline ahead of + // reality. + if ms > now_ms + MAX_EVENT_TIME_FUTURE_SKEW_MS { + return Err(TransactError::InvalidEventTime(format!( + "receivedAt '{}' is in the future; the audit axis is immutable and \ + monotonic, so a future receivedAt would permanently pin the ledger's \ + timeline ahead of reality", + epoch_ms_to_rfc3339(ms)? + ))); + } + let prev = head.and_then(|h| h.received_time_ms); + Some(prev.map_or(ms, |p| ms.max(p))) + } else { + None + }; + + Ok((timestamp, received_at_ms)) +} + /// Commit a staged transaction /// /// This function: @@ -445,6 +580,7 @@ pub async fn build_commit( skip_backpressure, merge_parents, timestamp: opt_timestamp, + received_at: opt_received_at, .. } = opts; @@ -470,6 +606,14 @@ pub async fn build_commit( )); } + // db:receivedAt is system-controlled like f:identity: strip any + // caller-supplied txn-meta claim unconditionally; the resolved value + // (if the ledger dual-stamps) is injected below. + txn_meta.retain(|entry| { + !(entry.predicate_ns == fluree_vocab::namespaces::FLUREE_DB + && entry.predicate_name == fluree_vocab::db::RECEIVED_AT) + }); + // No wrapper span: the caller's ambient span (e.g. `txn_commit` // from `commit()`, or whatever the Raft path opens) carries the // build-time fields, so sub-spans below stay direct children of @@ -527,8 +671,18 @@ pub async fn build_commit( graph_delta.values().map(std::string::String::as_str), )?; - // Use caller-provided timestamp or default to wall clock. - let timestamp = opt_timestamp.unwrap_or_else(|| Utc::now().to_rfc3339()); + // Resolve the commit's event time (`Commit.time`, the `@iso:` axis) and + // the optional audit-axis receivedAt stamp. Validates monotonicity and + // future bounds against the in-memory head temporal metadata. + let (timestamp, received_at_ms) = + resolve_commit_times(opt_timestamp, opt_received_at, base.head_temporal)?; + if let Some(recv_ms) = received_at_ms { + txn_meta.push(TxnMetaEntry::new( + fluree_vocab::namespaces::FLUREE_DB, + fluree_vocab::db::RECEIVED_AT, + TxnMetaValue::Long(recv_ms), + )); + } // The caller is responsible for uploading the raw-txn JSON // (if any) before invoking this function — the result is @@ -782,6 +936,11 @@ fn finalize_state_with_base( flake_count: usize, base: LedgerState, ) -> Result<(CommitReceipt, LedgerState)> { + // Capture before `commit_record.flakes` is taken below; keeps the head + // temporal metadata current so the next commit's event-time guard and + // dual-stamp decision stay in-memory integer checks. + let head_temporal = HeadTemporal::from_commit(&commit_record).or(base.head_temporal); + // 10. Generate commit metadata flakes let commit_metadata_flakes = { let span = tracing::debug_span!("commit_generate_metadata_flakes"); @@ -915,6 +1074,7 @@ fn finalize_state_with_base( ns_record: base.ns_record, binary_store: base.binary_store, spatial_indexes: base.spatial_indexes, + head_temporal, }; let receipt = CommitReceipt { @@ -1616,4 +1776,109 @@ mod tests { "raw_txn blob must be released after EmptyTransaction error" ); } + + // ========================================================================= + // resolve_commit_times: event-time validation + dual-stamp resolution + // ========================================================================= + + fn head(event_ms: i64, received_ms: Option) -> Option { + Some(HeadTemporal { + event_time_ms: event_ms, + received_time_ms: received_ms, + }) + } + + #[test] + fn resolve_times_default_is_wall_clock_no_dual_stamp() { + let (ts, recv) = resolve_commit_times(None, None, None).unwrap(); + assert!(chrono::DateTime::parse_from_rfc3339(&ts).is_ok()); + assert!(recv.is_none(), "plain commit must not dual-stamp"); + } + + #[test] + fn resolve_times_clamps_clock_backwards_to_head() { + // Head event time is one hour in the future (clock skew / backdated + // present): the default stamp must reuse it, not go backwards. + let future_ms = Utc::now().timestamp_millis() + 3_600_000; + let (ts, _) = resolve_commit_times(None, None, head(future_ms, None)).unwrap(); + let ms = iso_to_epoch_ms_opt(&ts).unwrap(); + assert_eq!(ms, future_ms, "default stamp must clamp to head event time"); + } + + #[test] + fn resolve_times_rejects_event_time_before_head() { + let now_ms = Utc::now().timestamp_millis(); + let err = resolve_commit_times( + Some("2020-01-01T00:00:00Z".into()), + None, + head(now_ms, None), + ) + .unwrap_err(); + assert!(err.to_string().contains("monotonically non-decreasing")); + } + + #[test] + fn resolve_times_rejects_future_event_time() { + let tomorrow = (Utc::now() + chrono::Duration::days(1)).to_rfc3339(); + let err = resolve_commit_times(Some(tomorrow), None, None).unwrap_err(); + assert!(err.to_string().contains("future")); + } + + #[test] + fn resolve_times_rejects_unparseable_event_time() { + let err = resolve_commit_times(Some("garbage".into()), None, None).unwrap_err(); + assert!(err.to_string().contains("RFC 3339")); + } + + #[test] + fn resolve_times_supplied_past_event_time_ok_on_fresh_ledger() { + let (ts, recv) = + resolve_commit_times(Some("1969-07-20T20:17:00Z".into()), None, None).unwrap(); + // Pre-1970 (negative epoch ms) is fine — historical modeling. + assert_eq!(ts, "1969-07-20T20:17:00Z"); + assert!(recv.is_none(), "timestamp alone must not flip dual-stamp"); + } + + #[test] + fn resolve_times_explicit_received_flips_dual_stamp() { + let (_, recv) = resolve_commit_times( + Some("2020-01-01T00:00:00Z".into()), + Some("2026-01-01T00:00:00Z".into()), + None, + ) + .unwrap(); + assert_eq!(recv, iso_to_epoch_ms_opt("2026-01-01T00:00:00Z")); + } + + #[test] + fn resolve_times_sticky_dual_stamp_continues_without_opts() { + let now_ms = Utc::now().timestamp_millis(); + let (_, recv) = + resolve_commit_times(None, None, head(now_ms - 1000, Some(now_ms - 1000))).unwrap(); + let recv = recv.expect("dual-stamp ledger must keep stamping receivedAt"); + assert!( + recv >= now_ms - 1000, + "receivedAt must be clamped monotonic" + ); + } + + #[test] + fn resolve_times_rejects_future_received_at() { + // Symmetric with the event axis: a caller-supplied receivedAt in the + // future is rejected so the immutable audit axis can't be pinned ahead. + let tomorrow = (Utc::now() + chrono::Duration::days(1)).to_rfc3339(); + let err = resolve_commit_times(Some("2020-01-01T00:00:00Z".into()), Some(tomorrow), None) + .unwrap_err(); + assert!(err.to_string().contains("future")); + } + + #[test] + fn resolve_times_received_clamped_to_head_received() { + // Head recorded one hour ahead (clock skew): the new receivedAt must + // not go backwards, or @recorded: resolution breaks. + let future_ms = Utc::now().timestamp_millis() + 3_600_000; + let (_, recv) = + resolve_commit_times(None, None, head(future_ms - 1, Some(future_ms))).unwrap(); + assert_eq!(recv, Some(future_ms)); + } } diff --git a/fluree-db-transact/src/error.rs b/fluree-db-transact/src/error.rs index 662f0d9893..36923c590e 100644 --- a/fluree-db-transact/src/error.rs +++ b/fluree-db-transact/src/error.rs @@ -95,6 +95,11 @@ pub enum TransactError { #[error("Empty transaction: no flakes to commit")] EmptyTransaction, + /// Invalid commit event time (unparseable, future-dated, or earlier + /// than the head commit's event time) + #[error("Invalid event time: {0}")] + InvalidEventTime(String), + /// Novelty at maximum size (backpressure) #[error("Novelty at maximum size, reindexing required")] NoveltyAtMax, diff --git a/fluree-vocab/src/lib.rs b/fluree-vocab/src/lib.rs index 28f7bff5ad..93df4eb610 100644 --- a/fluree-vocab/src/lib.rs +++ b/fluree-vocab/src/lib.rs @@ -1718,9 +1718,18 @@ pub mod db { /// db:previous - reference to previous commit pub const PREVIOUS: &str = "previous"; - /// db:time - commit timestamp (epoch milliseconds) + /// db:time - commit timestamp (epoch milliseconds). This is the commit's + /// *event time*: user-suppliable (backdated historical loads) and the + /// axis `@iso:` time travel resolves against. Defaults to wall clock. pub const TIME: &str = "time"; + /// db:receivedAt - wall-clock time the commit was actually recorded + /// (epoch milliseconds). System-controlled: emitted only on ledgers that + /// have used a caller-supplied event time (sticky dual-stamp mode), so + /// normal ledgers carry no extra metadata. The `@recorded:` time-travel + /// selector resolves against this axis for audit queries. + pub const RECEIVED_AT: &str = "receivedAt"; + /// db:message - commit message (optional) pub const MESSAGE: &str = "message";