Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions src/bin/canopy_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,21 @@ struct StatsFile {
received_payload_bytes: u64,
}

/// Block until SIGTERM (kubelet, native-sidecar termination) or SIGINT
/// (interactive ctrl-c) arrives, whichever comes first.
async fn wait_for_shutdown() -> Result<(), String> {
use tokio::signal::unix::{SignalKind, signal};
let mut sigterm = signal(SignalKind::terminate())
.map_err(|err| format!("installing SIGTERM handler: {err}"))?;
let mut sigint = signal(SignalKind::interrupt())
.map_err(|err| format!("installing SIGINT handler: {err}"))?;
tokio::select! {
_ = sigterm.recv() => {}
_ = sigint.recv() => {}
}
Ok(())
}

/// Write `port` to `port_file` atomically (write `.tmp` then rename) so a
/// partial read by the kopia container's wait-loop can't observe a torn value.
fn write_port_atomic(port_file: &std::path::Path, port: u16) -> std::io::Result<()> {
Expand Down Expand Up @@ -208,10 +223,11 @@ async fn run(cfg: Config) -> Result<(), String> {
write_port_atomic(&cfg.port_file, port)
.map_err(|err| format!("writing port file {}: {err}", cfg.port_file.display()))?;

// Wait for SIGTERM (kopia container completion → pod termination).
tokio::signal::ctrl_c()
.await
.map_err(|err| format!("waiting for shutdown signal: {err}"))?;
// Wait for shutdown. As a native sidecar the kubelet sends SIGTERM once
// the main container exits; interactive runs get SIGINT. Handle both —
// ctrl_c() alone only catches SIGINT, so under k8s the proxy would hang
// until SIGKILL and lose its stats.
wait_for_shutdown().await?;
info!("shutdown signal received");

let traffic = proxy.traffic();
Expand Down
14 changes: 13 additions & 1 deletion src/controllers/replica/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ fi
}),
..Default::default()
}];
// The canopy-proxy runs as a native sidecar (init container with
// restartPolicy: Always) so the Pod completes once the main
// snapshot-list container exits; a plain sidecar container would keep
// the Pod Running and the Job would never succeed.
let mut init_containers: Vec<Container> = Vec::new();

if let KopiaSource::CanopyProxy {
group, backup_type, ..
Expand All @@ -193,9 +198,10 @@ fi
});
containers[0].volume_mounts = Some(kopia_volume_mounts);

containers.push(Container {
init_containers.push(Container {
name: "canopy-proxy".to_string(),
image: Some(proxy.image.to_string()),
restart_policy: Some("Always".to_string()),
command: Some(vec!["canopy-proxy".to_string()]),
env: Some(vec![
EnvVar {
Expand Down Expand Up @@ -280,6 +286,12 @@ fi
}),
spec: Some(PodSpec {
restart_policy: Some("Never".to_string()),
init_containers: if init_containers.is_empty() {
None
} else {
Some(init_containers)
},
termination_grace_period_seconds: Some(30),
containers,
volumes: if volumes.is_empty() {
None
Expand Down
23 changes: 21 additions & 2 deletions src/controllers/restore/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,13 @@ echo -n "$VERSION" > /dev/termination-log
("pgro.bes.au/restore".to_string(), restore.name_any()),
]);

let mut containers = Vec::with_capacity(2);
let mut containers = Vec::with_capacity(1);
// The canopy-proxy runs as a native sidecar (an init container with
// restartPolicy: Always) so that when the main `restore` container
// exits the kubelet SIGTERMs the proxy and the Pod completes on the
// main container's exit code. A plain sidecar container would keep the
// Pod Running forever and the Job would never succeed.
let mut init_containers: Vec<Container> = Vec::new();
containers.push(Container {
name: "restore".to_string(),
image: Some(kopia_image.to_string()),
Expand Down Expand Up @@ -799,9 +805,14 @@ echo -n "$VERSION" > /dev/termination-log
..Default::default()
});

containers.push(Container {
init_containers.push(Container {
name: "canopy-proxy".to_string(),
image: Some(proxy.image.to_string()),
// Native sidecar: an init container that never exits on its
// own. `restartPolicy: Always` tells the kubelet to keep it
// running alongside the main containers and to SIGTERM it once
// they complete.
restart_policy: Some("Always".to_string()),
// Same image as the operator; run the `canopy-proxy` binary
// instead of the default `operator` entrypoint.
command: Some(vec!["canopy-proxy".to_string()]),
Expand Down Expand Up @@ -902,6 +913,14 @@ echo -n "$VERSION" > /dev/termination-log
fs_group: Some(999),
..Default::default()
}),
init_containers: if init_containers.is_empty() {
None
} else {
Some(init_containers)
},
// Give the native sidecar time to flush its final stats
// on SIGTERM before the kubelet SIGKILLs it.
termination_grace_period_seconds: Some(30),
containers,
volumes: Some(volumes),
..Default::default()
Expand Down
55 changes: 55 additions & 0 deletions src/controllers/restore/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,61 @@ fn test_restore_and_replica() -> (PostgresPhysicalRestore, PostgresPhysicalRepli
(restore, replica)
}

#[test]
fn canopy_restore_job_proxy_is_native_sidecar() {
// Regression: the canopy-proxy must be an init container with
// restartPolicy=Always (a native sidecar), NOT a plain container. As a
// plain container it keeps the Pod Running after the main `restore`
// container exits, so the Job never succeeds and eventually hits
// activeDeadlineSeconds → DeadlineExceeded.
let (restore, mut replica) = test_restore_and_replica();
replica.spec.kopia_secret_ref = None;
replica.spec.canopy_source = Some(CanopySource {
group: "11111111-1111-1111-1111-111111111111".to_string(),
r#type: "tamanu-postgres".to_string(),
});
let proxy = super::builders::CanopyProxyArgs {
image: "ghcr.io/beyondessential/postgres-restore-operator:latest",
broker_base_url: "http://operator.pgro-system.svc:9091",
stats_callback_url: "http://operator.pgro-system.svc:8080/api/v1/canopy-stats/ns/job",
};
let job = build_restore_job(
&restore,
"test-restore-restore",
"default",
&replica,
"kopia:latest",
"http://operator/api/v1/cache-pressure/default/test-restore",
Some(&proxy),
)
.unwrap();
let pod_spec = job.spec.unwrap().template.spec.unwrap();

// The proxy is NOT a regular container.
assert!(
!pod_spec.containers.iter().any(|c| c.name == "canopy-proxy"),
"canopy-proxy must not be a plain container (Pod would never complete)"
);
// It IS an init container with restartPolicy=Always.
let init = pod_spec
.init_containers
.as_ref()
.expect("canopy path must declare init containers");
let sidecar = init
.iter()
.find(|c| c.name == "canopy-proxy")
.expect("canopy-proxy must be a native sidecar (init container)");
assert_eq!(
sidecar.restart_policy.as_deref(),
Some("Always"),
"native sidecar must set restartPolicy=Always"
);
assert!(
pod_spec.termination_grace_period_seconds.unwrap_or(0) > 0,
"canopy Pod must allow grace time for the sidecar to flush stats on SIGTERM"
);
}

#[test]
fn restore_job_has_ttl_seconds_after_finished() {
let (restore, replica) = test_restore_and_replica();
Expand Down