Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ publish = false
[dependencies]
anyhow = "1.0.102"
axum = "0.8.9"
bestool-canopy = "0.4.3"
bestool-canopy = "0.4.4"
bestool-kopia = { version = "0.3.4", features = ["proxy"] }
cronexpr = "1.5.0"
futures = "0.3.31"
Expand Down
30 changes: 30 additions & 0 deletions src/canopy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,36 @@ impl Client {
.map_err(|err| Error::Canopy(format!("restore_verification: {err:?}")))
}

/// Report a restore outcome with an arbitrary JSON body — used to include
/// the `health_details` field, which the typed [`RestoreVerification`]
/// struct doesn't carry. `body` should be the serialized verification
/// plus any extra fields. Goes to the same `POST /restore-verification`
/// endpoint via the generic request escape hatch; a non-2xx response is
/// an error carrying the status + body.
pub async fn restore_verification_json(&self, body: &serde_json::Value) -> Result<()> {
let resp = self
.inner
.request(
bestool_canopy::reqwest::Method::POST,
&self.base_url,
"/restore-verification",
)
.await
.map_err(|err| Error::Canopy(format!("restore_verification request: {err:?}")))?
.json(body)
.send()
.await
.map_err(|err| Error::Canopy(format!("restore_verification send: {err:?}")))?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(Error::Canopy(format!(
"restore_verification returned {status}: {text}"
)));
}
Ok(())
}

/// Direct access to the public-mTLS base URL the client is configured
/// against. The tailnet path uses its own hardcoded URL inside
/// `bestool-canopy`.
Expand Down
166 changes: 163 additions & 3 deletions src/controllers/canopy/verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@

use bestool_canopy::{Outcome, RestoreVerification};
use jiff::Timestamp;
use kube::ResourceExt;
use k8s_openapi::api::core::v1::Secret;
use kube::{Api, ResourceExt};
use serde::Deserialize;
use serde_json::{Value, json};
use tracing::{info, warn};
use uuid::Uuid;

use crate::{
context::Context,
controllers::canopy::labels,
controllers::{canopy::labels, postgres},
types::{PostgresPhysicalReplica, PostgresPhysicalRestore},
};

Expand Down Expand Up @@ -125,11 +127,32 @@ pub async fn report(
s3_received_payload_bytes: Some(stats.received_payload_bytes as i64),
};

