Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 4 additions & 38 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ publish = false
[dependencies]
anyhow = "1.0.102"
axum = "0.8.9"
bestool-canopy = "0.4.5"
bestool-canopy = "0.6.1"
bestool-kopia = { version = "0.3.4", features = ["proxy"] }
cronexpr = "1.5.0"
futures = "0.3.31"
Expand Down
25 changes: 25 additions & 0 deletions operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ spec:
# operators can pin to a specific tag.
# - name: CANOPY_PROXY_IMAGE
# value: ghcr.io/beyondessential/pgro-canopy-proxy:latest
# Path to the tailscale sidecar's LocalAPI socket
# (shared below via the tailscale-socket emptyDir),
# used to resolve the tailnet MagicDNS suffix for the
# `url` intent semantic. Defaults to the value below;
# set empty to disable replica-URL reporting.
# - name: PGRO_TAILSCALED_SOCKET
# value: /var/run/tailscale/tailscaled.sock
ports:
- name: http
containerPort: 8080
Expand Down Expand Up @@ -259,6 +266,13 @@ spec:
readOnlyRootFilesystem: true
capabilities:
drop: ["ALL"]
volumeMounts:
# Read the tailscale sidecar's LocalAPI socket to
# resolve the tailnet MagicDNS suffix (see the `url`
# intent semantic).
- name: tailscale-socket
mountPath: /var/run/tailscale
readOnly: true
# Tailscale sidecar β€” the primary production network path
# to canopy. bestool-canopy's CanopyClient auto-probes
# canopy.tail53aef.ts.net; the SOCKS5 proxy on [::1]:1055
Expand Down Expand Up @@ -299,6 +313,17 @@ spec:
limits:
cpu: 200m
memory: 256Mi
volumeMounts:
# Expose the LocalAPI socket to the controller container.
# containerboot symlinks the socket to this path.
- name: tailscale-socket
mountPath: /var/run/tailscale
volumes:
# Shares the tailscaled LocalAPI socket between the sidecar
# and the controller so the operator can query the tailnet
# MagicDNS suffix for the `url` semantic.
- name: tailscale-socket
emptyDir: {}
---
# NetworkPolicy restricting the credential-broker port (9091) to Pods
# labelled `pgro.bes.au/proxy-sidecar=true` β€” the label the canopy Job
Expand Down
2 changes: 1 addition & 1 deletion src/bin/canopy_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use std::{path::PathBuf, pin::Pin, process::ExitCode, sync::Arc, time::Duration};

use bestool_canopy::BackupCredentials;
use bestool_canopy::schema::CredentialProcessOutput as BackupCredentials;
use bestool_kopia::proxy::{self, BoxError, CredentialProvider, Credentials, S3ProxyConfig};
use jiff::{Timestamp, ToSpan};
use serde::Serialize;
Expand Down
16 changes: 13 additions & 3 deletions src/bin/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use postgres_restore_operator::{
Context, DEFAULT_CANOPY_PROXY_IMAGE, DEFAULT_DEPLOYMENT_READY_TIMEOUT_SECS,
DEFAULT_KOPIA_IMAGE,
},
controllers::{self, canopy::intent::SUPPORTED as PGRO_SUPPORTED_INTENTS},
controllers::{self, canopy::intent},
types::{PostgresPhysicalReplica, PostgresPhysicalRestore},
};

