diff --git a/Cargo.lock b/Cargo.lock index 70331730..85846cfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4894,6 +4894,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "serial_test", "sha2 0.10.9", "tempfile", "thiserror", diff --git a/crates/omnigraph-cluster/Cargo.toml b/crates/omnigraph-cluster/Cargo.toml index 119545e2..19a325ba 100644 --- a/crates/omnigraph-cluster/Cargo.toml +++ b/crates/omnigraph-cluster/Cargo.toml @@ -30,5 +30,6 @@ tokio = { workspace = true } ulid = { workspace = true } [dev-dependencies] +serial_test = "3" tempfile = { workspace = true } tokio = { workspace = true } diff --git a/crates/omnigraph-cluster/src/failpoints.rs b/crates/omnigraph-cluster/src/failpoints.rs index f1799d79..f5b20230 100644 --- a/crates/omnigraph-cluster/src/failpoints.rs +++ b/crates/omnigraph-cluster/src/failpoints.rs @@ -1,6 +1,13 @@ //! Fault-injection hooks for the cluster apply protocol, mirroring the //! engine's `omnigraph::failpoints` pattern. With the `failpoints` feature //! off, every call site compiles to `Ok(())`. +//! +//! Only `maybe_fail` lives here — it returns the cluster's [`Diagnostic`] +//! error type. The test-side configuration guard is shared: use +//! [`omnigraph::failpoints::ScopedFailPoint`], which is registry-only +//! (error-type agnostic) and reachable because the cluster's `failpoints` +//! feature enables `omnigraph/failpoints`. One `ScopedFailPoint`, in the +//! lowest crate, avoids a drifting duplicate. use crate::Diagnostic; @@ -19,38 +26,16 @@ pub(crate) fn maybe_fail(_name: &str) -> Result<(), Diagnostic> { Ok(()) } -#[cfg(feature = "failpoints")] -pub struct ScopedFailPoint { - name: String, -} - -#[cfg(feature = "failpoints")] -impl ScopedFailPoint { - pub fn new(name: &str, action: &str) -> Self { - fail::cfg(name, action).expect("configure failpoint"); - Self { - name: name.to_string(), - } - } - - /// Register a callback failpoint with the same Drop-based cleanup as - /// `new`. Without the guard, a panic while the point is active would - /// leak the callback into the process-global registry and fire it under - /// later tests in the same binary. - pub fn with_callback(name: &str, callback: F) -> Self - where - F: Fn() + Send + Sync + 'static, - { - fail::cfg_callback(name, callback).expect("configure callback failpoint"); - Self { - name: name.to_string(), - } - } -} - -#[cfg(feature = "failpoints")] -impl Drop for ScopedFailPoint { - fn drop(&mut self) { - fail::remove(&self.name); - } +/// Compile-checked catalog of this crate's apply-protocol failpoint names. +/// Engine-scoped names referenced from cluster tests live in +/// [`omnigraph::failpoints::names`]. +pub mod names { + pub const CLUSTER_APPLY_AFTER_GRAPH_CREATE: &str = "cluster_apply.after_graph_create"; + pub const CLUSTER_APPLY_AFTER_GRAPH_DELETE: &str = "cluster_apply.after_graph_delete"; + pub const CLUSTER_APPLY_AFTER_PAYLOAD_PHASE: &str = "cluster_apply.after_payload_phase"; + pub const CLUSTER_APPLY_AFTER_SCHEMA_APPLY: &str = "cluster_apply.after_schema_apply"; + pub const CLUSTER_APPLY_BEFORE_GRAPH_CREATE: &str = "cluster_apply.before_graph_create"; + pub const CLUSTER_APPLY_BEFORE_GRAPH_DELETE: &str = "cluster_apply.before_graph_delete"; + pub const CLUSTER_APPLY_BEFORE_SCHEMA_APPLY: &str = "cluster_apply.before_schema_apply"; + pub const CLUSTER_APPLY_BEFORE_STATE_WRITE: &str = "cluster_apply.before_state_write"; } diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index bed27c88..a8db3d36 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -510,7 +510,7 @@ pub async fn apply_config_dir_with_options( continue; } }; - if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_graph_create") { + if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_CREATE) { // Simulated crash before the init: the sidecar stays for the // sweep (row 1: root absent -> intent removed next run). diagnostics.push(diagnostic); @@ -587,7 +587,7 @@ pub async fn apply_config_dir_with_options( // Crash point: the graph exists, the cluster state does not record it // yet. A failure here must acknowledge nothing; the next run's sweep // rolls the ledger forward (row 4). - if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_graph_create") { + if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_CREATE) { diagnostics.push(diagnostic); return early_return( display_path(&desired.config_dir), @@ -727,7 +727,7 @@ pub async fn apply_config_dir_with_options( continue; } }; - if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_schema_apply") { + if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_SCHEMA_APPLY) { // Simulated crash before the engine call: the sidecar stays; the // sweep retires it next run (ledger still consistent with live). diagnostics.push(diagnostic); @@ -787,7 +787,7 @@ pub async fn apply_config_dir_with_options( } // Crash point: the manifest moved, the ledger does not record it yet. // A failure here acknowledges nothing; the sweep rolls forward. - if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_schema_apply") { + if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_SCHEMA_APPLY) { diagnostics.push(diagnostic); return early_return( display_path(&desired.config_dir), @@ -872,7 +872,7 @@ pub async fn apply_config_dir_with_options( // Crash point: payloads are on disk, state has not moved. A failure here // must leave state.json byte-identical and acknowledge nothing; re-running // apply repairs via the skip-if-exists blob reuse. - if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_payload_phase") { + if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_PAYLOAD_PHASE) { diagnostics.push(diagnostic); return early_return( display_path(&desired.config_dir), @@ -949,7 +949,7 @@ pub async fn apply_config_dir_with_options( continue; } }; - if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_graph_delete") { + if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_DELETE) { // Simulated crash before removal: row 8 retires the intent and // the still-valid approval lets a later run retry. diagnostics.push(diagnostic); @@ -974,7 +974,7 @@ pub async fn apply_config_dir_with_options( } // Crash point: the root is gone, the ledger does not record it yet. // The sweep rolls forward (row 7b) and consumes the approval. - if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_graph_delete") { + if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_DELETE) { diagnostics.push(diagnostic); return early_return( display_path(&desired.config_dir), @@ -1080,7 +1080,7 @@ pub async fn apply_config_dir_with_options( // persisted-statuses revert contract below is exercised; a cfg_callback // on this point can mutate state.json to simulate a concurrent writer, // making write_state's CAS check fail organically. - let write_result = match failpoints::maybe_fail("cluster_apply.before_state_write") { + let write_result = match failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_STATE_WRITE) { Ok(()) => { backend .write_state(&new_state, expected_cas.as_deref(), &mut observations) diff --git a/crates/omnigraph-cluster/tests/failpoints.rs b/crates/omnigraph-cluster/tests/failpoints.rs index 51997ce0..6b6d3391 100644 --- a/crates/omnigraph-cluster/tests/failpoints.rs +++ b/crates/omnigraph-cluster/tests/failpoints.rs @@ -13,9 +13,11 @@ use std::fs; use std::path::{Path, PathBuf}; use fail::FailScenario; +use serial_test::serial; use omnigraph::db::Omnigraph; -use omnigraph::failpoints::ScopedFailPoint as EngineScopedFailPoint; -use omnigraph_cluster::failpoints::ScopedFailPoint; +// One ScopedFailPoint for both engine- and cluster-scoped failpoint names: +// it is registry-only (error-type agnostic) and lives in the lowest crate. +use omnigraph::failpoints::ScopedFailPoint; use omnigraph_cluster::{ ApplyOptions, apply_config_dir, apply_config_dir_with_options, approve_config_dir, validate_config_dir, @@ -105,12 +107,13 @@ fn query_blob(config_dir: &Path, digests: &BTreeMap) -> PathBuf } #[tokio::test] +#[serial] async fn failpoint_wiring_returns_injected_diagnostic() { let scenario = FailScenario::setup(); let dir = fixture(); seed_applyable_state(dir.path()); - let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return"); + let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_PAYLOAD_PHASE, "return"); let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!(out.diagnostics.iter().any(|diagnostic| { @@ -127,6 +130,7 @@ async fn failpoint_wiring_returns_injected_diagnostic() { /// state.json is byte-identical, nothing is acknowledged — and a plain re-run /// repairs by trusting the existing content-addressed blobs. #[tokio::test] +#[serial] async fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { let scenario = FailScenario::setup(); let dir = fixture(); @@ -134,7 +138,7 @@ async fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { let state_before = fs::read(state_path(dir.path())).unwrap(); { - let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return"); + let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_PAYLOAD_PHASE, "return"); let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!(!out.state_written); @@ -169,6 +173,7 @@ async fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { /// (possible under `state.lock: false`) must surface `state_cas_mismatch`, /// acknowledge nothing, and leave the concurrent writer's state on disk. #[tokio::test] +#[serial] async fn apply_cas_race_surfaces_state_cas_mismatch() { let scenario = FailScenario::setup(); let dir = fixture(); @@ -179,7 +184,7 @@ async fn apply_cas_race_surfaces_state_cas_mismatch() { // after apply read it but before apply writes. RAII-guarded so a panic // inside apply cannot leak the callback into the global registry. let race_path = state_path(dir.path()); - let failpoint = ScopedFailPoint::with_callback("cluster_apply.before_state_write", move || { + let failpoint = ScopedFailPoint::with_callback(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_BEFORE_STATE_WRITE, move || { let mut state: serde_json::Value = serde_json::from_str(&fs::read_to_string(&race_path).unwrap()).unwrap(); state["state_revision"] = serde_json::json!(99); @@ -256,13 +261,14 @@ fn recovery_sidecars(config_dir: &Path) -> Vec { /// The next run's sweep removes the intent (row 1) and the same run creates /// the graph and converges. #[tokio::test] +#[serial] async fn create_crash_before_init_recovers_via_sweep() { let scenario = FailScenario::setup(); let dir = fixture(); seed_empty_state(dir.path()); { - let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_create", "return"); + let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_CREATE, "return"); let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!(out.diagnostics.iter().any(|diagnostic| { @@ -298,6 +304,7 @@ async fn create_crash_before_init_recovers_via_sweep() { /// ledger is stale, nothing was acknowledged. The next run's sweep rolls the /// ledger forward (row 4) with an audit entry, and the run converges. #[tokio::test] +#[serial] async fn create_crash_after_init_rolls_state_forward() { let scenario = FailScenario::setup(); let dir = fixture(); @@ -305,7 +312,7 @@ async fn create_crash_after_init_rolls_state_forward() { let state_before = fs::read(dir.path().join("__cluster/state.json")).unwrap(); { - let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_create", "return"); + let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_CREATE, "return"); let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!(!out.state_written); @@ -385,6 +392,7 @@ async fn live_schema_digest(dir: &Path) -> String { /// live schema and ledger are untouched; the next run's sweep retires the /// stale intent and the same run applies and converges. #[tokio::test] +#[serial] async fn schema_crash_before_apply_recovers_via_sweep() { let scenario = FailScenario::setup(); let dir = fixture(); @@ -393,7 +401,7 @@ async fn schema_crash_before_apply_recovers_via_sweep() { fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); { - let _failpoint = ScopedFailPoint::new("cluster_apply.before_schema_apply", "return"); + let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_BEFORE_SCHEMA_APPLY, "return"); let out = apply_config_dir_with_options( dir.path(), ApplyOptions { @@ -425,6 +433,7 @@ async fn schema_crash_before_apply_recovers_via_sweep() { /// the graph manifest moves. The defensive cleanup proof should remove the /// cluster sidecar immediately so a pre-movement error cannot brick boot. #[tokio::test] +#[serial] async fn schema_apply_error_before_graph_movement_removes_sidecar() { let scenario = FailScenario::setup(); let dir = fixture(); @@ -433,7 +442,7 @@ async fn schema_apply_error_before_graph_movement_removes_sidecar() { fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); { - let _failpoint = EngineScopedFailPoint::new("schema_apply.before_staging_write", "return"); + let _failpoint = ScopedFailPoint::new(omnigraph::failpoints::names::SCHEMA_APPLY_BEFORE_STAGING_WRITE, "return"); let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!( @@ -462,6 +471,7 @@ async fn schema_apply_error_before_graph_movement_removes_sidecar() { /// prove this is a pre-movement failure, so the sidecar must survive for /// explicit recovery/quarantine instead of being cleaned up defensively. #[tokio::test] +#[serial] async fn schema_apply_error_after_graph_movement_keeps_sidecar() { let scenario = FailScenario::setup(); let dir = fixture(); @@ -472,7 +482,7 @@ async fn schema_apply_error_after_graph_movement_keeps_sidecar() { let v2_digest = desired.resource_digests["schema.knowledge"].clone(); { - let _failpoint = EngineScopedFailPoint::new("schema_apply.after_manifest_commit", "return"); + let _failpoint = ScopedFailPoint::new(omnigraph::failpoints::names::SCHEMA_APPLY_AFTER_MANIFEST_COMMIT, "return"); let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!( @@ -524,6 +534,7 @@ async fn schema_apply_error_after_graph_movement_keeps_sidecar() { /// moved, the ledger is stale, nothing acknowledged; the next run's sweep /// rolls the ledger forward with an audit entry and the run converges. #[tokio::test] +#[serial] async fn schema_crash_after_apply_rolls_state_forward() { let scenario = FailScenario::setup(); let dir = fixture(); @@ -534,7 +545,7 @@ async fn schema_crash_after_apply_rolls_state_forward() { let v2_digest = desired.resource_digests["schema.knowledge"].clone(); { - let _failpoint = ScopedFailPoint::new("cluster_apply.after_schema_apply", "return"); + let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_SCHEMA_APPLY, "return"); let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!(!out.state_written); @@ -608,13 +619,14 @@ async fn seed_approved_delete(dir: &Path) -> String { /// next run retires the stale intent (row 8) and the still-approved delete /// completes in the same run. #[tokio::test] +#[serial] async fn delete_crash_before_removal_reproposes() { let scenario = FailScenario::setup(); let dir = fixture(); let approval_id = seed_approved_delete(dir.path()).await; { - let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_delete", "return"); + let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_DELETE, "return"); let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!(dir.path().join("graphs/old.omni").exists()); @@ -650,6 +662,7 @@ async fn delete_crash_before_removal_reproposes() { /// nothing acknowledged; the next run's sweep rolls the tombstone forward, /// consumes the approval the sidecar carries, and audits the recovery. #[tokio::test] +#[serial] async fn delete_crash_after_removal_rolls_forward() { let scenario = FailScenario::setup(); let dir = fixture(); @@ -657,7 +670,7 @@ async fn delete_crash_after_removal_rolls_forward() { let state_before = fs::read(state_path(dir.path())).unwrap(); { - let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_delete", "return"); + let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_DELETE, "return"); let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!(!out.state_written); diff --git a/crates/omnigraph/src/db/graph_coordinator.rs b/crates/omnigraph/src/db/graph_coordinator.rs index b9bcb116..7a2c37c1 100644 --- a/crates/omnigraph/src/db/graph_coordinator.rs +++ b/crates/omnigraph/src/db/graph_coordinator.rs @@ -257,7 +257,7 @@ impl GraphCoordinator { /// fresh, so any existing commit-graph branch with this name is provably /// orphaned and is force-dropped before recreating. async fn create_commit_graph_branch(&mut self, branch: &str) -> Result<()> { - failpoints::maybe_fail("branch_create.after_manifest_branch_create")?; + failpoints::maybe_fail(crate::failpoints::names::BRANCH_CREATE_AFTER_MANIFEST_BRANCH_CREATE)?; let Some(commit_graph) = &mut self.commit_graph else { return Ok(()); }; @@ -306,7 +306,7 @@ impl GraphCoordinator { /// Best-effort, idempotent reclaim of the commit-graph branch `branch`. /// Tolerates an absent commit-graph dataset (a graph that never committed). async fn reclaim_commit_graph_branch(&mut self, branch: &str) -> Result<()> { - failpoints::maybe_fail("branch_delete.before_commit_graph_reclaim")?; + failpoints::maybe_fail(crate::failpoints::names::BRANCH_DELETE_BEFORE_COMMIT_GRAPH_RECLAIM)?; if let Some(commit_graph) = &mut self.commit_graph { commit_graph.force_delete_branch(branch).await } else if self @@ -486,7 +486,7 @@ impl GraphCoordinator { updates: &[SubTableUpdate], ) -> Result { let manifest_version = self.manifest.commit(updates).await?; - failpoints::maybe_fail("graph_publish.after_manifest_commit")?; + failpoints::maybe_fail(crate::failpoints::names::GRAPH_PUBLISH_AFTER_MANIFEST_COMMIT)?; Ok(manifest_version) } @@ -499,7 +499,7 @@ impl GraphCoordinator { .manifest .commit_with_expected(updates, expected_table_versions) .await?; - failpoints::maybe_fail("graph_publish.after_manifest_commit")?; + failpoints::maybe_fail(crate::failpoints::names::GRAPH_PUBLISH_AFTER_MANIFEST_COMMIT)?; Ok(manifest_version) } @@ -508,7 +508,7 @@ impl GraphCoordinator { changes: &[ManifestChange], ) -> Result { let manifest_version = self.manifest.commit_changes(changes).await?; - failpoints::maybe_fail("graph_publish.after_manifest_commit")?; + failpoints::maybe_fail(crate::failpoints::names::GRAPH_PUBLISH_AFTER_MANIFEST_COMMIT)?; Ok(manifest_version) } @@ -539,7 +539,7 @@ impl GraphCoordinator { self.manifest_incarnation().e_tag.as_deref(), )); }; - failpoints::maybe_fail("graph_publish.before_commit_append")?; + failpoints::maybe_fail(crate::failpoints::names::GRAPH_PUBLISH_BEFORE_COMMIT_APPEND)?; // Refresh the commit-graph head from storage before selecting the // parent. `append_commit` parents the new commit on the IN-MEMORY head // (`head_commit_id`, zero storage read), but the manifest was just @@ -571,7 +571,7 @@ impl GraphCoordinator { let commit_graph = self.commit_graph.as_mut().ok_or_else(|| { OmniError::manifest("branch merge requires _graph_commits.lance".to_string()) })?; - failpoints::maybe_fail("graph_publish.before_commit_append")?; + failpoints::maybe_fail(crate::failpoints::names::GRAPH_PUBLISH_BEFORE_COMMIT_APPEND)?; let graph_commit_id = commit_graph .append_merge_commit( current_branch.as_deref(), diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index d21e0fd1..48e1746c 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -416,7 +416,7 @@ pub(crate) async fn write_sidecar( ) -> Result { // Failpoint: models a storage put failure (S3 PutObject / fs write) // in Phase A — every writer must abort before any HEAD advance. - crate::failpoints::maybe_fail("recovery.sidecar_write")?; + crate::failpoints::maybe_fail(crate::failpoints::names::RECOVERY_SIDECAR_WRITE)?; debug_assert_eq!(sidecar.schema_version, SIDECAR_SCHEMA_VERSION); let uri = sidecar_uri(root_uri, &sidecar.operation_id); let json = serde_json::to_string_pretty(sidecar).map_err(|err| { @@ -457,7 +457,7 @@ pub(crate) async fn confirm_sidecar_phase_b( ) -> Result<()> { // Failpoint: models a storage failure on the confirmation write — the // pre-confirm sidecar stays on disk, so recovery rolls the operation back. - crate::failpoints::maybe_fail("recovery.sidecar_confirm")?; + crate::failpoints::maybe_fail(crate::failpoints::names::RECOVERY_SIDECAR_CONFIRM)?; for pin in &mut sidecar.tables { // Every pinned table MUST have an achieved version. A miss means the // pin set and the publish `updates` diverged — fail loudly at the @@ -489,7 +489,7 @@ pub(crate) async fn delete_sidecar( // Failpoint: models a storage delete failure (S3 DeleteObject) in // Phase D — callers swallow it (the write already published) and the // stale sidecar is healed by the next write or open. - crate::failpoints::maybe_fail("recovery.sidecar_delete")?; + crate::failpoints::maybe_fail(crate::failpoints::names::RECOVERY_SIDECAR_DELETE)?; storage.delete(&handle.sidecar_uri).await } @@ -507,7 +507,7 @@ pub(crate) async fn list_sidecars( // Failpoint: models a storage list failure (S3 ListObjectsV2) — every // consumer (open-time sweep, write-entry heal) must fail loudly // rather than silently skipping recovery. - crate::failpoints::maybe_fail("recovery.sidecar_list")?; + crate::failpoints::maybe_fail(crate::failpoints::names::RECOVERY_SIDECAR_LIST)?; let dir = recovery_dir_uri(root_uri); let mut uris = storage.list_dir(&dir).await?; // Sort by URI so the sweep processes sidecars deterministically. @@ -862,7 +862,7 @@ pub(crate) async fn heal_pending_sidecars_roll_forward( }; if process_sidecar( root_uri, - storage.as_ref(), + &storage, &branch_snapshot, &sidecar, RecoveryMode::RollForwardOnly, @@ -928,7 +928,7 @@ async fn discard_orphaned_branch_sidecar( .await?; // Failpoint: the residual window above — commit appended, audit // not yet durable. - crate::failpoints::maybe_fail("recovery.orphan_discard_audit_append")?; + crate::failpoints::maybe_fail(crate::failpoints::names::RECOVERY_ORPHAN_DISCARD_AUDIT_APPEND)?; audit .append(RecoveryAuditRecord { graph_commit_id, @@ -1036,7 +1036,7 @@ pub(crate) async fn recover_manifest_drift( }; process_sidecar( root_uri, - storage.as_ref(), + &storage, &branch_snapshot, &sidecar, mode, @@ -1051,7 +1051,7 @@ pub(crate) async fn recover_manifest_drift( async fn process_sidecar( root_uri: &str, - storage: &dyn StorageAdapter, + storage: &std::sync::Arc, snapshot: &Snapshot, sidecar: &RecoverySidecar, mode: RecoveryMode, @@ -1154,7 +1154,7 @@ async fn process_sidecar( ); } return record_audit_recovery_rollforward( - root_uri, storage, snapshot, sidecar, &states, + root_uri, storage.as_ref(), snapshot, sidecar, &states, ) .await .map(|()| true); @@ -1176,7 +1176,7 @@ async fn process_sidecar( writer_kind = ?sidecar.writer_kind, "recovery: rolling back sidecar (mixed or unexpected state)" ); - roll_back_sidecar(root_uri, storage, snapshot, sidecar, &states) + roll_back_sidecar(root_uri, storage.as_ref(), snapshot, sidecar, &states) .await .map(|()| true) } @@ -1191,7 +1191,7 @@ async fn process_sidecar( "recovery: rolling back SchemaApply sidecar because schema staging \ files were not promoted in this recovery pass" ); - roll_back_sidecar(root_uri, storage, snapshot, sidecar, &states) + roll_back_sidecar(root_uri, storage.as_ref(), snapshot, sidecar, &states) .await .map(|()| true) } @@ -1211,8 +1211,32 @@ async fn process_sidecar( "recovery: rolling forward sidecar (Phase B completed; \ Phase C did not land)" ); + // TOCTOU window: between `classify_table` (which read the manifest + // pin) and the publish CAS below, a concurrent live writer can + // advance the manifest past our expected version. The failpoint + // lets a test force that interleave deterministically. + crate::failpoints::maybe_fail( + crate::failpoints::names::RECOVERY_BEFORE_ROLL_FORWARD_PUBLISH, + )?; let (new_manifest_version, published_versions) = - roll_forward_all(root_uri, sidecar, &states, snapshot).await?; + match roll_forward_all(root_uri, sidecar, &states, snapshot).await { + Ok(published) => published, + // Convergence-idempotent (invariants 7 & 15): a roll-forward's + // postcondition is "the manifest reflects the sidecar's committed + // Lance state", NOT "this sweep personally won the CAS". A + // concurrent writer that advanced the manifest to/past that goal + // during the classify→publish window is convergence, not a logical + // conflict — so re-read and either record the already-achieved + // roll-forward or defer to the next pass; never fail the open. + // Any other error still propagates. + Err(err) if is_expected_version_mismatch(&err) => { + return converge_or_defer_roll_forward( + root_uri, storage, sidecar, &states, err, + ) + .await; + } + Err(err) => return Err(err), + }; // `to_version` records the ACTUAL Lance HEAD published for // each table (not pin.post_commit_pin, which is a lower bound // for loose-match writers like SchemaApply / EnsureIndices / @@ -1247,12 +1271,148 @@ async fn process_sidecar( outcomes, ) .await?; - delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?; + delete_sidecar_by_operation_id(root_uri, storage.as_ref(), &sidecar.operation_id) + .await?; Ok(true) } } } +/// True if `err` is the publisher's per-table CAS precondition failure +/// (`ExpectedVersionMismatch`) — the signal that a concurrent writer advanced +/// the manifest past what this caller expected. +fn is_expected_version_mismatch(err: &OmniError) -> bool { + matches!( + err, + OmniError::Manifest(m) + if matches!( + m.details, + Some(crate::error::ManifestConflictDetails::ExpectedVersionMismatch { .. }) + ) + ) +} + +/// Whether the live manifest already reflects everything this sidecar intended +/// to publish. +/// +/// SOUNDNESS: the per-table test is `current_version >= observed lance_head`, a +/// *proxy* for "the sidecar's committed Lance commit is an ancestor of the +/// published HEAD" (so a higher version is a descendant that contains it). The +/// proxy is sound only because of the heal-first invariant: every writer that +/// can advance a table's manifest version first heals pending sidecars +/// (`heal_pending_recovery_sidecars` runs at the head of `load`/`mutate`/ +/// schema-apply/branch-merge) or refuses on an unrecovered graph (`optimize`). +/// So the only path past `expected_version` is one that first publishes THIS +/// sidecar's commit at `lance_head` — version ordering then implies lineage +/// containment. A future writer that advances a pinned table WITHOUT healing +/// first (e.g. a non-heal-first `Overwrite` that replaces rows) would void this +/// proxy and must be re-validated by row-id lineage, not version ordering. +/// Added tables must be registered; tombstoned tables must be gone. +fn sidecar_intent_satisfied( + snapshot: &Snapshot, + sidecar: &RecoverySidecar, + states: &[ClassifiedTable], +) -> bool { + for (pin, state) in sidecar.tables.iter().zip(states.iter()) { + let current = snapshot + .entry(&pin.table_key) + .map(|e| e.table_version) + .unwrap_or(0); + if current < state.lance_head { + return false; + } + } + for reg in &sidecar.additional_registrations { + if snapshot.entry(®.table_key).is_none() { + return false; + } + } + for tomb in &sidecar.tombstones { + if snapshot.entry(&tomb.table_key).is_some() { + return false; + } + } + true +} + +/// Re-read the live manifest snapshot for the sidecar's branch. +async fn fresh_snapshot_for_sidecar( + root_uri: &str, + storage: &std::sync::Arc, + sidecar: &RecoverySidecar, +) -> Result { + let mut coordinator = match sidecar.branch.as_deref() { + Some(branch) if branch != "main" => { + GraphCoordinator::open_branch(root_uri, branch, std::sync::Arc::clone(storage)).await? + } + _ => GraphCoordinator::open(root_uri, std::sync::Arc::clone(storage)).await?, + }; + coordinator.refresh().await?; + Ok(coordinator.snapshot()) +} + +/// Convergence-idempotent handling of a roll-forward publish CAS that lost to a +/// concurrent writer (`ExpectedVersionMismatch`). A roll-forward's postcondition +/// is "the manifest reflects the sidecar's committed Lance state", not "this +/// sweep won the CAS" (invariants 7 & 15). Re-read the live manifest: +/// +/// - if it already reached the sidecar's goal, the work is done (just not by us) +/// — record the `RolledForward` audit and delete the sidecar idempotently; +/// - otherwise the manifest is progressing but not yet at the goal — leave the +/// sidecar for the next open / the live writer's own Phase D. +/// +/// Either way the open does NOT fail. A genuine logical conflict (a table below +/// `expected_version`, i.e. data lost) is not satisfiable here and re-surfaces +/// loudly via the classifier's `InvariantViolation` on the next pass. +/// See iss-schema-apply-reopen-recovery-race. +async fn converge_or_defer_roll_forward( + root_uri: &str, + storage: &std::sync::Arc, + sidecar: &RecoverySidecar, + states: &[ClassifiedTable], + conflict: OmniError, +) -> Result { + let fresh = fresh_snapshot_for_sidecar(root_uri, storage, sidecar).await?; + if !sidecar_intent_satisfied(&fresh, sidecar, states) { + warn!( + operation_id = sidecar.operation_id.as_str(), + writer_kind = ?sidecar.writer_kind, + "recovery: roll-forward publish lost a CAS and the manifest has not \ + yet reached the sidecar's goal; deferring to the next pass \ + (conflict: {conflict})" + ); + return Ok(false); + } + warn!( + operation_id = sidecar.operation_id.as_str(), + writer_kind = ?sidecar.writer_kind, + "recovery: roll-forward publish lost a CAS to a concurrent writer that \ + already reached the goal; converging (RolledForward audit + delete)" + ); + let outcomes: Vec = sidecar + .tables + .iter() + .map(|pin| TableOutcome { + table_key: pin.table_key.clone(), + from_version: pin.expected_version, + to_version: fresh + .entry(&pin.table_key) + .map(|e| e.table_version) + .unwrap_or(pin.post_commit_pin), + }) + .collect(); + record_audit( + root_uri, + sidecar, + fresh.version(), + RecoveryKind::RolledForward, + outcomes, + ) + .await?; + delete_sidecar_by_operation_id(root_uri, storage.as_ref(), &sidecar.operation_id).await?; + Ok(true) +} + #[derive(Debug, Clone, Copy)] struct ClassifiedTable { classification: TableClassification, @@ -1622,7 +1782,7 @@ async fn record_audit( // roll-back publish already landed — the sweep aborts, the sidecar // stays, and re-entry records the audit row (see the retry note in // the doc comment above). - crate::failpoints::maybe_fail("recovery.record_audit")?; + crate::failpoints::maybe_fail(crate::failpoints::names::RECOVERY_RECORD_AUDIT)?; // Non-main recovery commits must be appended on the sidecar branch's // commit graph, otherwise parent_commit_id comes from the global // main head. BranchMerge additionally records the source branch's diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index e1d7acf1..ac7a9630 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -287,7 +287,7 @@ impl Omnigraph { { return Err(OmniError::AlreadyInitialized { uri: root.clone() }); } - if let Err(err) = crate::failpoints::maybe_fail("init.after_schema_pg_written") { + if let Err(err) = crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_SCHEMA_PG_WRITTEN) { best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await; return Err(err); } @@ -1367,7 +1367,7 @@ impl Omnigraph { for (table_key, table_path) in cleanup_targets { let dataset_uri = self.storage().dataset_uri(&table_path); - let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup") + let outcome = match crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_DELETE_BEFORE_TABLE_CLEANUP) { Ok(()) => { self.storage() @@ -1939,14 +1939,14 @@ async fn init_storage_phase( if write_schema_pg { let schema_path = join_uri(root, SCHEMA_SOURCE_FILENAME); storage.write_text(&schema_path, schema_source).await?; - crate::failpoints::maybe_fail("init.after_schema_pg_written")?; + crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_SCHEMA_PG_WRITTEN)?; } write_schema_contract(root, storage.as_ref(), schema_ir).await?; - crate::failpoints::maybe_fail("init.after_schema_contract_written")?; + crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_SCHEMA_CONTRACT_WRITTEN)?; let coordinator = GraphCoordinator::init(root, catalog, Arc::clone(storage)).await?; - crate::failpoints::maybe_fail("init.after_coordinator_init")?; + crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_COORDINATOR_INIT)?; Ok(coordinator) } diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index 9a0a17ff..27eed0bb 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -477,7 +477,7 @@ async fn optimize_one_table( // Pin the per-writer Phase B → Phase C residual for optimize: Lance HEAD has // advanced but the manifest publish below hasn't run. - crate::failpoints::maybe_fail("optimize.post_phase_b_pre_manifest_commit")?; + crate::failpoints::maybe_fail(crate::failpoints::names::OPTIMIZE_POST_PHASE_B_PRE_MANIFEST_COMMIT)?; // Phase C: publish the compacted version to the manifest (one CAS commit, // expected = the version observed under the queue). On failure the sidecar @@ -579,7 +579,7 @@ pub async fn cleanup_all_tables( let results: Vec = futures::stream::iter(table_tasks.into_iter()) .map(|(table_key, full_path)| async move { let outcome: Result = async { - crate::failpoints::maybe_fail("cleanup.table_gc")?; + crate::failpoints::maybe_fail(crate::failpoints::names::CLEANUP_TABLE_GC)?; // `cleanup_old_versions` is a Lance-only maintenance API not // surfaced through `TableStorage` — see the optimize path // above for the same rationale. Unwrap via `into_dataset()`. @@ -737,7 +737,7 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result db.snapshot_for_branch(Some(&branch)).await, Err(injected) => Err(injected), }; @@ -816,7 +816,7 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result storage.force_delete_branch(&full_path, &branch).await, Err(injected) => Err(injected), }; @@ -946,7 +946,10 @@ mod tests { ds.create_branch("feature", base, None).await.unwrap(); } - let _fp = ScopedFailPoint::new("cleanup.resolve_branch_snapshot", "return"); + let _fp = ScopedFailPoint::new( + crate::failpoints::names::CLEANUP_RESOLVE_BRANCH_SNAPSHOT, + "return", + ); let stats = reconcile_orphaned_branches(&db).await.unwrap(); assert_eq!( diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index 3089641d..364f5a43 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -648,7 +648,7 @@ where // `recover_schema_state_files`: // - crash before commit → manifest unchanged; staging deleted on open // - crash after commit → manifest advanced; staging renamed on open - crate::failpoints::maybe_fail("schema_apply.before_staging_write")?; + crate::failpoints::maybe_fail(crate::failpoints::names::SCHEMA_APPLY_BEFORE_STAGING_WRITE)?; let staging_pg_uri = schema_source_staging_uri(&db.root_uri); db.storage @@ -656,7 +656,7 @@ where .await?; write_schema_contract_staging(&db.root_uri, db.storage.as_ref(), &desired_ir).await?; - crate::failpoints::maybe_fail("schema_apply.after_staging_write")?; + crate::failpoints::maybe_fail(crate::failpoints::names::SCHEMA_APPLY_AFTER_STAGING_WRITE)?; // `apply_schema` doesn't currently take an actor; system-attributed. let PublishedSnapshot { @@ -669,7 +669,7 @@ where .commit_changes_with_actor(&manifest_changes, None) .await?; - crate::failpoints::maybe_fail("schema_apply.after_manifest_commit")?; + crate::failpoints::maybe_fail(crate::failpoints::names::SCHEMA_APPLY_AFTER_MANIFEST_COMMIT)?; db.storage .rename_text(&staging_pg_uri, &schema_source_uri(&db.root_uri)) diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index ed5d0824..220dac4c 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -296,7 +296,7 @@ pub(super) async fn ensure_indices_for_branch( // (one commit_staged per index built) but the manifest publish below // hasn't run. Used by // `tests/failpoints.rs::ensure_indices_phase_b_failure_recovered_on_next_open`. - crate::failpoints::maybe_fail("ensure_indices.post_phase_b_pre_manifest_commit")?; + crate::failpoints::maybe_fail(crate::failpoints::names::ENSURE_INDICES_POST_PHASE_B_PRE_MANIFEST_COMMIT)?; if !updates.is_empty() { commit_prepared_updates_on_branch(db, branch, &updates, None).await?; @@ -571,7 +571,7 @@ pub(super) async fn open_owned_dataset_for_branch_write( Ok((ds, Some(active_branch.to_string()))) } source_branch => { - crate::failpoints::maybe_fail("fork.before_classify")?; + crate::failpoints::maybe_fail(crate::failpoints::names::FORK_BEFORE_CLASSIFY)?; // Authority check before forking: re-read the live manifest. If this // table is already forked on active_branch, a concurrent first-write // won the race and our snapshot is stale — that is a retryable @@ -667,7 +667,7 @@ pub(crate) async fn classify_fork_ref( // fresh-authority read (no-op without the `failpoints` feature). Lets a // test exercise the Indeterminate path — a read failure on a live branch // must classify as Indeterminate (skip), never Orphan (destroy). - let fresh = match crate::failpoints::maybe_fail("classify.fresh_read") { + let fresh = match crate::failpoints::maybe_fail(crate::failpoints::names::CLASSIFY_FRESH_READ) { Ok(()) => db.fresh_snapshot_for_branch(Some(branch)).await, Err(injected) => Err(injected), }; @@ -751,7 +751,7 @@ pub(super) async fn reclaim_orphaned_fork_and_refork( } } - crate::failpoints::maybe_fail("fork.before_reclaim")?; + crate::failpoints::maybe_fail(crate::failpoints::names::FORK_BEFORE_RECLAIM)?; db.storage() .force_delete_branch(full_path, active_branch) .await @@ -1014,7 +1014,7 @@ async fn stage_and_commit_btree( // to demonstrate that a stage-step failure in the staged-index // path (`stage_create_btree_index` succeeded; `commit_staged` not // yet called) leaves no Lance-HEAD drift on the touched table. - crate::failpoints::maybe_fail("ensure_indices.post_stage_pre_commit_btree")?; + crate::failpoints::maybe_fail(crate::failpoints::names::ENSURE_INDICES_POST_STAGE_PRE_COMMIT_BTREE)?; let new_ds = db .storage() .commit_staged(ds.clone(), staged) diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index 600fdf11..fb5e33b0 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -1125,7 +1125,7 @@ async fn publish_rewritten_merge_table( // rows are on Lance HEAD but the delete has not committed and the // achieved-version intent has not been recorded, so recovery must roll BACK. // See tests/failpoints.rs::branch_merge_rewrite_partial_after_merge_rolls_back. - crate::failpoints::maybe_fail("branch_merge.rewrite_after_merge_pre_delete")?; + crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_REWRITE_AFTER_MERGE_PRE_DELETE)?; // Phase 2: delete removed rows via deletion vectors. // @@ -1156,7 +1156,7 @@ async fn publish_rewritten_merge_table( // recorded, so recovery must roll BACK (the index is reconciler-owned derived // state, but the merge itself never reached its commit boundary). See // tests/failpoints.rs::branch_merge_rewrite_partial_after_delete_rolls_back. - crate::failpoints::maybe_fail("branch_merge.rewrite_after_delete_pre_index")?; + crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_REWRITE_AFTER_DELETE_PRE_INDEX)?; // Phase 3: rebuild indices. // @@ -1270,7 +1270,7 @@ async fn publish_adopted_delta( // have not committed and the achieved-version intent has not been recorded, so // recovery must roll BACK (not publish the appends-only state). See // tests/failpoints.rs::branch_merge_adopt_partial_after_append_rolls_back. - crate::failpoints::maybe_fail("branch_merge.adopt_after_append_pre_upsert")?; + crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_ADOPT_AFTER_APPEND_PRE_UPSERT)?; // Phase 1b: upsert the CHANGED rows. The merge_insert hash join is now // bounded to the genuinely-changed set, not the whole delta. It runs against @@ -1302,7 +1302,7 @@ async fn publish_adopted_delta( // has not committed and the achieved-version intent has not been recorded, so // recovery must roll BACK. See // tests/failpoints.rs::branch_merge_adopt_partial_after_upsert_rolls_back. - crate::failpoints::maybe_fail("branch_merge.adopt_after_upsert_pre_delete")?; + crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_ADOPT_AFTER_UPSERT_PRE_DELETE)?; // Phase 2: delete removed rows via deletion vectors (inline-commit residual, // same as the three-way path until Lance ships a public two-phase delete). @@ -1787,7 +1787,7 @@ impl Omnigraph { // (publish_*) AND the sidecar is confirmed, but the manifest publish // below hasn't run — so recovery rolls FORWARD. Used by // `tests/failpoints.rs::branch_merge_phase_b_failure_recovered_on_next_open`. - crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?; + crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_POST_PHASE_B_PRE_MANIFEST_COMMIT)?; let manifest_version = if updates.is_empty() { self.version().await diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index fbd07516..48a6c31a 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -818,7 +818,7 @@ impl Omnigraph { // across this failure so the next `Omnigraph::open`'s // recovery sweep can roll forward — see // `tests/failpoints.rs::recovery_rolls_forward_after_finalize_publisher_failure`. - crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?; + crate::failpoints::maybe_fail(crate::failpoints::names::MUTATION_POST_FINALIZE_PRE_PUBLISHER)?; self.commit_updates_on_branch_with_expected( requested.as_deref(), &updates, @@ -1305,7 +1305,7 @@ impl Omnigraph { crate::db::MutationOpKind::Delete, ) .await?; - crate::failpoints::maybe_fail("mutation.delete_node_pre_primary_delete")?; + crate::failpoints::maybe_fail(crate::failpoints::names::MUTATION_DELETE_NODE_PRE_PRIMARY_DELETE)?; let (_new_ds, delete_state) = self .storage_inline_residual() .delete_where(&full_path, ds, &pred_sql) diff --git a/crates/omnigraph/src/failpoints.rs b/crates/omnigraph/src/failpoints.rs index 461b73ef..786887b3 100644 --- a/crates/omnigraph/src/failpoints.rs +++ b/crates/omnigraph/src/failpoints.rs @@ -14,6 +14,47 @@ pub(crate) fn maybe_fail(_name: &str) -> Result<()> { Ok(()) } +/// Compile-checked catalog of every failpoint name in this crate. Call sites +/// (`maybe_fail`) and tests (`ScopedFailPoint` / the test rendezvous helper) +/// reference these constants instead of bare string literals, so a typo is a +/// compile error rather than a silently-never-firing failpoint. +pub mod names { + pub const BRANCH_CREATE_AFTER_MANIFEST_BRANCH_CREATE: &str = "branch_create.after_manifest_branch_create"; + pub const BRANCH_DELETE_BEFORE_COMMIT_GRAPH_RECLAIM: &str = "branch_delete.before_commit_graph_reclaim"; + pub const BRANCH_DELETE_BEFORE_TABLE_CLEANUP: &str = "branch_delete.before_table_cleanup"; + pub const BRANCH_MERGE_ADOPT_AFTER_APPEND_PRE_UPSERT: &str = "branch_merge.adopt_after_append_pre_upsert"; + pub const BRANCH_MERGE_ADOPT_AFTER_UPSERT_PRE_DELETE: &str = "branch_merge.adopt_after_upsert_pre_delete"; + pub const BRANCH_MERGE_POST_PHASE_B_PRE_MANIFEST_COMMIT: &str = "branch_merge.post_phase_b_pre_manifest_commit"; + pub const BRANCH_MERGE_REWRITE_AFTER_DELETE_PRE_INDEX: &str = "branch_merge.rewrite_after_delete_pre_index"; + pub const BRANCH_MERGE_REWRITE_AFTER_MERGE_PRE_DELETE: &str = "branch_merge.rewrite_after_merge_pre_delete"; + pub const CLASSIFY_FRESH_READ: &str = "classify.fresh_read"; + pub const CLEANUP_RECONCILE_FORK: &str = "cleanup.reconcile_fork"; + pub const CLEANUP_RESOLVE_BRANCH_SNAPSHOT: &str = "cleanup.resolve_branch_snapshot"; + pub const CLEANUP_TABLE_GC: &str = "cleanup.table_gc"; + pub const ENSURE_INDICES_POST_PHASE_B_PRE_MANIFEST_COMMIT: &str = "ensure_indices.post_phase_b_pre_manifest_commit"; + pub const ENSURE_INDICES_POST_STAGE_PRE_COMMIT_BTREE: &str = "ensure_indices.post_stage_pre_commit_btree"; + pub const FORK_BEFORE_CLASSIFY: &str = "fork.before_classify"; + pub const FORK_BEFORE_RECLAIM: &str = "fork.before_reclaim"; + pub const GRAPH_PUBLISH_AFTER_MANIFEST_COMMIT: &str = "graph_publish.after_manifest_commit"; + pub const GRAPH_PUBLISH_BEFORE_COMMIT_APPEND: &str = "graph_publish.before_commit_append"; + pub const INIT_AFTER_COORDINATOR_INIT: &str = "init.after_coordinator_init"; + pub const INIT_AFTER_SCHEMA_CONTRACT_WRITTEN: &str = "init.after_schema_contract_written"; + pub const INIT_AFTER_SCHEMA_PG_WRITTEN: &str = "init.after_schema_pg_written"; + pub const MUTATION_DELETE_NODE_PRE_PRIMARY_DELETE: &str = "mutation.delete_node_pre_primary_delete"; + pub const MUTATION_POST_FINALIZE_PRE_PUBLISHER: &str = "mutation.post_finalize_pre_publisher"; + pub const OPTIMIZE_POST_PHASE_B_PRE_MANIFEST_COMMIT: &str = "optimize.post_phase_b_pre_manifest_commit"; + pub const RECOVERY_BEFORE_ROLL_FORWARD_PUBLISH: &str = "recovery.before_roll_forward_publish"; + pub const RECOVERY_ORPHAN_DISCARD_AUDIT_APPEND: &str = "recovery.orphan_discard_audit_append"; + pub const RECOVERY_RECORD_AUDIT: &str = "recovery.record_audit"; + pub const RECOVERY_SIDECAR_CONFIRM: &str = "recovery.sidecar_confirm"; + pub const RECOVERY_SIDECAR_DELETE: &str = "recovery.sidecar_delete"; + pub const RECOVERY_SIDECAR_LIST: &str = "recovery.sidecar_list"; + pub const RECOVERY_SIDECAR_WRITE: &str = "recovery.sidecar_write"; + pub const SCHEMA_APPLY_AFTER_MANIFEST_COMMIT: &str = "schema_apply.after_manifest_commit"; + pub const SCHEMA_APPLY_AFTER_STAGING_WRITE: &str = "schema_apply.after_staging_write"; + pub const SCHEMA_APPLY_BEFORE_STAGING_WRITE: &str = "schema_apply.before_staging_write"; +} + #[cfg(feature = "failpoints")] pub struct ScopedFailPoint { name: String, @@ -27,6 +68,20 @@ impl ScopedFailPoint { name: name.to_string(), } } + + /// Register a callback failpoint with the same Drop-based cleanup as + /// `new`. Without the guard, a panic while the point is active would + /// leak the callback into the process-global registry and fire it under + /// later tests in the same binary. + pub fn with_callback(name: &str, callback: F) -> Self + where + F: Fn() + Send + Sync + 'static, + { + fail::cfg_callback(name, callback).expect("configure callback failpoint"); + Self { + name: name.to_string(), + } + } } #[cfg(feature = "failpoints")] diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 2365243d..1a96ec4c 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -602,7 +602,7 @@ async fn load_jsonl_reader( // staged commits have advanced Lance HEAD, but the manifest // publish has not run yet. Reuse the mutation failpoint name so // one failpoint pins the shared `MutationStaging` boundary. - crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?; + crate::failpoints::maybe_fail(crate::failpoints::names::MUTATION_POST_FINALIZE_PRE_PUBLISHER)?; db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id) .await?; // The recovery sidecar protects the per-table commit_staged → diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 96e61962..da318481 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -812,10 +812,12 @@ impl TableStore { /// Legacy inline-commit append: writes fragments AND commits in one /// call, advancing Lance HEAD as a side effect. Not on the /// `TableStorage` trait surface — the staged primitive `stage_append` - /// + `commit_staged` is the engine write path. This inherent - /// `pub(crate)` method survives only for recovery test setup. Do not - /// add new engine call sites — they re-introduce the multi-phase - /// commit drift the trait surface was designed to eliminate. + /// + `commit_staged` is the engine write path. This inherent method + /// survives only for in-source recovery test setup, so it is + /// `#[cfg(test)]`-gated: engine code physically cannot call it (which + /// enforces "no new call sites" by construction and silences the + /// dead-code warning the non-test lib build would otherwise emit). + #[cfg(test)] pub(crate) async fn append_batch( &self, dataset_uri: &str, diff --git a/crates/omnigraph/tests/failpoint_names_guard.rs b/crates/omnigraph/tests/failpoint_names_guard.rs new file mode 100644 index 00000000..b784f477 --- /dev/null +++ b/crates/omnigraph/tests/failpoint_names_guard.rs @@ -0,0 +1,77 @@ +//! Guard: failpoint names must come from the compile-checked `names` catalog +//! (`omnigraph::failpoints::names` / `omnigraph_cluster::failpoints::names`), +//! never bare string literals. +//! +//! The `names` consts give compile-time typo protection only if every call +//! site uses them. A bare `maybe_fail("typo.literal")` still compiles (the +//! arg is `&str`), so a typo there would silently never fire. This +//! source-walk closes that gap by construction — the same defense-in-depth +//! shape as `forbidden_apis.rs`. Add a new failpoint by adding its const to +//! the catalog first; this guard then forces every call site to reference it. + +use std::path::{Path, PathBuf}; + +/// Call-site prefixes whose first argument must be a `names::` constant. A +/// literal first argument makes the prefix immediately precede a `"`. +const LITERAL_CALL_PATTERNS: &[&str] = &[ + "maybe_fail(\"", + "ScopedFailPoint::new(\"", + "ScopedFailPoint::with_callback(\"", + "park_first(\"", +]; + +fn manifest_dir() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) +} + +/// Production call sites live under each crate's `src`; test call sites live +/// in the two failpoint integration binaries. This guard file is deliberately +/// not in the set (it names the patterns as literals itself). +fn files_to_scan() -> Vec { + let engine = manifest_dir(); + let cluster = engine.join("../omnigraph-cluster"); + let mut out = Vec::new(); + collect_rs(&engine.join("src"), &mut out); + collect_rs(&cluster.join("src"), &mut out); + out.push(engine.join("tests/failpoints.rs")); + out.push(cluster.join("tests/failpoints.rs")); + out +} + +fn collect_rs(dir: &Path, out: &mut Vec) { + let Ok(entries) = std::fs::read_dir(dir) else { + return; + }; + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { + collect_rs(&path, out); + } else if path.extension().is_some_and(|e| e == "rs") { + out.push(path); + } + } +} + +#[test] +fn failpoint_names_use_the_compile_checked_catalog() { + let mut violations = Vec::new(); + for file in files_to_scan() { + let Ok(contents) = std::fs::read_to_string(&file) else { + continue; + }; + for (idx, line) in contents.lines().enumerate() { + for pattern in LITERAL_CALL_PATTERNS { + if line.contains(pattern) { + violations.push(format!("{}:{}: {}", file.display(), idx + 1, line.trim())); + } + } + } + } + assert!( + violations.is_empty(), + "failpoint names must reference the compile-checked \ + `omnigraph::failpoints::names::*` (or `omnigraph_cluster::failpoints::names::*`) \ + constants, not string literals — a literal typo would silently never fire:\n{}", + violations.join("\n") + ); +} diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index 9d65bc1a..81ae4670 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -3,10 +3,10 @@ mod helpers; use fail::FailScenario; -use futures::FutureExt; use omnigraph::db::Omnigraph; use omnigraph::error::{ManifestErrorKind, OmniError}; use omnigraph::failpoints::ScopedFailPoint; +use omnigraph::failpoints::names; use omnigraph::loader::LoadMode; use serial_test::serial; @@ -32,12 +32,13 @@ fn node_table_uri(root: &str, type_name: &str) -> String { } #[tokio::test] +#[serial] async fn branch_create_failpoint_triggers() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); let db = Omnigraph::init(uri, helpers::TEST_SCHEMA).await.unwrap(); - let _failpoint = ScopedFailPoint::new("branch_create.after_manifest_branch_create", "return"); + let _failpoint = ScopedFailPoint::new(names::BRANCH_CREATE_AFTER_MANIFEST_BRANCH_CREATE, "return"); let err = db.branch_create("feature").await.unwrap_err(); assert!( @@ -52,6 +53,7 @@ async fn branch_create_failpoint_triggers() { // object-store error) must NOT fail the call: the branch is already gone, and // `cleanup` reconciles the stranded fork. The branch name is reusable after. #[tokio::test] +#[serial] async fn branch_delete_partial_failure_converges_via_cleanup() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -83,7 +85,7 @@ async fn branch_delete_partial_failure_converges_via_cleanup() { // Inject a failure during per-table cleanup, AFTER the manifest authority // flip. branch_delete must still succeed (best-effort reclaim). { - let _fp = ScopedFailPoint::new("branch_delete.before_table_cleanup", "return"); + let _fp = ScopedFailPoint::new(names::BRANCH_DELETE_BEFORE_TABLE_CLEANUP, "return"); main.branch_delete("feature").await.expect( "branch_delete is best-effort after the manifest flip: a cleanup-step \ failure must not fail the call", @@ -137,11 +139,12 @@ async fn branch_delete_partial_failure_converges_via_cleanup() { // prior delete; run cleanup". (This test was the inverse before the fork-as- // idempotent-reconcile fix; its flip is the signal the bug class is closed.) #[tokio::test] +#[serial] async fn recreate_over_orphaned_fork_self_heals_without_cleanup() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap().to_string(); - let mut main = helpers::init_and_load(&dir).await; + let main = helpers::init_and_load(&dir).await; main.branch_create("feature").await.unwrap(); let mut feature = Omnigraph::open(&uri).await.unwrap(); @@ -158,7 +161,7 @@ async fn recreate_over_orphaned_fork_self_heals_without_cleanup() { // Partial delete: leaves the Person fork orphaned (cleanup not yet run). { - let _fp = ScopedFailPoint::new("branch_delete.before_table_cleanup", "return"); + let _fp = ScopedFailPoint::new(names::BRANCH_DELETE_BEFORE_TABLE_CLEANUP, "return"); main.branch_delete("feature").await.unwrap(); } @@ -195,6 +198,7 @@ async fn recreate_over_orphaned_fork_self_heals_without_cleanup() { // leave the ref in place. It must not squeeze the ambiguity through // ExpectedVersionMismatch with expected == actual, which lies about the cause. #[tokio::test] +#[serial] async fn recreate_over_orphaned_fork_reports_indeterminate_authority_read() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -211,7 +215,7 @@ async fn recreate_over_orphaned_fork_reports_indeterminate_authority_read() { let row = r#"{"type":"Person","data":{"name":"Grace","age":37}}"#; { - let _fp = ScopedFailPoint::new("classify.fresh_read", "return"); + let _fp = ScopedFailPoint::new(names::CLASSIFY_FRESH_READ, "return"); let err = db .load_as("feature", None, row, LoadMode::Merge, None) .await @@ -257,6 +261,7 @@ async fn recreate_over_orphaned_fork_reports_indeterminate_authority_read() { // surfaced per-table in the returned stats, and the independent reconcile pass // still reclaimed an orphan. #[tokio::test] +#[serial] async fn cleanup_isolates_single_table_failure() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -272,7 +277,7 @@ async fn cleanup_isolates_single_table_failure() { } // One table's version GC fails once; the sweep must isolate it. - let _fp = ScopedFailPoint::new("cleanup.table_gc", "1*return"); + let _fp = ScopedFailPoint::new(names::CLEANUP_TABLE_GC, "1*return"); let stats = db .cleanup(omnigraph::db::CleanupPolicyOptions { keep_versions: Some(1), @@ -306,6 +311,7 @@ async fn cleanup_isolates_single_table_failure() { // isolated (logged, not propagated) so the sweep continues, and a later // cleanup converges. This is the loop the Devin finding was about. #[tokio::test] +#[serial] async fn cleanup_isolates_reconcile_failure() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -323,7 +329,7 @@ async fn cleanup_isolates_reconcile_failure() { // Inject a one-shot failure into the reconcile force-delete. The sweep must // not abort. { - let _fp = ScopedFailPoint::new("cleanup.reconcile_fork", "1*return"); + let _fp = ScopedFailPoint::new(names::CLEANUP_RECONCILE_FORK, "1*return"); db.cleanup(omnigraph::db::CleanupPolicyOptions { keep_versions: Some(1), older_than: None, @@ -359,6 +365,7 @@ async fn cleanup_isolates_reconcile_failure() { // per-table forks. A delete whose best-effort commit-graph reclaim fails leaves // a commit-graph orphan; the next cleanup must drop it. #[tokio::test] +#[serial] async fn cleanup_reclaims_orphaned_commit_graph_branch() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -369,7 +376,7 @@ async fn cleanup_reclaims_orphaned_commit_graph_branch() { // Delete, failing the commit-graph reclaim → commit-graph "feature" orphan // (manifest branch gone, commit-graph branch left behind). { - let _fp = ScopedFailPoint::new("branch_delete.before_commit_graph_reclaim", "return"); + let _fp = ScopedFailPoint::new(names::BRANCH_DELETE_BEFORE_COMMIT_GRAPH_RECLAIM, "return"); db.branch_delete("feature").await.unwrap(); } @@ -407,6 +414,7 @@ async fn cleanup_reclaims_orphaned_commit_graph_branch() { // the next run once the read succeeds. This pins the Indeterminate arm and the // don't-destroy-on-ambiguity rule end-to-end through cleanup. #[tokio::test] +#[serial] async fn reconcile_skips_fork_when_fresh_recheck_is_unavailable_then_converges() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -430,7 +438,7 @@ async fn reconcile_skips_fork_when_fresh_recheck_is_unavailable_then_converges() // With the fresh re-check failing, the fork's status is Indeterminate (the // branch is live but unreadable) → cleanup must SKIP it, not delete. { - let _fp = ScopedFailPoint::new("classify.fresh_read", "return"); + let _fp = ScopedFailPoint::new(names::CLASSIFY_FRESH_READ, "return"); db.cleanup(omnigraph::db::CleanupPolicyOptions { keep_versions: Some(1), older_than: None, @@ -465,6 +473,7 @@ async fn reconcile_skips_fork_when_fresh_recheck_is_unavailable_then_converges() // succeed (branch_create force-deletes a stale commit-graph ref since the // manifest branch is created fresh), instead of dying on the leftover ref. #[tokio::test] +#[serial] async fn branch_create_recreates_over_commit_graph_zombie() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -476,7 +485,7 @@ async fn branch_create_recreates_over_commit_graph_zombie() { { // Fail the best-effort commit-graph reclaim → commit-graph "feature" // zombie survives the delete (manifest authority still flips). - let _fp = ScopedFailPoint::new("branch_delete.before_commit_graph_reclaim", "return"); + let _fp = ScopedFailPoint::new(names::BRANCH_DELETE_BEFORE_COMMIT_GRAPH_RECLAIM, "return"); db.branch_delete("feature").await.unwrap(); } assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]); @@ -497,6 +506,7 @@ async fn branch_create_recreates_over_commit_graph_zombie() { // the branch does not half-exist. The existing failpoint fires right after the // manifest create, standing in for any post-authority failure. #[tokio::test] +#[serial] async fn branch_create_rolls_back_manifest_on_commit_graph_failure() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -505,7 +515,7 @@ async fn branch_create_rolls_back_manifest_on_commit_graph_failure() { .unwrap(); let err = { - let _fp = ScopedFailPoint::new("branch_create.after_manifest_branch_create", "return"); + let _fp = ScopedFailPoint::new(names::BRANCH_CREATE_AFTER_MANIFEST_BRANCH_CREATE, "return"); db.branch_create("feature").await.unwrap_err() }; assert!( @@ -524,42 +534,21 @@ async fn branch_create_rolls_back_manifest_on_commit_graph_failure() { // an orphan, so it must be a retryable "refresh and retry", never a misleading // "run cleanup". // -// Ordering is made deterministic (no sleeps) via a callback at the fork point: -// `compare_exchange` lets only the FIRST arrival (writer A) record readiness and -// block until released; later arrivals (writer B) fall through. The test waits -// on the readiness flag, lets B win and commit the fork, then releases A. -static FORK_A_AT_POINT: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); -static FORK_RELEASE_A: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); - +// Ordering is made deterministic (no fixed sleeps) via the shared rendezvous: +// it parks the first arrival (writer A) at the fork point until released; later +// arrivals (writer B) fall through. The test waits on the reached condition, +// lets B win and commit the fork, then releases A. #[tokio::test(flavor = "multi_thread")] +#[serial] async fn fork_collision_with_live_concurrent_fork_is_retryable() { - use std::sync::atomic::Ordering::SeqCst; - let _scenario = FailScenario::setup(); - FORK_A_AT_POINT.store(false, SeqCst); - FORK_RELEASE_A.store(false, SeqCst); let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap().to_string(); let main = helpers::init_and_load(&dir).await; main.branch_create("feature").await.unwrap(); - // First arrival (A) records readiness and blocks until released; the rest - // (B) fall through immediately. Bounded spin so a mistake can't hang forever. - fail::cfg_callback("fork.before_classify", || { - if FORK_A_AT_POINT - .compare_exchange(false, true, SeqCst, SeqCst) - .is_ok() - { - for _ in 0..2000 { - if FORK_RELEASE_A.load(SeqCst) { - break; - } - std::thread::sleep(std::time::Duration::from_millis(5)); - } - } - }) - .unwrap(); + let rv = helpers::failpoint::Rendezvous::park_first(names::FORK_BEFORE_CLASSIFY); let uri_a = uri.clone(); let writer_a = tokio::spawn(async move { @@ -574,17 +563,8 @@ async fn fork_collision_with_live_concurrent_fork_is_retryable() { .await }); - // Wait (bounded) until A is parked at the fork point. - for _ in 0..600 { - if FORK_A_AT_POINT.load(SeqCst) { - break; - } - tokio::time::sleep(std::time::Duration::from_millis(5)).await; - } - assert!( - FORK_A_AT_POINT.load(SeqCst), - "writer A never reached the fork point" - ); + // Wait until A is parked at the fork point. + rv.wait_until_reached().await; // B wins the fork and commits it. let mut b = Omnigraph::open(&uri).await.unwrap(); @@ -599,12 +579,11 @@ async fn fork_collision_with_live_concurrent_fork_is_retryable() { .unwrap(); // Release A; it resumes, re-reads the manifest, and sees the fork is live. - FORK_RELEASE_A.store(true, SeqCst); + rv.release(); let err = writer_a .await .unwrap() .expect_err("A's stale-snapshot fork should be a retryable conflict"); - fail::remove("fork.before_classify"); let msg = err.to_string(); assert!( @@ -618,13 +597,14 @@ async fn fork_collision_with_live_concurrent_fork_is_retryable() { } #[tokio::test(flavor = "multi_thread")] +#[serial] async fn graph_publish_failpoint_triggers_before_commit_append() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA) .await .unwrap(); - let _failpoint = ScopedFailPoint::new("graph_publish.before_commit_append", "return"); + let _failpoint = ScopedFailPoint::new(names::GRAPH_PUBLISH_BEFORE_COMMIT_APPEND, "return"); let err = mutate_main( &mut db, @@ -646,6 +626,7 @@ async fn graph_publish_failpoint_triggers_before_commit_append() { // state. #[tokio::test] +#[serial] async fn schema_apply_pre_commit_crash_rolls_forward_via_sidecar() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -653,7 +634,7 @@ async fn schema_apply_pre_commit_crash_rolls_forward_via_sidecar() { { let db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap(); - let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return"); + let _failpoint = ScopedFailPoint::new(names::SCHEMA_APPLY_AFTER_STAGING_WRITE, "return"); let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err(); assert!( err.to_string() @@ -689,6 +670,7 @@ async fn schema_apply_pre_commit_crash_rolls_forward_via_sidecar() { } #[tokio::test] +#[serial] async fn schema_apply_recovers_post_commit_crash() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -696,7 +678,7 @@ async fn schema_apply_recovers_post_commit_crash() { { let db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap(); - let _failpoint = ScopedFailPoint::new("schema_apply.after_manifest_commit", "return"); + let _failpoint = ScopedFailPoint::new(names::SCHEMA_APPLY_AFTER_MANIFEST_COMMIT, "return"); let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err(); assert!( err.to_string() @@ -714,6 +696,7 @@ async fn schema_apply_recovers_post_commit_crash() { } #[tokio::test] +#[serial] async fn schema_apply_recovers_partial_rename() { // Construct a partial-rename state: _schema.pg has been renamed in // (matching v2), but _schema.ir.json.staging and __schema_state.json.staging @@ -778,6 +761,7 @@ async fn schema_apply_recovers_partial_rename() { /// Continuous in-process recovery (no restart needed between failure /// and recovery) is the goal of a future background reconciler. #[tokio::test] +#[serial] async fn recovery_rolls_forward_after_finalize_publisher_failure() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -787,7 +771,7 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() { // Setup: trigger the residual. { let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); - let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new(names::MUTATION_POST_FINALIZE_PRE_PUBLISHER, "return"); // The mutation's finalize completes (commit_staged advances Lance // HEAD on node:Person AND writes a `__recovery/{ulid}.json` @@ -864,12 +848,103 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() { ); } -#[tokio::test] +/// Regression for iss-schema-apply-reopen-recovery-race: the open-time +/// recovery sweep's roll-forward must CONVERGE (not fatally error the open) +/// when a concurrent writer advances the manifest past the sidecar's pin +/// during the classify→publish window. +/// +/// Two concurrent `Omnigraph::open` sweeps race the same pending sidecar. +/// One is parked at `recovery.before_roll_forward_publish` (after it has +/// classified `RolledPastExpected`, before its publish CAS); the other falls +/// through, rolls the sidecar forward (manifest v → v+1), and deletes it. The +/// parked sweep then loses its publish CAS at the now-stale `expected = v`. +/// The manifest already reached the sidecar's goal, so this is convergence, +/// not a logical conflict — the open must succeed, not panic with +/// `ExpectedVersionMismatch`. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn open_sweep_roll_forward_converges_when_manifest_advances_concurrently() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + // Setup: leave one pending sidecar (node:Person at Lance v+1, manifest v). + { + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + let _failpoint = + ScopedFailPoint::new(names::MUTATION_POST_FINALIZE_PRE_PUBLISHER, "return"); + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .unwrap_err(); + } + assert_eq!( + std::fs::read_dir(dir.path().join("__recovery")) + .unwrap() + .count(), + 1, + "exactly one pending sidecar must persist for the sweep to roll forward" + ); + + // Park the FIRST sweep to reach the publish window; later arrivals fall + // through. wait_until_reached gates the second open so it is guaranteed + // to be the one that converges the sidecar. + let rv = helpers::failpoint::Rendezvous::park_first( + names::RECOVERY_BEFORE_ROLL_FORWARD_PUBLISH, + ); + + let uri_parked = uri.clone(); + let parked_open = tokio::spawn(async move { Omnigraph::open(&uri_parked).await }); + rv.wait_until_reached().await; + + // A concurrent open rolls the sidecar forward (manifest v → v+1) and + // deletes it, advancing the manifest past the parked sweep's pin. + let converging_open = Omnigraph::open(&uri) + .await + .expect("the second open's sweep should roll the sidecar forward and succeed"); + assert_eq!( + helpers::count_rows(&converging_open, "node:Person").await, + 1, + "the converging open must publish the rolled-forward Person row" + ); + + // Release the parked sweep: its publish CAS finds the manifest already at + // the goal. It must converge, not fail the open. + rv.release(); + parked_open + .await + .expect("the parked open task must not panic") + .expect( + "the open-time sweep must converge when the manifest already reached \ + the sidecar's goal, not fail the open with ExpectedVersionMismatch", + ); + + // The sidecar is gone and the graph is readable and consistent. + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + assert_eq!( + std::fs::read_dir(&recovery_dir).unwrap().count(), + 0, + "the sidecar must be gone after both sweeps converge" + ); + } + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!(helpers::count_rows(&db, "node:Person").await, 1); +} + +#[tokio::test(flavor = "multi_thread")] +#[serial] async fn inline_delete_conflict_writes_sidecar_before_rejecting() { + use std::sync::Arc; + let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap().to_string(); - let db = helpers::init_and_load(&dir).await; + let db = Arc::new(helpers::init_and_load(&dir).await); let pre_snapshot = db .snapshot_of(omnigraph::db::ReadTarget::branch("main")) @@ -879,39 +954,37 @@ async fn inline_delete_conflict_writes_sidecar_before_rejecting() { let person_uri = node_table_uri(&uri, "Person"); { - let _pause_delete = - ScopedFailPoint::new("mutation.delete_node_pre_primary_delete", "pause"); - let delete_params = helpers::params(&[("$name", "Alice")]); - let delete = db.mutate("main", MUTATION_QUERIES, "remove_person", &delete_params); - tokio::pin!(delete); - - let mut concurrent_update_succeeded = false; - for _ in 0..50 { - if delete.as_mut().now_or_never().is_some() { - panic!("delete mutation completed before primary-delete failpoint was released"); - } - let mut concurrent = Omnigraph::open_read_only(&uri).await.unwrap(); - if mutate_main( - &mut concurrent, - MUTATION_QUERIES, - "set_age", - &mixed_params(&[("$name", "Bob")], &[("$age", 26)]), - ) - .await - .is_ok() - { - concurrent_update_succeeded = true; - break; - } - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - } - assert!( - concurrent_update_succeeded, - "concurrent update must land while delete is paused" + // Park the delete at the primary-delete point. The concurrent update + // then lands deterministically before the delete resumes, so the + // delete's manifest CAS is guaranteed stale — no retry loop, no sleep. + let rv = helpers::failpoint::Rendezvous::park_first( + "mutation.delete_node_pre_primary_delete", ); - fail::remove("mutation.delete_node_pre_primary_delete"); - let err = delete.await.unwrap_err(); + let del_db = Arc::clone(&db); + let delete = tokio::spawn(async move { + let delete_params = helpers::params(&[("$name", "Alice")]); + del_db + .mutate("main", MUTATION_QUERIES, "remove_person", &delete_params) + .await + }); + + rv.wait_until_reached().await; + + // Concurrent update lands while the delete is parked. + let mut concurrent = Omnigraph::open_read_only(&uri).await.unwrap(); + mutate_main( + &mut concurrent, + MUTATION_QUERIES, + "set_age", + &mixed_params(&[("$name", "Bob")], &[("$age", 26)]), + ) + .await + .expect("concurrent update must land while delete is paused"); + + rv.release(); + + let err = delete.await.unwrap().unwrap_err(); assert!( err.to_string().contains("stale view of 'node:Person'") || err.to_string().contains("ExpectedVersionMismatch") @@ -943,6 +1016,7 @@ async fn inline_delete_conflict_writes_sidecar_before_rejecting() { } #[tokio::test] +#[serial] async fn recovery_rolls_forward_load_on_feature_branch() { use omnigraph::loader::LoadMode; @@ -973,7 +1047,7 @@ async fn recovery_rolls_forward_load_on_feature_branch() { .table_version; feature_parent_commit_id = branch_head_commit_id(dir.path(), "feature").await.unwrap(); - let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new(names::MUTATION_POST_FINALIZE_PRE_PUBLISHER, "return"); let err = db .load( "feature", @@ -1038,6 +1112,7 @@ async fn recovery_rolls_forward_load_on_feature_branch() { } #[tokio::test] +#[serial] async fn recovery_rolls_forward_load_overwrite() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -1054,7 +1129,7 @@ async fn recovery_rolls_forward_load_overwrite() { .unwrap(); parent_commit_id = branch_head_commit_id(dir.path(), "main").await.unwrap(); - let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new(names::MUTATION_POST_FINALIZE_PRE_PUBLISHER, "return"); let err = db .load( "main", @@ -1108,6 +1183,7 @@ async fn recovery_rolls_forward_load_overwrite() { } #[tokio::test] +#[serial] async fn recovery_rolls_forward_ensure_indices_on_feature_branch() { use lance::index::DatasetIndexExt; use omnigraph::loader::{LoadMode, load_jsonl}; @@ -1182,7 +1258,7 @@ async fn recovery_rolls_forward_ensure_indices_on_feature_branch() { { let _failpoint = - ScopedFailPoint::new("ensure_indices.post_phase_b_pre_manifest_commit", "return"); + ScopedFailPoint::new(names::ENSURE_INDICES_POST_PHASE_B_PRE_MANIFEST_COMMIT, "return"); let err = db.ensure_indices_on("feature").await.unwrap_err(); assert!( err.to_string().contains( @@ -1250,6 +1326,7 @@ async fn recovery_rolls_forward_ensure_indices_on_feature_branch() { /// on the same handle succeeds without restart and without /// ExpectedVersionMismatch. #[tokio::test] +#[serial] async fn refresh_runs_roll_forward_recovery_in_process() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -1259,7 +1336,7 @@ async fn refresh_runs_roll_forward_recovery_in_process() { // Setup: trigger the residual (sidecar persists; manifest unchanged). { - let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new(names::MUTATION_POST_FINALIZE_PRE_PUBLISHER, "return"); let err = mutate_main( &mut db, MUTATION_QUERIES, @@ -1329,6 +1406,7 @@ async fn refresh_runs_roll_forward_recovery_in_process() { /// drift in-process: no restart, no explicit `refresh()`, no /// `omnigraph repair`. #[tokio::test] +#[serial] async fn load_after_finalize_publisher_failure_heals_without_reopen() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -1342,7 +1420,7 @@ async fn load_after_finalize_publisher_failure_heals_without_reopen() { // commit_staged (Lance HEAD advances on three tables), then the // publisher is wedged before the manifest commit. { - let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new(names::MUTATION_POST_FINALIZE_PRE_PUBLISHER, "return"); let err = load_jsonl( &mut db, r#"{"type":"Person","data":{"name":"Alice","age":30}} @@ -1405,6 +1483,7 @@ async fn load_after_finalize_publisher_failure_heals_without_reopen() { /// recover — and the same handle must write normally once the fault /// clears (a transient storage error never wedges the graph). #[tokio::test] +#[serial] async fn sidecar_write_failure_aborts_load_with_no_head_advance() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -1422,7 +1501,7 @@ async fn sidecar_write_failure_aborts_load_with_no_head_advance() { .version; { - let _failpoint = ScopedFailPoint::new("recovery.sidecar_write", "return"); + let _failpoint = ScopedFailPoint::new(names::RECOVERY_SIDECAR_WRITE, "return"); let err = load_jsonl( &mut db, r#"{"type":"Person","data":{"name":"Alice","age":30}} @@ -1490,6 +1569,7 @@ async fn sidecar_write_failure_aborts_load_with_no_head_advance() { /// local filesystem backend. Skips unless `OMNIGRAPH_S3_TEST_BUCKET` is set /// (same gate as `s3_storage.rs`); CI runs it against RustFS. #[tokio::test] +#[serial] async fn s3_load_recovers_after_publisher_failure_without_reopen() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -1507,7 +1587,7 @@ async fn s3_load_recovers_after_publisher_failure_without_reopen() { // Failed load: commit_staged lands on S3, manifest publish does not; // the sidecar PUT went through the S3 adapter. { - let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new(names::MUTATION_POST_FINALIZE_PRE_PUBLISHER, "return"); let err = load_jsonl( &mut db, r#"{"type":"Person","data":{"name":"Alice","age":30}} @@ -1554,6 +1634,7 @@ async fn s3_load_recovers_after_publisher_failure_without_reopen() { /// documented retry tolerance in `record_audit`'s contract, exercised /// end-to-end through a real injected failure. #[tokio::test] +#[serial] async fn record_audit_failure_after_roll_forward_converges_on_next_write() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -1565,7 +1646,7 @@ async fn record_audit_failure_after_roll_forward_converges_on_next_write() { // Pending sidecar with real drift. { - let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new(names::MUTATION_POST_FINALIZE_PRE_PUBLISHER, "return"); load_jsonl( &mut db, r#"{"type":"Person","data":{"name":"Alice","age":30}} @@ -1581,7 +1662,7 @@ async fn record_audit_failure_after_roll_forward_converges_on_next_write() { // the audit write fails — the write must fail loudly and the sidecar // must survive for the retry. { - let _failpoint = ScopedFailPoint::new("recovery.record_audit", "return"); + let _failpoint = ScopedFailPoint::new(names::RECOVERY_RECORD_AUDIT, "return"); let err = load_jsonl( &mut db, r#"{"type":"Person","data":{"name":"Bob","age":25}} @@ -1645,6 +1726,7 @@ async fn record_audit_failure_after_roll_forward_converges_on_next_write() { /// (which would be consumer tolerance of drift). Once the fault clears, /// open recovers normally. #[tokio::test] +#[serial] async fn sidecar_list_failure_fails_write_and_open_loudly_then_clears() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -1656,7 +1738,7 @@ async fn sidecar_list_failure_fails_write_and_open_loudly_then_clears() { // Pending sidecar via the usual finalize → publisher failure. { - let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new(names::MUTATION_POST_FINALIZE_PRE_PUBLISHER, "return"); let err = load_jsonl( &mut db, r#"{"type":"Person","data":{"name":"Alice","age":30}} @@ -1674,7 +1756,7 @@ async fn sidecar_list_failure_fails_write_and_open_loudly_then_clears() { assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 1); } - let _failpoint = ScopedFailPoint::new("recovery.sidecar_list", "return"); + let _failpoint = ScopedFailPoint::new(names::RECOVERY_SIDECAR_LIST, "return"); // Write-entry heal: the list failure surfaces as the write's error — // no silent skip that would proceed over the pending sidecar. @@ -1725,6 +1807,7 @@ async fn sidecar_list_failure_fails_write_and_open_loudly_then_clears() { /// consumed by the next write's entry heal (attributed `RolledForward` /// audit row), not by an operator. #[tokio::test] +#[serial] async fn sidecar_delete_failure_keeps_write_success_and_next_write_heals() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -1735,7 +1818,7 @@ async fn sidecar_delete_failure_keeps_write_success_and_next_write_heals() { let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); { - let _failpoint = ScopedFailPoint::new("recovery.sidecar_delete", "return"); + let _failpoint = ScopedFailPoint::new(names::RECOVERY_SIDECAR_DELETE, "return"); // The load itself must succeed: commit_staged + manifest publish // landed; only the Phase D cleanup failed (swallowed + logged). load_jsonl( @@ -1783,6 +1866,7 @@ async fn sidecar_delete_failure_keeps_write_success_and_next_write_heals() { /// PUT failure must abort the merge before any target-table HEAD moves; /// retrying after the fault clears merges cleanly. #[tokio::test] +#[serial] async fn sidecar_write_failure_aborts_branch_merge_with_no_head_advance() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -1830,7 +1914,7 @@ async fn sidecar_write_failure_aborts_branch_merge_with_no_head_advance() { .version; { - let _failpoint = ScopedFailPoint::new("recovery.sidecar_write", "return"); + let _failpoint = ScopedFailPoint::new(names::RECOVERY_SIDECAR_WRITE, "return"); let err = db.branch_merge("feature", "main").await.unwrap_err(); assert!( err.to_string() @@ -1873,6 +1957,7 @@ async fn sidecar_write_failure_aborts_branch_merge_with_no_head_advance() { /// `refresh()` (which `refresh_runs_roll_forward_recovery_in_process` /// covers), no reopen. #[tokio::test] +#[serial] async fn mutation_after_finalize_publisher_failure_heals_without_reopen() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -1881,7 +1966,7 @@ async fn mutation_after_finalize_publisher_failure_heals_without_reopen() { let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); { - let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new(names::MUTATION_POST_FINALIZE_PRE_PUBLISHER, "return"); let err = mutate_main( &mut db, MUTATION_QUERIES, @@ -1936,6 +2021,7 @@ async fn mutation_after_finalize_publisher_failure_heals_without_reopen() { /// runs, so a long-lived handle can evolve the schema without a /// restart after a Phase B → Phase C failure. #[tokio::test] +#[serial] async fn schema_apply_after_finalize_publisher_failure_heals_without_reopen() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -1946,7 +2032,7 @@ async fn schema_apply_after_finalize_publisher_failure_heals_without_reopen() { let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); { - let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new(names::MUTATION_POST_FINALIZE_PRE_PUBLISHER, "return"); let err = load_jsonl( &mut db, r#"{"type":"Person","data":{"name":"Alice","age":30}} @@ -1995,6 +2081,7 @@ async fn schema_apply_after_finalize_publisher_failure_heals_without_reopen() { /// (with its recovery audit row) before the merge reads its target /// snapshot — not silently folded into the merge's publish. #[tokio::test] +#[serial] async fn branch_merge_after_finalize_publisher_failure_heals_without_reopen() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -2027,7 +2114,7 @@ async fn branch_merge_after_finalize_publisher_failure_heals_without_reopen() { // Failed load on MAIN: Person drifts ahead of the manifest with a // sidecar covering it. { - let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new(names::MUTATION_POST_FINALIZE_PRE_PUBLISHER, "return"); let err = load_jsonl( &mut db, r#"{"type":"Person","data":{"name":"Bob","age":25}} @@ -2079,6 +2166,7 @@ async fn branch_merge_after_finalize_publisher_failure_heals_without_reopen() { /// the audit already written — the retry must NOT append a second /// audit row for the same operation, only finish the delete. #[tokio::test] +#[serial] async fn orphaned_branch_discard_is_idempotent_across_delete_failure() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -2141,7 +2229,7 @@ async fn orphaned_branch_discard_is_idempotent_across_delete_failure() { // First write: the discard path writes its audit row, then the // sidecar delete fails (injected). The write fails loudly. { - let _failpoint = ScopedFailPoint::new("recovery.sidecar_delete", "return"); + let _failpoint = ScopedFailPoint::new(names::RECOVERY_SIDECAR_DELETE, "return"); let err = load_jsonl( &mut db, "{\"type\":\"Person\",\"data\":{\"name\":\"Bob\",\"age\":25}}\n", @@ -2185,6 +2273,7 @@ async fn orphaned_branch_discard_is_idempotent_across_delete_failure() { /// while a sidecar is pending. Sequenced failpoint: first list (entry /// heal) passes, second list (the guard) fails. #[tokio::test] +#[serial] async fn drift_guard_names_both_paths_when_sidecar_list_fails() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -2244,7 +2333,7 @@ async fn drift_guard_names_both_paths_when_sidecar_list_fails() { // First list (entry heal) passes and defers the sidecar; second // list (the guard's classification) fails. - let _failpoint = ScopedFailPoint::new("recovery.sidecar_list", "1*off->1*return"); + let _failpoint = ScopedFailPoint::new(names::RECOVERY_SIDECAR_LIST, "1*off->1*return"); let err = load_jsonl( &mut db, "{\"type\":\"Person\",\"data\":{\"name\":\"bob\",\"age\":25}}\n", @@ -2272,6 +2361,7 @@ async fn drift_guard_names_both_paths_when_sidecar_list_fails() { /// Gap): bounded commit-graph noise, never a lost or duplicated audit /// record under clean failures. #[tokio::test] +#[serial] async fn orphaned_branch_discard_converges_across_audit_append_failure() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -2332,7 +2422,7 @@ async fn orphaned_branch_discard_converges_across_audit_append_failure() { // fails (injected). The write fails loudly; the sidecar survives so // the discard is retried with the audit still owed. { - let _failpoint = ScopedFailPoint::new("recovery.orphan_discard_audit_append", "return"); + let _failpoint = ScopedFailPoint::new(names::RECOVERY_ORPHAN_DISCARD_AUDIT_APPEND, "return"); let err = load_jsonl( &mut db, "{\"type\":\"Person\",\"data\":{\"name\":\"Bob\",\"age\":25}}\n", @@ -2386,6 +2476,7 @@ async fn orphaned_branch_discard_converges_across_audit_append_failure() { /// writes against the stale catalog rejects rows of types the graph /// already has. #[tokio::test] +#[serial] async fn load_after_schema_apply_phase_b_failure_uses_recovered_catalog() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -2425,7 +2516,7 @@ edge Knows: Person -> Person { edge WorksAt: Person -> Company "#; { - let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return"); + let _failpoint = ScopedFailPoint::new(names::SCHEMA_APPLY_AFTER_STAGING_WRITE, "return"); let err = db.apply_schema(v2_schema).await.unwrap_err(); assert!( err.to_string() @@ -2467,6 +2558,7 @@ edge WorksAt: Person -> Company /// the resumed apply's own renames then fail on the missing sources: /// an error (and a corrupted catalog) for an otherwise-healthy apply. #[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] async fn heal_does_not_promote_live_schema_apply_staging() { use omnigraph::loader::LoadMode; use std::sync::Arc; @@ -2477,22 +2569,17 @@ async fn heal_does_not_promote_live_schema_apply_staging() { let db = Arc::new(Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap()); - // Pause the apply right after its staging files land (its sidecar is + // Park the apply right after its staging files land (its sidecar is // already on disk from Phase A; the manifest commit has not run). - let failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "pause"); + let rv = helpers::failpoint::Rendezvous::park_first(names::SCHEMA_APPLY_AFTER_STAGING_WRITE); let apply_db = Arc::clone(&db); let desired = format!("{}\nnode Tag {{ name: String @key }}\n", helpers::TEST_SCHEMA); let apply = tokio::spawn(async move { apply_db.apply_schema(&desired).await }); - // Wait until the apply is parked in the window: staging on disk. + // Wait until the apply is parked in the window (staging files written). + rv.wait_until_reached().await; let staging_pg = dir.path().join("_schema.pg.staging"); - for _ in 0..500 { - if staging_pg.exists() { - break; - } - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - } assert!(staging_pg.exists(), "schema apply never reached the paused window"); // Concurrent load on the same handle: its entry heal runs while the @@ -2516,7 +2603,7 @@ async fn heal_does_not_promote_live_schema_apply_staging() { // stole the apply's commit); fixed code leaves the load blocked on // the schema-apply serialization key until the apply finishes. tokio::time::sleep(std::time::Duration::from_millis(500)).await; - drop(failpoint); + rv.release(); let apply_result = apply.await.unwrap(); let _ = tokio::time::timeout(std::time::Duration::from_secs(30), load) @@ -2546,6 +2633,7 @@ async fn heal_does_not_promote_live_schema_apply_staging() { /// sidecar still on disk, Lance HEAD unchanged (no restore commit). /// Then drop + open: full sweep handles it. #[tokio::test] +#[serial] async fn refresh_defers_rollback_eligible_sidecar_to_next_open() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -2718,6 +2806,7 @@ async fn refresh_defers_rollback_eligible_sidecar_to_next_open() { /// on one table leaves OTHER tables untouched. Subsequent writes to /// non-drifted tables proceed normally; the drift is contained. #[tokio::test] +#[serial] async fn finalize_publisher_residual_does_not_drift_untouched_tables() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -2726,7 +2815,7 @@ async fn finalize_publisher_residual_does_not_drift_untouched_tables() { .unwrap(); { - let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new(names::MUTATION_POST_FINALIZE_PRE_PUBLISHER, "return"); let _ = mutate_main( &mut db, MUTATION_QUERIES, @@ -2764,6 +2853,7 @@ async fn finalize_publisher_residual_does_not_drift_untouched_tables() { /// a roll-forward-only heal and proceed — they do not refuse on a pending /// sidecar the way `optimize`/`repair` do — so the write succeeds with no drift. #[tokio::test] +#[serial] async fn ensure_indices_stage_btree_failure_leaves_existing_tables_writable() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -2791,7 +2881,7 @@ async fn ensure_indices_stage_btree_failure_leaves_existing_tables_writable() { // ensure_indices builds the deferred `age` BTREE on Person; the failpoint // fires between stage and commit, so Person's Lance HEAD does not move. let _failpoint = - ScopedFailPoint::new("ensure_indices.post_stage_pre_commit_btree", "return"); + ScopedFailPoint::new(names::ENSURE_INDICES_POST_STAGE_PRE_COMMIT_BTREE, "return"); let err = db.ensure_indices().await.unwrap_err(); assert!( err.to_string() @@ -2844,6 +2934,7 @@ fn assert_no_staging_files(graph: &std::path::Path) { // ExpectedVersionMismatch. #[tokio::test] +#[serial] async fn schema_apply_without_schema_staging_rolls_back_on_next_open() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -2871,7 +2962,7 @@ async fn schema_apply_without_schema_staging_rolls_back_on_next_open() { { let db = Omnigraph::open(&uri).await.unwrap(); - let _failpoint = ScopedFailPoint::new("schema_apply.before_staging_write", "return"); + let _failpoint = ScopedFailPoint::new(names::SCHEMA_APPLY_BEFORE_STAGING_WRITE, "return"); let v2_schema = r#"node Person { name: String @key age: I32? @@ -2941,6 +3032,7 @@ edge WorksAt: Person -> Company } #[tokio::test] +#[serial] async fn schema_apply_phase_b_failure_recovered_on_next_open() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -2976,7 +3068,7 @@ async fn schema_apply_phase_b_failure_recovered_on_next_open() { // written, but BEFORE the manifest publish. The recovery sidecar persists. { let db = Omnigraph::open(&uri).await.unwrap(); - let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return"); + let _failpoint = ScopedFailPoint::new(names::SCHEMA_APPLY_AFTER_STAGING_WRITE, "return"); // v2 schema: add a `city` property to Person AND add a new // `Tag` node type. The new property triggers the rewritten_tables // path (Phase B sidecar coverage). The new type changes the @@ -3089,6 +3181,7 @@ edge WorksAt: Person -> Company /// forward on next open so the manifest tracks the Lance HEAD — and the healed /// table must then accept a schema apply (the original bug's victim). #[tokio::test] +#[serial] async fn optimize_phase_b_failure_recovered_on_next_open() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -3122,7 +3215,7 @@ async fn optimize_phase_b_failure_recovered_on_next_open() { { let db = Omnigraph::open(&uri).await.unwrap(); let _failpoint = - ScopedFailPoint::new("optimize.post_phase_b_pre_manifest_commit", "return"); + ScopedFailPoint::new(names::OPTIMIZE_POST_PHASE_B_PRE_MANIFEST_COMMIT, "return"); let err = db.optimize().await.unwrap_err(); assert!( err.to_string().contains( @@ -3179,6 +3272,7 @@ async fn optimize_phase_b_failure_recovered_on_next_open() { } #[tokio::test] +#[serial] #[serial(branch_merge_phase_b)] async fn branch_merge_phase_b_failure_recovered_on_next_open() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -3235,7 +3329,7 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() { { let db = Omnigraph::open(&uri).await.unwrap(); let _failpoint = - ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return"); + ScopedFailPoint::new(names::BRANCH_MERGE_POST_PHASE_B_PRE_MANIFEST_COMMIT, "return"); let err = db.branch_merge("feature", "main").await.unwrap_err(); assert!( err.to_string().contains( @@ -3348,6 +3442,7 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() { /// silently: the adopt path advanced Lance HEAD but was unpinned, so the sweep /// found no sidecar and the merge was lost. #[tokio::test] +#[serial] #[serial(branch_merge_phase_b)] async fn branch_merge_adopt_with_delta_phase_b_failure_recovered_on_next_open() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -3390,7 +3485,7 @@ async fn branch_merge_adopt_with_delta_phase_b_failure_recovered_on_next_open() { let db = Omnigraph::open(&uri).await.unwrap(); let _failpoint = - ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return"); + ScopedFailPoint::new(names::BRANCH_MERGE_POST_PHASE_B_PRE_MANIFEST_COMMIT, "return"); let err = db.branch_merge("feature", "main").await.unwrap_err(); assert!( err.to_string().contains( @@ -3562,6 +3657,7 @@ async fn assert_partial_merge_rolls_back(scenario: MergeScenario, failpoint: &st } #[tokio::test] +#[serial] #[serial(branch_merge_phase_b)] async fn branch_merge_adopt_partial_after_append_rolls_back() { assert_partial_merge_rolls_back( @@ -3572,6 +3668,7 @@ async fn branch_merge_adopt_partial_after_append_rolls_back() { } #[tokio::test] +#[serial] #[serial(branch_merge_phase_b)] async fn branch_merge_adopt_partial_after_upsert_rolls_back() { assert_partial_merge_rolls_back( @@ -3582,6 +3679,7 @@ async fn branch_merge_adopt_partial_after_upsert_rolls_back() { } #[tokio::test] +#[serial] #[serial(branch_merge_phase_b)] async fn branch_merge_rewrite_partial_after_merge_rolls_back() { assert_partial_merge_rolls_back( @@ -3592,6 +3690,7 @@ async fn branch_merge_rewrite_partial_after_merge_rolls_back() { } #[tokio::test] +#[serial] #[serial(branch_merge_phase_b)] async fn branch_merge_rewrite_partial_after_delete_rolls_back() { assert_partial_merge_rolls_back( @@ -3618,6 +3717,7 @@ async fn branch_merge_rewrite_partial_after_delete_rolls_back() { /// the version-aware classifier reads v1 as the old loose generation → rolls /// forward → `bob` preserved. #[tokio::test] +#[serial] #[serial(branch_merge_phase_b)] async fn pre_upgrade_v1_branch_merge_sidecar_rolls_forward_not_back() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -3652,7 +3752,7 @@ async fn pre_upgrade_v1_branch_merge_sidecar_rolls_forward_not_back() { // sidecar lands on disk. { let db = Omnigraph::open(&uri).await.unwrap(); - let _fp = ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return"); + let _fp = ScopedFailPoint::new(names::BRANCH_MERGE_POST_PHASE_B_PRE_MANIFEST_COMMIT, "return"); db.branch_merge("feature", "main").await.unwrap_err(); } @@ -3694,6 +3794,7 @@ async fn pre_upgrade_v1_branch_merge_sidecar_rolls_forward_not_back() { /// target, and future merges between the same pair would lose /// already-up-to-date detection. #[tokio::test] +#[serial] #[serial(branch_merge_phase_b)] async fn branch_merge_phase_b_failure_recovered_on_non_main_target() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -3758,7 +3859,7 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() { { let db = Omnigraph::open(&uri).await.unwrap(); let _failpoint = - ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return"); + ScopedFailPoint::new(names::BRANCH_MERGE_POST_PHASE_B_PRE_MANIFEST_COMMIT, "return"); let err = db .branch_merge("source_branch", "target_branch") .await @@ -3819,6 +3920,7 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() { /// keeps RewriteMerged tables on active_branch), the contract assertion /// catches a regression that reverts to `entry.table_branch.clone()`. #[tokio::test] +#[serial] #[serial(branch_merge_phase_b)] async fn branch_merge_sidecar_pins_table_branch_to_active_branch() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -3860,7 +3962,7 @@ async fn branch_merge_sidecar_pins_table_branch_to_active_branch() { { let db = Omnigraph::open(&uri).await.unwrap(); let _failpoint = - ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return"); + ScopedFailPoint::new(names::BRANCH_MERGE_POST_PHASE_B_PRE_MANIFEST_COMMIT, "return"); let _ = db .branch_merge("source_branch", "target_branch") .await @@ -3921,6 +4023,7 @@ async fn branch_merge_sidecar_pins_table_branch_to_active_branch() { /// `needs_index_work_*` code path and the /// `recovery_ensure_indices_handles_empty_tables` integration test. #[tokio::test] +#[serial] async fn ensure_indices_phase_b_failure_does_not_leak_sidecar_when_no_work_needed() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -3951,7 +4054,7 @@ async fn ensure_indices_phase_b_failure_does_not_leak_sidecar_when_no_work_neede { let db = Omnigraph::open(&uri).await.unwrap(); let _failpoint = - ScopedFailPoint::new("ensure_indices.post_phase_b_pre_manifest_commit", "return"); + ScopedFailPoint::new(names::ENSURE_INDICES_POST_PHASE_B_PRE_MANIFEST_COMMIT, "return"); let err = db.ensure_indices().await.unwrap_err(); assert!( err.to_string().contains( @@ -4021,11 +4124,12 @@ async fn ensure_indices_phase_b_failure_does_not_leak_sidecar_when_no_work_neede // limitation. #[tokio::test] +#[serial] async fn init_failpoint_after_schema_pg_written_cleans_up_schema_file() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); - let _failpoint = ScopedFailPoint::new("init.after_schema_pg_written", "return"); + let _failpoint = ScopedFailPoint::new(names::INIT_AFTER_SCHEMA_PG_WRITTEN, "return"); let err = match Omnigraph::init(uri, helpers::TEST_SCHEMA).await { Ok(_) => panic!("expected Omnigraph::init to fail at the configured failpoint"), @@ -4047,11 +4151,12 @@ async fn init_failpoint_after_schema_pg_written_cleans_up_schema_file() { } #[tokio::test] +#[serial] async fn init_failpoint_after_schema_contract_written_cleans_up_all_schema_files() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); - let _failpoint = ScopedFailPoint::new("init.after_schema_contract_written", "return"); + let _failpoint = ScopedFailPoint::new(names::INIT_AFTER_SCHEMA_CONTRACT_WRITTEN, "return"); let err = match Omnigraph::init(uri, helpers::TEST_SCHEMA).await { Ok(_) => panic!("expected Omnigraph::init to fail at the configured failpoint"), @@ -4078,11 +4183,12 @@ async fn init_failpoint_after_schema_contract_written_cleans_up_all_schema_files } #[tokio::test] +#[serial] async fn init_failpoint_after_coordinator_init_cleans_up_schema_files() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); - let _failpoint = ScopedFailPoint::new("init.after_coordinator_init", "return"); + let _failpoint = ScopedFailPoint::new(names::INIT_AFTER_COORDINATOR_INIT, "return"); let err = match Omnigraph::init(uri, helpers::TEST_SCHEMA).await { Ok(_) => panic!("expected Omnigraph::init to fail at the configured failpoint"), @@ -4118,6 +4224,7 @@ async fn init_failpoint_after_coordinator_init_cleans_up_schema_files() { } #[tokio::test] +#[serial] async fn init_failpoint_returns_original_error_not_cleanup_error() { // The cleanup is best-effort. If `storage.delete` fails (e.g. transient // network blip on S3), the original init failpoint error must still @@ -4129,7 +4236,7 @@ async fn init_failpoint_returns_original_error_not_cleanup_error() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); - let _failpoint = ScopedFailPoint::new("init.after_schema_pg_written", "return"); + let _failpoint = ScopedFailPoint::new(names::INIT_AFTER_SCHEMA_PG_WRITTEN, "return"); let err = match Omnigraph::init(uri, helpers::TEST_SCHEMA).await { Ok(_) => panic!("expected Omnigraph::init to fail at the configured failpoint"), diff --git a/crates/omnigraph/tests/helpers/failpoint.rs b/crates/omnigraph/tests/helpers/failpoint.rs new file mode 100644 index 00000000..0c93670a --- /dev/null +++ b/crates/omnigraph/tests/helpers/failpoint.rs @@ -0,0 +1,84 @@ +//! Deterministic rendezvous for concurrent failpoint tests. +//! +//! The pattern: park the FIRST thread that hits a failpoint until the test +//! explicitly releases it, while later arrivals fall through. This replaces +//! fixed "guess" `sleep`s for cross-thread coordination — the test waits on +//! the *condition* (the point was reached) with a bounded timeout that fails +//! loudly, instead of betting a fixed duration is long enough. +//! +//! Extracted from the open-coded `AtomicBool` + callback pattern that +//! `fork_collision_with_live_concurrent_fork_is_retryable` proved out. +//! +//! The `reached` flag also doubles as a fired-assertion: a point that is +//! never hit makes [`Rendezvous::wait_until_reached`] panic, so a typo'd or +//! misplaced failpoint cannot pass silently. + +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; +use std::time::Duration; + +use omnigraph::failpoints::ScopedFailPoint; + +/// A parked-on-first-arrival rendezvous bound to a failpoint name. The +/// underlying callback is RAII-cleaned when this guard drops. +pub struct Rendezvous { + name: String, + reached: Arc, + release: Arc, + _failpoint: ScopedFailPoint, +} + +impl Rendezvous { + /// Register `name` so the FIRST thread to hit it records readiness and + /// blocks until [`release`](Self::release); later arrivals fall through + /// immediately. The park is bounded (~30s) so a test bug cannot hang the + /// suite forever. + pub fn park_first(name: &str) -> Self { + let reached = Arc::new(AtomicBool::new(false)); + let release = Arc::new(AtomicBool::new(false)); + let (cb_reached, cb_release) = (Arc::clone(&reached), Arc::clone(&release)); + let _failpoint = ScopedFailPoint::with_callback(name, move || { + if cb_reached + .compare_exchange(false, true, SeqCst, SeqCst) + .is_ok() + { + // ~30s bound (6000 * 5ms); released earlier on the common path. + for _ in 0..6000 { + if cb_release.load(SeqCst) { + return; + } + std::thread::sleep(Duration::from_millis(5)); + } + } + }); + Self { + name: name.to_string(), + reached, + release, + _failpoint, + } + } + + /// Async-wait until the parked thread has reached the failpoint, polling + /// the readiness condition with a bounded (~12s) timeout. Panics if the + /// point is never hit — the fired-assertion. + pub async fn wait_until_reached(&self) { + for _ in 0..2400 { + if self.reached.load(SeqCst) { + return; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + panic!("rendezvous: failpoint '{}' was never reached", self.name); + } + + /// Whether the parked thread has reached the failpoint yet. + pub fn reached(&self) -> bool { + self.reached.load(SeqCst) + } + + /// Release the parked thread so it resumes past the failpoint. + pub fn release(&self) { + self.release.store(true, SeqCst); + } +} diff --git a/crates/omnigraph/tests/helpers/mod.rs b/crates/omnigraph/tests/helpers/mod.rs index 131f91b8..7fc4f61d 100644 --- a/crates/omnigraph/tests/helpers/mod.rs +++ b/crates/omnigraph/tests/helpers/mod.rs @@ -1,6 +1,8 @@ #![allow(dead_code)] pub mod cost; +#[cfg(feature = "failpoints")] +pub mod failpoint; pub mod recovery; use arrow_array::{Array, RecordBatch, StringArray}; diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index eb6821a5..09a3061f 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -211,10 +211,21 @@ them explicit. sweep has the same exposure, and always has): it may roll a live foreign writer's sidecar forward, which degrades to publisher-CAS contention for data writes but can race the schema-staging promotion for a foreign live - schema apply. Multi-process writers on one graph are already documented - one-winner-CAS territory; closing this fully needs a cross-process - serialization primitive (e.g. lease-based use of the schema-apply lock - branch) — design it before promoting multi-process write topologies. + schema apply. The roll-**forward** CAS contention is now + convergence-idempotent: when the publish loses the CAS to a concurrent + writer that already reached the sidecar's goal, the sweep treats it as + convergence (record the `RolledForward` audit + delete) rather than a fatal + `ExpectedVersionMismatch`, and defers when the manifest is only partway + (`converge_or_defer_roll_forward` in `db/manifest/recovery.rs`; + iss-schema-apply-reopen-recovery-race). So a concurrent advance no longer + fails the open. The schema-staging promotion race and the destructive + roll-**back** path (Lance `Restore` "trumps" a concurrent commit, so it + cannot be made idempotent — iss-recovery-sweep-live-writer-rollback) still + need the cross-process primitive. Multi-process writers on one graph are + already documented one-winner-CAS territory; closing this fully needs a + cross-process serialization primitive (e.g. lease-based use of the + schema-apply lock branch) — design it before promoting multi-process write + topologies. - **Fork reclaim is in-process-safe only:** the first write to a table on a branch forks it (a Lance `create_branch` that advances state before the manifest publish). An interrupted fork (crash, or a cancelled request diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 941cec6e..c1527a89 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -46,7 +46,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `validators.rs` | Schema constraint enforcement (enum, range, unique, cardinality) across JSONL, insert, update paths | | `policy_engine_chassis.rs` | Engine-layer Cedar enforcement (MR-722): allow + deny through every `_as` writer via the SDK directly — no HTTP — proving embedded and CLI callers hit the same gate as the server, with action × scope shapes matching `authorize_request` | | `maintenance.rs` | `optimize` (compaction), `repair` (explicit uncovered-drift publish), and `cleanup` (version GC): empty/idempotent/no-op edges, policy validation, head preservation; `optimize` publishes its own compaction (`optimize_publishes_compaction_to_manifest_so_schema_apply_succeeds`), skips pre-existing uncovered drift (`optimize_skips_preexisting_manifest_head_drift`), and refuses to run while a `__recovery` sidecar is pending (`optimize_defers_when_recovery_sidecar_is_pending`); `repair` previews/heals verified maintenance drift, refuses raw semantic drift without `--force`, and forced repair publishes only by explicit operator choice; the index reconciler (iss-848): `index_build_tolerates_null_vector_rows` (an untrainable Vector column defers instead of aborting the build, sibling indexes still build) and `optimize_materializes_index_declared_but_unbuilt` (optimize creates a declared-but-deferred index) | -| `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the five per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`, `optimize_phase_b_failure_recovered_on_next_open`) and the write-entry in-process heal contract (the four `*_after_finalize_publisher_failure_heals_without_reopen` tests — load, mutation, schema apply, branch merge: a follow-up write on the same handle rolls a sidecar-covered residual forward without reopen/refresh) and the storage-fault matrix for the sidecar lifecycle (`recovery.sidecar_{write,delete,list}` / `recovery.record_audit` failpoints: Phase A put failure aborts with zero drift, Phase D delete failure is swallowed and healed by the next write, list failures are loud at heal and open, audit-append failures are retried to exactly one audit row; plus the bucket-gated `s3_load_recovers_after_publisher_failure_without_reopen`). | +| `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the five per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`, `optimize_phase_b_failure_recovered_on_next_open`) and the write-entry in-process heal contract (the four `*_after_finalize_publisher_failure_heals_without_reopen` tests — load, mutation, schema apply, branch merge: a follow-up write on the same handle rolls a sidecar-covered residual forward without reopen/refresh) and the storage-fault matrix for the sidecar lifecycle (`recovery.sidecar_{write,delete,list}` / `recovery.record_audit` failpoints: Phase A put failure aborts with zero drift, Phase D delete failure is swallowed and healed by the next write, list failures are loud at heal and open, audit-append failures are retried to exactly one audit row; plus the bucket-gated `s3_load_recovers_after_publisher_failure_without_reopen`) and the convergence-idempotent roll-forward regression (`open_sweep_roll_forward_converges_when_manifest_advances_concurrently`: two concurrent open-sweeps race one sidecar at the `recovery.before_roll_forward_publish` rendezvous; the CAS loser must converge, not fail the open — iss-schema-apply-reopen-recovery-race). | | `recovery.rs` | Open-time recovery sweep — sidecar I/O, classifier dispatch (NoMovement / RolledPastExpected / UnexpectedAtP1 / UnexpectedMultistep / InvariantViolation), all-or-nothing decision, roll-forward via `ManifestBatchPublisher::publish`, roll-back via `Dataset::restore`, audit row in `_graph_commit_recoveries.lance`, `OpenMode::ReadOnly` skip path | | `composite_flow.rs` | Compositional/narrative end-to-end stories — multi-step flows that compose mechanics covered by other test files. Catches integration regressions where individual operations all pass their unit tests but their composition breaks (sequential merges, post-merge main writes, time-travel through merge DAG, reopen consistency over multi-merge histories, post-optimize and post-cleanup strict writes). | @@ -64,10 +64,12 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav ## Failpoints (fault injection) -- Cargo feature: `failpoints = ["dep:fail", "fail/failpoints"]` (in `crates/omnigraph/Cargo.toml` **and** `crates/omnigraph-cluster/Cargo.toml`; the cluster feature does not enable the engine's). -- Wrappers: `crates/omnigraph/src/failpoints.rs` and `crates/omnigraph-cluster/src/failpoints.rs` expose `maybe_fail("name")` and `ScopedFailPoint` for tests. -- Call sites are inserted at sensitive transaction boundaries (branch create, graph publish commit, cluster apply's payload→state-write window, etc.). -- Activated tests: `crates/omnigraph/tests/failpoints.rs` and `crates/omnigraph-cluster/tests/failpoints.rs` (crash-mid-apply + state CAS race via `fail::cfg_callback`; integration binaries, never in-source — the fail registry is process-global). Run with `cargo test -p omnigraph-engine --features failpoints --test failpoints` / `cargo test -p omnigraph-cluster --features failpoints --test failpoints`. +- Cargo feature: `failpoints = ["dep:fail", "fail/failpoints"]` in `crates/omnigraph/Cargo.toml`; the cluster's `failpoints` feature additionally enables `omnigraph/failpoints` (`crates/omnigraph-cluster/Cargo.toml`), so the shared test guard is available to cluster tests. +- Wrappers: `crates/omnigraph/src/failpoints.rs` and `crates/omnigraph-cluster/src/failpoints.rs` each expose `maybe_fail("name")` (per-crate error type). The test-side config guard `ScopedFailPoint` (`new` for action strings, `with_callback` for callbacks; RAII `Drop` removes the point) lives **once** in the engine and is reused by both test binaries. +- **Names are compile-checked.** Every failpoint name is a `pub const` in `omnigraph::failpoints::names` (engine) / `omnigraph_cluster::failpoints::names` (cluster). Call sites and tests reference the constant, never a bare literal — a typo is a compile error, not a silently-never-firing point. Add a new failpoint by adding its const first. +- Call sites are inserted at sensitive transaction boundaries (branch create, graph publish commit, the recovery sweep's classify→roll-forward-publish window, cluster apply's payload→state-write window, etc.). +- **Serialize and rendezvous, never sleep.** The `fail` registry is process-global, so every failpoint test carries `#[serial]` (`serial_test`). For concurrent tests, use `helpers::failpoint::Rendezvous` (`tests/helpers/failpoint.rs`): `park_first(name)` parks the first thread to hit the point until `release()`, and `wait_until_reached().await` blocks on that condition (it doubles as a fired-assertion). Do not coordinate threads with fixed `sleep`s. +- Activated tests: `crates/omnigraph/tests/failpoints.rs` and `crates/omnigraph-cluster/tests/failpoints.rs` (integration binaries, never in-source — the fail registry is process-global). Run with `cargo test -p omnigraph-engine --features failpoints --test failpoints` / `cargo test -p omnigraph-cluster --features failpoints --test failpoints`. ## RustFS / S3 integration