Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/omnigraph-cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ tokio = { workspace = true }
ulid = { workspace = true }

[dev-dependencies]
serial_test = "3"
tempfile = { workspace = true }
tokio = { workspace = true }
53 changes: 19 additions & 34 deletions crates/omnigraph-cluster/src/failpoints.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
//! Fault-injection hooks for the cluster apply protocol, mirroring the
//! engine's `omnigraph::failpoints` pattern. With the `failpoints` feature
//! off, every call site compiles to `Ok(())`.
//!
//! Only `maybe_fail` lives here — it returns the cluster's [`Diagnostic`]
//! error type. The test-side configuration guard is shared: use
//! [`omnigraph::failpoints::ScopedFailPoint`], which is registry-only
//! (error-type agnostic) and reachable because the cluster's `failpoints`
//! feature enables `omnigraph/failpoints`. One `ScopedFailPoint`, in the
//! lowest crate, avoids a drifting duplicate.

use crate::Diagnostic;

Expand All @@ -19,38 +26,16 @@ pub(crate) fn maybe_fail(_name: &str) -> Result<(), Diagnostic> {
Ok(())
}

#[cfg(feature = "failpoints")]
pub struct ScopedFailPoint {
name: String,
}

#[cfg(feature = "failpoints")]
impl ScopedFailPoint {
pub fn new(name: &str, action: &str) -> Self {
fail::cfg(name, action).expect("configure failpoint");
Self {
name: name.to_string(),
}
}

/// Register a callback failpoint with the same Drop-based cleanup as
/// `new`. Without the guard, a panic while the point is active would
/// leak the callback into the process-global registry and fire it under
/// later tests in the same binary.
pub fn with_callback<F>(name: &str, callback: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
fail::cfg_callback(name, callback).expect("configure callback failpoint");
Self {
name: name.to_string(),
}
}
}

#[cfg(feature = "failpoints")]
impl Drop for ScopedFailPoint {
fn drop(&mut self) {
fail::remove(&self.name);
}
/// Compile-checked catalog of this crate's apply-protocol failpoint names.
/// Engine-scoped names referenced from cluster tests live in
/// [`omnigraph::failpoints::names`].
pub mod names {
pub const CLUSTER_APPLY_AFTER_GRAPH_CREATE: &str = "cluster_apply.after_graph_create";
pub const CLUSTER_APPLY_AFTER_GRAPH_DELETE: &str = "cluster_apply.after_graph_delete";
pub const CLUSTER_APPLY_AFTER_PAYLOAD_PHASE: &str = "cluster_apply.after_payload_phase";
pub const CLUSTER_APPLY_AFTER_SCHEMA_APPLY: &str = "cluster_apply.after_schema_apply";
pub const CLUSTER_APPLY_BEFORE_GRAPH_CREATE: &str = "cluster_apply.before_graph_create";
pub const CLUSTER_APPLY_BEFORE_GRAPH_DELETE: &str = "cluster_apply.before_graph_delete";
pub const CLUSTER_APPLY_BEFORE_SCHEMA_APPLY: &str = "cluster_apply.before_schema_apply";
pub const CLUSTER_APPLY_BEFORE_STATE_WRITE: &str = "cluster_apply.before_state_write";
}
16 changes: 8 additions & 8 deletions crates/omnigraph-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ pub async fn apply_config_dir_with_options(
continue;
}
};
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_graph_create") {
if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_CREATE) {
// Simulated crash before the init: the sidecar stays for the
// sweep (row 1: root absent -> intent removed next run).
diagnostics.push(diagnostic);
Expand Down Expand Up @@ -587,7 +587,7 @@ pub async fn apply_config_dir_with_options(
// Crash point: the graph exists, the cluster state does not record it
// yet. A failure here must acknowledge nothing; the next run's sweep
// rolls the ledger forward (row 4).
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_graph_create") {
if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_CREATE) {
diagnostics.push(diagnostic);
return early_return(
display_path(&desired.config_dir),
Expand Down Expand Up @@ -727,7 +727,7 @@ pub async fn apply_config_dir_with_options(
continue;
}
};
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_schema_apply") {
if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_SCHEMA_APPLY) {
// Simulated crash before the engine call: the sidecar stays; the
// sweep retires it next run (ledger still consistent with live).
diagnostics.push(diagnostic);
Expand Down Expand Up @@ -787,7 +787,7 @@ pub async fn apply_config_dir_with_options(
}
// Crash point: the manifest moved, the ledger does not record it yet.
// A failure here acknowledges nothing; the sweep rolls forward.
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_schema_apply") {
if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_SCHEMA_APPLY) {
diagnostics.push(diagnostic);
return early_return(
display_path(&desired.config_dir),
Expand Down Expand Up @@ -872,7 +872,7 @@ pub async fn apply_config_dir_with_options(
// Crash point: payloads are on disk, state has not moved. A failure here
// must leave state.json byte-identical and acknowledge nothing; re-running
// apply repairs via the skip-if-exists blob reuse.
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_payload_phase") {
if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_PAYLOAD_PHASE) {
diagnostics.push(diagnostic);
return early_return(
display_path(&desired.config_dir),
Expand Down Expand Up @@ -949,7 +949,7 @@ pub async fn apply_config_dir_with_options(
continue;
}
};
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_graph_delete") {
if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_DELETE) {
// Simulated crash before removal: row 8 retires the intent and
// the still-valid approval lets a later run retry.
diagnostics.push(diagnostic);
Expand All @@ -974,7 +974,7 @@ pub async fn apply_config_dir_with_options(
}
// Crash point: the root is gone, the ledger does not record it yet.
// The sweep rolls forward (row 7b) and consumes the approval.
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_graph_delete") {
if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_DELETE) {
diagnostics.push(diagnostic);
return early_return(
display_path(&desired.config_dir),
Expand Down Expand Up @@ -1080,7 +1080,7 @@ pub async fn apply_config_dir_with_options(
// persisted-statuses revert contract below is exercised; a cfg_callback
// on this point can mutate state.json to simulate a concurrent writer,
// making write_state's CAS check fail organically.
let write_result = match failpoints::maybe_fail("cluster_apply.before_state_write") {
let write_result = match failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_STATE_WRITE) {
Ok(()) => {
backend
.write_state(&new_state, expected_cas.as_deref(), &mut observations)
Expand Down
39 changes: 26 additions & 13 deletions crates/omnigraph-cluster/tests/failpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ use std::fs;
use std::path::{Path, PathBuf};

use fail::FailScenario;
use serial_test::serial;
use omnigraph::db::Omnigraph;
use omnigraph::failpoints::ScopedFailPoint as EngineScopedFailPoint;
use omnigraph_cluster::failpoints::ScopedFailPoint;
// One ScopedFailPoint for both engine- and cluster-scoped failpoint names:
// it is registry-only (error-type agnostic) and lives in the lowest crate.
use omnigraph::failpoints::ScopedFailPoint;
use omnigraph_cluster::{
ApplyOptions, apply_config_dir, apply_config_dir_with_options, approve_config_dir,
validate_config_dir,
Expand Down Expand Up @@ -105,12 +107,13 @@ fn query_blob(config_dir: &Path, digests: &BTreeMap<String, String>) -> PathBuf
}

#[tokio::test]
#[serial]
async fn failpoint_wiring_returns_injected_diagnostic() {
let scenario = FailScenario::setup();
let dir = fixture();
seed_applyable_state(dir.path());

let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_PAYLOAD_PHASE, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(out.diagnostics.iter().any(|diagnostic| {
Expand All @@ -127,14 +130,15 @@ async fn failpoint_wiring_returns_injected_diagnostic() {
/// state.json is byte-identical, nothing is acknowledged — and a plain re-run
/// repairs by trusting the existing content-addressed blobs.
#[tokio::test]
#[serial]
async fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() {
let scenario = FailScenario::setup();
let dir = fixture();
let digests = seed_applyable_state(dir.path());
let state_before = fs::read(state_path(dir.path())).unwrap();

{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_PAYLOAD_PHASE, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(!out.state_written);
Expand Down Expand Up @@ -169,6 +173,7 @@ async fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() {
/// (possible under `state.lock: false`) must surface `state_cas_mismatch`,
/// acknowledge nothing, and leave the concurrent writer's state on disk.
#[tokio::test]
#[serial]
async fn apply_cas_race_surfaces_state_cas_mismatch() {
let scenario = FailScenario::setup();
let dir = fixture();
Expand All @@ -179,7 +184,7 @@ async fn apply_cas_race_surfaces_state_cas_mismatch() {
// after apply read it but before apply writes. RAII-guarded so a panic
// inside apply cannot leak the callback into the global registry.
let race_path = state_path(dir.path());
let failpoint = ScopedFailPoint::with_callback("cluster_apply.before_state_write", move || {
let failpoint = ScopedFailPoint::with_callback(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_BEFORE_STATE_WRITE, move || {
let mut state: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&race_path).unwrap()).unwrap();
state["state_revision"] = serde_json::json!(99);
Expand Down Expand Up @@ -256,13 +261,14 @@ fn recovery_sidecars(config_dir: &Path) -> Vec<PathBuf> {
/// The next run's sweep removes the intent (row 1) and the same run creates
/// the graph and converges.
#[tokio::test]
#[serial]
async fn create_crash_before_init_recovers_via_sweep() {
let scenario = FailScenario::setup();
let dir = fixture();
seed_empty_state(dir.path());

{
let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_create", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_CREATE, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(out.diagnostics.iter().any(|diagnostic| {
Expand Down Expand Up @@ -298,14 +304,15 @@ async fn create_crash_before_init_recovers_via_sweep() {
/// ledger is stale, nothing was acknowledged. The next run's sweep rolls the
/// ledger forward (row 4) with an audit entry, and the run converges.
#[tokio::test]
#[serial]
async fn create_crash_after_init_rolls_state_forward() {
let scenario = FailScenario::setup();
let dir = fixture();
seed_empty_state(dir.path());
let state_before = fs::read(dir.path().join("__cluster/state.json")).unwrap();

{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_create", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_CREATE, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(!out.state_written);
Expand Down Expand Up @@ -385,6 +392,7 @@ async fn live_schema_digest(dir: &Path) -> String {
/// live schema and ledger are untouched; the next run's sweep retires the
/// stale intent and the same run applies and converges.
#[tokio::test]
#[serial]
async fn schema_crash_before_apply_recovers_via_sweep() {
let scenario = FailScenario::setup();
let dir = fixture();
Expand All @@ -393,7 +401,7 @@ async fn schema_crash_before_apply_recovers_via_sweep() {
fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap();

{
let _failpoint = ScopedFailPoint::new("cluster_apply.before_schema_apply", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_BEFORE_SCHEMA_APPLY, "return");
let out = apply_config_dir_with_options(
dir.path(),
ApplyOptions {
Expand Down Expand Up @@ -425,6 +433,7 @@ async fn schema_crash_before_apply_recovers_via_sweep() {
/// the graph manifest moves. The defensive cleanup proof should remove the
/// cluster sidecar immediately so a pre-movement error cannot brick boot.
#[tokio::test]
#[serial]
async fn schema_apply_error_before_graph_movement_removes_sidecar() {
let scenario = FailScenario::setup();
let dir = fixture();
Expand All @@ -433,7 +442,7 @@ async fn schema_apply_error_before_graph_movement_removes_sidecar() {
fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap();

{
let _failpoint = EngineScopedFailPoint::new("schema_apply.before_staging_write", "return");
let _failpoint = ScopedFailPoint::new(omnigraph::failpoints::names::SCHEMA_APPLY_BEFORE_STAGING_WRITE, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(
Expand Down Expand Up @@ -462,6 +471,7 @@ async fn schema_apply_error_before_graph_movement_removes_sidecar() {
/// prove this is a pre-movement failure, so the sidecar must survive for
/// explicit recovery/quarantine instead of being cleaned up defensively.
#[tokio::test]
#[serial]
async fn schema_apply_error_after_graph_movement_keeps_sidecar() {
let scenario = FailScenario::setup();
let dir = fixture();
Expand All @@ -472,7 +482,7 @@ async fn schema_apply_error_after_graph_movement_keeps_sidecar() {
let v2_digest = desired.resource_digests["schema.knowledge"].clone();

{
let _failpoint = EngineScopedFailPoint::new("schema_apply.after_manifest_commit", "return");
let _failpoint = ScopedFailPoint::new(omnigraph::failpoints::names::SCHEMA_APPLY_AFTER_MANIFEST_COMMIT, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(
Expand Down Expand Up @@ -524,6 +534,7 @@ async fn schema_apply_error_after_graph_movement_keeps_sidecar() {
/// moved, the ledger is stale, nothing acknowledged; the next run's sweep
/// rolls the ledger forward with an audit entry and the run converges.
#[tokio::test]
#[serial]
async fn schema_crash_after_apply_rolls_state_forward() {
let scenario = FailScenario::setup();
let dir = fixture();
Expand All @@ -534,7 +545,7 @@ async fn schema_crash_after_apply_rolls_state_forward() {
let v2_digest = desired.resource_digests["schema.knowledge"].clone();

{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_schema_apply", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_SCHEMA_APPLY, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(!out.state_written);
Expand Down Expand Up @@ -608,13 +619,14 @@ async fn seed_approved_delete(dir: &Path) -> String {
/// next run retires the stale intent (row 8) and the still-approved delete
/// completes in the same run.
#[tokio::test]
#[serial]
async fn delete_crash_before_removal_reproposes() {
let scenario = FailScenario::setup();
let dir = fixture();
let approval_id = seed_approved_delete(dir.path()).await;

{
let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_delete", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_DELETE, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(dir.path().join("graphs/old.omni").exists());
Expand Down Expand Up @@ -650,14 +662,15 @@ async fn delete_crash_before_removal_reproposes() {
/// nothing acknowledged; the next run's sweep rolls the tombstone forward,
/// consumes the approval the sidecar carries, and audits the recovery.
#[tokio::test]
#[serial]
async fn delete_crash_after_removal_rolls_forward() {
let scenario = FailScenario::setup();
let dir = fixture();
let approval_id = seed_approved_delete(dir.path()).await;
let state_before = fs::read(state_path(dir.path())).unwrap();

{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_delete", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_DELETE, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(!out.state_written);
Expand Down
Loading