diff --git a/src/bin/canopy_proxy.rs b/src/bin/canopy_proxy.rs index f2e8ffc..acdf4de 100644 --- a/src/bin/canopy_proxy.rs +++ b/src/bin/canopy_proxy.rs @@ -154,6 +154,21 @@ struct StatsFile { received_payload_bytes: u64, } +/// Block until SIGTERM (kubelet, native-sidecar termination) or SIGINT +/// (interactive ctrl-c) arrives, whichever comes first. +async fn wait_for_shutdown() -> Result<(), String> { + use tokio::signal::unix::{SignalKind, signal}; + let mut sigterm = signal(SignalKind::terminate()) + .map_err(|err| format!("installing SIGTERM handler: {err}"))?; + let mut sigint = signal(SignalKind::interrupt()) + .map_err(|err| format!("installing SIGINT handler: {err}"))?; + tokio::select! { + _ = sigterm.recv() => {} + _ = sigint.recv() => {} + } + Ok(()) +} + /// Write `port` to `port_file` atomically (write `.tmp` then rename) so a /// partial read by the kopia container's wait-loop can't observe a torn value. fn write_port_atomic(port_file: &std::path::Path, port: u16) -> std::io::Result<()> { @@ -208,10 +223,11 @@ async fn run(cfg: Config) -> Result<(), String> { write_port_atomic(&cfg.port_file, port) .map_err(|err| format!("writing port file {}: {err}", cfg.port_file.display()))?; - // Wait for SIGTERM (kopia container completion → pod termination). - tokio::signal::ctrl_c() - .await - .map_err(|err| format!("waiting for shutdown signal: {err}"))?; + // Wait for shutdown. As a native sidecar the kubelet sends SIGTERM once + // the main container exits; interactive runs get SIGINT. Handle both — + // ctrl_c() alone only catches SIGINT, so under k8s the proxy would hang + // until SIGKILL and lose its stats. + wait_for_shutdown().await?; info!("shutdown signal received"); let traffic = proxy.traffic(); diff --git a/src/controllers/replica/resources.rs b/src/controllers/replica/resources.rs index fea4198..b1b5f56 100644 --- a/src/controllers/replica/resources.rs +++ b/src/controllers/replica/resources.rs @@ -176,6 +176,11 @@ fi }), ..Default::default() }]; + // The canopy-proxy runs as a native sidecar (init container with + // restartPolicy: Always) so the Pod completes once the main + // snapshot-list container exits; a plain sidecar container would keep + // the Pod Running and the Job would never succeed. + let mut init_containers: Vec = Vec::new(); if let KopiaSource::CanopyProxy { group, backup_type, .. @@ -193,9 +198,10 @@ fi }); containers[0].volume_mounts = Some(kopia_volume_mounts); - containers.push(Container { + init_containers.push(Container { name: "canopy-proxy".to_string(), image: Some(proxy.image.to_string()), + restart_policy: Some("Always".to_string()), command: Some(vec!["canopy-proxy".to_string()]), env: Some(vec![ EnvVar { @@ -280,6 +286,12 @@ fi }), spec: Some(PodSpec { restart_policy: Some("Never".to_string()), + init_containers: if init_containers.is_empty() { + None + } else { + Some(init_containers) + }, + termination_grace_period_seconds: Some(30), containers, volumes: if volumes.is_empty() { None diff --git a/src/controllers/restore/builders.rs b/src/controllers/restore/builders.rs index 2d0a944..3e9383d 100644 --- a/src/controllers/restore/builders.rs +++ b/src/controllers/restore/builders.rs @@ -758,7 +758,13 @@ echo -n "$VERSION" > /dev/termination-log ("pgro.bes.au/restore".to_string(), restore.name_any()), ]); - let mut containers = Vec::with_capacity(2); + let mut containers = Vec::with_capacity(1); + // The canopy-proxy runs as a native sidecar (an init container with + // restartPolicy: Always) so that when the main `restore` container + // exits the kubelet SIGTERMs the proxy and the Pod completes on the + // main container's exit code. A plain sidecar container would keep the + // Pod Running forever and the Job would never succeed. + let mut init_containers: Vec = Vec::new(); containers.push(Container { name: "restore".to_string(), image: Some(kopia_image.to_string()), @@ -799,9 +805,14 @@ echo -n "$VERSION" > /dev/termination-log ..Default::default() }); - containers.push(Container { + init_containers.push(Container { name: "canopy-proxy".to_string(), image: Some(proxy.image.to_string()), + // Native sidecar: an init container that never exits on its + // own. `restartPolicy: Always` tells the kubelet to keep it + // running alongside the main containers and to SIGTERM it once + // they complete. + restart_policy: Some("Always".to_string()), // Same image as the operator; run the `canopy-proxy` binary // instead of the default `operator` entrypoint. command: Some(vec!["canopy-proxy".to_string()]), @@ -902,6 +913,14 @@ echo -n "$VERSION" > /dev/termination-log fs_group: Some(999), ..Default::default() }), + init_containers: if init_containers.is_empty() { + None + } else { + Some(init_containers) + }, + // Give the native sidecar time to flush its final stats + // on SIGTERM before the kubelet SIGKILLs it. + termination_grace_period_seconds: Some(30), containers, volumes: Some(volumes), ..Default::default() diff --git a/src/controllers/restore/tests.rs b/src/controllers/restore/tests.rs index 37ba527..676e2e2 100644 --- a/src/controllers/restore/tests.rs +++ b/src/controllers/restore/tests.rs @@ -148,6 +148,61 @@ fn test_restore_and_replica() -> (PostgresPhysicalRestore, PostgresPhysicalRepli (restore, replica) } +#[test] +fn canopy_restore_job_proxy_is_native_sidecar() { + // Regression: the canopy-proxy must be an init container with + // restartPolicy=Always (a native sidecar), NOT a plain container. As a + // plain container it keeps the Pod Running after the main `restore` + // container exits, so the Job never succeeds and eventually hits + // activeDeadlineSeconds → DeadlineExceeded. + let (restore, mut replica) = test_restore_and_replica(); + replica.spec.kopia_secret_ref = None; + replica.spec.canopy_source = Some(CanopySource { + group: "11111111-1111-1111-1111-111111111111".to_string(), + r#type: "tamanu-postgres".to_string(), + }); + let proxy = super::builders::CanopyProxyArgs { + image: "ghcr.io/beyondessential/postgres-restore-operator:latest", + broker_base_url: "http://operator.pgro-system.svc:9091", + stats_callback_url: "http://operator.pgro-system.svc:8080/api/v1/canopy-stats/ns/job", + }; + let job = build_restore_job( + &restore, + "test-restore-restore", + "default", + &replica, + "kopia:latest", + "http://operator/api/v1/cache-pressure/default/test-restore", + Some(&proxy), + ) + .unwrap(); + let pod_spec = job.spec.unwrap().template.spec.unwrap(); + + // The proxy is NOT a regular container. + assert!( + !pod_spec.containers.iter().any(|c| c.name == "canopy-proxy"), + "canopy-proxy must not be a plain container (Pod would never complete)" + ); + // It IS an init container with restartPolicy=Always. + let init = pod_spec + .init_containers + .as_ref() + .expect("canopy path must declare init containers"); + let sidecar = init + .iter() + .find(|c| c.name == "canopy-proxy") + .expect("canopy-proxy must be a native sidecar (init container)"); + assert_eq!( + sidecar.restart_policy.as_deref(), + Some("Always"), + "native sidecar must set restartPolicy=Always" + ); + assert!( + pod_spec.termination_grace_period_seconds.unwrap_or(0) > 0, + "canopy Pod must allow grace time for the sidecar to flush stats on SIGTERM" + ); +} + #[test] fn restore_job_has_ttl_seconds_after_finished() { let (restore, replica) = test_restore_and_replica();