Expand Down Expand Up @@ -216,6 +216,14 @@ async fn main() -> anyhow::Result<()> {
});
ctx.canopy_proxy_image = std::env::var("CANOPY_PROXY_IMAGE")
.unwrap_or_else(|_| DEFAULT_CANOPY_PROXY_IMAGE.to_string());
// Path to the tailscale sidecar's LocalAPI socket (shared via an emptyDir),
// used to resolve the tailnet MagicDNS suffix for the `url` semantic.
// Defaults to containerboot's fixed location; set empty to disable.
ctx.tailscaled_socket = match std::env::var("PGRO_TAILSCALED_SOCKET") {
Ok(s) if s.is_empty() => None,
Ok(s) => Some(s),
Err(_) => Some("/var/run/tailscale/tailscaled.sock".to_string()),
};
ctx.canopy_broker_base_url = if let Ok(url) = std::env::var("CANOPY_BROKER_BASE_URL") {
url
} else if let Ok(svc) = std::env::var("OPERATOR_SERVICE_NAME") {
Expand Down Expand Up @@ -660,13 +668,15 @@ async fn register_capabilities(ctx: Arc<Context>) {
let Some(canopy) = ctx.canopy.as_ref() else {
return;
};
let descriptors = intent::descriptors();
let intent_names: Vec<&str> = descriptors.iter().map(|d| d.intent.as_str()).collect();
let mut delay = Duration::from_secs(1);
let max_delay = Duration::from_secs(300);
for attempt in 1..=8u32 {
match canopy.restore_capabilities(PGRO_SUPPORTED_INTENTS).await {
match canopy.restore_capabilities(&descriptors).await {
Ok(_) => {
info!(
intents = ?PGRO_SUPPORTED_INTENTS,
intents = ?intent_names,
"registered supported intents with canopy"
);
return;
Expand Down
123 changes: 47 additions & 76 deletions src/canopy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
//! the integration seam tests inject a stub at, and as the place to hang
//! pgro-specific logging / retry / cache concerns later.

use bestool_canopy::{CanopyClient, RestoreCredentials, WorklistEntry, client_builder};
use bestool_canopy::{
CanopyClient, TAILSCALE_URL,
schema::{
IntentDescriptor, RestoreCapabilitiesArgs, RestoreCredentials, RestoreCredentialsArgs,
VerificationArgs, WorklistEntry,
},
};
use reqwest::Url;
use uuid::Uuid;

Expand Down Expand Up @@ -41,9 +47,8 @@ pub struct CanopyConfig {
/// SOCKS proxy takes down both paths at once.
async fn build_inner(cfg: &CanopyConfig) -> Result<CanopyClient> {
let socks5 = cfg.socks5_proxy.clone();
let version = env!("CARGO_PKG_VERSION").to_string();
let make_builder = move || {
let mut b = client_builder(&version);
let mut b = reqwest::Client::builder();
if !socks5.is_empty() {
let socks5 = socks5.clone();
let proxy = reqwest::Proxy::custom(move |url| {
Expand All @@ -58,8 +63,12 @@ async fn build_inner(cfg: &CanopyConfig) -> Result<CanopyClient> {
b
};

let inner = CanopyClient::new(
env!("CARGO_PKG_VERSION"),
let tailscale_url: Url = TAILSCALE_URL
.parse()
.expect("bestool-canopy TAILSCALE_URL is a valid URL");
let inner = CanopyClient::with_urls(
cfg.base_url.clone(),
tailscale_url,
cfg.device_key_pem.as_deref(),
make_builder,
)
Expand All @@ -73,51 +82,49 @@ async fn build_inner(cfg: &CanopyConfig) -> Result<CanopyClient> {
Ok(inner)
}

/// pgro's canopy client wrapper. Holds the live `bestool_canopy::CanopyClient`
/// plus the public-mTLS base URL β€” the bestool client uses its own hardcoded
/// tailnet URL on the tailnet path; the base URL is the mTLS-leg fallback.
/// pgro's canopy client wrapper around the live `bestool_canopy::CanopyClient`
/// (which bakes in the mTLS base URL and the tailnet endpoint at construction).
pub struct Client {
inner: CanopyClient,
base_url: Url,
}

impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("canopy::Client")
.field("base_url", &self.base_url.as_str())
.finish_non_exhaustive()
f.debug_struct("canopy::Client").finish_non_exhaustive()
}
}

impl Client {
/// Build a client from operator-level config. Returns `Ok(None)` if no
/// canopy integration is configured (no base URL set) β€” the operator
/// canopy integration is configured (no config provided) β€” the operator
/// then runs in legacy-only mode.
pub async fn from_config(cfg: Option<CanopyConfig>) -> Result<Option<Self>> {
let Some(cfg) = cfg else { return Ok(None) };
let inner = build_inner(&cfg).await?;
Ok(Some(Self {
inner,
base_url: cfg.base_url,
}))
Ok(Some(Self { inner }))
}

/// Register the intents this consumer supports. Replaces the registered
/// set wholesale (per canopy's semantics).
pub async fn restore_capabilities(&self, intents: &[&str]) -> Result<()> {
/// Register the intent descriptors this consumer supports. Replaces the
/// registered set wholesale (per canopy's semantics). Each descriptor
/// carries the intent name, the canopy semantics it opts into, and its
/// typed parameter schema.
pub async fn restore_capabilities(&self, intents: &[IntentDescriptor]) -> Result<()> {
let body = RestoreCapabilitiesArgs {
intents: intents.to_vec(),
};
self.inner
.restore_capabilities(&self.base_url, intents)
.restore_capabilities(&body)
.await
.map_err(|err| Error::Canopy(format!("restore_capabilities: {err:?}")))
.map_err(|err| Error::Canopy(format!("restore_capabilities: {err}")))
}

/// Fetch the consumer's desired-state worklist. Each entry is one
/// concrete replica to maintain.
pub async fn worklist(&self) -> Result<Vec<WorklistEntry>> {
self.inner
.restore_worklist(&self.base_url)
.restore_worklist()
.await
.map_err(|err| Error::Canopy(format!("restore_worklist: {err:?}")))
.map_err(|err| Error::Canopy(format!("restore_worklist: {err}")))
}

/// Fetch short-lived read-only STS creds plus the repo password for a
Expand All @@ -127,64 +134,28 @@ impl Client {
backup_type: &str,
group: Uuid,
) -> Result<RestoreCredentials> {
self.inner
.restore_credentials(&self.base_url, backup_type, group)
.await
.map_err(|err| {
Error::Canopy(format!(
"restore_credentials({backup_type}, {group}): {err:?}"
))
})
let body = RestoreCredentialsArgs {
group,
type_: backup_type.to_string(),
};
self.inner.restore_credentials(&body).await.map_err(|err| {
Error::Canopy(format!(
"restore_credentials({backup_type}, {group}): {err}"
))
})
}

/// Report a restore outcome (signal 3, restore-verification).
///
/// `body` is the typed [`bestool_canopy::schema::VerificationArgs`]
/// (generated from canopy's OpenAPI) β€” including the free-form
/// `health_details` the hand-written wire type doesn't carry. Sent to
/// `POST /restore-verification` via the generic request escape hatch; a
/// non-2xx response is an error carrying the status + body.
pub async fn restore_verification_typed(
&self,
body: &(impl serde::Serialize + ?Sized),
) -> Result<()> {
let resp = self
.inner
.request(
bestool_canopy::reqwest::Method::POST,
&self.base_url,
"/restore-verification",
)
.await
.map_err(|err| Error::Canopy(format!("restore_verification request: {err:?}")))?
.json(body)
.send()
/// Report a restore outcome (signal 3, restore-verification). `args` is
/// the typed [`bestool_canopy::schema::VerificationArgs`], including the
/// free-form `health_details`.
pub async fn restore_verification_typed(&self, args: &VerificationArgs) -> Result<()> {
self.inner
.restore_verification(args)
.await
.map_err(|err| Error::Canopy(format!("restore_verification send: {err:?}")))?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(Error::Canopy(format!(
"restore_verification returned {status}: {text}"
)));
}
Ok(())
}

/// Direct access to the public-mTLS base URL the client is configured
/// against. The tailnet path uses its own hardcoded URL inside
/// `bestool-canopy`.
pub fn base_url(&self) -> &Url {
&self.base_url
.map_err(|err| Error::Canopy(format!("restore_verification: {err}")))
}
}

/// Re-export the wire types pgro consumes verbatim from `bestool-canopy`.
pub use bestool_canopy::{
BackupCredentials as Credentials, Outcome, RestoreCredentials as CredsResponse,
RestoreVerification as Verification, WorklistEntry as Entry,
};

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading