From 7cc83b877201c2f6ecf9dbb0e1793783b8f3354b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20Saparelli?= Date: Thu, 2 Jul 2026 16:53:01 +1200 Subject: [PATCH] feat(canopy): send health_details (sizes, fixes, duration) with verification MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Canopy now accepts an arbitrary `health_details` map on /restore-verification, and bestool-canopy 0.4.4 adds a generic request escape hatch. The typed RestoreVerification struct has no such field, so pgro serializes the typed report, splices in `health_details`, and POSTs the merged body via CanopyClient::request to the same endpoint. health_details (snake_case) carries: - sizes: per-database on-disk bytes (pg_database_size), keyed by db name. - fixes: an arbitrary jsonb map of the fix steps the restore applied (locale, reindex, reset_wal, recreated_pg_wal). Stored in _pgro.restore_info.fixes by the init script and forwarded verbatim, so adding a fix is one shell line + its flag — no schema or operator change. - restore_duration_sec: wall-clock from the restore CR's createdAt to report time. sizes + fixes come from one read-only connection to the restore's postgres (done in the switchover block, before any ephemeral teardown destroys the DB). Gathering is best-effort: on the failure path, or if postgres never came up, those pieces are omitted and the verification still sends. Duration is independent of postgres. Records pg_resetwal / pg_wal-recreation via flag files so they surface in fixes alongside the existing locale/reindex flags. --- Cargo.lock | 6 +- Cargo.toml | 2 +- src/canopy.rs | 30 +++++ src/controllers/canopy/verification.rs | 166 ++++++++++++++++++++++++- src/controllers/postgres.rs | 43 +++++++ src/controllers/restore/builders.rs | 29 ++++- 6 files changed, 265 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9549a76..e00bd0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -447,9 +447,9 @@ checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445" [[package]] name = "bestool-canopy" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cb064d9c74aedc67f722d2bf7cac5a9214525ef6bf12b70c09d5428abe020e2" +checksum = "322f836e08276d95b49e4b1188470f24af768e3e504b39a14661f9a882a42fcf" dependencies = [ "algae-cli", "base64 0.22.1", @@ -4333,7 +4333,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom 0.3.4", + "getrandom 0.4.3", "once_cell", "rustix", "windows-sys 0.61.2", diff --git a/Cargo.toml b/Cargo.toml index d8ae622..894a12e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ publish = false [dependencies] anyhow = "1.0.102" axum = "0.8.9" -bestool-canopy = "0.4.3" +bestool-canopy = "0.4.4" bestool-kopia = { version = "0.3.4", features = ["proxy"] } cronexpr = "1.5.0" futures = "0.3.31" diff --git a/src/canopy.rs b/src/canopy.rs index 62da773..42c6109 100644 --- a/src/canopy.rs +++ b/src/canopy.rs @@ -147,6 +147,36 @@ impl Client { .map_err(|err| Error::Canopy(format!("restore_verification: {err:?}"))) } + /// Report a restore outcome with an arbitrary JSON body — used to include + /// the `health_details` field, which the typed [`RestoreVerification`] + /// struct doesn't carry. `body` should be the serialized verification + /// plus any extra fields. Goes to the same `POST /restore-verification` + /// endpoint via the generic request escape hatch; a non-2xx response is + /// an error carrying the status + body. + pub async fn restore_verification_json(&self, body: &serde_json::Value) -> Result<()> { + let resp = self + .inner + .request( + bestool_canopy::reqwest::Method::POST, + &self.base_url, + "/restore-verification", + ) + .await + .map_err(|err| Error::Canopy(format!("restore_verification request: {err:?}")))? + .json(body) + .send() + .await + .map_err(|err| Error::Canopy(format!("restore_verification send: {err:?}")))?; + let status = resp.status(); + if !status.is_success() { + let text = resp.text().await.unwrap_or_default(); + return Err(Error::Canopy(format!( + "restore_verification returned {status}: {text}" + ))); + } + Ok(()) + } + /// Direct access to the public-mTLS base URL the client is configured /// against. The tailnet path uses its own hardcoded URL inside /// `bestool-canopy`. diff --git a/src/controllers/canopy/verification.rs b/src/controllers/canopy/verification.rs index 29caf3d..614251f 100644 --- a/src/controllers/canopy/verification.rs +++ b/src/controllers/canopy/verification.rs @@ -7,14 +7,16 @@ use bestool_canopy::{Outcome, RestoreVerification}; use jiff::Timestamp; -use kube::ResourceExt; +use k8s_openapi::api::core::v1::Secret; +use kube::{Api, ResourceExt}; use serde::Deserialize; +use serde_json::{Value, json}; use tracing::{info, warn}; use uuid::Uuid; use crate::{ context::Context, - controllers::canopy::labels, + controllers::{canopy::labels, postgres}, types::{PostgresPhysicalReplica, PostgresPhysicalRestore}, }; @@ -125,11 +127,32 @@ pub async fn report( s3_received_payload_bytes: Some(stats.received_payload_bytes as i64), }; - match canopy.restore_verification(&report).await { + // Serialize the typed report, then splice in `health_details` — the + // typed struct doesn't carry it, so we send via the arbitrary-JSON + // path. If serialization somehow fails, fall back to the typed call so + // the outcome still reaches canopy. + let mut body = match serde_json::to_value(&report) { + Ok(v) => v, + Err(err) => { + warn!( + restore = %restore.name_any(), + error = %err, + "canopy verification: failed to serialize report; sending without health_details" + ); + if let Err(err) = canopy.restore_verification(&report).await { + warn!(restore = %restore.name_any(), error = %err, "canopy verification report failed"); + } + return; + } + }; + body["health_details"] = gather_health_details(ctx, replica, restore).await; + + match canopy.restore_verification_json(&body).await { Ok(()) => info!( replica = %replica.name_any(), restore = %restore.name_any(), ?outcome, + health_details = %body["health_details"], "canopy verification reported" ), Err(err) => warn!( @@ -140,3 +163,140 @@ pub async fn report( ), } } + +/// Best-effort gather of the `health_details` map (snake_case keys): +/// `{ sizes: {: bytes}, fixes: {reindex, locale}, restore_duration_sec }`. +/// +/// `sizes` and `fixes` come from a single read-only connection to the +/// restore's postgres (`sizes` from `pg_database_size`, `fixes` from the +/// `_pgro.restore_info` flags the init recorded). Any connection or query +/// failure — expected on the failure path, where postgres may never have +/// come up — just omits that piece; the verification still sends. +/// `restore_duration_sec` is the wall-clock from the restore CR's +/// `createdAt` to now (≈ activation), independent of postgres. +async fn gather_health_details( + ctx: &Context, + replica: &PostgresPhysicalReplica, + restore: &PostgresPhysicalRestore, +) -> Value { + let duration_sec = restore + .status + .as_ref() + .and_then(|s| s.created_at.as_ref()) + .map(|created| Timestamp::now().duration_since(created.0).as_secs().max(0) as u64); + + let pg = match gather_from_postgres(ctx, replica, restore).await { + Ok(parts) => Some(parts), + // Expected on the failure path (postgres may never have come up) and + // on ephemeral replicas racing teardown; the verification still sends. + Err(err) => { + warn!( + replica = %replica.name_any(), + restore = %restore.name_any(), + error = %err, + "canopy verification: could not gather sizes/fixes; reporting without them" + ); + None + } + }; + + build_health_details(duration_sec, pg) +} + +/// Postgres-derived pieces of the health details: per-database sizes and +/// the `fixes` map the restore init recorded. `fixes` is an arbitrary JSON +/// object (`{locale, reindex, reset_wal, ...}`) forwarded verbatim, so new +/// fix flags flow through without operator changes. +struct PostgresHealth { + sizes: Vec<(String, u64)>, + fixes: Value, +} + +/// Assemble the `health_details` JSON from already-gathered parts. Pure, so +/// the snake_case wire shape is unit-testable without a database. Omits any +/// piece that wasn't gathered. +fn build_health_details(duration_sec: Option, pg: Option) -> Value { + let mut details = serde_json::Map::new(); + if let Some(secs) = duration_sec { + details.insert("restore_duration_sec".into(), json!(secs)); + } + if let Some(pg) = pg { + let sizes_obj: serde_json::Map = pg + .sizes + .into_iter() + .map(|(name, bytes)| (name, json!(bytes))) + .collect(); + details.insert("sizes".into(), Value::Object(sizes_obj)); + details.insert("fixes".into(), pg.fixes); + } + Value::Object(details) +} + +/// Connect to the restore's postgres (as the replica's reader user) and +/// read the per-database sizes + fix flags. Returns +/// `(sizes, (locale_fixed, reindex_done))`. +async fn gather_from_postgres( + ctx: &Context, + replica: &PostgresPhysicalReplica, + restore: &PostgresPhysicalRestore, +) -> crate::error::Result { + let namespace = replica.namespace().unwrap_or_default(); + let restore_name = restore.name_any(); + + let secrets: Api = Api::namespaced(ctx.client.clone(), &namespace); + let reader_secret = secrets.get(&replica.creds_secret_name()).await?; + let reader_user = postgres::read_secret_field(&reader_secret, "username")?; + let reader_password = postgres::read_secret_field(&reader_secret, "password")?; + + let conn = postgres::connect_to_restore( + &ctx.client, + &namespace, + &restore_name, + "postgres", + &reader_user, + &reader_password, + ctx.use_port_forward(), + ) + .await?; + + let sizes = postgres::list_database_sizes(&conn.client).await?; + let fixes = postgres::read_restore_fixes(&conn.client) + .await + .unwrap_or_else(|_| json!({})); + Ok(PostgresHealth { sizes, fixes }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn health_details_shape_is_snake_case() { + let v = build_health_details( + Some(700), + Some(PostgresHealth { + sizes: vec![("tamanu".into(), 1_872_782), ("postgres".into(), 12_829)], + fixes: json!({ "locale": true, "reindex": false, "reset_wal": false }), + }), + ); + assert_eq!(v["restore_duration_sec"], json!(700)); + assert_eq!(v["sizes"]["tamanu"], json!(1_872_782u64)); + assert_eq!(v["sizes"]["postgres"], json!(12_829u64)); + // fixes is forwarded verbatim, so new flags pass through untouched. + assert_eq!(v["fixes"]["locale"], json!(true)); + assert_eq!(v["fixes"]["reindex"], json!(false)); + assert_eq!(v["fixes"]["reset_wal"], json!(false)); + } + + #[test] + fn health_details_omits_ungathered_parts() { + // Failure path: no postgres connection, no createdAt. + let v = build_health_details(None, None); + assert_eq!(v, json!({})); + // Duration known but postgres unreachable: sizes/fixes omitted. + let v = build_health_details(Some(42), None); + assert_eq!(v, json!({ "restore_duration_sec": 42 })); + assert!(v.get("sizes").is_none()); + assert!(v.get("fixes").is_none()); + } +} diff --git a/src/controllers/postgres.rs b/src/controllers/postgres.rs index 86f9cf8..0b1089f 100644 --- a/src/controllers/postgres.rs +++ b/src/controllers/postgres.rs @@ -258,6 +258,49 @@ pub async fn database_size_on(pg: &tokio_postgres::Client) -> Result { Ok(size as u64) } +/// Per-database on-disk sizes in bytes, keyed by database name, excluding +/// the `template0`/`template1` templates. Runs on an already-open +/// connection (typically to the `postgres` database). Used to build the +/// `sizes` map in the canopy restore-verification `health_details`. +pub async fn list_database_sizes(pg: &tokio_postgres::Client) -> Result> { + let rows = pg + .query( + "SELECT datname, pg_database_size(datname) FROM pg_database \ + WHERE datname NOT IN ('template0', 'template1') AND datallowconn \ + ORDER BY pg_database_size(datname) DESC", + &[], + ) + .await?; + Ok(rows + .iter() + .map(|r| { + let name: String = r.get(0); + let size: i64 = r.get(1); + (name, size.max(0) as u64) + }) + .collect()) +} + +/// Read the `fixes` map the restore's init recorded into +/// `_pgro.restore_info` — an arbitrary JSON object like +/// `{"locale": true, "reindex": false, "reset_wal": false, ...}`. Read as +/// text and parsed here so we don't need tokio_postgres's serde_json +/// feature. Absent row/column (older restores) reads as an empty object. +/// Runs on an already-open connection to the `postgres` database. +pub async fn read_restore_fixes(pg: &tokio_postgres::Client) -> Result { + let row = pg + .query_opt( + "SELECT coalesce(fixes::text, '{}') FROM _pgro.restore_info WHERE id = 1", + &[], + ) + .await?; + let text: String = match row { + Some(r) => r.get(0), + None => return Ok(serde_json::json!({})), + }; + Ok(serde_json::from_str(&text).unwrap_or_else(|_| serde_json::json!({}))) +} + /// Query the on-disk size of the given database (bytes) via `pg_database_size()`. pub async fn measure_database_size( client: &Client, diff --git a/src/controllers/restore/builders.rs b/src/controllers/restore/builders.rs index 3e9383d..1e57154 100644 --- a/src/controllers/restore/builders.rs +++ b/src/controllers/restore/builders.rs @@ -1209,6 +1209,7 @@ HBAEOF if [ ! -d "$PGDATA/pg_wal" ]; then echo "pg_wal directory missing (snapshot may be from a Windows host with WAL on a separate path), creating empty pg_wal..." mkdir -p "$PGDATA/pg_wal" + touch /pgdata/fix-recreated-pg-wal fi # Run a postgres --single SQL command with a two-stage pg_resetwal -f @@ -1252,6 +1253,7 @@ postgres_single_or_resetwal() {{ rm -f "$logfile" pg_resetwal -f "$PGDATA" touch /pgdata/needs-reindex-all + touch /pgdata/fix-reset-wal echo "$sql_input" | postgres --single -D "$PGDATA" postgres return $? fi @@ -1273,6 +1275,7 @@ postgres_single_or_resetwal() {{ rm -f "$logfile" pg_resetwal -f "$PGDATA" touch /pgdata/needs-reindex-all + touch /pgdata/fix-reset-wal echo "$sql_input" | postgres --single -D "$PGDATA" postgres }} @@ -1368,13 +1371,29 @@ ALTER ROLE ${{ANALYTICS_USERNAME}} WITH SUPERUSER; SQLEOF fi +# Record which "fix" steps this restore had to apply, so the operator can +# read them back (SELECT from _pgro.restore_info) and forward them to +# canopy in the restore-verification health_details. Stored as a jsonb map +# so adding a new fix is one shell line here plus recording its flag — no +# schema change, no operator change (the operator forwards the map as-is). +# Each fix is keyed by a flag file the fix step touches: +# locale — the post-startup locale rewrite actually changed rows +# reindex — REINDEX ran (after pg_resetwal, or a locale rewrite) +# reset_wal — pg_resetwal -f ran (snapshot's trailing WAL was unusable) +# recreated_pg_wal — an empty pg_wal was created (Windows-host snapshot) if [ -f /pgdata/needs-reindex ] || [ -f /pgdata/needs-reindex-all ]; then PGRO_STAGE=restored + PGRO_REINDEX=true else PGRO_STAGE=ready + PGRO_REINDEX=false fi +if [ "${{LOCALE_CHANGED:-0}}" != "0" ]; then PGRO_LOCALE=true; else PGRO_LOCALE=false; fi +if [ -f /pgdata/fix-reset-wal ]; then PGRO_RESET_WAL=true; else PGRO_RESET_WAL=false; fi +if [ -f /pgdata/fix-recreated-pg-wal ]; then PGRO_RECREATED_WAL=true; else PGRO_RECREATED_WAL=false; fi +PGRO_FIXES="{{\"locale\": ${{PGRO_LOCALE}}, \"reindex\": ${{PGRO_REINDEX}}, \"reset_wal\": ${{PGRO_RESET_WAL}}, \"recreated_pg_wal\": ${{PGRO_RECREATED_WAL}}}}" -echo "Writing restore metadata (stage=${{PGRO_STAGE}})..." +echo "Writing restore metadata (stage=${{PGRO_STAGE}} fixes=${{PGRO_FIXES}})..." psql -U postgres -d postgres << SQLEOF CREATE SCHEMA IF NOT EXISTS _pgro; CREATE TABLE IF NOT EXISTS _pgro.restore_info ( @@ -1387,14 +1406,16 @@ CREATE TABLE IF NOT EXISTS _pgro.restore_info ( ); ALTER TABLE _pgro.restore_info ADD COLUMN IF NOT EXISTS stage text NOT NULL DEFAULT 'restored'; ALTER TABLE _pgro.restore_info ADD COLUMN IF NOT EXISTS last_transition_time timestamptz NOT NULL DEFAULT now(); -INSERT INTO _pgro.restore_info (id, snapshot_id, snapshot_time, stage, last_transition_time) -VALUES (1, '${{PGRO_SNAPSHOT_ID}}', CASE WHEN '${{PGRO_SNAPSHOT_TIME}}' = '' THEN NULL ELSE '${{PGRO_SNAPSHOT_TIME}}'::timestamptz END, '${{PGRO_STAGE}}', now()) +ALTER TABLE _pgro.restore_info ADD COLUMN IF NOT EXISTS fixes jsonb; +INSERT INTO _pgro.restore_info (id, snapshot_id, snapshot_time, stage, last_transition_time, fixes) +VALUES (1, '${{PGRO_SNAPSHOT_ID}}', CASE WHEN '${{PGRO_SNAPSHOT_TIME}}' = '' THEN NULL ELSE '${{PGRO_SNAPSHOT_TIME}}'::timestamptz END, '${{PGRO_STAGE}}', now(), '${{PGRO_FIXES}}'::jsonb) ON CONFLICT (id) DO UPDATE SET snapshot_id = EXCLUDED.snapshot_id, snapshot_time = EXCLUDED.snapshot_time, restored_at = now(), stage = EXCLUDED.stage, - last_transition_time = now(); + last_transition_time = now(), + fixes = EXCLUDED.fixes; SQLEOF echo "Stopping temporary postgres..."