From c784ed503d2d4ea9866962568d8f84b5cb9d386a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20Saparelli?= Date: Fri, 3 Jul 2026 19:46:19 +1200 Subject: [PATCH] feat(canopy): capability descriptors with semantics and typed params Bump bestool-canopy to 0.6.1 and advertise intents as descriptors (name, description, canopy semantics, typed parameter schema) instead of bare names, via the generated restore-capabilities client method. Collapse analytics-dev and analytics-dbt into a single parametrised analytics intent (semantics check + url): persistent_schemas, minimum_ttl, switchover_grace, storage_size_maximum and expose are now per-declaration params resolved by canopy and mapped onto the replica CR. verify keeps check + once. For the url semantic, an exposed replica's tailnet URL is looked up from the tailscale sidecar LocalAPI (MagicDNS suffix, cached) and attached to the verification health_details. The 0.6.x client bakes the base URL in at construction and exposes generated endpoint methods; several wire types moved under bestool_canopy::schema. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 42 +- Cargo.toml | 2 +- operator.yaml | 25 ++ src/bin/canopy_proxy.rs | 2 +- src/bin/operator.rs | 16 +- src/canopy.rs | 123 +++--- src/context.rs | 22 + src/controllers/canopy.rs | 9 +- src/controllers/canopy/intent.rs | 561 ++++++++++++++++++------- src/controllers/canopy/verification.rs | 35 +- src/controllers/replica.rs | 2 +- src/controllers/restore.rs | 2 +- src/lib.rs | 1 + src/tailscale.rs | 110 +++++ 14 files changed, 668 insertions(+), 284 deletions(-) create mode 100644 src/tailscale.rs diff --git a/Cargo.lock b/Cargo.lock index e04058c..4a11752 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -464,9 +464,9 @@ checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445" [[package]] name = "bestool-canopy" -version = "0.4.5" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a9a70b6aea63cd674f184bb45b07cb0b42cfc36449e99e2e15c7860267ad7fd" +checksum = "1de6ff6a223ff6d3b1d721b68cfdb3e77ebfa3ef3372364d0d1df0fac7017b79" dependencies = [ "algae-cli", "base64 0.22.1", @@ -490,7 +490,6 @@ dependencies = [ "tokio", "tracing", "typify", - "ureq", "uuid", ] @@ -3633,7 +3632,9 @@ dependencies = [ "base64 0.22.1", "bytes", "encoding_rs", + "futures-channel", "futures-core", + "futures-util", "h2", "http", "http-body", @@ -3822,7 +3823,6 @@ dependencies = [ "aws-lc-rs", "log", "once_cell", - "ring", "rustls-pki-types", "rustls-webpki", "subtle", @@ -5058,34 +5058,6 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" -[[package]] -name = "ureq" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dea7109cdcd5864d4eeb1b58a1648dc9bf520360d7af16ec26d0a9354bafcfc0" -dependencies = [ - "base64 0.22.1", - "log", - "percent-encoding", - "rustls", - "rustls-pki-types", - "ureq-proto", - "utf8-zero", - "webpki-roots", -] - -[[package]] -name = "ureq-proto" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e994ba84b0bd1b1b0cf92878b7ef898a5c1760108fe7b6010327e274917a808c" -dependencies = [ - "base64 0.22.1", - "http", - "httparse", - "log", -] - [[package]] name = "url" version = "2.5.8" @@ -5098,12 +5070,6 @@ dependencies = [ "serde", ] -[[package]] -name = "utf8-zero" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8c0a043c9540bae7c578c88f91dda8bd82e59ae27c21baca69c8b191aaf5a6e" - [[package]] name = "utf8_iter" version = "1.0.4" diff --git a/Cargo.toml b/Cargo.toml index 1a3a438..3c2b3d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ publish = false [dependencies] anyhow = "1.0.102" axum = "0.8.9" -bestool-canopy = "0.4.5" +bestool-canopy = "0.6.1" bestool-kopia = { version = "0.3.4", features = ["proxy"] } cronexpr = "1.5.0" futures = "0.3.31" diff --git a/operator.yaml b/operator.yaml index 24203a0..2636577 100644 --- a/operator.yaml +++ b/operator.yaml @@ -228,6 +228,13 @@ spec: # operators can pin to a specific tag. # - name: CANOPY_PROXY_IMAGE # value: ghcr.io/beyondessential/pgro-canopy-proxy:latest + # Path to the tailscale sidecar's LocalAPI socket + # (shared below via the tailscale-socket emptyDir), + # used to resolve the tailnet MagicDNS suffix for the + # `url` intent semantic. Defaults to the value below; + # set empty to disable replica-URL reporting. + # - name: PGRO_TAILSCALED_SOCKET + # value: /var/run/tailscale/tailscaled.sock ports: - name: http containerPort: 8080 @@ -259,6 +266,13 @@ spec: readOnlyRootFilesystem: true capabilities: drop: ["ALL"] + volumeMounts: + # Read the tailscale sidecar's LocalAPI socket to + # resolve the tailnet MagicDNS suffix (see the `url` + # intent semantic). + - name: tailscale-socket + mountPath: /var/run/tailscale + readOnly: true # Tailscale sidecar — the primary production network path # to canopy. bestool-canopy's CanopyClient auto-probes # canopy.tail53aef.ts.net; the SOCKS5 proxy on [::1]:1055 @@ -299,6 +313,17 @@ spec: limits: cpu: 200m memory: 256Mi + volumeMounts: + # Expose the LocalAPI socket to the controller container. + # containerboot symlinks the socket to this path. + - name: tailscale-socket + mountPath: /var/run/tailscale + volumes: + # Shares the tailscaled LocalAPI socket between the sidecar + # and the controller so the operator can query the tailnet + # MagicDNS suffix for the `url` semantic. + - name: tailscale-socket + emptyDir: {} --- # NetworkPolicy restricting the credential-broker port (9091) to Pods # labelled `pgro.bes.au/proxy-sidecar=true` — the label the canopy Job diff --git a/src/bin/canopy_proxy.rs b/src/bin/canopy_proxy.rs index acdf4de..31299b1 100644 --- a/src/bin/canopy_proxy.rs +++ b/src/bin/canopy_proxy.rs @@ -18,7 +18,7 @@ use std::{path::PathBuf, pin::Pin, process::ExitCode, sync::Arc, time::Duration}; -use bestool_canopy::BackupCredentials; +use bestool_canopy::schema::CredentialProcessOutput as BackupCredentials; use bestool_kopia::proxy::{self, BoxError, CredentialProvider, Credentials, S3ProxyConfig}; use jiff::{Timestamp, ToSpan}; use serde::Serialize; diff --git a/src/bin/operator.rs b/src/bin/operator.rs index b0f1fc0..4f52999 100644 --- a/src/bin/operator.rs +++ b/src/bin/operator.rs @@ -25,7 +25,7 @@ use postgres_restore_operator::{ Context, DEFAULT_CANOPY_PROXY_IMAGE, DEFAULT_DEPLOYMENT_READY_TIMEOUT_SECS, DEFAULT_KOPIA_IMAGE, }, - controllers::{self, canopy::intent::SUPPORTED as PGRO_SUPPORTED_INTENTS}, + controllers::{self, canopy::intent}, types::{PostgresPhysicalReplica, PostgresPhysicalRestore}, }; @@ -216,6 +216,14 @@ async fn main() -> anyhow::Result<()> { }); ctx.canopy_proxy_image = std::env::var("CANOPY_PROXY_IMAGE") .unwrap_or_else(|_| DEFAULT_CANOPY_PROXY_IMAGE.to_string()); + // Path to the tailscale sidecar's LocalAPI socket (shared via an emptyDir), + // used to resolve the tailnet MagicDNS suffix for the `url` semantic. + // Defaults to containerboot's fixed location; set empty to disable. + ctx.tailscaled_socket = match std::env::var("PGRO_TAILSCALED_SOCKET") { + Ok(s) if s.is_empty() => None, + Ok(s) => Some(s), + Err(_) => Some("/var/run/tailscale/tailscaled.sock".to_string()), + }; ctx.canopy_broker_base_url = if let Ok(url) = std::env::var("CANOPY_BROKER_BASE_URL") { url } else if let Ok(svc) = std::env::var("OPERATOR_SERVICE_NAME") { @@ -660,13 +668,15 @@ async fn register_capabilities(ctx: Arc) { let Some(canopy) = ctx.canopy.as_ref() else { return; }; + let descriptors = intent::descriptors(); + let intent_names: Vec<&str> = descriptors.iter().map(|d| d.intent.as_str()).collect(); let mut delay = Duration::from_secs(1); let max_delay = Duration::from_secs(300); for attempt in 1..=8u32 { - match canopy.restore_capabilities(PGRO_SUPPORTED_INTENTS).await { + match canopy.restore_capabilities(&descriptors).await { Ok(_) => { info!( - intents = ?PGRO_SUPPORTED_INTENTS, + intents = ?intent_names, "registered supported intents with canopy" ); return; diff --git a/src/canopy.rs b/src/canopy.rs index 8c85d37..857631b 100644 --- a/src/canopy.rs +++ b/src/canopy.rs @@ -7,7 +7,13 @@ //! the integration seam tests inject a stub at, and as the place to hang //! pgro-specific logging / retry / cache concerns later. -use bestool_canopy::{CanopyClient, RestoreCredentials, WorklistEntry, client_builder}; +use bestool_canopy::{ + CanopyClient, TAILSCALE_URL, + schema::{ + IntentDescriptor, RestoreCapabilitiesArgs, RestoreCredentials, RestoreCredentialsArgs, + VerificationArgs, WorklistEntry, + }, +}; use reqwest::Url; use uuid::Uuid; @@ -41,9 +47,8 @@ pub struct CanopyConfig { /// SOCKS proxy takes down both paths at once. async fn build_inner(cfg: &CanopyConfig) -> Result { let socks5 = cfg.socks5_proxy.clone(); - let version = env!("CARGO_PKG_VERSION").to_string(); let make_builder = move || { - let mut b = client_builder(&version); + let mut b = reqwest::Client::builder(); if !socks5.is_empty() { let socks5 = socks5.clone(); let proxy = reqwest::Proxy::custom(move |url| { @@ -58,8 +63,12 @@ async fn build_inner(cfg: &CanopyConfig) -> Result { b }; - let inner = CanopyClient::new( - env!("CARGO_PKG_VERSION"), + let tailscale_url: Url = TAILSCALE_URL + .parse() + .expect("bestool-canopy TAILSCALE_URL is a valid URL"); + let inner = CanopyClient::with_urls( + cfg.base_url.clone(), + tailscale_url, cfg.device_key_pem.as_deref(), make_builder, ) @@ -73,51 +82,49 @@ async fn build_inner(cfg: &CanopyConfig) -> Result { Ok(inner) } -/// pgro's canopy client wrapper. Holds the live `bestool_canopy::CanopyClient` -/// plus the public-mTLS base URL — the bestool client uses its own hardcoded -/// tailnet URL on the tailnet path; the base URL is the mTLS-leg fallback. +/// pgro's canopy client wrapper around the live `bestool_canopy::CanopyClient` +/// (which bakes in the mTLS base URL and the tailnet endpoint at construction). pub struct Client { inner: CanopyClient, - base_url: Url, } impl std::fmt::Debug for Client { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("canopy::Client") - .field("base_url", &self.base_url.as_str()) - .finish_non_exhaustive() + f.debug_struct("canopy::Client").finish_non_exhaustive() } } impl Client { /// Build a client from operator-level config. Returns `Ok(None)` if no - /// canopy integration is configured (no base URL set) — the operator + /// canopy integration is configured (no config provided) — the operator /// then runs in legacy-only mode. pub async fn from_config(cfg: Option) -> Result> { let Some(cfg) = cfg else { return Ok(None) }; let inner = build_inner(&cfg).await?; - Ok(Some(Self { - inner, - base_url: cfg.base_url, - })) + Ok(Some(Self { inner })) } - /// Register the intents this consumer supports. Replaces the registered - /// set wholesale (per canopy's semantics). - pub async fn restore_capabilities(&self, intents: &[&str]) -> Result<()> { + /// Register the intent descriptors this consumer supports. Replaces the + /// registered set wholesale (per canopy's semantics). Each descriptor + /// carries the intent name, the canopy semantics it opts into, and its + /// typed parameter schema. + pub async fn restore_capabilities(&self, intents: &[IntentDescriptor]) -> Result<()> { + let body = RestoreCapabilitiesArgs { + intents: intents.to_vec(), + }; self.inner - .restore_capabilities(&self.base_url, intents) + .restore_capabilities(&body) .await - .map_err(|err| Error::Canopy(format!("restore_capabilities: {err:?}"))) + .map_err(|err| Error::Canopy(format!("restore_capabilities: {err}"))) } /// Fetch the consumer's desired-state worklist. Each entry is one /// concrete replica to maintain. pub async fn worklist(&self) -> Result> { self.inner - .restore_worklist(&self.base_url) + .restore_worklist() .await - .map_err(|err| Error::Canopy(format!("restore_worklist: {err:?}"))) + .map_err(|err| Error::Canopy(format!("restore_worklist: {err}"))) } /// Fetch short-lived read-only STS creds plus the repo password for a @@ -127,64 +134,28 @@ impl Client { backup_type: &str, group: Uuid, ) -> Result { - self.inner - .restore_credentials(&self.base_url, backup_type, group) - .await - .map_err(|err| { - Error::Canopy(format!( - "restore_credentials({backup_type}, {group}): {err:?}" - )) - }) + let body = RestoreCredentialsArgs { + group, + type_: backup_type.to_string(), + }; + self.inner.restore_credentials(&body).await.map_err(|err| { + Error::Canopy(format!( + "restore_credentials({backup_type}, {group}): {err}" + )) + }) } - /// Report a restore outcome (signal 3, restore-verification). - /// - /// `body` is the typed [`bestool_canopy::schema::VerificationArgs`] - /// (generated from canopy's OpenAPI) — including the free-form - /// `health_details` the hand-written wire type doesn't carry. Sent to - /// `POST /restore-verification` via the generic request escape hatch; a - /// non-2xx response is an error carrying the status + body. - pub async fn restore_verification_typed( - &self, - body: &(impl serde::Serialize + ?Sized), - ) -> Result<()> { - let resp = self - .inner - .request( - bestool_canopy::reqwest::Method::POST, - &self.base_url, - "/restore-verification", - ) - .await - .map_err(|err| Error::Canopy(format!("restore_verification request: {err:?}")))? - .json(body) - .send() + /// Report a restore outcome (signal 3, restore-verification). `args` is + /// the typed [`bestool_canopy::schema::VerificationArgs`], including the + /// free-form `health_details`. + pub async fn restore_verification_typed(&self, args: &VerificationArgs) -> Result<()> { + self.inner + .restore_verification(args) .await - .map_err(|err| Error::Canopy(format!("restore_verification send: {err:?}")))?; - let status = resp.status(); - if !status.is_success() { - let text = resp.text().await.unwrap_or_default(); - return Err(Error::Canopy(format!( - "restore_verification returned {status}: {text}" - ))); - } - Ok(()) - } - - /// Direct access to the public-mTLS base URL the client is configured - /// against. The tailnet path uses its own hardcoded URL inside - /// `bestool-canopy`. - pub fn base_url(&self) -> &Url { - &self.base_url + .map_err(|err| Error::Canopy(format!("restore_verification: {err}"))) } } -/// Re-export the wire types pgro consumes verbatim from `bestool-canopy`. -pub use bestool_canopy::{ - BackupCredentials as Credentials, Outcome, RestoreCredentials as CredsResponse, - RestoreVerification as Verification, WorklistEntry as Entry, -}; - #[cfg(test)] mod tests { use super::*; diff --git a/src/context.rs b/src/context.rs index 8568f14..a8c7bbf 100644 --- a/src/context.rs +++ b/src/context.rs @@ -59,6 +59,13 @@ pub struct Context { /// Unix timestamp of the last successful entry into a reconcile function. /// Used by `/livez` to detect a stuck reconciliation loop. pub last_reconcile: Arc, + /// Filesystem path to the tailscale sidecar's LocalAPI unix socket, used + /// to look up the tailnet MagicDNS suffix for the `url` semantic. `None` + /// disables URL reporting. Set at startup from `PGRO_TAILSCALED_SOCKET`. + pub tailscaled_socket: Option, + /// Cached tailnet MagicDNS suffix. The suffix is constant per tailnet, so + /// it's fetched once from the LocalAPI and reused. + pub magic_dns_suffix: Arc>>, } impl Context { @@ -91,9 +98,24 @@ impl Context { callback_base_url, deployment_ready_timeout_secs, last_reconcile: Arc::new(AtomicI64::new(Timestamp::now().as_second())), + tailscaled_socket: None, + magic_dns_suffix: Arc::new(tokio::sync::RwLock::new(None)), } } + /// The tailnet MagicDNS suffix, fetched from the tailscale sidecar's + /// LocalAPI socket and cached. `None` when no socket is configured or the + /// lookup fails — callers then omit the replica URL. + pub async fn magic_dns_suffix(&self) -> Option { + if let Some(cached) = self.magic_dns_suffix.read().await.clone() { + return Some(cached); + } + let socket = self.tailscaled_socket.as_deref()?; + let suffix = crate::tailscale::magic_dns_suffix(socket).await?; + *self.magic_dns_suffix.write().await = Some(suffix.clone()); + Some(suffix) + } + pub fn max_concurrent_restores(&self) -> usize { self.max_concurrent_restores.load(Ordering::Relaxed) } diff --git a/src/controllers/canopy.rs b/src/controllers/canopy.rs index cc1cb0c..a226de5 100644 --- a/src/controllers/canopy.rs +++ b/src/controllers/canopy.rs @@ -17,7 +17,7 @@ use std::{ time::Duration, }; -use bestool_canopy::WorklistEntry; +use bestool_canopy::schema::WorklistEntry; use futures::stream::{self, StreamExt}; use k8s_openapi::{ ByteString, @@ -260,7 +260,7 @@ async fn reconcile_entry(ctx: &Context, ns_name: &str, entry: &WorklistEntry) -> return Ok(()); }; let creds = canopy - .restore_credentials(&entry.r#type.to_string(), entry.group_id) + .restore_credentials(&entry.type_, entry.group_id) .await?; ensure_canopy_creds_secret(ctx, ns_name, entry, &creds.repo_password.0).await?; @@ -276,7 +276,7 @@ async fn ensure_namespace(ctx: &Context, ns_name: &str, entry: &WorklistEntry) - labels_map.insert(labels::DECLARATION_ID.into(), entry.replica_id.to_string()); labels_map.insert(labels::GROUP.into(), entry.group_id.to_string()); labels_map.insert(labels::SERVER.into(), entry.server_id.to_string()); - labels_map.insert(labels::TYPE.into(), entry.r#type.to_string()); + labels_map.insert(labels::TYPE.into(), entry.type_.clone()); labels_map.insert(labels::INTENT.into(), entry.intent.to_string()); let ns = Namespace { @@ -383,7 +383,7 @@ async fn ensure_replica_cr( labels_map.insert(labels::DECLARATION_ID.into(), entry.replica_id.to_string()); labels_map.insert(labels::GROUP.into(), entry.group_id.to_string()); labels_map.insert(labels::SERVER.into(), entry.server_id.to_string()); - labels_map.insert(labels::TYPE.into(), entry.r#type.to_string()); + labels_map.insert(labels::TYPE.into(), entry.type_.clone()); labels_map.insert(labels::INTENT.into(), entry.intent.to_string()); let cr = replica_cr(CR_NAME, ns_name, labels_map, spec); @@ -478,6 +478,7 @@ mod tests { "type": "tamanu-postgres", "intent": "verify", "name": name, + "params": {}, "snapshot_id": null, "snapshot_at": null, "storage": "s3", diff --git a/src/controllers/canopy/intent.rs b/src/controllers/canopy/intent.rs index 9fb13a6..fea7ffe 100644 --- a/src/controllers/canopy/intent.rs +++ b/src/controllers/canopy/intent.rs @@ -1,57 +1,162 @@ //! Intent-driven configuration for canopy-backed replicas. //! -//! Each canopy `WorklistEntry` carries an `intent` string. pgro registers -//! its supported intents at startup ([`SUPPORTED`]) so canopy only -//! dispatches entries it can handle. On each tick, the syncer looks up the -//! entry's intent in [`config_for`] and calls -//! [`IntentConfig::to_replica_spec`] to materialise a +//! Each canopy `WorklistEntry` carries an `intent` string plus resolved +//! `params`. pgro advertises its intents as [`descriptors`] at startup — +//! each a name, human description, the canopy `semantics` it opts into, and +//! a typed parameter schema — so canopy only dispatches entries pgro can +//! handle and collects the right parameters when an operator declares one. +//! +//! On each tick the syncer looks up the entry's intent in [`config_for`] for +//! the fixed per-intent bits, then [`IntentConfig::to_replica_spec`] merges +//! the entry's resolved params on top to materialise a //! `PostgresPhysicalReplicaSpec` — the CR path takes over from there. //! -//! Adding an intent is a two-step edit: -//! 1. Add a new [`IntentConfig`] entry to [`config_for`]. -//! 2. Extend [`SUPPORTED`] with the new intent name. +//! The single parametrised `analytics` intent covers what used to be +//! `analytics-dev` (no persistent schemas) and `analytics-dbt` +//! (`persistent_schemas=dbt`, exposed): the difference is now operator-set +//! parameters, not distinct intents. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; -use bestool_canopy::WorklistEntry; +use bestool_canopy::schema::{ + BTreeMap as ParamSchema, BTreeMapValue as ParamSpec, IntentDescriptor, ParamType, WorklistEntry, +}; use jiff::Span; use k8s_openapi::{ api::core::v1::{ResourceRequirements, SecretReference}, apimachinery::pkg::api::resource::Quantity, }; +use serde_json::{Map, Value, json}; use crate::{ types::{CanopySource, PostgresPhysicalReplicaSpec}, util::TimeSpan, }; -/// pgro-supported intent names, registered with canopy at operator startup. -/// Canopy only dispatches worklist entries whose intent appears here. -pub const SUPPORTED: &[&str] = &["verify", "analytics-dev", "analytics-dbt"]; +/// Canopy semantics pgro's intents opt into (see the RST spec). Carried as +/// plain strings; canopy acts only on the ones it recognises. +mod semantics { + /// The intent produces restore-health feedback (canopy expects a report + /// and holds it to the overdue bound). + pub const CHECK: &str = "check"; + /// A given snapshot is dispatched at most once; canopy self-suppresses + /// the worklist entry once the snapshot has a healthy report. + pub const ONCE: &str = "once"; + /// The health report carries a link to the running replica, which canopy + /// surfaces to operators. + pub const URL: &str = "url"; +} + +/// Names of the parameters the `analytics` intent advertises. Shared between +/// the descriptor (what canopy collects) and [`IntentConfig::to_replica_spec`] +/// (what pgro reads back) so the two can't drift. +pub mod params { + /// `duration` — minimum time between restores of this replica. + pub const MINIMUM_TTL: &str = "minimum_ttl"; + /// `duration` — grace period before tearing down the old restore on + /// switchover. + pub const SWITCHOVER_GRACE: &str = "switchover_grace"; + /// `bytes` — cap on the restore PVC size. + pub const STORAGE_SIZE_MAXIMUM: &str = "storage_size_maximum"; + /// `text` — comma-separated schemas migrated into the restore and kept + /// across restores (the dbt workload). Empty/unset = a plain replica. + pub const PERSISTENT_SCHEMAS: &str = "persistent_schemas"; + /// `boolean` — expose the replica on the tailnet and report its URL. + pub const EXPOSE: &str = "expose"; +} + +/// Default minimum TTL for `analytics` replicas when the operator leaves the +/// param unset (2 hours, in seconds). +const DEFAULT_ANALYTICS_MINIMUM_TTL_SECS: i64 = 7200; +/// Default switchover grace for `analytics` replicas (2 minutes, in seconds). +const DEFAULT_ANALYTICS_SWITCHOVER_GRACE_SECS: i64 = 120; + +fn param(type_: ParamType, default: Option) -> ParamSpec { + ParamSpec { type_, default } +} + +/// The `analytics` intent's parameter schema (name → typed spec + default). +fn analytics_param_schema() -> ParamSchema { + ParamSchema(HashMap::from([ + ( + params::MINIMUM_TTL.to_string(), + param( + ParamType::Duration, + Some(json!(DEFAULT_ANALYTICS_MINIMUM_TTL_SECS)), + ), + ), + ( + params::SWITCHOVER_GRACE.to_string(), + param( + ParamType::Duration, + Some(json!(DEFAULT_ANALYTICS_SWITCHOVER_GRACE_SECS)), + ), + ), + ( + params::STORAGE_SIZE_MAXIMUM.to_string(), + param(ParamType::Bytes, None), + ), + ( + params::PERSISTENT_SCHEMAS.to_string(), + param(ParamType::Text, None), + ), + ( + params::EXPOSE.to_string(), + param(ParamType::Boolean, Some(json!(false))), + ), + ])) +} -/// Intent-derived spec fragments merged onto the base replica spec. +/// The intent descriptors pgro advertises to canopy at startup. Canopy stores +/// them and offers the intents (with descriptions and parameter fields) to +/// operators, dispatches only these, and applies the semantics' behaviours. +pub fn descriptors() -> Vec { + vec![ + IntentDescriptor { + intent: "verify".to_string(), + description: Some( + "Restore the snapshot to prove it is restorable, then discard it.".to_string(), + ), + semantics: vec![semantics::CHECK.to_string(), semantics::ONCE.to_string()], + params: None, + }, + IntentDescriptor { + intent: "analytics".to_string(), + description: Some( + "Keep a long-lived read-only query replica restored from the latest snapshot." + .to_string(), + ), + semantics: vec![semantics::CHECK.to_string(), semantics::URL.to_string()], + params: Some(analytics_param_schema()), + }, + ] +} + +/// Fixed (non-parametrised) spec fragments for a supported intent, plus the +/// defaults used when a parametrised field is left unset by the operator. #[derive(Debug, Clone)] pub struct IntentConfig { pub resources: Option, pub read_only: bool, + /// Default when the `minimum_ttl` param is unset. pub minimum_ttl: Option, - pub persistent_schemas: Option>, - /// Service annotations. `{name}` in a value is substituted with the - /// worklist entry's `name` at materialisation time. - pub service_annotations: Option>, + /// Default when the `switchover_grace` param is unset. pub switchover_grace_period: TimeSpan, + /// Default when the `persistent_schemas` param is unset. + pub persistent_schemas: Option>, pub storage_size_override: Quantity, + /// Default when the `storage_size_maximum` param is unset. + pub storage_size_maximum: Quantity, /// Tear the restore down once it's verified healthy rather than keeping /// it running. Materialised into `PostgresPhysicalReplicaSpec.ephemeral`. - /// True for `verify` (throwaway snapshot check), false for the - /// analytics intents (long-lived query replicas). + /// True for `verify` (throwaway snapshot check), false for `analytics` + /// (long-lived query replica). pub ephemeral: bool, /// Floor on the postgres pod's `/dev/shm` sizing. Materialised into - /// `PostgresPhysicalReplicaSpec.shm_size_floor` so the shared - /// Deployment builder picks `max(computed_from_resources, floor)`. - /// Analytics workloads (`analytics-dev` / `analytics-dbt`) want a - /// higher shm than what a 2 GiB memory request would derive, without - /// paying the k8s scheduling cost of bumping the request. + /// `PostgresPhysicalReplicaSpec.shm_size_floor` so the shared Deployment + /// builder picks `max(computed_from_resources, floor)`. Analytics + /// workloads want a higher shm than a 2 GiB memory request derives, + /// without paying the k8s scheduling cost of bumping the request. pub shm_size_floor: Quantity, } @@ -70,48 +175,35 @@ fn resources(cpu_req: &str, mem_req: &str, cpu_lim: &str, mem_lim: &str) -> Reso } /// Look up the fixed configuration for a supported intent name. Returns -/// `None` for unsupported intents — canopy shouldn't dispatch these -/// because they aren't in [`SUPPORTED`], but callers must still handle -/// the possibility (e.g. a worklist entry sneaking through during an -/// operator downgrade). +/// `None` for unsupported intents — canopy shouldn't dispatch these because +/// they aren't advertised in [`descriptors`], but callers must still handle +/// the possibility (e.g. a worklist entry sneaking through during an operator +/// downgrade). pub fn config_for(intent: &str) -> Option { match intent { "verify" => Some(IntentConfig { resources: Some(resources("250m", "512Mi", "2", "2Gi")), read_only: true, minimum_ttl: None, - persistent_schemas: None, - service_annotations: None, switchover_grace_period: TimeSpan(Span::new().minutes(5)), + persistent_schemas: None, storage_size_override: Quantity("20Gi".to_string()), + storage_size_maximum: Quantity("2Ti".to_string()), ephemeral: true, shm_size_floor: Quantity("512Mi".to_string()), }), - "analytics-dev" => Some(IntentConfig { + "analytics" => Some(IntentConfig { resources: Some(resources("500m", "2Gi", "4", "8Gi")), read_only: true, - minimum_ttl: None, + minimum_ttl: Some(TimeSpan( + Span::new().seconds(DEFAULT_ANALYTICS_MINIMUM_TTL_SECS), + )), + switchover_grace_period: TimeSpan( + Span::new().seconds(DEFAULT_ANALYTICS_SWITCHOVER_GRACE_SECS), + ), persistent_schemas: None, - service_annotations: None, - switchover_grace_period: TimeSpan(Span::new().minutes(5)), - storage_size_override: Quantity("50Gi".to_string()), - ephemeral: false, - shm_size_floor: Quantity("2Gi".to_string()), - }), - "analytics-dbt" => Some(IntentConfig { - resources: Some(resources("500m", "2Gi", "4", "8Gi")), - read_only: true, - minimum_ttl: Some(TimeSpan(Span::new().hours(2))), - persistent_schemas: Some(vec!["dbt".to_string()]), - service_annotations: Some(BTreeMap::from([ - ("tailscale.com/expose".to_string(), "true".to_string()), - ( - "tailscale.com/hostname".to_string(), - "infra-replica-{name}".to_string(), - ), - ])), - switchover_grace_period: TimeSpan(Span::new().minutes(2)), storage_size_override: Quantity("50Gi".to_string()), + storage_size_maximum: Quantity("2Ti".to_string()), ephemeral: false, shm_size_floor: Quantity("2Gi".to_string()), }), @@ -119,51 +211,90 @@ pub fn config_for(intent: &str) -> Option { } } -/// Substitute `{name}` in each value with `entry_name`. Other braces are -/// left alone; there's only one placeholder in current use. -fn substitute_annotations( - base: BTreeMap, - entry_name: &str, -) -> BTreeMap { - base.into_iter() - .map(|(k, v)| (k, v.replace("{name}", entry_name))) - .collect() +/// Tailscale Service annotations exposing the replica on the tailnet under +/// the deterministic hostname `infra-replica-{entry_name}`. +fn expose_annotations(entry_name: &str) -> BTreeMap { + BTreeMap::from([ + ("tailscale.com/expose".to_string(), "true".to_string()), + ( + "tailscale.com/hostname".to_string(), + exposed_hostname(entry_name), + ), + ]) +} + +/// The tailnet hostname a replica is exposed under (without the MagicDNS +/// suffix). Shared with the verification reporter so the reported URL matches +/// what the Service annotation requests. +pub fn exposed_hostname(entry_name: &str) -> String { + format!("infra-replica-{entry_name}") +} + +fn param_i64(params: &Map, key: &str) -> Option { + params.get(key).and_then(Value::as_i64) +} + +fn param_bool(params: &Map, key: &str) -> Option { + params.get(key).and_then(Value::as_bool) +} + +fn param_str<'a>(params: &'a Map, key: &str) -> Option<&'a str> { + params.get(key).and_then(Value::as_str) +} + +/// Whether the entry's `expose` param is set true. +pub fn is_exposed(entry: &WorklistEntry) -> bool { + param_bool(&entry.params, params::EXPOSE).unwrap_or(false) } impl IntentConfig { /// Materialise a `PostgresPhysicalReplicaSpec` for a canopy-managed - /// replica. The syncer patches the CR with this spec on Provision / - /// re-asserts it on subsequent ticks so drift from manual edits is - /// self-healing. + /// replica, merging the entry's resolved params over the intent defaults. + /// The syncer patches the CR with this spec on Provision / re-asserts it + /// on subsequent ticks so drift from manual edits is self-healing. pub fn to_replica_spec( &self, entry: &WorklistEntry, notifications: Vec, ) -> PostgresPhysicalReplicaSpec { + let p = &entry.params; + + let minimum_ttl = param_i64(p, params::MINIMUM_TTL) + .map(|secs| TimeSpan(Span::new().seconds(secs))) + .or(self.minimum_ttl); + let switchover_grace_period = param_i64(p, params::SWITCHOVER_GRACE) + .map(|secs| TimeSpan(Span::new().seconds(secs))) + .unwrap_or(self.switchover_grace_period); + let storage_size_maximum = param_i64(p, params::STORAGE_SIZE_MAXIMUM) + .map(|bytes| Quantity(bytes.to_string())) + .unwrap_or_else(|| self.storage_size_maximum.clone()); + let persistent_schemas = param_str(p, params::PERSISTENT_SCHEMAS) + .map(parse_persistent_schemas) + .filter(|schemas| !schemas.is_empty()) + .or_else(|| self.persistent_schemas.clone()); + let service_annotations = is_exposed(entry).then(|| expose_annotations(&entry.name)); + PostgresPhysicalReplicaSpec { kopia_secret_ref: None, canopy_source: Some(CanopySource { group: entry.group_id.to_string(), - r#type: entry.r#type.to_string(), + r#type: entry.type_.to_string(), }), snapshot_filter: None, - // Long cadence — the actual restore trigger on the canopy - // path is a change to `status.canopyDesiredSnapshotId` - // written by the worklist syncer. The cron is a - // belt-and-braces fallback (e.g. missed status watch). + // Long cadence — the actual restore trigger on the canopy path is + // a change to `status.canopyDesiredSnapshotId` written by the + // worklist syncer. The cron is a belt-and-braces fallback (e.g. + // missed status watch). schedule: "H * * * *".to_string(), schedule_jitter: TimeSpan(Span::new().minutes(10)), - minimum_ttl: self.minimum_ttl, - switchover_grace_period: self.switchover_grace_period, + minimum_ttl, + switchover_grace_period, analytics_username: "analytics".to_string(), storage_class: None, storage_size_override: Some(self.storage_size_override.clone()), resources: self.resources.clone(), shm_size_floor: Some(self.shm_size_floor.clone()), - service_annotations: self - .service_annotations - .clone() - .map(|a| substitute_annotations(a, &entry.name)), + service_annotations, pod_annotations: None, affinity: None, tolerations: Vec::new(), @@ -171,14 +302,14 @@ impl IntentConfig { ephemeral: self.ephemeral, postgres_extra_config: None, notifications, - persistent_schemas: self.persistent_schemas.clone(), - storage_size_maximum: Quantity("2Ti".to_string()), + persistent_schemas, + storage_size_maximum, } } - /// Name of the namespace-local Secret the canopy syncer materialises - /// with the worklist entry's bucket / region / prefix / repo password - /// + dummy AWS keys. `build_restore_job` mounts it via env_from_secret. + /// Name of the namespace-local Secret the canopy syncer materialises with + /// the worklist entry's bucket / region / prefix / repo password + dummy + /// AWS keys. `build_restore_job` mounts it via env_from_secret. pub fn canopy_creds_secret_name(replica_name: &str) -> String { format!("{replica_name}-canopy-creds") } @@ -193,21 +324,32 @@ impl IntentConfig { } } +/// Split a comma-separated `persistent_schemas` param into trimmed, +/// non-empty schema names. +fn parse_persistent_schemas(raw: &str) -> Vec { + raw.split(',') + .map(str::trim) + .filter(|s| !s.is_empty()) + .map(str::to_string) + .collect() +} + #[cfg(test)] mod tests { use super::*; - use uuid::Uuid; - fn entry(intent: &str, name: &str) -> WorklistEntry { - serde_json::from_value(serde_json::json!({ - "replica_id": Uuid::new_v4().to_string(), - "group_id": Uuid::new_v4().to_string(), - "server_id": Uuid::new_v4().to_string(), + fn entry(intent: &str, name: &str, params: Value) -> WorklistEntry { + serde_json::from_value(json!({ + "replica_id": "11111111-1111-1111-1111-111111111111", + "group_id": "22222222-2222-2222-2222-222222222222", + "server_id": "33333333-3333-3333-3333-333333333333", "type": "tamanu-postgres", "intent": intent, "name": name, + "params": params, "snapshot_id": "abc123", "snapshot_at": "2026-07-01T00:00:00Z", + "overdue_after_seconds": null, "storage": "s3", "bucket": "canopy-test", "prefix": "", @@ -217,78 +359,156 @@ mod tests { } #[test] - fn config_for_verify() { - let cfg = config_for("verify").expect("verify is supported"); - assert!(cfg.read_only); - assert!(cfg.minimum_ttl.is_none()); - assert!(cfg.persistent_schemas.is_none()); - assert!(cfg.ephemeral, "verify replicas are torn down after verify"); - } + fn descriptors_advertise_expected_intents_and_semantics() { + let ds = descriptors(); + let names: Vec<&str> = ds.iter().map(|d| d.intent.as_str()).collect(); + assert_eq!(names, ["verify", "analytics"]); - #[test] - fn only_verify_is_ephemeral() { - assert!(config_for("verify").unwrap().ephemeral); - assert!( - !config_for("analytics-dev").unwrap().ephemeral, - "analytics-dev is a long-lived query replica" + let verify = &ds[0]; + assert_eq!(verify.semantics, ["check", "once"]); + assert!(verify.params.is_none(), "verify takes no params"); + assert!(verify.description.is_some()); + + let analytics = &ds[1]; + assert_eq!(analytics.semantics, ["check", "url"]); + let params = analytics + .params + .as_ref() + .expect("analytics is parametrised"); + // Every advertised param is present with its declared type. + assert_eq!( + params.get(params::MINIMUM_TTL).unwrap().type_, + ParamType::Duration ); - assert!( - !config_for("analytics-dbt").unwrap().ephemeral, - "analytics-dbt is a long-lived query replica" + assert_eq!( + params.get(params::SWITCHOVER_GRACE).unwrap().type_, + ParamType::Duration + ); + assert_eq!( + params.get(params::STORAGE_SIZE_MAXIMUM).unwrap().type_, + ParamType::Bytes + ); + assert_eq!( + params.get(params::PERSISTENT_SCHEMAS).unwrap().type_, + ParamType::Text + ); + assert_eq!( + params.get(params::EXPOSE).unwrap().type_, + ParamType::Boolean + ); + // Only the params pgro actually acts on are advertised. + assert_eq!(params.len(), 5); + assert!(params.get("anonymise").is_none()); + // Defaults the sketch specified. + assert_eq!( + params.get(params::MINIMUM_TTL).unwrap().default, + Some(json!(7200)) + ); + assert_eq!( + params.get(params::SWITCHOVER_GRACE).unwrap().default, + Some(json!(120)) + ); + assert_eq!( + params.get(params::EXPOSE).unwrap().default, + Some(json!(false)) + ); + // A `bytes` cap with no default is sent as null when unset. + assert_eq!( + params.get(params::STORAGE_SIZE_MAXIMUM).unwrap().default, + None ); } #[test] - fn to_replica_spec_carries_ephemeral() { - let e = entry("verify", "test"); - assert!( - config_for("verify") - .unwrap() - .to_replica_spec(&e, vec![]) - .ephemeral - ); - let e = entry("analytics-dev", "test"); + fn descriptors_serialise_to_canopy_wire_shape() { + // This is the actual `POST /restore-capabilities` body contract, so + // assert the on-the-wire JSON, not just the Rust field values (catches + // serde renames / a non-transparent params map). + let ds = descriptors(); + let v = serde_json::to_value(&ds).unwrap(); + assert_eq!(v[0]["intent"], "verify"); + assert_eq!(v[0]["semantics"], json!(["check", "once"])); + assert!(v[0]["params"].is_null(), "verify has no params"); + + assert_eq!(v[1]["intent"], "analytics"); + assert_eq!(v[1]["semantics"], json!(["check", "url"])); + // params is a flat object of name -> { type, default? }. + assert_eq!(v[1]["params"]["minimum_ttl"]["type"], "duration"); + assert_eq!(v[1]["params"]["minimum_ttl"]["default"], 7200); + assert_eq!(v[1]["params"]["storage_size_maximum"]["type"], "bytes"); + assert_eq!(v[1]["params"]["persistent_schemas"]["type"], "text"); + assert_eq!(v[1]["params"]["expose"]["type"], "boolean"); + // a `bytes` param with no default omits the key entirely. assert!( - !config_for("analytics-dev") - .unwrap() - .to_replica_spec(&e, vec![]) - .ephemeral + v[1]["params"]["storage_size_maximum"] + .get("default") + .is_none() ); } #[test] - fn config_for_analytics_dbt_has_all_extras() { - let cfg = config_for("analytics-dbt").expect("analytics-dbt is supported"); - assert!(cfg.minimum_ttl.is_some()); - assert_eq!( - cfg.persistent_schemas.as_deref(), - Some(&["dbt".to_string()][..]) + fn config_for_known_and_unknown() { + assert!(config_for("verify").unwrap().ephemeral); + assert!(!config_for("analytics").unwrap().ephemeral); + assert!( + config_for("analytics-dev").is_none(), + "old split names retired" ); - assert!(cfg.service_annotations.is_some()); + assert!(config_for("analytics-dbt").is_none()); + assert!(config_for("").is_none()); } #[test] - fn config_for_unknown_intent() { - assert!(config_for("disaster-recovery").is_none()); - assert!(config_for("").is_none()); + fn verify_spec_uses_defaults_and_is_ephemeral() { + let spec = config_for("verify") + .unwrap() + .to_replica_spec(&entry("verify", "site", json!({})), vec![]); + assert!(spec.ephemeral); + assert!(spec.minimum_ttl.is_none()); + assert!(spec.persistent_schemas.is_none()); + assert!(spec.service_annotations.is_none()); + assert_eq!(spec.storage_size_maximum.0, "2Ti"); } #[test] - fn supported_names_all_resolve() { - for name in SUPPORTED { - assert!( - config_for(name).is_some(), - "SUPPORTED lists {name} but config_for returned None" - ); - } + fn analytics_spec_defaults_when_params_unset() { + let spec = config_for("analytics") + .unwrap() + .to_replica_spec(&entry("analytics", "site", json!({})), vec![]); + assert!(!spec.ephemeral); + // Defaults from the intent config (2h TTL, 2m grace). + assert!(spec.minimum_ttl.is_some()); + assert!(spec.persistent_schemas.is_none()); + assert!( + spec.service_annotations.is_none(), + "unset expose = not exposed" + ); + assert_eq!(spec.storage_size_maximum.0, "2Ti"); } #[test] - fn to_replica_spec_substitutes_name_in_service_annotations() { - let cfg = config_for("analytics-dbt").unwrap(); - let e = entry("analytics-dbt", "example-site"); - let spec = cfg.to_replica_spec(&e, vec![]); - let annos = spec.service_annotations.expect("dbt has annotations"); + fn analytics_dbt_via_params() { + // The old analytics-dbt: persistent schema + exposed + size cap. + let spec = config_for("analytics").unwrap().to_replica_spec( + &entry( + "analytics", + "example-site", + json!({ + "persistent_schemas": "dbt", + "storage_size_maximum": 107374182400i64, + "expose": true, + }), + ), + vec![], + ); + assert_eq!( + spec.persistent_schemas.as_deref(), + Some(&["dbt".to_string()][..]) + ); + assert_eq!(spec.storage_size_maximum.0, "107374182400"); + let annos = spec + .service_annotations + .expect("expose=true sets annotations"); assert_eq!( annos.get("tailscale.com/hostname").map(String::as_str), Some("infra-replica-example-site") @@ -300,25 +520,62 @@ mod tests { } #[test] - fn to_replica_spec_sets_canopy_source_from_entry() { - let cfg = config_for("verify").unwrap(); - let e = entry("verify", "test"); - let spec = cfg.to_replica_spec(&e, vec![]); - assert!(spec.kopia_secret_ref.is_none()); - let cs = spec.canopy_source.expect("canopy_source must be set"); - assert_eq!(cs.r#type, "tamanu-postgres"); - assert_eq!(cs.group, e.group_id.to_string()); + fn duration_params_map_to_seconds() { + let spec = config_for("analytics").unwrap().to_replica_spec( + &entry( + "analytics", + "site", + json!({ "minimum_ttl": 3600, "switchover_grace": 30 }), + ), + vec![], + ); + let ttl = spec.minimum_ttl.expect("minimum_ttl set from param"); + assert_eq!(ttl.0.get_seconds(), 3600); + assert_eq!(spec.switchover_grace_period.0.get_seconds(), 30); } #[test] - fn to_replica_spec_dbt_carries_migration_settings() { - let cfg = config_for("analytics-dbt").unwrap(); - let e = entry("analytics-dbt", "test"); - let spec = cfg.to_replica_spec(&e, vec![]); - assert!(spec.minimum_ttl.is_some()); + fn persistent_schemas_parsing() { + assert_eq!(parse_persistent_schemas("dbt"), vec!["dbt"]); assert_eq!( - spec.persistent_schemas.as_deref(), - Some(&["dbt".to_string()][..]) + parse_persistent_schemas("dbt, reporting , analytics"), + vec!["dbt", "reporting", "analytics"] + ); + assert!(parse_persistent_schemas("").is_empty()); + assert!(parse_persistent_schemas(" , ").is_empty()); + } + + #[test] + fn empty_persistent_schemas_param_is_treated_as_unset() { + let spec = config_for("analytics").unwrap().to_replica_spec( + &entry("analytics", "site", json!({ "persistent_schemas": "" })), + vec![], ); + assert!(spec.persistent_schemas.is_none()); + } + + #[test] + fn is_exposed_reads_param() { + assert!(is_exposed(&entry( + "analytics", + "s", + json!({ "expose": true }) + ))); + assert!(!is_exposed(&entry( + "analytics", + "s", + json!({ "expose": false }) + ))); + assert!(!is_exposed(&entry("analytics", "s", json!({})))); + } + + #[test] + fn to_replica_spec_sets_canopy_source_from_entry() { + let spec = config_for("verify") + .unwrap() + .to_replica_spec(&entry("verify", "site", json!({})), vec![]); + assert!(spec.kopia_secret_ref.is_none()); + let cs = spec.canopy_source.expect("canopy_source must be set"); + assert_eq!(cs.r#type, "tamanu-postgres"); } } diff --git a/src/controllers/canopy/verification.rs b/src/controllers/canopy/verification.rs index 71c67c0..6120a32 100644 --- a/src/controllers/canopy/verification.rs +++ b/src/controllers/canopy/verification.rs @@ -5,7 +5,7 @@ //! loop. This module owns signal 3 — one function called at each terminal //! transition (switchover success, restore failure). -use bestool_canopy::{Outcome, schema::VerificationArgs}; +use bestool_canopy::schema::{RunOutcome, VerificationArgs}; use jiff::Timestamp; use k8s_openapi::api::core::v1::Secret; use kube::{Api, ResourceExt}; @@ -38,7 +38,7 @@ pub async fn report( ctx: &Context, replica: &PostgresPhysicalReplica, restore: &PostgresPhysicalRestore, - outcome: Outcome, + outcome: RunOutcome, error: Option<&str>, ) { if replica.spec.canopy_source.is_none() { @@ -107,11 +107,18 @@ pub async fn report( .as_ref() .and_then(|s| s.postgres_version.clone()); - let replica_healthy = matches!(outcome, Outcome::Success); + let replica_healthy = matches!(outcome, RunOutcome::Success); // Gather health details; send None rather than an empty object when // nothing was gathered (e.g. failure path, postgres unreachable). - let health = gather_health_details(ctx, replica, restore).await; + let mut health = gather_health_details(ctx, replica, restore).await; + // `url` semantic: for a replica exposed on the tailnet, attach a link to + // it so canopy can surface it to operators alongside the report. + if let Some(url) = exposed_replica_url(ctx, replica).await + && let Some(obj) = health.as_object_mut() + { + obj.insert("url".to_string(), json!(url)); + } let health_details = health .as_object() .is_some_and(|m| !m.is_empty()) @@ -158,11 +165,25 @@ pub async fn report( /// Wire string canopy expects for the `outcome` field (matches the /// lowercase serialization of `bestool_canopy::Outcome`). -fn outcome_wire(outcome: Outcome) -> &'static str { +fn outcome_wire(outcome: RunOutcome) -> &'static str { match outcome { - Outcome::Success => "success", - Outcome::Failure => "failure", + RunOutcome::Success => "success", + RunOutcome::Failure => "failure", + } +} + +/// The tailnet URL of a replica exposed via the `url` semantic, or `None` if +/// it isn't exposed or the MagicDNS suffix can't be resolved. The hostname is +/// read from the Service's `tailscale.com/hostname` annotation the intent set, +/// so the reported URL matches exactly what tailscale publishes. +async fn exposed_replica_url(ctx: &Context, replica: &PostgresPhysicalReplica) -> Option { + let annotations = replica.spec.service_annotations.as_ref()?; + if annotations.get("tailscale.com/expose").map(String::as_str) != Some("true") { + return None; } + let hostname = annotations.get("tailscale.com/hostname")?; + let suffix = ctx.magic_dns_suffix().await?; + Some(crate::tailscale::replica_url(hostname, &suffix)) } /// Best-effort gather of the `health_details` map (snake_case keys): diff --git a/src/controllers/replica.rs b/src/controllers/replica.rs index 379b553..30b1364 100644 --- a/src/controllers/replica.rs +++ b/src/controllers/replica.rs @@ -349,7 +349,7 @@ pub async fn reconcile(replica: Arc, ctx: Arc) &ctx, &replica, switching, - bestool_canopy::Outcome::Success, + bestool_canopy::schema::RunOutcome::Success, None, ) .await; diff --git a/src/controllers/restore.rs b/src/controllers/restore.rs index 7ce2daf..322bc42 100644 --- a/src/controllers/restore.rs +++ b/src/controllers/restore.rs @@ -307,7 +307,7 @@ async fn fail_restore( ctx, &replica, &restore, - bestool_canopy::Outcome::Failure, + bestool_canopy::schema::RunOutcome::Failure, Some(error), ) .await; diff --git a/src/lib.rs b/src/lib.rs index 3552c2e..19fb32e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,5 +7,6 @@ pub mod kopia; pub mod metrics; pub mod notifications; pub mod quantity; +pub mod tailscale; pub mod types; pub mod util; diff --git a/src/tailscale.rs b/src/tailscale.rs new file mode 100644 index 0000000..e9a8120 --- /dev/null +++ b/src/tailscale.rs @@ -0,0 +1,110 @@ +//! Minimal tailscale LocalAPI client. +//! +//! Used by the canopy verification reporter to build the URL of a replica +//! exposed on the tailnet (the `url` semantic): the tailnet's MagicDNS suffix +//! (e.g. `tailnet-abc.ts.net`) plus the Service's `tailscale.com/hostname` +//! give `https://.`. The suffix is constant per tailnet, so +//! callers cache it after the first successful fetch. +//! +//! tailscaled's LocalAPI is served on a unix socket, not TCP: the userspace +//! sidecar only exposes SOCKS5 (and optional metrics/health) over TCP. So we +//! reach it over the socket the sidecar shares via an `emptyDir`, which +//! containerboot always symlinks to `/var/run/tailscale/tailscaled.sock`. +//! `GET /localapi/v0/status` is a read endpoint, and reads over the unix +//! socket are permitted regardless of peer uid (tailscale +//! `ipnserver`: `IsUnixSock()` ⇒ read always allowed), so the operator +//! container needs no special identity — only access to the socket file. +//! LocalAPI expects the `Host: local-tailscaled.sock` header, which the +//! request URL's host supplies. + +use serde::Deserialize; + +/// Host LocalAPI expects (the connection is over the socket; the host is only +/// the `Host` header and is never resolved). +const LOCALAPI_HOST: &str = "local-tailscaled.sock"; + +/// Subset of the tailscale LocalAPI `/localapi/v0/status` response we read. +#[derive(Debug, Deserialize)] +struct Status { + #[serde(rename = "CurrentTailnet")] + current_tailnet: Option, +} + +#[derive(Debug, Deserialize)] +struct CurrentTailnet { + #[serde(rename = "MagicDNSSuffix")] + magic_dns_suffix: String, +} + +/// Fetch the tailnet's MagicDNS suffix from the tailscale sidecar's LocalAPI +/// over the given unix socket path. +/// +/// Best-effort: an unbuildable client, a network / decode error, a non-2xx +/// response, or an empty suffix all yield `None`, so a missing socket just +/// omits the URL from the health report rather than failing the report. +pub async fn magic_dns_suffix(socket_path: &str) -> Option { + let client = reqwest::Client::builder() + .unix_socket(socket_path) + .build() + .ok()?; + let resp = client + .get(format!("http://{LOCALAPI_HOST}/localapi/v0/status")) + .send() + .await + .ok()?; + if !resp.status().is_success() { + return None; + } + let status: Status = resp.json().await.ok()?; + let suffix = status.current_tailnet?.magic_dns_suffix; + (!suffix.is_empty()).then_some(suffix) +} + +/// Build the HTTPS URL of an exposed replica from its tailnet hostname and the +/// MagicDNS suffix. Pure; tolerates a leading dot on the suffix. +pub fn replica_url(hostname: &str, magic_dns_suffix: &str) -> String { + format!( + "https://{hostname}.{}", + magic_dns_suffix.trim_start_matches('.') + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn builds_https_url() { + assert_eq!( + replica_url("infra-replica-site", "tailnet-abc.ts.net"), + "https://infra-replica-site.tailnet-abc.ts.net" + ); + } + + #[test] + fn tolerates_leading_dot_on_suffix() { + assert_eq!( + replica_url("host", ".tailnet-abc.ts.net"), + "https://host.tailnet-abc.ts.net" + ); + } + + #[test] + fn parses_magic_dns_suffix_from_status_json() { + let json = serde_json::json!({ + "CurrentTailnet": { "Name": "example", "MagicDNSSuffix": "tailnet-abc.ts.net" }, + "Self": { "DNSName": "operator.tailnet-abc.ts.net." } + }); + let status: Status = serde_json::from_value(json).unwrap(); + assert_eq!( + status.current_tailnet.unwrap().magic_dns_suffix, + "tailnet-abc.ts.net" + ); + } + + #[test] + fn missing_current_tailnet_is_none() { + let status: Status = serde_json::from_value(serde_json::json!({})).unwrap(); + assert!(status.current_tailnet.is_none()); + } +}