match canopy.restore_verification(&report).await {
// Serialize the typed report, then splice in `health_details` — the
// typed struct doesn't carry it, so we send via the arbitrary-JSON
// path. If serialization somehow fails, fall back to the typed call so
// the outcome still reaches canopy.
let mut body = match serde_json::to_value(&report) {
Ok(v) => v,
Err(err) => {
warn!(
restore = %restore.name_any(),
error = %err,
"canopy verification: failed to serialize report; sending without health_details"
);
if let Err(err) = canopy.restore_verification(&report).await {
warn!(restore = %restore.name_any(), error = %err, "canopy verification report failed");
}
return;
}
};
body["health_details"] = gather_health_details(ctx, replica, restore).await;

match canopy.restore_verification_json(&body).await {
Ok(()) => info!(
replica = %replica.name_any(),
restore = %restore.name_any(),
?outcome,
health_details = %body["health_details"],
"canopy verification reported"
),
Err(err) => warn!(
Expand All @@ -140,3 +163,140 @@ pub async fn report(
),
}
}

/// Best-effort gather of the `health_details` map (snake_case keys):
/// `{ sizes: {<db>: bytes}, fixes: {reindex, locale}, restore_duration_sec }`.
///
/// `sizes` and `fixes` come from a single read-only connection to the
/// restore's postgres (`sizes` from `pg_database_size`, `fixes` from the
/// `_pgro.restore_info` flags the init recorded). Any connection or query
/// failure — expected on the failure path, where postgres may never have
/// come up — just omits that piece; the verification still sends.
/// `restore_duration_sec` is the wall-clock from the restore CR's
/// `createdAt` to now (≈ activation), independent of postgres.
async fn gather_health_details(
ctx: &Context,
replica: &PostgresPhysicalReplica,
restore: &PostgresPhysicalRestore,
) -> Value {
let duration_sec = restore
.status
.as_ref()
.and_then(|s| s.created_at.as_ref())
.map(|created| Timestamp::now().duration_since(created.0).as_secs().max(0) as u64);

let pg = match gather_from_postgres(ctx, replica, restore).await {
Ok(parts) => Some(parts),
// Expected on the failure path (postgres may never have come up) and
// on ephemeral replicas racing teardown; the verification still sends.
Err(err) => {
warn!(
replica = %replica.name_any(),
restore = %restore.name_any(),
error = %err,
"canopy verification: could not gather sizes/fixes; reporting without them"
);
None
}
};

build_health_details(duration_sec, pg)
}

/// Postgres-derived pieces of the health details: per-database sizes and
/// the `fixes` map the restore init recorded. `fixes` is an arbitrary JSON
/// object (`{locale, reindex, reset_wal, ...}`) forwarded verbatim, so new
/// fix flags flow through without operator changes.
struct PostgresHealth {
sizes: Vec<(String, u64)>,
fixes: Value,
}

/// Assemble the `health_details` JSON from already-gathered parts. Pure, so
/// the snake_case wire shape is unit-testable without a database. Omits any
/// piece that wasn't gathered.
fn build_health_details(duration_sec: Option<u64>, pg: Option<PostgresHealth>) -> Value {
let mut details = serde_json::Map::new();
if let Some(secs) = duration_sec {
details.insert("restore_duration_sec".into(), json!(secs));
}
if let Some(pg) = pg {
let sizes_obj: serde_json::Map<String, Value> = pg
.sizes
.into_iter()
.map(|(name, bytes)| (name, json!(bytes)))
.collect();
details.insert("sizes".into(), Value::Object(sizes_obj));
details.insert("fixes".into(), pg.fixes);
}
Value::Object(details)
}

/// Connect to the restore's postgres (as the replica's reader user) and
/// read the per-database sizes + fix flags. Returns
/// `(sizes, (locale_fixed, reindex_done))`.
async fn gather_from_postgres(
ctx: &Context,
replica: &PostgresPhysicalReplica,
restore: &PostgresPhysicalRestore,
) -> crate::error::Result<PostgresHealth> {
let namespace = replica.namespace().unwrap_or_default();
let restore_name = restore.name_any();

let secrets: Api<Secret> = Api::namespaced(ctx.client.clone(), &namespace);
let reader_secret = secrets.get(&replica.creds_secret_name()).await?;
let reader_user = postgres::read_secret_field(&reader_secret, "username")?;
let reader_password = postgres::read_secret_field(&reader_secret, "password")?;

let conn = postgres::connect_to_restore(
&ctx.client,
&namespace,
&restore_name,
"postgres",
&reader_user,
&reader_password,
ctx.use_port_forward(),
)
.await?;

let sizes = postgres::list_database_sizes(&conn.client).await?;
let fixes = postgres::read_restore_fixes(&conn.client)
.await
.unwrap_or_else(|_| json!({}));
Ok(PostgresHealth { sizes, fixes })
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn health_details_shape_is_snake_case() {
let v = build_health_details(
Some(700),
Some(PostgresHealth {
sizes: vec![("tamanu".into(), 1_872_782), ("postgres".into(), 12_829)],
fixes: json!({ "locale": true, "reindex": false, "reset_wal": false }),
}),
);
assert_eq!(v["restore_duration_sec"], json!(700));
assert_eq!(v["sizes"]["tamanu"], json!(1_872_782u64));
assert_eq!(v["sizes"]["postgres"], json!(12_829u64));
// fixes is forwarded verbatim, so new flags pass through untouched.
assert_eq!(v["fixes"]["locale"], json!(true));
assert_eq!(v["fixes"]["reindex"], json!(false));
assert_eq!(v["fixes"]["reset_wal"], json!(false));
}

#[test]
fn health_details_omits_ungathered_parts() {
// Failure path: no postgres connection, no createdAt.
let v = build_health_details(None, None);
assert_eq!(v, json!({}));
// Duration known but postgres unreachable: sizes/fixes omitted.
let v = build_health_details(Some(42), None);
assert_eq!(v, json!({ "restore_duration_sec": 42 }));
assert!(v.get("sizes").is_none());
assert!(v.get("fixes").is_none());
}
}
43 changes: 43 additions & 0 deletions src/controllers/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,49 @@ pub async fn database_size_on(pg: &tokio_postgres::Client) -> Result<u64> {
Ok(size as u64)
}

/// Per-database on-disk sizes in bytes, keyed by database name, excluding
/// the `template0`/`template1` templates. Runs on an already-open
/// connection (typically to the `postgres` database). Used to build the
/// `sizes` map in the canopy restore-verification `health_details`.
pub async fn list_database_sizes(pg: &tokio_postgres::Client) -> Result<Vec<(String, u64)>> {
let rows = pg
.query(
"SELECT datname, pg_database_size(datname) FROM pg_database \
WHERE datname NOT IN ('template0', 'template1') AND datallowconn \
ORDER BY pg_database_size(datname) DESC",
&[],
)
.await?;
Ok(rows
.iter()
.map(|r| {
let name: String = r.get(0);
let size: i64 = r.get(1);
(name, size.max(0) as u64)
})
.collect())
}

/// Read the `fixes` map the restore's init recorded into
/// `_pgro.restore_info` — an arbitrary JSON object like
/// `{"locale": true, "reindex": false, "reset_wal": false, ...}`. Read as
/// text and parsed here so we don't need tokio_postgres's serde_json
/// feature. Absent row/column (older restores) reads as an empty object.
/// Runs on an already-open connection to the `postgres` database.
pub async fn read_restore_fixes(pg: &tokio_postgres::Client) -> Result<serde_json::Value> {
let row = pg
.query_opt(
"SELECT coalesce(fixes::text, '{}') FROM _pgro.restore_info WHERE id = 1",
&[],
)
.await?;
let text: String = match row {
Some(r) => r.get(0),
None => return Ok(serde_json::json!({})),
};
Ok(serde_json::from_str(&text).unwrap_or_else(|_| serde_json::json!({})))
}

/// Query the on-disk size of the given database (bytes) via `pg_database_size()`.
pub async fn measure_database_size(
client: &Client,
Expand Down
29 changes: 25 additions & 4 deletions src/controllers/restore/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,7 @@ HBAEOF
if [ ! -d "$PGDATA/pg_wal" ]; then
echo "pg_wal directory missing (snapshot may be from a Windows host with WAL on a separate path), creating empty pg_wal..."
mkdir -p "$PGDATA/pg_wal"
touch /pgdata/fix-recreated-pg-wal
fi

# Run a postgres --single SQL command with a two-stage pg_resetwal -f
Expand Down Expand Up @@ -1252,6 +1253,7 @@ postgres_single_or_resetwal() {{
rm -f "$logfile"
pg_resetwal -f "$PGDATA"
touch /pgdata/needs-reindex-all
touch /pgdata/fix-reset-wal
echo "$sql_input" | postgres --single -D "$PGDATA" postgres
return $?
fi
Expand All @@ -1273,6 +1275,7 @@ postgres_single_or_resetwal() {{
rm -f "$logfile"
pg_resetwal -f "$PGDATA"
touch /pgdata/needs-reindex-all
touch /pgdata/fix-reset-wal
echo "$sql_input" | postgres --single -D "$PGDATA" postgres
}}

Expand Down Expand Up @@ -1368,13 +1371,29 @@ ALTER ROLE ${{ANALYTICS_USERNAME}} WITH SUPERUSER;
SQLEOF
fi

# Record which "fix" steps this restore had to apply, so the operator can
# read them back (SELECT from _pgro.restore_info) and forward them to
# canopy in the restore-verification health_details. Stored as a jsonb map
# so adding a new fix is one shell line here plus recording its flag — no
# schema change, no operator change (the operator forwards the map as-is).
# Each fix is keyed by a flag file the fix step touches:
# locale — the post-startup locale rewrite actually changed rows
# reindex — REINDEX ran (after pg_resetwal, or a locale rewrite)
# reset_wal — pg_resetwal -f ran (snapshot's trailing WAL was unusable)
# recreated_pg_wal — an empty pg_wal was created (Windows-host snapshot)
if [ -f /pgdata/needs-reindex ] || [ -f /pgdata/needs-reindex-all ]; then
PGRO_STAGE=restored
PGRO_REINDEX=true
else
PGRO_STAGE=ready
PGRO_REINDEX=false
fi
if [ "${{LOCALE_CHANGED:-0}}" != "0" ]; then PGRO_LOCALE=true; else PGRO_LOCALE=false; fi
if [ -f /pgdata/fix-reset-wal ]; then PGRO_RESET_WAL=true; else PGRO_RESET_WAL=false; fi
if [ -f /pgdata/fix-recreated-pg-wal ]; then PGRO_RECREATED_WAL=true; else PGRO_RECREATED_WAL=false; fi
PGRO_FIXES="{{\"locale\": ${{PGRO_LOCALE}}, \"reindex\": ${{PGRO_REINDEX}}, \"reset_wal\": ${{PGRO_RESET_WAL}}, \"recreated_pg_wal\": ${{PGRO_RECREATED_WAL}}}}"

echo "Writing restore metadata (stage=${{PGRO_STAGE}})..."
echo "Writing restore metadata (stage=${{PGRO_STAGE}} fixes=${{PGRO_FIXES}})..."
psql -U postgres -d postgres << SQLEOF
CREATE SCHEMA IF NOT EXISTS _pgro;
CREATE TABLE IF NOT EXISTS _pgro.restore_info (
Expand All @@ -1387,14 +1406,16 @@ CREATE TABLE IF NOT EXISTS _pgro.restore_info (
);
ALTER TABLE _pgro.restore_info ADD COLUMN IF NOT EXISTS stage text NOT NULL DEFAULT 'restored';
ALTER TABLE _pgro.restore_info ADD COLUMN IF NOT EXISTS last_transition_time timestamptz NOT NULL DEFAULT now();
INSERT INTO _pgro.restore_info (id, snapshot_id, snapshot_time, stage, last_transition_time)
VALUES (1, '${{PGRO_SNAPSHOT_ID}}', CASE WHEN '${{PGRO_SNAPSHOT_TIME}}' = '' THEN NULL ELSE '${{PGRO_SNAPSHOT_TIME}}'::timestamptz END, '${{PGRO_STAGE}}', now())
ALTER TABLE _pgro.restore_info ADD COLUMN IF NOT EXISTS fixes jsonb;
INSERT INTO _pgro.restore_info (id, snapshot_id, snapshot_time, stage, last_transition_time, fixes)
VALUES (1, '${{PGRO_SNAPSHOT_ID}}', CASE WHEN '${{PGRO_SNAPSHOT_TIME}}' = '' THEN NULL ELSE '${{PGRO_SNAPSHOT_TIME}}'::timestamptz END, '${{PGRO_STAGE}}', now(), '${{PGRO_FIXES}}'::jsonb)
ON CONFLICT (id) DO UPDATE
SET snapshot_id = EXCLUDED.snapshot_id,
snapshot_time = EXCLUDED.snapshot_time,
restored_at = now(),
stage = EXCLUDED.stage,
last_transition_time = now();
last_transition_time = now(),
fixes = EXCLUDED.fixes;
SQLEOF

echo "Stopping temporary postgres..."
Expand Down
Loading