Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions src/controllers/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,36 @@ pub fn persistent_schemas_migration_settled(replica: &PostgresPhysicalReplica) -
.is_none_or(SchemaMigrationPhase::is_settled)
}

/// True when a snapshot is already covered by an existing restore and must
/// not be restored again.
///
/// A snapshot is covered when we've already recorded it as verified
/// (`status.verifiedSnapshotId`, the ephemeral marker that outlives the torn
/// down restore) or when any non-failed restore for this replica is already
/// working on it (Pending/Restoring/Ready/Switching/Active). Failed restores
/// don't count — the failure backoff path is allowed to retry the snapshot.
///
/// Without this an ephemeral `verify` replica whose restore has been torn
/// down would re-create a restore for the same snapshot — the snapshot-list
/// handler otherwise only checks the *active* restore, which is gone after
/// teardown — and double-report the verification to canopy.
fn snapshot_already_covered(
snapshot_id: &str,
verified_snapshot_id: Option<&str>,
restores: &[PostgresPhysicalRestore],
) -> bool {
if verified_snapshot_id == Some(snapshot_id) {
return true;
}
restores.iter().any(|r| {
r.spec.snapshot == snapshot_id
&& !matches!(
r.status.as_ref().and_then(|s| s.phase.as_ref()),
Some(&RestorePhase::Failed)
)
})
}

/// Generate a random password for analytics credentials.
pub(crate) fn generate_password() -> String {
let mut rng = rand::rng();
Expand Down Expand Up @@ -691,14 +721,20 @@ pub async fn reconcile(replica: Arc<PostgresPhysicalReplica>, ctx: Arc<Context>)
)
.await?;

let current_snapshot_id =
active_restore.map(|r| r.spec.snapshot.as_str());
let verified_snapshot_id = replica
.status
.as_ref()
.and_then(|s| s.verified_snapshot_id.as_deref());

if current_snapshot_id == Some(&snap.id) {
if snapshot_already_covered(
&snap.id,
verified_snapshot_id,
&restore_list.items,
) {
debug!(
replica = name,
snapshot = snap.id,
"latest snapshot already active, skipping"
"snapshot already covered by an existing or verified restore, skipping"
);
} else {
info!(
Expand Down
68 changes: 68 additions & 0 deletions src/controllers/replica/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{kopia::Snapshot, types::*, util::TimeSpan};

use super::{
generate_password, persistent_schemas_migration_settled, resources::build_snapshot_list_job,
snapshot_already_covered,
};

fn make_replica(
Expand Down Expand Up @@ -233,3 +234,70 @@ fn snapshot_list_job_rotates_kopia_logs() {
"snapshot-list script must rotate kopia logs by age"
);
}

fn make_restore(snapshot: &str, phase: Option<RestorePhase>) -> PostgresPhysicalRestore {
PostgresPhysicalRestore {
metadata: ObjectMeta {
name: Some(format!("test-{snapshot}")),
namespace: Some("default".into()),
..Default::default()
},
spec: PostgresPhysicalRestoreSpec {
replica: k8s_openapi::api::core::v1::LocalObjectReference {
name: "test".into(),
},
snapshot: snapshot.into(),
snapshot_size: k8s_openapi::apimachinery::pkg::api::resource::Quantity("1Gi".into()),
snapshot_time: None,
storage_size: k8s_openapi::apimachinery::pkg::api::resource::Quantity("2Gi".into()),
},
status: phase.map(|p| PostgresPhysicalRestoreStatus {
phase: Some(p),
..Default::default()
}),
}
}

#[test]
fn snapshot_covered_by_verified_marker() {
// Ephemeral `verify` replica: the restore has been torn down (no live
// restores) but the marker records that we already verified the
// snapshot, so we must not restore it again and double-report.
assert!(snapshot_already_covered("snapA", Some("snapA"), &[]));
assert!(!snapshot_already_covered("snapB", Some("snapA"), &[]));
}

#[test]
fn snapshot_covered_by_live_restore() {
for phase in [
RestorePhase::Pending,
RestorePhase::Restoring,
RestorePhase::Ready,
RestorePhase::Switching,
RestorePhase::Active,
] {
let restores = [make_restore("snapA", Some(phase.clone()))];
assert!(
snapshot_already_covered("snapA", None, &restores),
"a {phase:?} restore on the snapshot must count as covered"
);
}
}

#[test]
fn failed_restore_does_not_cover_snapshot() {
// A Failed restore is allowed to be retried via the failure backoff
// path, so it must not block a fresh restore of the same snapshot.
let restores = [make_restore("snapA", Some(RestorePhase::Failed))];
assert!(!snapshot_already_covered("snapA", None, &restores));
}

#[test]
fn uncovered_snapshot_is_created() {
// No marker, and the only live restore is for a different snapshot.
let restores = [make_restore("snapOld", Some(RestorePhase::Active))];
assert!(!snapshot_already_covered("snapNew", None, &restores));
// A restore with no status yet (phase None) still counts as live.
let pending = [make_restore("snapNew", None)];
assert!(snapshot_already_covered("snapNew", None, &pending));
}