diff --git a/Cargo.lock b/Cargo.lock index 61d3c91..13a3490 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1512,6 +1512,33 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crossterm" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8b9f2e4c67f833b660cdb0a3523065869fb35570177239812ed4c905aeff87b" +dependencies = [ + "bitflags 2.11.0", + "crossterm_winapi", + "derive_more 2.1.1", + "document-features", + "mio", + "parking_lot", + "rustix", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b" +dependencies = [ + "winapi", +] + [[package]] name = "crunchy" version = "0.2.4" @@ -1731,18 +1758,23 @@ checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" [[package]] name = "datum-connect" -version = "0.1.0" +version = "0.2.1" dependencies = [ "async-trait", "clap", "dotenv", + "hex", "hickory-proto", "hickory-server", "humantime", + "inquire", "iroh-base", "lib", "n0-error", + "openssl", "rand 0.9.2", + "reqwest 0.12.28", + "rustls", "sentry", "serde", "serde_yml", @@ -3164,6 +3196,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fuzzy-matcher" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54614a3312934d066701a80f20f15fa3b56d67ac7722b39eea5b4c9dd1d66c94" +dependencies = [ + "thread_local", +] + [[package]] name = "fxhash" version = "0.2.1" @@ -4309,6 +4350,20 @@ dependencies = [ "smallvec", ] +[[package]] +name = "inquire" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6654738b8024300cf062d04a1c13c10c8e2cea598ec1c47dc9b6641159429756" +dependencies = [ + "bitflags 2.11.0", + "crossterm", + "dyn-clone", + "fuzzy-matcher", + "unicode-segmentation", + "unicode-width", +] + [[package]] name = "instant" version = "0.1.13" @@ -6454,6 +6509,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "openssl-src" +version = "300.6.0+3.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8e8cbfd3a4a8c8f089147fd7aaa33cf8c7450c4d09f8f80698a0cf093abeff4" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.112" @@ -6462,6 +6526,7 @@ checksum = "57d55af3b3e226502be1526dfdba67ab0e9c96fc293004e79576b2b9edb0dbdb" dependencies = [ "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] @@ -7869,9 +7934,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.37" +version = "0.23.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4" +checksum = "ef86cd5876211988985292b91c96a8f2d298df24e75989a43a3c73f2d4d8168b" dependencies = [ "aws-lc-rs", "log", @@ -8562,6 +8627,17 @@ dependencies = [ "signal-hook-registry", ] +[[package]] +name = "signal-hook-mio" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b75a19a7a740b25bc7944bdee6172368f988763b744e3d4dfe753f6b4ece40cc" +dependencies = [ + "libc", + "mio", + "signal-hook", +] + [[package]] name = "signal-hook-registry" version = "1.4.8" @@ -9657,6 +9733,12 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9629274872b2bfaf8d66f5f15725007f635594914870f65218920345aa11aa8c" +[[package]] +name = "unicode-width" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" + [[package]] name = "unicode-xid" version = "0.2.6" diff --git a/README.md b/README.md index 5c5805f..db00dee 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,17 @@ CLI, GUI app, and shared library for exposing local environments to the internet brew install datum-cloud/tap/desktop ``` +**nix** + +``` +# GUI app +nix run github:datum-cloud/app#desktop + +# CLI +nix run github:datum-cloud/app#cli -- auth login +nix run github:datum-cloud/app#cli -- tunnel list +``` + **Direct download:** [![Download for macOS](https://img.shields.io/badge/Download-macOS-000000?logo=apple&logoColor=white)](https://github.com/datum-cloud/datum-connect/releases/latest/download/Datum.dmg) diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 51d3ccd..903a9f6 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "datum-connect" -version = "0.1.0" +version = "0.2.1" edition = "2024" [dependencies] @@ -21,4 +21,15 @@ hickory-proto = "0.25.2" iroh-base.workspace = true z32 = "1.0.3" rand.workspace = true -sentry.workspace = true \ No newline at end of file +hex.workspace = true +sentry.workspace = true +rustls = { workspace = true, features = ["ring"] } +inquire = "0.9.4" +reqwest.workspace = true +# Bring in openssl with the `vendored` feature so cross-compiles (e.g. +# cargo-zigbuild for aarch64-linux/apple) build openssl from source +# instead of needing target-arch system headers. The dep is transitively +# pulled by iroh's pkarr -> reqwest 0.13 (default features include +# native-tls); we can't disable that without patching iroh, so we +# vendor instead. +openssl = { version = "0.10", features = ["vendored"] } \ No newline at end of file diff --git a/cli/src/main.rs b/cli/src/main.rs index c27eef3..752a4fb 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -3,20 +3,36 @@ use clap::{Parser, Subcommand, ValueEnum}; mod dns_dev; mod tunnel_dev; +// Ensure rustls crypto provider is installed +use rustls::crypto::ring as rustls_ring; + use lib::{ - Advertisment, AdvertismentTicket, ConnectNode, DiscoveryMode, ListenNode, ProxyState, Repo, - TcpProxyData, - datum_cloud::{ApiEnv, DatumCloudClient}, + Advertisment, AdvertismentTicket, ConnectNode, DiscoveryMode, HeartbeatAgent, ListenNode, + ProgressStepKind, ProxyState, Repo, SelectedContext, StepStatus, TcpProxyData, TunnelService, + datum_cloud::{ApiEnv, DatumCloudClient, DeviceCodeInfo, LoginState}, }; +use n0_error::StdResultExt; use std::{ net::{IpAddr, SocketAddr}, path::PathBuf, }; +use std::sync::OnceLock; use tracing::info; use tracing_subscriber::prelude::*; +/// Global handle to the EnvFilter so the picker can temporarily silence +/// tracing output (the iroh relay actor logs continuously and corrupts +/// the inquire redraw if left on). +static FILTER_RELOAD: OnceLock< + tracing_subscriber::reload::Handle< + tracing_subscriber::EnvFilter, + tracing_subscriber::Registry, + >, +> = OnceLock::new(); + /// Datum Connect Agent #[derive(Parser, Debug)] +#[command(version)] struct Args { #[clap(short, long, env = "DATUM_CONNECT_REPO")] repo: Option, @@ -48,6 +64,17 @@ enum Commands { /// Add proxies. #[clap(subcommand, alias = "ls")] Add(AddCommands), + + /// Authenticate with Datum Cloud (login, logout, status). + #[clap(subcommand)] + Auth(AuthCommands), + + /// Manage tunnels (create, list, update, delete) that expose local services to public hostnames. + Tunnel(TunnelArgs), + + /// Manage Datum Cloud projects. + #[clap(subcommand)] + Projects(ProjectsCommands), } #[derive(Debug, clap::Parser)] @@ -179,9 +206,127 @@ pub enum DiscoveryModeArg { Hybrid, } +#[derive(Subcommand, Debug)] +pub enum ProjectsCommands { + /// List all available projects across your organizations. + List, + + /// Switch the active project. + Switch, +} + +#[derive(Subcommand, Debug)] +pub enum AuthCommands { + /// Show current authentication status. + Status, + + /// Log in to Datum Cloud (opens browser for OAuth). + Login { + /// Skip the localhost-redirect flow and use the OAuth2 device + /// authorization grant instead. Required when running on a + /// remote machine over SSH, in CI, or in a container — anywhere + /// the localhost-redirect can't reach a browser. The CLI prints + /// a verification URL + user code; complete authorization on + /// another device. + #[clap(long)] + no_browser: bool, + }, + + /// Log out and clear stored credentials. + Logout, + + /// List all locally authenticated users. + List, + + /// Switch to a different authenticated user (clears current and prompts for new login). + Switch { + /// Same as `auth login --no-browser`: use the OAuth2 device + /// authorization grant for the fresh login after logout. + #[clap(long)] + no_browser: bool, + }, +} + +#[derive(Parser, Debug)] +pub struct TunnelArgs { + /// Project ID to use for this command (overrides the currently selected project). + #[clap(long)] + project: Option, + #[clap(subcommand)] + command: TunnelCommands, +} + +#[derive(Subcommand, Debug)] +pub enum TunnelCommands { + /// List all tunnels in the current project. + List, + + /// Start a tunnel that exposes a local service to a public hostname. + Listen { + /// Display name for the tunnel (auto-generated if not provided). + #[clap(long)] + label: Option, + /// Local address to expose (host:port, e.g. 127.0.0.1:8080). + /// Required unless --id is given (in which case the existing + /// tunnel's stored endpoint is reused). If both --id and --endpoint + /// are given they must match the stored endpoint exactly — a + /// mismatch fails hard instead of silently re-pointing the tunnel. + #[clap(long)] + endpoint: Option, + /// Adopt an existing tunnel by its resource ID and keep its hostname. + /// Use this when you have already shared the tunnel's URL and need + /// it to survive across restarts even if the connector identity has + /// changed in between (e.g. after rotating listen_key). With --id + /// alone (no --endpoint) the existing tunnel's stored endpoint is + /// reused — useful for resuming a tunnel verbatim. + #[clap(long)] + id: Option, + /// Maximum wall-clock budget for the whole setup including the + /// end-to-end connectivity verification (origin probe + proxy URL + /// probe). When the controller conditions all flip Ready, the CLI + /// keeps polling both URLs every 10s until they respond non-5xx. + /// If this elapses with probes still failing, the command exits + /// non-zero with a summary so you don't think it's healthy when + /// it isn't. Accepts humantime values like "5m" or "30s". + #[clap(long, default_value = "10m")] + timeout: humantime::Duration, + /// Skip confirmation prompt if tunnel already exists. + #[clap(long, default_value = "false")] + yes: bool, + }, + + /// Update an existing tunnel. + Update { + /// Tunnel ID (resource name). + #[clap(long)] + id: String, + /// New display name for the tunnel. + #[clap(long)] + label: Option, + /// New local address to expose (host:port, e.g. 127.0.0.1:8080). + #[clap(long)] + endpoint: Option, + }, + + /// Delete a tunnel. + Delete { + /// Tunnel ID (resource name) to delete. + #[clap(long)] + id: String, + }, +} + #[tokio::main] async fn main() -> n0_error::Result<()> { + // Install the ring-based crypto provider for rustls + let _ = rustls_ring::default_provider().install_default(); + + let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn")); + let (filter_layer, filter_reload) = tracing_subscriber::reload::Layer::new(env_filter); + let _ = FILTER_RELOAD.set(filter_reload); tracing_subscriber::registry() + .with(filter_layer) .with(tracing_subscriber::fmt::layer()) .with(sentry::integrations::tracing::layer()) .init(); @@ -249,6 +394,99 @@ async fn main() -> n0_error::Result<()> { .await?; println!("OK."); } + Commands::Auth(args) => { + let datum = DatumCloudClient::with_repo(ApiEnv::default(), repo.clone()).await?; + match args { + AuthCommands::Status => { + if datum.is_authenticated().await? { + println!("Authenticated"); + if let Some(ctx) = datum.selected_context() { + println!(" org: {}", ctx.org_id); + println!(" project: {}", ctx.project_id); + } + } else { + println!("Not authenticated"); + } + } + AuthCommands::Login { no_browser } => { + if no_browser { + datum.login_device_code(display_device_code).await?; + } else { + datum.login().await?; + } + if let Ok(state) = datum.auth_state().get() { + println!( + "Logged in as {} ({})", + state.profile.display_name(), + state.profile.email + ); + } else { + println!("Login successful"); + } + select_project_interactive(&datum).await?; + } + AuthCommands::Logout => { + datum.logout().await?; + println!("Logged out"); + } + AuthCommands::List => { + let is_auth = datum.is_authenticated().await?; + if is_auth { + println!("Current user (active):"); + if let Some(ctx) = datum.selected_context() { + println!(" org: {}", ctx.org_id); + println!(" project: {}", ctx.project_id); + } + } else { + println!("No authenticated users"); + } + println!(); + println!("Note: Multi-user storage not yet implemented. Use 'auth switch' to log in as a different user."); + } + AuthCommands::Switch { no_browser } => { + datum.logout().await?; + println!("Switching users..."); + if no_browser { + datum.login_device_code(display_device_code).await?; + } else { + datum.login().await?; + } + if let Ok(state) = datum.auth_state().get() { + println!( + "Switched to {} ({})", + state.profile.display_name(), + state.profile.email + ); + } else { + println!("Switched to new user"); + } + select_project_interactive(&datum).await?; + } + } + } + Commands::Projects(args) => { + let datum = DatumCloudClient::with_repo(ApiEnv::default(), repo.clone()).await?; + match args { + ProjectsCommands::List => { + let orgs = datum.orgs_and_projects().await?; + let selected = datum.selected_context(); + for org in &orgs { + println!("{} ({})", org.org.display_name, org.org.resource_id); + for project in &org.projects { + let active = selected + .as_ref() + .map(|ctx| ctx.project_id == project.resource_id) + .unwrap_or(false); + let marker = if active { " *" } else { "" }; + println!(" {} ({}){}", project.display_name, project.resource_id, marker); + } + } + } + ProjectsCommands::Switch => { + select_project_interactive(&datum).await?; + } + } + } Commands::Serve => { let node = ListenNode::new(repo).await?; let endpoint_id = node.endpoint_id(); @@ -374,6 +612,840 @@ async fn main() -> n0_error::Result<()> { Commands::TunnelDev(args) => { tunnel_dev::serve(args).await?; } + Commands::Tunnel(TunnelArgs { project, command: args }) => { + let datum = DatumCloudClient::with_repo(ApiEnv::default(), repo.clone()).await?; + + if let Some(project_id) = project { + let orgs = datum.orgs_and_projects().await?; + let ctx = resolve_project_context(&orgs, &project_id) + .ok_or_else(|| n0_error::anyerr!("project '{}' not found", project_id))?; + datum.set_selected_context(Some(ctx)).await?; + } + + let project_id = datum + .selected_context() + .map(|ctx| ctx.project_id) + .ok_or_else(|| { + n0_error::anyerr!( + "no project selected — pass --project or run 'datumctl ctx use --project '" + ) + })?; + let node = ListenNode::new_for_project(repo.clone(), &project_id).await?; + let service = TunnelService::new(datum.clone(), node.clone()); + let heartbeat = HeartbeatAgent::new(datum.clone(), node.clone()); + + match args { + TunnelCommands::List => { + let tunnels = service.list_active().await?; + if tunnels.is_empty() { + println!("No tunnels found in current project."); + } else { + for t in tunnels { + let status = if t.accepted && t.programmed { + "ready" + } else if t.accepted { + "accepted" + } else { + "pending" + }; + let enabled = if t.enabled { "enabled" } else { "disabled" }; + println!("{} [{}] {} -> {}", t.id, status, t.label, t.endpoint); + if !t.hostnames.is_empty() { + for h in &t.hostnames { + println!(" hostname: {}", h); + } + } + println!(" status: {}, {}", enabled, status); + } + } + } + TunnelCommands::Listen { label, endpoint, id, timeout, yes } => { + let endpoint_id = node.endpoint_id(); + + // Resolve (existing_tunnel, effective_endpoint): + // --id alone → reuse stored endpoint, re-point backend + // --id + --endpoint → both must agree exactly; mismatch fails + // --endpoint alone → endpoint-match adoption (best-effort) + // neither → error + let (existing, endpoint) = if let Some(id_str) = id.as_deref() { + let Some(t) = service.get_active(id_str).await? else { + n0_error::bail_any!( + "Tunnel with id '{}' not found in this project. \ + Run 'datum-connect tunnel list' to see available ids.", + id_str + ); + }; + if let Some(user_endpoint) = endpoint.as_deref() { + let user_normalized = lib::normalize_endpoint(user_endpoint); + if user_normalized != t.endpoint { + n0_error::bail_any!( + "--id '{}' references a tunnel with endpoint '{}', \ + but --endpoint was given as '{}' (normalized: '{}'). \ + Omit --endpoint to reuse the stored endpoint, or run \ + 'datum-connect tunnel update --id {} --endpoint {}' \ + first to change it.", + id_str, t.endpoint, user_endpoint, user_normalized, + id_str, user_endpoint, + ); + } + } + let stored_endpoint = t.endpoint.clone(); + (Some(t), stored_endpoint) + } else if let Some(endpoint) = endpoint { + let existing = service.get_active_by_endpoint(&endpoint).await?; + (existing, endpoint) + } else { + // Neither flag was given. If we're on a TTY and the + // project has tunnels with hostnames, pop a picker. + // Otherwise fall back to the usual error so the + // failure mode in scripts/CI stays explicit. + match pick_tunnel_interactive(&service).await? { + Some(t) => { + let endpoint = t.endpoint.clone(); + (Some(t), endpoint) + } + None => { + n0_error::bail_any!( + "Provide --endpoint (or --id to resume \ + an existing tunnel). See \ + 'datum-connect tunnel listen --help'." + ); + } + } + }; + let tunnel_id = if let Some(t) = existing { + println!("Found existing tunnel for {}:", endpoint); + println!(" id: {}", t.id); + println!(" label: {}", t.label); + println!(" endpoint: {}", t.endpoint); + println!(); + + let label_changed = label.as_ref().is_some_and(|l| l != &t.label); + let resolved_label = label.clone().unwrap_or_else(|| t.label.clone()); + // Two reasons to call update_active here: + // 1. --id was given: always re-point the backend at + // this agent's current connector. That's the + // whole point of --id — survive connector + // identity changes. + // 2. The label differs (endpoint-match path only). + let must_update = id.is_some() || label_changed; + if must_update { + if label_changed && !yes { + print!("Update tunnel label to '{}'? [y/N] ", resolved_label); + std::io::Write::flush(&mut std::io::stdout())?; + let mut input = String::new(); + std::io::stdin().read_line(&mut input)?; + if !input.trim().eq_ignore_ascii_case("y") { + println!("Aborted."); + return Ok(()); + } + } else if label_changed && yes { + println!("Updating tunnel (--yes specified)"); + } + let updated = service + .update_active(&t.id, &resolved_label, &endpoint) + .await?; + if id.is_some() { + println!("Adopted tunnel {} (hostname preserved)", updated.id); + } else { + println!("Updated tunnel:"); + println!(" id: {}", updated.id); + } + updated.id + } else { + println!("Tunnel already configured correctly."); + t.id + } + } else { + let label = label.unwrap_or_else(|| { + let bytes: [u8; 6] = rand::random(); + hex::encode(bytes) + }); + let tunnel = service.create_active(&label, &endpoint).await?; + println!("Created tunnel:"); + println!(" id: {}", tunnel.id); + println!(" label: {}", tunnel.label); + tunnel.id + }; + + // Manual mode: only heartbeat the project the tunnel + // lives in. Auto-enroll would silently keep presence in + // every other project the user can see, which is both + // wasteful and confusing in logs. + heartbeat.start_manual().await; + heartbeat.register_project(project_id.clone()).await; + + service.set_enabled_active(&tunnel_id, true).await?; + println!(); + println!("Your endpoint ID: {}", endpoint_id); + println!("Setting up tunnel..."); + let progress = await_tunnel_progress(&service, &tunnel_id).await?; + + let hostname = progress + .hostnames + .first() + .cloned() + .ok_or_else(|| n0_error::anyerr!("tunnel has no hostname after setup"))?; + println!("Verifying connectivity..."); + let verify_start = std::time::Instant::now(); + let budget = (*timeout).saturating_sub(progress.elapsed); + verify_endpoints(&endpoint, &hostname, budget).await?; + let total = progress.elapsed + verify_start.elapsed(); + println!( + "Tunnel ready after {} sec: https://{}", + total.as_secs(), + hostname, + ); + println!("Press Ctrl+C to stop..."); + + // Watch login state so a permanent auth loss mid-session + // (refresh token expired or revoked at the IdP) surfaces to + // the operator immediately, with reconnection guidance — + // not just buried in tracing output. + let mut login_rx = datum.auth().login_state_watch(); + let mut last_state = *login_rx.borrow(); + // Also poll the server-side progress every 10s. Setup-time + // checks aren't enough: conditions can flip back to a + // terminal failure later (e.g. the iroh DNS controller + // re-reconciling and re-emerging a stale owner), and the + // data plane silently drops in the meantime. When that + // happens, surface it and break out so the operator sees + // the same actionable message they'd have seen at setup. + let mut runtime_poll = tokio::time::interval(RUNTIME_POLL_INTERVAL); + runtime_poll.set_missed_tick_behavior( + tokio::time::MissedTickBehavior::Delay, + ); + runtime_poll.tick().await; // consume the immediate first tick + loop { + tokio::select! { + res = tokio::signal::ctrl_c() => { + res?; + break; + } + res = login_rx.changed() => { + if res.is_err() { break; } + let new_state = *login_rx.borrow(); + if new_state == LoginState::Missing + && last_state != LoginState::Missing + { + eprintln!(); + eprintln!("================================================================"); + eprintln!(" Datum login has expired or been revoked."); + eprintln!(" The tunnel will stop accepting new connections until you"); + eprintln!(" log in again. Stop this command (Ctrl+C) and run:"); + eprintln!(); + eprintln!(" datum-connect login"); + eprintln!(); + eprintln!(" Then restart the tunnel listener."); + eprintln!("================================================================"); + eprintln!(); + } + last_state = new_state; + } + _ = runtime_poll.tick() => { + match service.get_active_progress(&tunnel_id).await { + Ok(Some(progress)) => { + if let Some(fail) = progress.terminal_failure() { + eprintln!(); + eprintln!("================================================================"); + eprintln!(" Tunnel is no longer reachable from the edge."); + eprintln!(); + eprintln!(" {}", format_terminal_failure(fail)); + eprintln!("================================================================"); + eprintln!(); + break; + } + } + Ok(None) => { + eprintln!(); + eprintln!("Tunnel {} no longer exists on the server. Stopping.", tunnel_id); + break; + } + Err(err) => { + // Transient query failure (network blip, token mid-refresh, + // etc.) — log and keep going; the next tick will retry. + tracing::warn!("watch: progress poll failed: {err:#}"); + } + } + } + } + } + println!(); + println!("Disabling tunnel..."); + service.set_enabled_active(&tunnel_id, false).await?; + println!("Tunnel disabled."); + } + TunnelCommands::Update { id, label, endpoint } => { + let current = service.get_active(&id).await?; + let current = current.std_context("Tunnel not found")?; + let new_label = label.unwrap_or(current.label); + let new_endpoint = endpoint.unwrap_or(current.endpoint); + let tunnel = service.update_active(&id, &new_label, &new_endpoint).await?; + println!("Updated tunnel {}:", tunnel.id); + println!(" label: {}", tunnel.label); + println!(" endpoint: {}", tunnel.endpoint); + if !tunnel.hostnames.is_empty() { + println!(" hostnames:"); + for h in &tunnel.hostnames { + println!(" {}", h); + } + } + } + TunnelCommands::Delete { id } => { + service.delete_active(&id).await?; + println!("Deleted tunnel {}", id); + } + } + } + } + Ok(()) +} + +/// Prompt the user to select an org and project, then persist it as the active context. +async fn select_project_interactive(datum: &DatumCloudClient) -> n0_error::Result<()> { + use lib::datum_cloud::OrganizationWithProjects; + use std::io::{BufRead, Write}; + + let orgs = datum.orgs_and_projects().await?; + if orgs.is_empty() { + println!("No organizations found. Create a project at https://app.datum.net first."); + return Ok(()); } + + // Flatten to (org_ref, project_index) for a simple numbered list. + let mut entries: Vec<(&OrganizationWithProjects, usize)> = Vec::new(); + for org in &orgs { + for pi in 0..org.projects.len() { + entries.push((org, pi)); + } + } + + if entries.is_empty() { + println!("No projects found. Create a project at https://app.datum.net first."); + return Ok(()); + } + + if entries.len() == 1 { + let (org, pi) = entries[0]; + let project = &org.projects[pi]; + let ctx = SelectedContext { + org_id: org.org.resource_id.clone(), + org_name: org.org.display_name.clone(), + project_id: project.resource_id.clone(), + project_name: project.display_name.clone(), + org_type: org.org.r#type.clone(), + }; + println!("Selected project: {} / {}", ctx.org_name, ctx.project_name); + datum.set_selected_context(Some(ctx)).await?; + return Ok(()); + } + + println!("\nSelect a project:"); + for (i, (org, pi)) in entries.iter().enumerate() { + let project = &org.projects[*pi]; + println!(" [{}] {} / {}", i + 1, org.org.display_name, project.display_name); + } + print!("Enter number [1-{}]: ", entries.len()); + std::io::stdout().flush().ok(); + + let stdin = std::io::stdin(); + let line = stdin + .lock() + .lines() + .next() + .ok_or_else(|| n0_error::anyerr!("no input"))??; + let choice: usize = line + .trim() + .parse() + .map_err(|_| n0_error::anyerr!("invalid selection"))?; + if choice < 1 || choice > entries.len() { + return Err(n0_error::anyerr!("selection out of range")); + } + + let (org, pi) = entries[choice - 1]; + let project = &org.projects[pi]; + let ctx = SelectedContext { + org_id: org.org.resource_id.clone(), + org_name: org.org.display_name.clone(), + project_id: project.resource_id.clone(), + project_name: project.display_name.clone(), + org_type: org.org.r#type.clone(), + }; + println!("Selected project: {} / {}", ctx.org_name, ctx.project_name); + datum.set_selected_context(Some(ctx)).await?; Ok(()) } + +/// Guard that silences tracing output for its lifetime, restoring the +/// previous filter on drop. Used to keep the iroh relay actor (and +/// anything else chatty) from corrupting inquire's terminal redraw. +struct QuietTracing { + previous: Option, +} + +impl QuietTracing { + fn engage() -> Self { + // Capture the previous filter representation so we can rebuild it + // on restore. EnvFilter doesn't implement Clone; serialize+parse + // is the supported round-trip. + let previous = FILTER_RELOAD + .get() + .and_then(|h| h.with_current(|f| f.to_string()).ok()); + if let Some(handle) = FILTER_RELOAD.get() { + let _ = handle.reload(tracing_subscriber::EnvFilter::new("error")); + } + Self { previous } + } +} + +impl Drop for QuietTracing { + fn drop(&mut self) { + if let (Some(handle), Some(prev)) = (FILTER_RELOAD.get(), self.previous.take()) + && let Ok(filter) = tracing_subscriber::EnvFilter::try_new(&prev) + { + let _ = handle.reload(filter); + } + } +} + +/// Interactive picker for resuming an existing tunnel. Returns the +/// chosen tunnel, or `None` if the user cancelled, there are no +/// candidates, or stdin is not a TTY (in which case the caller should +/// fall back to its usual flag-missing error path). +async fn pick_tunnel_interactive( + service: &TunnelService, +) -> n0_error::Result> { + use std::io::IsTerminal; + + if !std::io::stdin().is_terminal() { + return Ok(None); + } + + let mut candidates: Vec = service + .list_active() + .await? + .into_iter() + .filter(|t| !t.hostnames.is_empty()) + .collect(); + if candidates.is_empty() { + return Ok(None); + } + + // Single candidate: no point prompting. The selection marker would sit + // on the only row and arrow keys would be no-ops, which looks like a + // wedge from the user's perspective (terminal cursor still on the + // prompt line). Just adopt it and tell the user what happened. + if candidates.len() == 1 { + let only = candidates.remove(0); + let host = only + .hostnames + .first() + .map(String::as_str) + .unwrap_or(only.id.as_str()); + println!("Resuming the only tunnel in this project: {host} (id: {})", only.id); + return Ok(Some(only)); + } + + // Most-likely-relevant first: tunnels you previously enabled (have an + // advertisement) bubble up before disabled ones. + candidates.sort_by(|a, b| { + b.enabled + .cmp(&a.enabled) + .then_with(|| a.hostnames[0].cmp(&b.hostnames[0])) + }); + + let max_host = candidates + .iter() + .map(|t| t.hostnames[0].len()) + .max() + .unwrap_or(0); + let max_endpoint = candidates + .iter() + .map(|t| t.endpoint.len()) + .max() + .unwrap_or(0); + let labels: Vec = candidates + .iter() + .map(|t| { + let host = &t.hostnames[0]; + let status = if t.enabled { " " } else { "○ " }; + format!( + "{status}{host: Option { + for org in orgs { + if let Some(project) = org.projects.iter().find(|p| p.resource_id == project_id) { + return Some(SelectedContext { + org_id: org.org.resource_id.clone(), + org_name: org.org.display_name.clone(), + project_id: project.resource_id.clone(), + project_name: project.display_name.clone(), + org_type: org.org.r#type.clone(), + }); + } + } + None +} + +/// Renders an OAuth2 device authorization grant prompt for `auth login +/// --no-browser` / `auth switch --no-browser`. Prints the verification +/// URL + user code prominently to stderr (so it doesn't get tangled +/// with the structured-output JSON future plugin modes may emit to +/// stdout), then returns so the polling loop in lib can proceed. +async fn display_device_code(info: DeviceCodeInfo) { + eprintln!(); + eprintln!("================================================================"); + eprintln!(" Open this URL on another device to authorize:"); + eprintln!(); + eprintln!(" {}", info.verification_uri); + eprintln!(); + eprintln!(" Enter this code when prompted:"); + eprintln!(); + eprintln!(" {}", info.user_code); + if let Some(complete) = &info.verification_uri_complete { + eprintln!(); + eprintln!(" (Or open this pre-filled URL to skip the code step:"); + eprintln!(" {})", complete); + } + eprintln!(); + eprintln!( + " Code expires in {} seconds. Waiting for authorization…", + info.expires_in.as_secs(), + ); + eprintln!("================================================================"); + eprintln!(); +} + +/// Result of streaming the tunnel-setup progress to stdout. All conditions +/// reached `Ready` (or we bailed before that for a terminal failure). +struct SetupResult { + elapsed: std::time::Duration, + hostnames: Vec, +} + +/// Stuck threshold: a step that stays pending this long without progressing +/// gets called out with a hint. Picked to cover normal slow paths (TLS cert +/// issuance, edge programming) while still flagging genuine wedges. +const PROGRESS_STUCK_WARN: std::time::Duration = std::time::Duration::from_secs(30); + +/// Poll cadence during setup. Fast enough that step transitions feel +/// responsive; the actual server-side reconcile latency dominates +/// wall-clock anyway. +const PROGRESS_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(750); + +/// Poll cadence during the steady-state watch after setup is done. A live +/// terminal failure (e.g. a stale iroh DNS owner re-emerging on a +/// controller re-reconcile) needs to be surfaced within a minute or so, +/// but we don't need sub-second resolution — the tunnel either works or +/// it doesn't. +const RUNTIME_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10); + +/// Setup-loop tolerance for transient apiserver errors. A single 503 from +/// the Datum API server front-end (kube apiserver behind Envoy briefly +/// dropping connections, etc.) should not kill an in-flight setup that +/// the next poll tick would have ridden over. After this many consecutive +/// errors we still bail so a genuinely unreachable control plane surfaces +/// fast. At 750ms cadence, 10 tries ≈ 7.5s of patience — enough for a +/// short-lived blip, short enough to not feel hung. +const MAX_CONSECUTIVE_POLL_ERRORS: u32 = 10; + +/// Format the user-facing message for a terminal progress failure. Shared +/// between setup-time (bail!) and runtime watch (eprintln!) so the user +/// sees the same diagnosis regardless of when the condition trips. +fn format_terminal_failure(fail: &lib::ProgressStep) -> String { + let owner = fail + .message + .as_deref() + .unwrap_or("(controller did not provide a message)"); + format!( + "✗ {}: iroh DNS record is owned by another Connector ({}). \ + Another project on this machine — or a stale Connector that was never \ + cleaned up — claimed the same iroh identity. Remove the listen_key for \ + this project (under /projects//listen_key with the \ + per-project layout, or the flat /listen_key otherwise) and rerun, \ + or delete the offending Connector.", + fail.kind.label(), + owner, + ) +} + +/// Drive a tunnel through its setup conditions, printing a checklist as +/// each one transitions to Ready. Bails fast on terminal failure +/// (e.g. iroh DNS deferred to another project's Connector — waiting won't +/// help, and the operator's message already names the conflict). +async fn await_tunnel_progress( + service: &TunnelService, + tunnel_id: &str, +) -> n0_error::Result { + use std::collections::HashMap; + + let start = std::time::Instant::now(); + let mut last_status: HashMap = HashMap::new(); + let mut pending_since: HashMap = HashMap::new(); + let mut warned_stuck: std::collections::HashSet = Default::default(); + let mut consecutive_errors: u32 = 0; + + loop { + // Tolerate transient apiserver errors mid-setup. A single 503 from + // the Datum API server's Envoy front (e.g. kube apiserver briefly + // dropping connections) shouldn't kill a setup the next tick would + // have recovered. We bail only after MAX_CONSECUTIVE_POLL_ERRORS + // so a genuinely unreachable control plane still surfaces fast. + let progress = match service.get_active_progress(tunnel_id).await { + Ok(Some(p)) => { + consecutive_errors = 0; + p + } + Ok(None) => n0_error::bail_any!("Tunnel {} not found", tunnel_id), + Err(err) => { + consecutive_errors += 1; + if consecutive_errors >= MAX_CONSECUTIVE_POLL_ERRORS { + return Err(err); + } + tracing::debug!( + consecutive_errors, + max = MAX_CONSECUTIVE_POLL_ERRORS, + "setup poll errored, retrying: {err:#}" + ); + tokio::time::sleep(PROGRESS_POLL_INTERVAL).await; + continue; + } + }; + + for step in &progress.steps { + let resource = step + .resource + .as_deref() + .map(|r| format!(" [{r}]")) + .unwrap_or_default(); + let prev = last_status.get(&step.kind).copied(); + if prev != Some(step.status) { + match step.status { + StepStatus::Ready => { + println!( + " ✓ {} ({:.1}s){resource}", + step.kind.label(), + start.elapsed().as_secs_f32(), + ); + pending_since.remove(&step.kind); + } + StepStatus::Pending => { + pending_since.entry(step.kind).or_insert_with(std::time::Instant::now); + } + StepStatus::Unknown => {} + } + last_status.insert(step.kind, step.status); + } + + // Generic stuck warning: if a step has been Pending past the + // threshold and we haven't already warned, surface the + // controller's reason/message so the user knows what's stalled. + if step.status == StepStatus::Pending + && !warned_stuck.contains(&step.kind) + && let Some(since) = pending_since.get(&step.kind) + && since.elapsed() >= PROGRESS_STUCK_WARN + { + warned_stuck.insert(step.kind); + let detail = step + .message + .as_deref() + .or(step.reason.as_deref()) + .unwrap_or("no detail from controller"); + eprintln!( + " … {} still pending after {}s{resource}: {detail}", + step.kind.label(), + since.elapsed().as_secs(), + ); + } + } + + if let Some(fail) = progress.terminal_failure() { + n0_error::bail_any!("{}", format_terminal_failure(fail)); + } + + if progress.all_ready() && !progress.hostnames.is_empty() { + return Ok(SetupResult { + elapsed: start.elapsed(), + hostnames: progress.hostnames, + }); + } + + tokio::time::sleep(PROGRESS_POLL_INTERVAL).await; + } +} + +/// How often to retry the connectivity probes during the verify phase. +const VERIFY_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10); + +/// Outcome of one probe attempt. We split "reachable but error" from +/// "couldn't even connect" so the user gets a useful failure summary +/// (different remedies for "your server is down" vs "the edge is 503ing"). +#[derive(Debug, Clone)] +enum ProbeOutcome { + Ok { status: u16 }, + HttpStatus { status: u16 }, + NotReachable { reason: String }, +} + +impl ProbeOutcome { + fn ok(&self) -> bool { + matches!(self, ProbeOutcome::Ok { .. }) + } + + fn detail(&self) -> String { + match self { + ProbeOutcome::Ok { status } => format!("HTTP {status}"), + ProbeOutcome::HttpStatus { status } => format!("HTTP {status}"), + ProbeOutcome::NotReachable { reason } => reason.clone(), + } + } +} + +/// Probe one URL. Any response under 500 (including 4xx like 401/404) is +/// considered "reachable" — the data path is forwarding even if the +/// origin chose to reject the request. Only 5xx + transport errors are +/// counted as failures so we don't false-fail on authenticated origins. +async fn probe(client: &reqwest::Client, url: &str) -> ProbeOutcome { + match client.get(url).send().await { + Ok(resp) => { + let status = resp.status().as_u16(); + if status < 500 { + ProbeOutcome::Ok { status } + } else { + ProbeOutcome::HttpStatus { status } + } + } + Err(err) => ProbeOutcome::NotReachable { reason: err.to_string() }, + } +} + +/// After the controller conditions all flip Ready, the data plane still +/// needs a moment to actually carry traffic — Envoy programming a route +/// is not the same as Envoy serving it. Probe both the user's local +/// origin and the public proxy URL every 10 seconds until both respond +/// non-5xx, or the timeout budget runs out. +async fn verify_endpoints( + origin_url: &str, + proxy_hostname: &str, + budget: std::time::Duration, +) -> n0_error::Result<()> { + // The CLI accepts `--endpoint host:port` (no scheme) and uses the same + // normalize_endpoint that `TunnelSummary.endpoint` is canonicalized + // through. Without this, reqwest's request-builder rejects a bare + // host:port with "builder error" and the probe loops forever even + // though the actual origin is reachable. + let origin_url = lib::normalize_endpoint(origin_url); + let proxy_url = format!("https://{proxy_hostname}/"); + let client = reqwest::Client::builder() + // Per-request timeout shorter than the poll interval so a stuck + // request can't eat the whole 10s gap. + .timeout(std::time::Duration::from_secs(5)) + .build() + .map_err(|err| n0_error::anyerr!("failed to build HTTP client: {err}"))?; + + let start = std::time::Instant::now(); + let mut origin_ok = false; + let mut proxy_ok = false; + let mut last_origin: Option = None; + let mut last_proxy: Option = None; + + loop { + // Probe in parallel — skip the side that's already ready. + let origin_fut = async { + if origin_ok { None } else { Some(probe(&client, &origin_url).await) } + }; + let proxy_fut = async { + if proxy_ok { None } else { Some(probe(&client, &proxy_url).await) } + }; + let (now_origin, now_proxy) = tokio::join!(origin_fut, proxy_fut); + let elapsed = start.elapsed().as_secs_f32(); + + if let Some(o) = now_origin { + if o.ok() { + origin_ok = true; + println!( + " ✓ origin reachable ({elapsed:.1}s) [{origin_url}]: {}", + o.detail(), + ); + } else { + eprintln!( + " … origin not reachable ({elapsed:.0}s) [{origin_url}]: {}", + o.detail(), + ); + } + last_origin = Some(o); + } + if let Some(p) = now_proxy { + if p.ok() { + proxy_ok = true; + println!( + " ✓ proxy responding ({elapsed:.1}s) [https://{proxy_hostname}]: {}", + p.detail(), + ); + } else { + eprintln!( + " … proxy not responding ({elapsed:.0}s) [https://{proxy_hostname}]: {}", + p.detail(), + ); + } + last_proxy = Some(p); + } + + if origin_ok && proxy_ok { + return Ok(()); + } + if start.elapsed() >= budget { + break; + } + // Don't sleep past the budget — clamp the wait so an early + // success on one side doesn't make us waste 10s before bailing. + let remaining = budget.saturating_sub(start.elapsed()); + tokio::time::sleep(VERIFY_POLL_INTERVAL.min(remaining)).await; + } + + let mut parts: Vec = Vec::new(); + if !origin_ok { + let detail = last_origin + .as_ref() + .map(|o| o.detail()) + .unwrap_or_else(|| "never probed".into()); + parts.push(format!("origin {origin_url} never responded ({detail})")); + } + if !proxy_ok { + let detail = last_proxy + .as_ref() + .map(|p| p.detail()) + .unwrap_or_else(|| "never probed".into()); + parts.push(format!( + "proxy https://{proxy_hostname} never returned non-5xx ({detail})" + )); + } + n0_error::bail_any!( + "connectivity verification timed out after {}s: {}", + start.elapsed().as_secs(), + parts.join("; "), + ) +} diff --git a/flake.lock b/flake.lock index 9da080b..b4d0034 100644 --- a/flake.lock +++ b/flake.lock @@ -20,11 +20,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1770115704, - "narHash": "sha256-KHFT9UWOF2yRPlAnSXQJh6uVcgNcWlFqqiAZ7OVlHNc=", + "lastModified": 1780243769, + "narHash": "sha256-x5UQuRsH3MqI0U9afaXSNqzTPSeZlRLvFAav2Ux1pNw=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "e6eae2ee2110f3d31110d5c222cd395303343b08", + "rev": "331800de5053fcebacf6813adb5db9c9dca22a0c", "type": "github" }, "original": { @@ -62,11 +62,11 @@ "nixpkgs": "nixpkgs_2" }, "locked": { - "lastModified": 1770088046, - "narHash": "sha256-4hfYDnUTvL1qSSZEA4CEThxfz+KlwSFQ30Z9jgDguO0=", + "lastModified": 1780456895, + "narHash": "sha256-CvRZn3Ut0scqLJ1xwQFkZwKGVBUUNBPrFVXRTMZpbfU=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "71f9daa4e05e49c434d08627e755495ae222bc34", + "rev": "7cc96a6a3fd6613cafd633250a3934483479b9a1", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index fb5869a..c2c3317 100644 --- a/flake.nix +++ b/flake.nix @@ -27,10 +27,11 @@ linuxPackages = with pkgs; lib.optionals stdenv.isLinux [ # For web/desktop rendering - webkitgtk + webkitgtk_4_1 gtk3 - libsoup + libsoup_3 # X11 dependencies + xdo xorg.libX11 xorg.libXcursor xorg.libXrandr @@ -38,7 +39,7 @@ ]; cargoOutputHashes = { - "iroh-proxy-utils-0.1.0" = "sha256-tI26vv7fvNR18KsUJvBTXZ0c7Wc/63Qq88NAWuWMoHs="; + "iroh-proxy-utils-0.1.0" = "sha256-ZV71q22zCWBqFdrc0jzkwyQdVc/H0r0BBB6dKrNARr8="; "dioxus-primitives-0.0.1" = "sha256-tI26vv7fvNR18KsUJvBTXZ0c7Wc/63Qq88NAWuWMoHs="; }; @@ -151,7 +152,7 @@ formatter = pkgs.nixpkgs-fmt; apps.desktop = let - script = pkgs.writeShellScriptBin "desktop-app" '' + script = pkgs.writeShellScriptBin "datum-desktop" '' cd "$PWD/ui" export DATUM_CONNECT_PUBLISH_TICKETS=1 export RUST_LOG=info,lib::heartbeat=debug,lib::tunnels=debug @@ -159,7 +160,16 @@ ''; in { type = "app"; - program = "${script}/bin/desktop-app"; + program = "${script}/bin/datum-desktop"; + }; + + apps.cli = let + script = pkgs.writeShellScriptBin "datum-connect-cli" '' + exec ${self.packages.${system}.cli}/bin/datum-connect "$@" + ''; + in { + type = "app"; + program = "${script}/bin/datum-connect-cli"; }; } ); diff --git a/lib/src/config.rs b/lib/src/config.rs index 37217c0..6953933 100644 --- a/lib/src/config.rs +++ b/lib/src/config.rs @@ -7,6 +7,8 @@ use std::{ use n0_error::{Result, StackResultExt, StdResultExt}; use serde::{Deserialize, Serialize}; +use crate::SelectedContext; + #[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)] #[serde(rename_all = "snake_case")] pub enum DiscoveryMode { @@ -49,6 +51,10 @@ pub struct Config { /// Useful for local development (e.g. 127.0.0.1:53535). #[serde(default)] pub dns_resolver: Option, + + /// The currently selected org/project context. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub selected_context: Option, } #[derive(Debug, Clone, Default, Serialize, Deserialize)] diff --git a/lib/src/datum_apis/connector.rs b/lib/src/datum_apis/connector.rs index 3701801..ed53494 100644 --- a/lib/src/datum_apis/connector.rs +++ b/lib/src/datum_apis/connector.rs @@ -102,3 +102,11 @@ pub struct ConnectorStatus { pub connection_details: Option, pub lease_ref: Option, } + +pub const CONNECTOR_CONDITION_READY: &str = "Ready"; +pub const CONNECTOR_CONDITION_IROH_DNS_PUBLISHED: &str = "IrohDNSPublished"; +/// The iroh DNS record is already owned by another Connector with the same +/// public key — typically a Connector in a different project. The losing +/// Connector cannot publish DNS and its tunnel data plane is silently +/// unreachable. See network-services-operator iroh_dns_controller.go. +pub const CONNECTOR_REASON_DEFERRED_TO_OWNER: &str = "DeferredToOwner"; diff --git a/lib/src/datum_apis/http_proxy.rs b/lib/src/datum_apis/http_proxy.rs index 825c7f5..77a5a95 100644 --- a/lib/src/datum_apis/http_proxy.rs +++ b/lib/src/datum_apis/http_proxy.rs @@ -63,6 +63,8 @@ pub const HTTP_PROXY_CONDITION_ACCEPTED: &str = "Accepted"; pub const HTTP_PROXY_CONDITION_PROGRAMMED: &str = "Programmed"; pub const HTTP_PROXY_CONDITION_HOSTNAMES_VERIFIED: &str = "HostnamesVerified"; pub const HTTP_PROXY_CONDITION_HOSTNAMES_IN_USE: &str = "HostnamesInUse"; +pub const HTTP_PROXY_CONDITION_CERTIFICATES_READY: &str = "CertificatesReady"; +pub const HTTP_PROXY_CONDITION_CONNECTOR_METADATA_PROGRAMMED: &str = "ConnectorMetadataProgrammed"; pub const HTTP_PROXY_REASON_ACCEPTED: &str = "Accepted"; pub const HTTP_PROXY_REASON_PROGRAMMED: &str = "Programmed"; diff --git a/lib/src/datum_cloud.rs b/lib/src/datum_cloud.rs index 61a8a2f..7f543b3 100644 --- a/lib/src/datum_cloud.rs +++ b/lib/src/datum_cloud.rs @@ -14,7 +14,7 @@ use crate::http_user_agent::datum_http_user_agent; use crate::{ProjectControlPlaneClient, Repo, SelectedContext}; pub use self::{ - auth::{AuthClient, AuthState, LoginState, MaybeAuth, UserProfile}, + auth::{AuthClient, AuthState, DeviceCodeInfo, LoginState, MaybeAuth, UserProfile}, env::ApiEnv, }; @@ -96,6 +96,31 @@ impl DatumCloudClient { self.auth.load() } + pub async fn is_authenticated(&self) -> Result { + let state = self.auth.load_refreshed().await?; + Ok(state.get().is_ok()) + } + + pub async fn login(&self) -> Result<()> { + self.auth.login().await + } + + /// `--no-browser` login via the OAuth2 device authorization grant. + /// The CLI's `auth login --no-browser` flag routes here; the `display` + /// callback shows the verification URL + user code to the operator + /// who completes authorization on another device. + pub async fn login_device_code(&self, display: F) -> Result<()> + where + F: FnOnce(DeviceCodeInfo) -> Fut, + Fut: std::future::Future, + { + self.auth.login_device_code(display).await + } + + pub async fn logout(&self) -> Result<()> { + self.auth.logout().await + } + pub fn selected_context(&self) -> Option { self.session.selected_context() } diff --git a/lib/src/datum_cloud/auth.rs b/lib/src/datum_cloud/auth.rs index 66fbe7a..8065588 100644 --- a/lib/src/datum_cloud/auth.rs +++ b/lib/src/datum_cloud/auth.rs @@ -11,10 +11,12 @@ use arc_swap::ArcSwap; use chrono::Utc; use n0_error::{Result, StackResultExt, StdResultExt, anyerr, stack_error}; use openidconnect::{ - AccessToken, AccessTokenHash, AuthorizationCode, ClientId, ClientSecret, CsrfToken, IssuerUrl, - Nonce, NonceVerifier, OAuth2TokenResponse, PkceCodeChallenge, RefreshToken, Scope, - TokenResponse, - core::{CoreAuthenticationFlow, CoreClient, CoreProviderMetadata}, + AccessToken, AccessTokenHash, AuthorizationCode, ClientId, ClientSecret, CsrfToken, + DeviceAuthorizationUrl, IssuerUrl, Nonce, NonceVerifier, OAuth2TokenResponse, + PkceCodeChallenge, RefreshToken, Scope, TokenResponse, + core::{ + CoreAuthenticationFlow, CoreClient, CoreDeviceAuthorizationResponse, CoreProviderMetadata, + }, }; use serde::{Deserialize, Serialize}; use tokio::sync::watch; @@ -31,6 +33,25 @@ const LOGIN_TIMEOUT: Duration = Duration::from_secs(60); /// Refresh auth or relogin if access token is valid for less than 30min const REFRESH_AUTH_WHEN: Duration = Duration::from_secs(60 * 30); +/// Surface of an in-flight OAuth2 device authorization grant. The +/// `display` callback passed to [`StatelessClient::login_device_code`] +/// receives one of these and is responsible for showing the verification +/// URL + user code to the operator. +#[derive(Debug, Clone)] +pub struct DeviceCodeInfo { + /// URL the user opens on their other device to authorize. + pub verification_uri: String, + /// Short code the user enters at `verification_uri`. + pub user_code: String, + /// Optional URL that pre-fills the user code, so the user only has + /// to follow one link. + pub verification_uri_complete: Option, + /// How long the user has before the device code expires. + pub expires_in: Duration, + /// Operator-side polling interval recommended by the auth server. + pub interval: Duration, +} + pub struct AuthProvider { pub issuer_url: String, pub client_id: String, @@ -190,6 +211,7 @@ impl StatelessClient { .add_scope(Scope::new("profile".to_string())) .add_scope(Scope::new("email".to_string())) .add_scope(Scope::new("offline_access".to_string())) + .add_extra_param("prompt", "select_account") .set_pkce_challenge(pkce_challenge) .url(); debug!(auth_uri=%self.oidc.auth_uri(), "attempting login"); @@ -233,24 +255,167 @@ impl StatelessClient { None => Err("Missing nonce in ID token".to_string()), } }; - let state = self.parse_token_response(tokens, nonce_verifier).await?; + let state = self + .parse_token_response(tokens, nonce_verifier, None) + .await?; info!(email=%state.profile.email, expires_at=%state.tokens.expires_at(), "login succesfull"); Ok(state) } - pub async fn refresh(&self, tokens: &AuthTokens) -> Result { - let refresh_token = tokens.refresh_token.as_ref().context("No refresh token")?; + /// OAuth2 Device Authorization grant (RFC 8628). Used by `datum-connect + /// auth login --no-browser` and any other headless context where the + /// localhost-redirect flow can't reach back to a browser (SSH, CI, + /// containers). The caller receives a `DeviceCodeInfo` via the + /// `display` callback and is responsible for showing the verification + /// URL + user code to the operator; this method then polls the token + /// endpoint until the user completes authorization. + pub async fn login_device_code(&self, display: F) -> Result + where + F: FnOnce(DeviceCodeInfo) -> Fut, + Fut: Future, + { + // `openidconnect::CoreProviderMetadata` doesn't surface the + // `device_authorization_endpoint` from discovery, so refetch the + // raw JSON to find it. + let discovery_url = format!( + "{}/.well-known/openid-configuration", + self.env.auth_provider().issuer_url.trim_end_matches('/'), + ); + #[derive(Deserialize)] + struct DiscoveryDoc { + device_authorization_endpoint: Option, + } + let discovery: DiscoveryDoc = self + .http + .get(&discovery_url) + .send() + .await + .std_context("Failed to fetch OIDC discovery document")? + .error_for_status() + .std_context("OIDC discovery returned a non-success status")? + .json() + .await + .std_context("Failed to parse OIDC discovery document")?; + let device_endpoint = discovery.device_authorization_endpoint.context( + "Auth server does not advertise a device_authorization_endpoint; \ + --no-browser is unsupported against this provider", + )?; + + // Rebuild a CoreClient with the device URL set. The crate's + // typestate prevents mutating the cached `self.oidc` in place, + // and the discovery the constructor performs is cheap enough to + // accept the duplication. + // + // HACK: borrow datumctl's OIDC client_id for the device-flow + // path. Our own client (the `datum-desktop-app` Zitadel app — + // see datum-cloud/infra apps/datum-iam-system/.../zitadel-setup + // /pulumi/index.ts) is allow-listed only for AUTHORIZATION_CODE + // and REFRESH_TOKEN grants, and Zitadel responds with + // `unauthorized_client: grant_type "...device_code" not allowed` + // for it. `datumctl-cli` already has DEVICE_CODE in its + // grantTypes, so we reuse its client_id strictly for the device + // flow until the planned datumctl-connect plugin lands (which + // will own its own properly-scoped OIDC client). Token issued + // is still scoped to the same Zitadel project; downstream API + // calls don't care which client minted it. + let provider = self.env.auth_provider(); + let device_client_id = match self.env { + ApiEnv::Staging => "325848904128073754", + ApiEnv::Production => "328728232771788043", + }; + let issuer = IssuerUrl::new(provider.issuer_url.clone()) + .std_context("Invalid OIDC provider issuer URL")?; + let metadata = CoreProviderMetadata::discover_async(issuer, &self.http) + .await + .std_context("Failed to discover OIDC provider metadata")?; + let oidc = CoreClient::from_provider_metadata( + metadata, + ClientId::new(device_client_id.to_string()), + provider.client_secret.clone().map(ClientSecret::new), + ) + .set_device_authorization_url( + DeviceAuthorizationUrl::new(device_endpoint).std_context( + "Invalid device_authorization_endpoint in OIDC discovery", + )?, + ); + + let details: CoreDeviceAuthorizationResponse = oidc + .exchange_device_code() + .add_scope(Scope::new("openid".to_string())) + .add_scope(Scope::new("profile".to_string())) + .add_scope(Scope::new("email".to_string())) + .add_scope(Scope::new("offline_access".to_string())) + .request_async(&self.http) + .await + .std_context("Failed to start device authorization")?; + + let info = DeviceCodeInfo { + verification_uri: details.verification_uri().to_string(), + user_code: details.user_code().secret().to_string(), + verification_uri_complete: details + .verification_uri_complete() + .map(|u| u.secret().to_string()), + expires_in: Duration::from_secs(details.expires_in().as_secs()), + interval: Duration::from_secs(details.interval().as_secs()), + }; + display(info).await; + + let tokens = oidc + .exchange_device_access_token(&details) + .std_context("Device-flow client misconfigured")? + .request_async(&self.http, tokio::time::sleep, None) + .await + .std_context("Failed to exchange device access token")?; + + // Device flow doesn't bind a nonce (no user-agent round-trip carrying + // one), so accept an absent nonce in the ID token. + let nonce_verifier = + |_received: Option<&Nonce>| -> std::result::Result<(), String> { Ok(()) }; + let state = self.parse_token_response(tokens, nonce_verifier, None).await?; + info!( + email=%state.profile.email, + expires_at=%state.tokens.expires_at(), + "device-code login successful" + ); + Ok(state) + } + + pub async fn refresh( + &self, + tokens: &AuthTokens, + fallback_profile: Option, + ) -> std::result::Result { + let refresh_token = tokens.refresh_token.as_ref().ok_or_else(|| { + // No stored refresh token means we cannot exchange anything; only an + // interactive login can repopulate it. + RefreshError::Permanent(anyerr!("No refresh token available")) + })?; debug!("Refreshing access token"); - let tokens = self + let refresh_req = self .oidc .exchange_refresh_token(refresh_token) - .std_context("Missing OIDC provider metadata")? - .request_async(&self.http) - .await - .std_context("Failed to refresh tokens")?; + .std_context("Missing OIDC provider metadata") + .map_err(RefreshError::Transient)?; + let tokens = match refresh_req.request_async(&self.http).await { + Ok(tokens) => tokens, + Err(err) => { + let permanent = classify_oidc_refresh_error(&err); + let wrapped = anyerr!("OAuth refresh exchange failed: {err}"); + return Err(if permanent { + RefreshError::Permanent(wrapped) + } else { + RefreshError::Transient(wrapped) + }); + } + }; + // ID token verification failures (e.g. stale JWKs after key rotation) + // are transient: ensure_fresh_client() on the next attempt refetches + // provider metadata. Profile-fetch failures are already handled by + // fallback_profile inside parse_token_response. let state = self - .parse_token_response(tokens, refresh_nonce_verifier) - .await?; + .parse_token_response(tokens, refresh_nonce_verifier, fallback_profile) + .await + .map_err(RefreshError::Transient)?; debug!("Access token refreshed"); Ok(state) } @@ -259,6 +424,7 @@ impl StatelessClient { &self, tokens: OidcTokenResponse, nonce_verifier: impl NonceVerifier, + fallback_profile: Option, ) -> Result { // Extract the ID token claims after verifying its authenticity and nonce. let id_token = tokens @@ -311,8 +477,23 @@ impl StatelessClient { expires_in: tokens.expires_in().context("Missing expires_in claim")?, }; - // Fetch user profile from Datum Cloud API - let profile = self.fetch_user_profile(&auth_tokens, &user_id).await?; + // Fetch user profile from Datum Cloud API. If the fetch fails but we already + // have a profile from a prior login, keep the prior one rather than dropping + // the freshly minted tokens. This guards against transient API blips (e.g. a + // 401 on /users/{id} while the new access token is still propagating) causing + // a full logout. See datum-cloud/app#TBD. + let profile = match self.fetch_user_profile(&auth_tokens, &user_id).await { + Ok(profile) => profile, + Err(err) => match fallback_profile { + Some(profile) => { + warn!( + "Profile fetch after token refresh failed, keeping prior profile: {err:#}" + ); + profile + } + None => return Err(err), + }, + }; Ok(AuthState { tokens: auth_tokens, @@ -427,6 +608,68 @@ impl StatelessClient { #[error("Not logged in")] pub struct NotLoggedIn; +/// Outcome classification for a token refresh attempt. +/// +/// The distinction matters for the heartbeat loop and the listener: a +/// `Transient` failure should keep auth state intact and let the next retry +/// recover, while a `Permanent` failure means the OAuth provider has +/// definitively rejected our credentials and only a fresh interactive login +/// can recover. Treating every refresh failure as permanent — which the +/// previous implementation did — meant a 30-second IdP wobble would log a +/// long-running tunnel out. +#[derive(Debug)] +pub enum RefreshError { + /// The IdP definitively rejected the refresh (typically `invalid_grant`, + /// `invalid_client`, etc.). Auth state has been cleared; the operator + /// must log in again. + Permanent(n0_error::AnyError), + /// Transient failure (network, IdP 5xx, parse error, ID-token claim + /// verification). Auth state is preserved; the caller should retry with + /// backoff. + Transient(n0_error::AnyError), +} + +impl std::fmt::Display for RefreshError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Permanent(e) => write!(f, "refresh permanently rejected by IdP: {e:#}"), + Self::Transient(e) => write!(f, "transient refresh failure: {e:#}"), + } + } +} + +impl std::error::Error for RefreshError {} + +/// Decide whether a `RequestTokenError` from the OAuth refresh exchange is +/// permanent (re-login required) or transient (retry). +fn classify_oidc_refresh_error( + err: &openidconnect::RequestTokenError< + RE, + openidconnect::StandardErrorResponse, + >, +) -> bool +where + RE: std::error::Error, +{ + use openidconnect::RequestTokenError; + use openidconnect::core::CoreErrorResponseType; + match err { + // The IdP returned a structured OAuth error code. RFC 6749 §5.2 codes + // all describe client-side problems that retry cannot fix. + RequestTokenError::ServerResponse(resp) => !matches!( + resp.error(), + // Unknown extension code — conservatively treat as transient so a + // custom code like "rate_limit_exceeded" does not log the user out. + CoreErrorResponseType::Extension(_) + ), + // Network/transport, malformed response body, or some other oauth2 + // crate-level error. All retryable. + RequestTokenError::Request(_) + | RequestTokenError::Parse(_, _) + | RequestTokenError::Other(_) => false, + } +} + #[derive(Default, Debug)] pub struct MaybeAuth(Option); @@ -674,10 +917,16 @@ impl AuthClient { } Ok(auth) if auth.tokens.expires_in_less_than(REFRESH_AUTH_WHEN) => { let client = self.ensure_fresh_client().await?; - match client.refresh(&auth.tokens).await { + match client + .refresh(&auth.tokens, Some(auth.profile.clone())) + .await + { Ok(auth) => auth, Err(err) => { - warn!("Failed to refresh auth token: {err:#}"); + // Either Permanent or Transient — falling through to a + // fresh interactive login is the right move in either + // case because the operator just asked us to log in. + warn!("Refresh during login failed, falling back to fresh login: {err:#}"); let client = self.ensure_fresh_client().await?; client .login(|url, _cancel_token| async move { @@ -698,20 +947,64 @@ impl AuthClient { Ok(()) } + /// `--no-browser` analog of [`AuthClient::login`]. Skips the auth-code + /// localhost-redirect flow (which doesn't work over SSH because the + /// remote machine's bound port can't be reached by a browser running + /// on the operator's laptop) and uses the OAuth2 device authorization + /// grant instead. Always performs a fresh login — if a refresh-eligible + /// token exists the caller can use [`AuthClient::login`] without + /// `--no-browser` to use it. + pub async fn login_device_code(&self, display: F) -> Result<()> + where + F: FnOnce(DeviceCodeInfo) -> Fut, + Fut: Future, + { + let client = self.ensure_fresh_client().await?; + let auth = client.login_device_code(display).await?; + self.state.set(Some(auth)).await?; + Ok(()) + } + pub async fn refresh(&self) -> Result<()> { let auth = self.state.load(); let auth = auth.get()?; let client = self.ensure_fresh_client().await?; - let new_auth = match client.refresh(&auth.tokens).await { - Ok(auth) => auth, - Err(err) => { - warn!("Failed to refresh auth tokens, logging out: {err:#}"); + match client + .refresh(&auth.tokens, Some(auth.profile.clone())) + .await + { + Ok(new_auth) => { + self.state.set(Some(new_auth)).await?; + Ok(()) + } + Err(RefreshError::Permanent(err)) => { + // The IdP definitively rejected our refresh — only a fresh + // interactive login can recover. Clear state and surface the + // event prominently so a long-running session does not silently + // wedge. + error!( + "Datum login has expired or been revoked — the tunnel will \ + stop accepting new connections. Run the CLI's login \ + command (or restart the desktop app and sign in) to \ + reconnect. Cause: {err:#}" + ); + eprintln!( + "Datum login has expired or been revoked. \ + Please log in again to restore the tunnel." + ); self.state.set(None).await?; - Err(err).context("Failed to refresh auth tokens, needs login")? + Err(err).context("Refresh permanently rejected; re-login required")? } - }; - self.state.set(Some(new_auth)).await?; - Ok(()) + Err(RefreshError::Transient(err)) => { + // Network/IdP blip. Keep tokens; the next retry (proactive + // timer or 401-triggered) should recover. + warn!( + "Transient token refresh failure — keeping existing \ + credentials, will retry: {err:#}" + ); + Err(err).context("Transient token refresh failure")? + } + } } /// Refresh the user profile from the API without refreshing tokens @@ -815,8 +1108,6 @@ mod redirect_server { static LOGIN_SUCCESS_PNG: &[u8] = include_bytes!("../../../ui/assets/images/login-success.png"); static ALLIANCE_NO1_REGULAR_TTF: &[u8] = include_bytes!("../../../ui/assets/fonts/AllianceNo1-Regular.ttf"); - static FAVICON_LIGHT_32: &[u8] = - include_bytes!("../../../ui/assets/icons/favicon-light-32x32.png"); static FAVICON_DARK_32: &[u8] = include_bytes!("../../../ui/assets/icons/favicon-dark-32x32.png"); diff --git a/lib/src/heartbeat.rs b/lib/src/heartbeat.rs index 2a7ba1e..66515c8 100644 --- a/lib/src/heartbeat.rs +++ b/lib/src/heartbeat.rs @@ -14,7 +14,7 @@ use rand::Rng; use serde_json::json; use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; -use tracing::{debug, warn}; +use tracing::{debug, info, warn}; use crate::ListenNode; use crate::datum_apis::connector::{ @@ -86,6 +86,13 @@ impl HeartbeatAgent { } } + /// Start in auto-enroll mode: watch login + projects state and keep + /// heartbeats running for every project the user has access to. + /// Intended for multi-project consumers like the UI. + /// + /// For the CLI tunnel use case where there is exactly one project of + /// interest, prefer [`Self::start_manual`] — auto-enroll silently + /// maintains presence in projects the user didn't ask about. pub async fn start(&self) { let mut guard = self.inner.login_task.lock().await; if guard.is_some() { @@ -134,6 +141,24 @@ impl HeartbeatAgent { *guard = Some(AbortOnDropHandle::new(task)); } + /// Start in manual mode: do not watch login state and do not auto-enroll + /// projects. Callers are responsible for [`Self::register_project`] / + /// [`Self::deregister_project`] for the projects they want heartbeats + /// for. Per-project loops still handle 401s via their own + /// force-refresh logic, so transient auth blips are tolerated; a + /// permanent logout is surfaced separately by the CLI's own login + /// watcher. + pub async fn start_manual(&self) { + let mut guard = self.inner.login_task.lock().await; + if guard.is_some() { + return; + } + // Park a completed task so future start() / start_manual() calls + // remain no-ops, matching start()'s "single-start" contract. + let task = tokio::spawn(async {}); + *guard = Some(AbortOnDropHandle::new(task)); + } + pub async fn register_project(&self, project_id: impl Into) { let project_id = project_id.into(); let mut projects = self.inner.projects.lock().await; @@ -239,6 +264,78 @@ struct ConnectorCache { last_home_relay: Option, } +/// Returns true if `err` is a kube API error with HTTP status 401. +/// Used to decide whether a heartbeat retry should force an OAuth token refresh +/// (the proactive refresh timer in `AuthClient` only fires when the access token +/// is within `REFRESH_AUTH_WHEN` of expiry, so a token rejected before that +/// would otherwise spin until the timer catches up). +fn is_unauthorized(err: &kube::Error) -> bool { + matches!(err, kube::Error::Api(e) if e.code == 401) +} + +fn is_not_found(err: &kube::Error) -> bool { + matches!(err, kube::Error::Api(e) if e.code == 404) +} + +/// What the heartbeat loop should do with its cache after a lease op fails. +#[derive(Debug, PartialEq, Eq)] +enum LeaseErrorAction { + /// Keep the cached connector/lease names; retry after backoff. + Retain, + /// Drop the cache so the next iteration re-resolves connector and lease + /// from scratch. Used when the lease no longer exists server-side. + Reset, + /// Force a token refresh, then retain the cache and retry. + RefreshAuth, +} + +fn classify_lease_error(err: &kube::Error) -> LeaseErrorAction { + if is_not_found(err) { + LeaseErrorAction::Reset + } else if is_unauthorized(err) { + LeaseErrorAction::RefreshAuth + } else { + LeaseErrorAction::Retain + } +} + +/// Force an OAuth token refresh after a 401. The proactive timer only refreshes +/// when the token is near expiry, so a server-side rejection that arrives early +/// (clock skew, revocation, etc.) would otherwise leave the heartbeat retrying +/// with the same dead token until the timer eventually fires. +/// +/// When auth is already in [`LoginState::Missing`] (e.g. after a previous +/// permanent refresh failure), this returns immediately without contacting the +/// IdP — the auth layer has already surfaced the loss to the operator and +/// there is nothing to refresh until they log in again. +async fn force_refresh_auth(project_id: &str, datum: &DatumCloudClient) { + if matches!(datum.auth().login_state(), LoginState::Missing) { + debug!( + %project_id, + "heartbeat: skipping forced refresh — auth state is missing, awaiting login" + ); + return; + } + match datum.auth().refresh().await { + Ok(()) => debug!(%project_id, "heartbeat: forced token refresh after 401"), + Err(err) => { + if matches!(datum.auth().login_state(), LoginState::Missing) { + // refresh classified the failure as permanent and cleared state; + // the auth layer has already error!/eprintln!'d the loss. + debug!( + %project_id, + "heartbeat: forced refresh permanently rejected, will resume after re-login" + ); + } else { + warn!( + %project_id, + "heartbeat: forced token refresh failed (will retry): {err:#}" + ); + } + } + } +} + async fn run_project( project_id: String, datum: DatumCloudClient, @@ -268,6 +365,7 @@ async fn run_project( if cache.is_none() { match find_connector(&connectors, provider.endpoint_id()).await { Ok(Some(connector)) => { + let connector_name = connector.name_any(); let lease_name = connector .status .as_ref() @@ -279,8 +377,14 @@ async fn run_project( .and_then(|status| status.connection_details.as_ref()) .and_then(|details| details.public_key.as_ref()) .map(|details| details.home_relay.clone()); + info!( + %project_id, + connector = %connector_name, + lease = lease_name.as_deref().unwrap_or(""), + "heartbeat: registered connector, starting lease renewals" + ); cache = Some(ConnectorCache { - name: connector.name_any(), + name: connector_name, lease_name, lease_duration_seconds: None, last_details: None, @@ -295,6 +399,9 @@ async fn run_project( } Err(err) => { warn!(%project_id, "heartbeat: connector lookup failed: {err:#}"); + if is_unauthorized(&err) { + force_refresh_auth(&project_id, &datum).await; + } sleep_with_cancel(backoff.next(), &cancel).await; continue; } @@ -331,6 +438,9 @@ async fn run_project( connector = %cached.name, "heartbeat: failed to fetch connector: {err:#}" ); + if is_unauthorized(&err) { + force_refresh_auth(&project_id, &datum).await; + } cache = None; sleep_with_cancel(backoff.next(), &cancel).await; continue; @@ -364,17 +474,26 @@ async fn run_project( if cached.last_details.as_ref() != Some(&details_value) { let patch = json!({ "status": { "connectionDetails": details_value } }); - if let Err(err) = connectors + match connectors .patch_status(&cached.name, &PatchParams::default(), &Patch::Merge(&patch)) .await { - warn!( - %project_id, - connector = %cached.name, - "heartbeat: failed to patch connection details: {err:#}" - ); - } else { - cached.last_details = Some(patch["status"]["connectionDetails"].clone()); + Ok(_) => { + cached.last_details = Some(patch["status"]["connectionDetails"].clone()); + } + Err(err) => { + warn!( + %project_id, + connector = %cached.name, + "heartbeat: failed to patch connection details: {err:#}" + ); + if is_unauthorized(&err) { + force_refresh_auth(&project_id, &datum).await; + cache = Some(cached); + sleep_with_cancel(backoff.next(), &cancel).await; + continue; + } + } } } @@ -397,7 +516,14 @@ async fn run_project( lease = %lease_name, "heartbeat: failed to fetch lease: {err:#}" ); - cache = Some(cached); + match classify_lease_error(&err) { + LeaseErrorAction::Reset => cache = None, + LeaseErrorAction::RefreshAuth => { + force_refresh_auth(&project_id, &datum).await; + cache = Some(cached); + } + LeaseErrorAction::Retain => cache = Some(cached), + } sleep_with_cancel(backoff.next(), &cancel).await; continue; } @@ -417,7 +543,14 @@ async fn run_project( .await { warn!(%project_id, lease = %lease_name, "heartbeat: lease renew failed: {err:#}"); - cache = Some(cached); + match classify_lease_error(&err) { + LeaseErrorAction::Reset => cache = None, + LeaseErrorAction::RefreshAuth => { + force_refresh_auth(&project_id, &datum).await; + cache = Some(cached); + } + LeaseErrorAction::Retain => cache = Some(cached), + } sleep_with_cancel(backoff.next(), &cancel).await; continue; } @@ -441,18 +574,20 @@ async fn probe_connector( let client = pcp.client(); let connectors: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); let selector = provider.endpoint_id(); - Ok(find_connector(&connectors, selector).await?.is_some()) + Ok(find_connector(&connectors, selector) + .await + .std_context("connector lookup failed")? + .is_some()) } async fn find_connector( connectors: &Api, endpoint_id: String, -) -> Result> { +) -> kube::Result> { let selector = format!("status.connectionDetails.publicKey.id={endpoint_id}"); let list = connectors .list(&ListParams::default().fields(&selector)) - .await - .std_context("failed to list connectors")?; + .await?; if list.items.is_empty() { return Ok(None); } @@ -616,6 +751,53 @@ mod tests { assert_eq!(count, 0); } + #[tokio::test] + async fn start_manual_does_not_auto_enroll() { + // Manual mode is the CLI tunnel-listen path: only the project the + // caller explicitly registers should get a heartbeat task. Auto- + // enroll would have probed `orgs_and_projects()` on bootstrap and + // registered every accessible project — we verify it didn't by + // checking the projects map stays empty until we register one. + let repo = crate::Repo::open_or_create(test_repo_path()).await.unwrap(); + let datum = crate::datum_cloud::DatumCloudClient::with_repo( + crate::datum_cloud::ApiEnv::Staging, + repo, + ) + .await + .unwrap(); + let provider = Arc::new(TestProvider { + endpoint_id: "test-endpoint".to_string(), + }); + let runner: ProjectRunner = Arc::new(|_project_id, _datum, _provider, cancel| { + tokio::spawn(async move { + cancel.cancelled().await; + }) + }); + let agent = HeartbeatAgent::new_with_runner(datum, provider, runner); + + agent.start_manual().await; + // Give any background bootstrap a chance to run; manual mode + // shouldn't have spawned one, but if it did this would expose it. + tokio::task::yield_now().await; + assert_eq!( + agent.inner.projects.lock().await.len(), + 0, + "manual mode must not auto-enroll any project" + ); + + agent.register_project("explicit-project").await; + assert_eq!( + agent.inner.projects.lock().await.len(), + 1, + "register_project still works in manual mode" + ); + + // start_manual is idempotent (matches start()'s contract): a + // second call is a no-op rather than tearing down and replacing. + agent.start_manual().await; + assert_eq!(agent.inner.projects.lock().await.len(), 1); + } + #[test] fn renewal_interval_in_range() { for lease_duration_seconds in [1, 2, 10, 60] { @@ -633,6 +815,45 @@ mod tests { } } + fn api_error(code: u16, reason: &str) -> kube::Error { + kube::Error::Api(kube::core::ErrorResponse { + status: "Failure".to_string(), + message: "test".to_string(), + reason: reason.to_string(), + code, + }) + } + + #[test] + fn classify_lease_error_resets_on_not_found() { + // Mirrors the production wedge: the Lease was deleted server-side and + // the renew loop kept patching the dead name. A 404 must clear the + // cache so the next iteration re-resolves the connector + lease. + assert_eq!( + classify_lease_error(&api_error(404, "NotFound")), + LeaseErrorAction::Reset + ); + } + + #[test] + fn classify_lease_error_refreshes_on_unauthorized() { + assert_eq!( + classify_lease_error(&api_error(401, "Unauthorized")), + LeaseErrorAction::RefreshAuth + ); + } + + #[test] + fn classify_lease_error_retains_on_transient() { + for code in [403, 409, 429, 500, 502, 503] { + assert_eq!( + classify_lease_error(&api_error(code, "Transient")), + LeaseErrorAction::Retain, + "code {code} should retain cache" + ); + } + } + #[test] fn backoff_doubles_and_resets() { let mut backoff = Backoff::new(); diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 2a81243..a884929 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -19,7 +19,10 @@ pub use node::*; pub use project_control_plane::ProjectControlPlaneClient; pub use repo::Repo; pub use state::*; -pub use tunnels::{TunnelDeleteOutcome, TunnelService, TunnelSummary}; +pub use tunnels::{ + ProgressStep, ProgressStepKind, StepStatus, TunnelDeleteOutcome, TunnelProgress, TunnelService, + TunnelSummary, normalize_endpoint, +}; pub use update::{UpdateChannel, UpdateChecker, UpdateInfo, UpdateSettings}; /// The root domain for datum connect urls to subdomain from. A proxy URL will diff --git a/lib/src/node.rs b/lib/src/node.rs index 92e21d7..fa806b1 100644 --- a/lib/src/node.rs +++ b/lib/src/node.rs @@ -58,16 +58,37 @@ pub struct ListenNode { impl ListenNode { pub async fn new(repo: Repo) -> Result { let n0des_api_secret = n0des_api_secret_from_env()?; - Self::with_n0des_api_secret(repo, n0des_api_secret).await + Self::build(repo, n0des_api_secret, None).await + } + + /// Construct a listen node using a project-scoped iroh identity. The CLI + /// Tunnel command takes this path so each project's Connector has a + /// distinct iroh public key — see [`Repo::listen_key_for_project`] for + /// why that matters. + pub async fn new_for_project(repo: Repo, project_id: &str) -> Result { + let n0des_api_secret = n0des_api_secret_from_env()?; + Self::build(repo, n0des_api_secret, Some(project_id)).await } #[instrument("listen-node", skip_all)] pub async fn with_n0des_api_secret( repo: Repo, n0des_api_secret: Option, + ) -> Result { + Self::build(repo, n0des_api_secret, None).await + } + + #[instrument("listen-node", skip(repo, n0des_api_secret))] + async fn build( + repo: Repo, + n0des_api_secret: Option, + project_id: Option<&str>, ) -> Result { let config = repo.config().await?; - let secret_key = repo.listen_key().await?; + let secret_key = match project_id { + Some(pid) => repo.listen_key_for_project(pid).await?, + None => repo.listen_key().await?, + }; let endpoint = build_endpoint(secret_key, &config).await?; let n0des = build_n0des_client_opt(&endpoint, n0des_api_secret).await; let state = repo.load_state().await?; @@ -161,9 +182,11 @@ impl StateWrapper { fn tcp_proxy_exists(&self, host: &str, port: u16) -> bool { // Strip scheme from incoming host (e.g., "http://127.0.0.1" -> "127.0.0.1") // The gateway may send the host with scheme, but local state stores without - let normalized_host = strip_host_scheme(host); + let normalized_host = normalize_loopback(strip_host_scheme(host)); let exists = self.get().proxies.iter().any(|a| { - a.enabled && a.info.service().host == normalized_host && a.info.service().port == port + a.enabled + && normalize_loopback(&a.info.service().host) == normalized_host + && a.info.service().port == port }); if !exists { debug!( @@ -182,6 +205,15 @@ fn strip_host_scheme(host: &str) -> &str { .unwrap_or(host) } +/// Normalize loopback hostnames so that "localhost", "127.0.0.1", and "::1" compare equal. +/// The gateway may use either form regardless of how the tunnel was registered. +fn normalize_loopback(host: &str) -> &str { + match host { + "localhost" | "::1" => "127.0.0.1", + _ => host, + } +} + impl AuthHandler for StateWrapper { async fn authorize<'a>( &'a self, @@ -348,6 +380,21 @@ const BUILD_DATUM_CONNECT_RELAY_URLS: &str = "BUILD_DATUM_CONNECT_RELAY_URLS"; const STARTUP_RELAY_SELECTION_MAX: usize = 5; const STARTUP_RELAY_PROBE_TIMEOUT: Duration = Duration::from_millis(800); +/// Built-in Datum relay shortlist. Used when neither the runtime env +/// `DATUM_CONNECT_RELAY_URLS` nor the compile-time env +/// `BUILD_DATUM_CONNECT_RELAY_URLS` is set. Ensures stock `cargo build` / +/// `nix run` / IDE builds reach a Datum-routable relay network instead of +/// silently falling through to the n0 public relays (which the Datum +/// gateway cannot route through). +const DEFAULT_DATUM_RELAY_URLS: &str = + "iroh-relay.us-east-1.datumconnect.net,iroh-relay.us-west-1.datumconnect.net"; + +/// Resolve the iroh relay set with explicit precedence: +/// 1. runtime env `DATUM_CONNECT_RELAY_URLS` (operator override) +/// 2. compile-time env `BUILD_DATUM_CONNECT_RELAY_URLS` (CI-injected list) +/// 3. built-in `DEFAULT_DATUM_RELAY_URLS` shortlist +/// 4. iroh's `default_relay_mode()` — n0 public/canary relays. Reaching this +/// branch means the Datum gateway will not be able to dial this endpoint. async fn relay_mode_from_env_or_build() -> Result { if let Ok(raw_urls) = std::env::var(DATUM_CONNECT_RELAY_URLS) { match parse_relay_urls(&raw_urls) { @@ -385,6 +432,27 @@ async fn relay_mode_from_env_or_build() -> Result { } } + match parse_relay_urls(DEFAULT_DATUM_RELAY_URLS) { + Ok(relays) => { + let relays = select_best_relays_for_startup(relays, STARTUP_RELAY_SELECTION_MAX).await; + info!( + source = "built-in", + count = relays.len(), + "using built-in Datum relay shortlist" + ); + return Ok(iroh::endpoint::RelayMode::Custom(relays_to_map(relays))); + } + Err(err) => { + warn!("invalid built-in DEFAULT_DATUM_RELAY_URLS, this is a bug: {err:#}"); + } + } + + warn!( + "Falling back to iroh's default public relays (n0). The Datum gateway \ + cannot route through this relay network — inbound connections to this \ + endpoint will fail. Set DATUM_CONNECT_RELAY_URLS or fix \ + DEFAULT_DATUM_RELAY_URLS." + ); Ok(default_relay_mode()) } @@ -643,4 +711,17 @@ mod tests { assert_eq!(parsed[0].host_str(), Some("relay-a.example.com.")); assert_eq!(parsed[1].host_str(), Some("relay-b.example.com.")); } + + #[test] + fn built_in_default_relay_list_parses() { + let parsed = parse_relay_urls(DEFAULT_DATUM_RELAY_URLS) + .expect("DEFAULT_DATUM_RELAY_URLS must parse — guards the runtime fallback path"); + assert!( + !parsed.is_empty(), + "DEFAULT_DATUM_RELAY_URLS must yield at least one relay" + ); + for relay in &parsed { + assert_eq!(relay.scheme(), "https"); + } + } } diff --git a/lib/src/repo.rs b/lib/src/repo.rs index 9f7dd88..593b660 100644 --- a/lib/src/repo.rs +++ b/lib/src/repo.rs @@ -28,7 +28,7 @@ impl Repo { const OAUTH_FILE: &str = "oauth.yml"; const AUTH_FILE: &str = "auth.yml"; const STATE_FILE: &str = "state.yml"; - const SELECTED_CONTEXT_FILE: &str = "selected_context.yml"; + const PROJECTS_DIR: &str = "projects"; pub fn default_location() -> PathBuf { match std::env::var("DATUM_CONNECT_REPO") { @@ -94,21 +94,28 @@ impl Repo { &self, selected: Option<&crate::SelectedContext>, ) -> Result<()> { - let path = self.0.join(Self::SELECTED_CONTEXT_FILE); - let data = serde_yml::to_string(&selected).anyerr()?; - tokio::fs::write(path, data).await?; - Ok(()) + let path = self.0.join(Self::CONFIG_FILE); + let mut config = if path.exists() { + let data = tokio::fs::read_to_string(&path) + .await + .context("reading config file")?; + serde_yml::from_str(&data).std_context("parsing config file")? + } else { + crate::config::Config::default() + }; + config.selected_context = selected.cloned(); + config.write(path).await } pub async fn read_selected_context(&self) -> Result> { - let path = self.0.join(Self::SELECTED_CONTEXT_FILE); + let path = self.0.join(Self::CONFIG_FILE); if path.exists() { let data = tokio::fs::read_to_string(path) .await - .context("failed to read selected context file")?; - let selected: Option = - serde_yml::from_str(&data).std_context("failed to parse selected context file")?; - return Ok(selected); + .context("reading config file")?; + let config: crate::config::Config = + serde_yml::from_str(&data).std_context("parsing config file")?; + return Ok(config.selected_context); } Ok(None) } @@ -130,6 +137,34 @@ impl Repo { self.secret_key(key_file_path).await } + /// Project-scoped listen key. Each project gets its own iroh identity so + /// Connectors registered in different projects don't collide on the iroh + /// DNS record (the controller assigns ownership to one and leaves the + /// others with `IrohDNSPublished=False; DeferredToOwner`, which manifests + /// as a tunnel that reports ready but silently drops data). + /// + /// On first access for any project, if the legacy flat `listen_key` exists + /// it is moved into this project's directory so the user keeps continuity + /// with whatever Connector that key was registered as. Subsequent projects + /// (no legacy file left) get freshly generated keys. + pub async fn listen_key_for_project(&self, project_id: &str) -> Result { + let project_dir = self.0.join(Self::PROJECTS_DIR).join(project_id); + let key_file_path = project_dir.join(Self::LISTEN_KEY_FILE); + if !key_file_path.exists() { + let legacy = self.0.join(Self::LISTEN_KEY_FILE); + if legacy.exists() { + tokio::fs::create_dir_all(&project_dir).await?; + info!( + "migrating legacy listen_key {} -> {} for project {project_id}", + legacy.display(), + key_file_path.display(), + ); + tokio::fs::rename(&legacy, &key_file_path).await?; + } + } + self.secret_key(key_file_path).await + } + pub async fn gateway_key(&self) -> Result { let key_file_path = self.0.join(Self::GATEWAY_KEY_FILE); self.secret_key(key_file_path).await @@ -143,7 +178,9 @@ impl Repo { async fn secret_key(&self, key_file_path: PathBuf) -> Result { if !key_file_path.exists() { warn!("secret key does not exist. creating new key"); - tokio::fs::create_dir_all(&self.0).await?; + if let Some(parent) = key_file_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } return self.create_key(&key_file_path).await; }; @@ -209,3 +246,75 @@ impl Repo { &self.0 } } + +#[cfg(test)] +mod tests { + use super::*; + + fn temp_repo_dir() -> PathBuf { + let mut path = std::env::temp_dir(); + path.push(format!("datum-repo-test-{}", uuid::Uuid::new_v4())); + path + } + + #[tokio::test] + async fn listen_key_for_project_migrates_legacy_into_first_project() { + // The legacy `listen_key` lived at the repo root and was reused for + // every project the CLI talked to. The migration must move (not copy) + // it into the first project that requests it, so the second project + // gets a fresh identity instead of joining the cross-project DNS race. + let repo = Repo::open_or_create(temp_repo_dir()).await.unwrap(); + let legacy = repo.listen_key().await.unwrap(); + let legacy_bytes = legacy.to_bytes(); + let legacy_path = repo.0.join(Repo::LISTEN_KEY_FILE); + assert!(legacy_path.exists(), "precondition: legacy key exists"); + + let p1 = repo.listen_key_for_project("project-a").await.unwrap(); + assert_eq!( + p1.to_bytes(), + legacy_bytes, + "first project must adopt the legacy key" + ); + assert!(!legacy_path.exists(), "legacy file must be gone after migration"); + let p1_path = repo + .0 + .join(Repo::PROJECTS_DIR) + .join("project-a") + .join(Repo::LISTEN_KEY_FILE); + assert!(p1_path.exists(), "key must now live under the project dir"); + + let p2 = repo.listen_key_for_project("project-b").await.unwrap(); + assert_ne!( + p2.to_bytes(), + legacy_bytes, + "second project must get a fresh key, not the legacy one" + ); + } + + #[tokio::test] + async fn listen_key_for_project_is_stable_across_calls() { + let repo = Repo::open_or_create(temp_repo_dir()).await.unwrap(); + let first = repo.listen_key_for_project("project-x").await.unwrap(); + let second = repo.listen_key_for_project("project-x").await.unwrap(); + assert_eq!( + first.to_bytes(), + second.to_bytes(), + "repeat calls must return the same persisted key" + ); + } + + #[tokio::test] + async fn listen_key_for_project_generates_fresh_without_legacy() { + let repo = Repo::open_or_create(temp_repo_dir()).await.unwrap(); + let key = repo.listen_key_for_project("only-project").await.unwrap(); + let legacy_path = repo.0.join(Repo::LISTEN_KEY_FILE); + assert!(!legacy_path.exists(), "no legacy must be created"); + let project_path = repo + .0 + .join(Repo::PROJECTS_DIR) + .join("only-project") + .join(Repo::LISTEN_KEY_FILE); + assert!(project_path.exists()); + assert_eq!(tokio::fs::read(&project_path).await.unwrap(), key.to_bytes()); + } +} diff --git a/lib/src/tunnels.rs b/lib/src/tunnels.rs index 209bcfb..84914f4 100644 --- a/lib/src/tunnels.rs +++ b/lib/src/tunnels.rs @@ -9,15 +9,18 @@ use serde_json::json; use tracing::{debug, warn}; use crate::datum_apis::connector::{ - Connector, ConnectorConnectionDetails, ConnectorConnectionDetailsPublicKey, - ConnectorConnectionType, ConnectorSpec, PublicKeyConnectorAddress, PublicKeyDiscoveryMode, + CONNECTOR_CONDITION_IROH_DNS_PUBLISHED, CONNECTOR_CONDITION_READY, + CONNECTOR_REASON_DEFERRED_TO_OWNER, Connector, ConnectorConnectionDetails, + ConnectorConnectionDetailsPublicKey, ConnectorConnectionType, ConnectorSpec, + PublicKeyConnectorAddress, PublicKeyDiscoveryMode, }; use crate::datum_apis::connector_advertisement::{ ConnectorAdvertisement, ConnectorAdvertisementLayer4, ConnectorAdvertisementLayer4Service, ConnectorAdvertisementSpec, Layer4ServiceAddress, Layer4ServicePort, Protocol, }; use crate::datum_apis::http_proxy::{ - ConnectorReference, HTTP_PROXY_CONDITION_ACCEPTED, HTTP_PROXY_CONDITION_PROGRAMMED, HTTPProxy, + ConnectorReference, HTTP_PROXY_CONDITION_ACCEPTED, HTTP_PROXY_CONDITION_CERTIFICATES_READY, + HTTP_PROXY_CONDITION_CONNECTOR_METADATA_PROGRAMMED, HTTP_PROXY_CONDITION_PROGRAMMED, HTTPProxy, HTTPProxyRule, HTTPProxyRuleBackend, HTTPProxySpec, }; use crate::datum_apis::traffic_protection_policy::{ @@ -116,6 +119,225 @@ fn condition_is_true( .unwrap_or(false) } +fn find_condition<'a>( + conditions: Option<&'a [k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition]>, + kind: &str, +) -> Option<&'a k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition> { + conditions.unwrap_or_default().iter().find(|c| c.type_ == kind) +} + +/// One checkpoint in the tunnel setup pipeline. Maps 1:1 to a controller +/// condition; the order roughly tracks how a healthy setup progresses. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ProgressStepKind { + /// HTTPProxy `Accepted` — control plane accepted the resource. + ProxyAccepted, + /// HTTPProxy `CertificatesReady` — TLS certs issued for the hostname. + CertificatesReady, + /// Connector `Ready` — agent is online and renewing its lease. + ConnectorReady, + /// Connector `IrohDNSPublished` — iroh DNS record published. The + /// failure-with-`DeferredToOwner` case is the silent-tunnel failure + /// that signals cross-project iroh-key collision. + IrohDnsPublished, + /// HTTPProxy `Programmed` — edge actually programmed the route. + ProxyProgrammed, + /// HTTPProxy `ConnectorMetadataProgrammed` — Envoy has the iroh metadata + /// it needs to dial the connector. + ConnectorMetadataProgrammed, +} + +impl ProgressStepKind { + pub fn label(&self) -> &'static str { + match self { + Self::ProxyAccepted => "tunnel accepted", + Self::CertificatesReady => "TLS certificate issued", + Self::ConnectorReady => "connector ready", + Self::IrohDnsPublished => "iroh DNS published", + Self::ProxyProgrammed => "route programmed", + Self::ConnectorMetadataProgrammed => "envoy metadata propagated", + } + } + + pub fn all() -> &'static [ProgressStepKind] { + &[ + Self::ProxyAccepted, + Self::CertificatesReady, + Self::ConnectorReady, + Self::IrohDnsPublished, + Self::ProxyProgrammed, + Self::ConnectorMetadataProgrammed, + ] + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StepStatus { + /// Controller hasn't reported on this condition yet. + Unknown, + /// Condition exists with status False — still waiting (or failing). + Pending, + /// Condition is True. + Ready, +} + +#[derive(Debug, Clone)] +pub struct ProgressStep { + pub kind: ProgressStepKind, + pub status: StepStatus, + pub reason: Option, + pub message: Option, + /// Pre-formatted "Kind/name" of the underlying Kubernetes resource + /// (`HTTPProxy/` or `Connector/`). The CLI + /// renders this alongside each step so the user can pivot to + /// `datumctl describe ...` on the exact resource that's stuck or + /// reporting a stale Ready. `None` only when the resource doesn't + /// exist server-side (e.g. probing for a tunnel id that's not there). + pub resource: Option, +} + +impl ProgressStepKind { + /// The Kubernetes resource kind whose conditions back this step. + pub fn resource_kind(&self) -> &'static str { + match self { + Self::ConnectorReady | Self::IrohDnsPublished => "Connector", + Self::ProxyAccepted + | Self::CertificatesReady + | Self::ProxyProgrammed + | Self::ConnectorMetadataProgrammed => "HTTPProxy", + } + } +} + +impl ProgressStep { + /// True if this step is in a terminal failure mode that won't self-heal + /// without user action. The canonical case is the iroh DNS owner + /// collision: another Connector with the same iroh key owns the record, + /// and waiting longer won't change that. + pub fn is_terminal_failure(&self) -> bool { + matches!(self.kind, ProgressStepKind::IrohDnsPublished) + && self.status == StepStatus::Pending + && self.reason.as_deref() == Some(CONNECTOR_REASON_DEFERRED_TO_OWNER) + } +} + +#[derive(Debug, Clone)] +pub struct TunnelProgress { + pub hostnames: Vec, + pub steps: Vec, +} + +impl TunnelProgress { + pub fn all_ready(&self) -> bool { + self.steps.iter().all(|s| s.status == StepStatus::Ready) + } + + pub fn step(&self, kind: ProgressStepKind) -> Option<&ProgressStep> { + self.steps.iter().find(|s| s.kind == kind) + } + + pub fn terminal_failure(&self) -> Option<&ProgressStep> { + self.steps.iter().find(|s| s.is_terminal_failure()) + } + + fn from_resources(proxy: &HTTPProxy, connector: Option<&Connector>) -> Self { + let proxy_conds = proxy.status.as_ref().and_then(|s| s.conditions.as_deref()); + let proxy_gen = proxy.metadata.generation.unwrap_or(0); + let proxy_resource = proxy + .metadata + .name + .as_deref() + .map(|n| format!("HTTPProxy/{n}")); + let conn_conds = connector + .and_then(|c| c.status.as_ref()) + .and_then(|s| s.conditions.as_deref()); + let conn_gen = connector.and_then(|c| c.metadata.generation).unwrap_or(0); + let connector_resource = connector + .and_then(|c| c.metadata.name.as_deref()) + .map(|n| format!("Connector/{n}")); + + // A condition is Ready only if its observedGeneration has caught up + // with the resource's current generation. After we PATCH the spec + // (e.g. `tunnel listen --id` re-points the backend, bumping + // generation 1→2), the controller's prior True conditions still + // show observedGeneration=1 until it re-reconciles. Treating those + // as Ready makes the CLI claim "Tunnel ready" while the data plane + // is still serving 503s from stale Envoy config. + let make_step = |kind: ProgressStepKind, + conds: Option<&[k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition]>, + type_: &str, + current_gen: i64, + resource: Option| + -> ProgressStep { + let cond = find_condition(conds, type_); + let observed = cond.and_then(|c| c.observed_generation).unwrap_or(0); + let fresh = observed >= current_gen; + let status = match cond { + Some(c) if c.status == "True" && fresh => StepStatus::Ready, + Some(_) => StepStatus::Pending, + None => StepStatus::Unknown, + }; + ProgressStep { + kind, + status, + reason: cond.map(|c| c.reason.clone()), + message: cond.map(|c| c.message.clone()), + resource, + } + }; + + let steps = vec![ + make_step( + ProgressStepKind::ProxyAccepted, + proxy_conds, + HTTP_PROXY_CONDITION_ACCEPTED, + proxy_gen, + proxy_resource.clone(), + ), + make_step( + ProgressStepKind::CertificatesReady, + proxy_conds, + HTTP_PROXY_CONDITION_CERTIFICATES_READY, + proxy_gen, + proxy_resource.clone(), + ), + make_step( + ProgressStepKind::ConnectorReady, + conn_conds, + CONNECTOR_CONDITION_READY, + conn_gen, + connector_resource.clone(), + ), + make_step( + ProgressStepKind::IrohDnsPublished, + conn_conds, + CONNECTOR_CONDITION_IROH_DNS_PUBLISHED, + conn_gen, + connector_resource.clone(), + ), + make_step( + ProgressStepKind::ProxyProgrammed, + proxy_conds, + HTTP_PROXY_CONDITION_PROGRAMMED, + proxy_gen, + proxy_resource.clone(), + ), + make_step( + ProgressStepKind::ConnectorMetadataProgrammed, + proxy_conds, + HTTP_PROXY_CONDITION_CONNECTOR_METADATA_PROGRAMMED, + proxy_gen, + proxy_resource, + ), + ]; + + Self { + hostnames: proxy_hostnames(proxy), + steps, + } + } +} + impl TunnelService { pub fn new(datum: DatumCloudClient, listen: ListenNode) -> Self { Self { @@ -133,9 +355,80 @@ impl TunnelService { self.list_project(&selected.project_id).await } + /// Fetch a tunnel by its HTTPProxy resource name. Direct API lookup — + /// does NOT filter by which connector the proxy's backend references. + /// Callers that explicitly name a tunnel (`tunnel update --id`, + /// `tunnel listen --id`, etc.) want to find it regardless of whether + /// its backend currently points at this agent's connector; adoption + /// re-points the backend afterwards. pub async fn get_active(&self, tunnel_id: &str) -> Result> { + let Some(selected) = self.datum.selected_context() else { + return Ok(None); + }; + let pcp = self + .datum + .project_control_plane_client(&selected.project_id) + .await?; + let client = pcp.client(); + let proxies: Api = Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); + let ads: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); + + let Some(proxy) = proxies + .get_opt(tunnel_id) + .await + .std_context("Failed to fetch HTTPProxy")? + else { + return Ok(None); + }; + let enabled = ads + .get_opt(tunnel_id) + .await + .std_context("Failed to fetch ConnectorAdvertisement")? + .is_some(); + Ok(Some(summary_from_proxy(&proxy, enabled))) + } + + pub async fn get_active_by_endpoint(&self, endpoint: &str) -> Result> { let tunnels = self.list_active().await?; - Ok(tunnels.into_iter().find(|tunnel| tunnel.id == tunnel_id)) + let normalized = normalize_endpoint(endpoint); + Ok(tunnels.into_iter().find(|tunnel| tunnel.endpoint == normalized)) + } + + /// Fetch the rich progress view for a tunnel: every checkpoint condition + /// from both the HTTPProxy and its referenced Connector. Returns `None` + /// if the proxy doesn't exist (matches `get_active`). + pub async fn get_active_progress( + &self, + tunnel_id: &str, + ) -> Result> { + let Some(selected) = self.datum.selected_context() else { + return Ok(None); + }; + let pcp = self + .datum + .project_control_plane_client(&selected.project_id) + .await?; + let client = pcp.client(); + let proxies: Api = Api::namespaced(client.clone(), DEFAULT_PCP_NAMESPACE); + let Some(proxy) = proxies + .get_opt(tunnel_id) + .await + .std_context("Failed to fetch HTTPProxy")? + else { + return Ok(None); + }; + + let connector = if let Some(name) = proxy_connector_name(&proxy) { + let connectors: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); + connectors + .get_opt(&name) + .await + .std_context("Failed to fetch Connector")? + } else { + None + }; + + Ok(Some(TunnelProgress::from_resources(&proxy, connector.as_ref()))) } pub async fn create_active(&self, label: &str, endpoint: &str) -> Result { @@ -179,7 +472,7 @@ impl TunnelService { } pub async fn list_project(&self, project_id: &str) -> Result> { - let connector = self.find_connector(project_id).await?; + let connector = self.find_connector_readonly(project_id).await?; let Some(connector) = connector else { return Ok(Vec::new()); }; @@ -248,19 +541,6 @@ impl TunnelService { programmed, }); } - if !self.publish_tickets { - for tunnel in &tunnels { - if let Ok(proxy_state) = proxy_state_from_summary( - &tunnel.id, - &tunnel.endpoint, - &tunnel.label, - tunnel.enabled, - ) && let Err(err) = self.listen.set_proxy_state(proxy_state).await - { - warn!(tunnel_id = %tunnel.id, "Failed to store proxy state: {err:#}"); - } - } - } Ok(tunnels) } @@ -306,18 +586,22 @@ impl TunnelService { }, status: None, }; - proxy = proxies - .create(&PostParams::default(), &proxy) - .await - .std_context("Failed to create HTTPProxy") - .inspect_err(|err| { - warn!( - %project_id, - connector = %connector_name, - endpoint = %endpoint, - "HTTPProxy create failed: {err:#}" - ); - })?; + let post_params = PostParams::default(); + proxy = with_quota_check_retry("HTTPProxy create", || { + proxies.create(&post_params, &proxy) + }) + .await + .map_err(|err| { + warn!( + %project_id, + connector = %connector_name, + endpoint = %endpoint, + "HTTPProxy create failed: {err:#}" + ); + format_quota_error(&err, "HTTPProxy") + .unwrap_or_else(|| format!("Failed to create HTTPProxy: {err}")) + }) + .map_err(|err| n0_error::anyerr!(err))?; let proxy_name = proxy.name_any(); debug!( %project_id, @@ -341,17 +625,22 @@ impl TunnelService { spec: ad_spec, status: None, }; - ads.create(&PostParams::default(), &ad) - .await - .std_context("Failed to create ConnectorAdvertisement") - .inspect_err(|err| { - warn!( - %project_id, - proxy = %proxy_name, - connector = %connector_name, - "ConnectorAdvertisement create failed: {err:#}" - ); - })?; + let ad_post = PostParams::default(); + with_quota_check_retry("ConnectorAdvertisement create", || { + ads.create(&ad_post, &ad) + }) + .await + .map_err(|err| { + warn!( + %project_id, + proxy = %proxy_name, + connector = %connector_name, + "ConnectorAdvertisement create failed: {err:#}" + ); + format_quota_error(&err, "ConnectorAdvertisement") + .unwrap_or_else(|| format!("Failed to create ConnectorAdvertisement: {err}")) + }) + .map_err(|err| n0_error::anyerr!(err))?; debug!( %project_id, proxy = %proxy_name, @@ -395,16 +684,22 @@ impl TunnelService { }, status: None, }; - tpps.create(&PostParams::default(), &tpp) + let tpp_post = PostParams::default(); + with_quota_check_retry("TrafficProtectionPolicy create", || { + tpps.create(&tpp_post, &tpp) + }) .await - .std_context("Failed to create TrafficProtectionPolicy") - .inspect_err(|err| { + .map_err(|err| { warn!( %project_id, proxy = %proxy_name, "TrafficProtectionPolicy create failed: {err:#}" ); - })?; + format_quota_error(&err, "TrafficProtectionPolicy").unwrap_or_else(|| { + format!("Failed to create TrafficProtectionPolicy: {err}") + }) + }) + .map_err(|err| n0_error::anyerr!(err))?; debug!( %project_id, proxy = %proxy_name, @@ -473,39 +768,61 @@ impl TunnelService { .await .std_context("Failed to fetch HTTPProxy")?; let hostnames = existing.spec.hostnames.clone().unwrap_or_default(); - - let patch = json!({ - "metadata": { - "annotations": { - DISPLAY_NAME_ANNOTATION: label, + let desired_rules = vec![https_redirect_rule(), proxy_rule(&endpoint, &connector_name)]; + + // Skip the PATCH when the existing spec already matches what we'd + // write. A no-op patch still bumps metadata.generation on some API + // servers, which triggers a downstream Envoy re-reconcile and a + // window where the data plane returns 5xx — exactly the resume- + // induced churn the UI doesn't suffer because its enable path + // never touches HTTPProxy.spec. Making this verb idempotent at the + // lib boundary means every caller (CLI, UI Edit dialog, future + // datumctl plugin) gets the no-churn behavior for free. + if http_proxy_spec_matches(&existing, label, &desired_rules) { + debug!( + %project_id, + proxy = %tunnel_id, + "HTTPProxy spec already matches desired state; skipping patch" + ); + } else { + let patch = json!({ + "metadata": { + "annotations": { + DISPLAY_NAME_ANNOTATION: label, + } + }, + "spec": { + "hostnames": hostnames, + "rules": desired_rules, } - }, - "spec": { - "hostnames": hostnames, - "rules": [https_redirect_rule(), proxy_rule(&endpoint, &connector_name)], - } - }); - proxies - .patch(tunnel_id, &PatchParams::default(), &Patch::Merge(&patch)) - .await - .std_context("Failed to update HTTPProxy")?; - - if let Ok(existing_ad) = ads.get_opt(tunnel_id).await - && existing_ad.is_some() - { - let ad_patch = json!({ - "spec": advertisement_spec(&connector_name, target) }); - ads.patch(tunnel_id, &PatchParams::default(), &Patch::Merge(&ad_patch)) + proxies + .patch(tunnel_id, &PatchParams::default(), &Patch::Merge(&patch)) .await - .std_context("Failed to update ConnectorAdvertisement")?; + .std_context("Failed to update HTTPProxy")?; } - let enabled = ads + let existing_ad = ads .get_opt(tunnel_id) .await - .std_context("Failed to load ConnectorAdvertisement")? - .is_some(); + .std_context("Failed to fetch ConnectorAdvertisement")?; + if let Some(existing_ad) = existing_ad.as_ref() { + let desired_ad_spec = advertisement_spec(&connector_name, target); + if advertisement_spec_matches(existing_ad, &desired_ad_spec) { + debug!( + %project_id, + advertisement = %tunnel_id, + "ConnectorAdvertisement spec already matches; skipping patch" + ); + } else { + let ad_patch = json!({ "spec": desired_ad_spec }); + ads.patch(tunnel_id, &PatchParams::default(), &Patch::Merge(&ad_patch)) + .await + .std_context("Failed to update ConnectorAdvertisement")?; + } + } + + let enabled = existing_ad.is_some(); let summary = TunnelSummary { id: tunnel_id.to_string(), @@ -594,9 +911,12 @@ impl TunnelService { spec: ad_spec, status: None, }; - ads.create(&PostParams::default(), &ad) - .await - .std_context("Failed to create ConnectorAdvertisement")?; + let ad_post = PostParams::default(); + with_quota_check_retry("ConnectorAdvertisement create", || { + ads.create(&ad_post, &ad) + }) + .await + .std_context("Failed to create ConnectorAdvertisement")?; } } } else if ads @@ -653,13 +973,7 @@ impl TunnelService { tunnel_id: &str, ) -> Result { let connector = self.find_connector(project_id).await?; - let Some(connector) = connector else { - return Ok(TunnelDeleteOutcome { - project_id: project_id.to_string(), - connector_deleted: false, - }); - }; - let connector_name = connector.name_any(); + let connector_name = connector.as_ref().map(|c| c.name_any()); let pcp = self.datum.project_control_plane_client(project_id).await?; let client = pcp.client(); @@ -713,41 +1027,43 @@ impl TunnelService { warn!(%tunnel_id, "Failed to remove proxy state: {err:#}"); } - let remaining = proxies - .list(&ListParams::default()) - .await - .std_context("Failed to list remaining HTTPProxy objects")?; let mut connector_deleted = false; - let mut remaining_for_connector = remaining - .items - .into_iter() - .filter(|proxy| proxy_uses_connector(proxy, &connector_name)) - .peekable(); - if remaining_for_connector.peek().is_none() { - let ad_selector = format!("{ADVERTISEMENT_CONNECTOR_FIELD}={connector_name}"); - let ads_list = ads - .list(&ListParams::default().fields(&ad_selector)) + if let Some(connector_name) = connector_name { + let remaining = proxies + .list(&ListParams::default()) .await - .std_context("Failed to list remaining ConnectorAdvertisements")?; - for ad in ads_list.items { - if let Some(name) = ad.metadata.name.clone() - && let Err(err) = ads.delete(&name, &DeleteParams::default()).await - { - warn!(%name, "Failed to delete connector advertisement: {err:#}"); + .std_context("Failed to list remaining HTTPProxy objects")?; + let mut remaining_for_connector = remaining + .items + .into_iter() + .filter(|proxy| proxy_uses_connector(proxy, &connector_name)) + .peekable(); + if remaining_for_connector.peek().is_none() { + let ad_selector = format!("{ADVERTISEMENT_CONNECTOR_FIELD}={connector_name}"); + let ads_list = ads + .list(&ListParams::default().fields(&ad_selector)) + .await + .std_context("Failed to list remaining ConnectorAdvertisements")?; + for ad in ads_list.items { + if let Some(name) = ad.metadata.name.clone() + && let Err(err) = ads.delete(&name, &DeleteParams::default()).await + { + warn!(%name, "Failed to delete connector advertisement: {err:#}"); + } } - } - if connectors - .get_opt(&connector_name) - .await - .std_context("Failed to load Connector")? - .is_some() - { - connectors - .delete(&connector_name, &DeleteParams::default()) + if connectors + .get_opt(&connector_name) .await - .std_context("Failed to delete Connector")?; - connector_deleted = true; + .std_context("Failed to load Connector")? + .is_some() + { + connectors + .delete(&connector_name, &DeleteParams::default()) + .await + .std_context("Failed to delete Connector")?; + connector_deleted = true; + } } } @@ -757,6 +1073,29 @@ impl TunnelService { }) } + async fn find_connector_readonly(&self, project_id: &str) -> Result> { + let pcp = self.datum.project_control_plane_client(project_id).await?; + let client = pcp.client(); + let connectors: Api = Api::namespaced(client, DEFAULT_PCP_NAMESPACE); + let endpoint_id = self.listen.endpoint_id().to_string(); + let selector = format!("{CONNECTOR_SELECTOR_FIELD}={endpoint_id}"); + let list = connectors + .list(&ListParams::default().fields(&selector)) + .await + .std_context("Failed to list connectors")?; + if list.items.is_empty() { + return Ok(None); + } + if list.items.len() > 1 { + debug!( + %selector, + count = list.items.len(), + "Multiple connectors found for endpoint, using first" + ); + } + Ok(Some(list.items.into_iter().next().unwrap())) + } + async fn find_connector(&self, project_id: &str) -> Result> { let pcp = self.datum.project_control_plane_client(project_id).await?; let client = pcp.client(); @@ -849,10 +1188,12 @@ impl TunnelService { }, status: None, }; - connector = connectors - .create(&PostParams::default(), &connector) - .await - .std_context("Failed to create Connector")?; + let conn_post = PostParams::default(); + connector = with_quota_check_retry("Connector create", || { + connectors.create(&conn_post, &connector) + }) + .await + .std_context("Failed to create Connector")?; if let Some(details) = build_connection_details(&self.listen) { let details_value = serde_json::to_value(details) @@ -935,7 +1276,12 @@ fn build_connection_details(listen: &ListenNode) -> Option String { +/// Canonicalize an endpoint string the way `TunnelSummary.endpoint` is +/// stored: trim whitespace, leave URLs that already have a scheme alone, +/// otherwise prepend `http://`. Exposed so the CLI can compare a +/// user-supplied endpoint against an existing tunnel's stored endpoint +/// before deciding whether to adopt or fail. +pub fn normalize_endpoint(endpoint: &str) -> String { let endpoint = endpoint.trim(); if endpoint.is_empty() { return endpoint.to_string(); @@ -965,6 +1311,89 @@ fn proxy_hostnames(proxy: &HTTPProxy) -> Vec { .unwrap_or_default() } +/// Build a `TunnelSummary` from an `HTTPProxy` and a boolean indicating +/// whether an advertisement exists (which is how `enabled` is derived +/// throughout the rest of the file). Shared between `list_project` and +/// `get_active` so they don't drift apart. +fn summary_from_proxy(proxy: &HTTPProxy, enabled: bool) -> TunnelSummary { + let id = proxy.metadata.name.clone().unwrap_or_default(); + let label = proxy + .metadata + .annotations + .as_ref() + .and_then(|annotations| annotations.get(DISPLAY_NAME_ANNOTATION)) + .cloned() + .unwrap_or_else(|| id.clone()); + let endpoint = + normalize_endpoint(&proxy_backend_endpoint(proxy).unwrap_or_default()); + let conds = proxy.status.as_ref().and_then(|s| s.conditions.as_deref()); + TunnelSummary { + id, + label, + endpoint, + hostnames: proxy_hostnames(proxy), + enabled, + accepted: condition_is_true(conds, HTTP_PROXY_CONDITION_ACCEPTED), + programmed: condition_is_true(conds, HTTP_PROXY_CONDITION_PROGRAMMED), + } +} + +/// True when the HTTPProxy's display label annotation and rules already +/// match what `update_project` would write. Used to short-circuit the +/// PATCH so a no-op update doesn't bump `metadata.generation` and trigger +/// a downstream Envoy re-reconcile (see the resume-induced 5xx window). +fn http_proxy_spec_matches( + existing: &HTTPProxy, + desired_label: &str, + desired_rules: &[HTTPProxyRule], +) -> bool { + let existing_label = existing + .metadata + .annotations + .as_ref() + .and_then(|a| a.get(DISPLAY_NAME_ANNOTATION)) + .map(String::as_str); + if existing_label != Some(desired_label) { + return false; + } + // Compare via serde Value rather than structural equality on the Rust + // types so we get a stable representation that doesn't drift when + // Option<...> fields with serde defaults serialize differently. + let Ok(existing_rules_value) = serde_json::to_value(&existing.spec.rules) else { + return false; + }; + let Ok(desired_rules_value) = serde_json::to_value(desired_rules) else { + return false; + }; + existing_rules_value == desired_rules_value +} + +/// True when the ConnectorAdvertisement's spec already matches what +/// `update_project` would write. Same idempotency motivation as +/// `http_proxy_spec_matches`. +fn advertisement_spec_matches( + existing: &ConnectorAdvertisement, + desired: &ConnectorAdvertisementSpec, +) -> bool { + let Ok(existing_value) = serde_json::to_value(&existing.spec) else { + return false; + }; + let Ok(desired_value) = serde_json::to_value(desired) else { + return false; + }; + existing_value == desired_value +} + +/// Extract the connector name from the first backend that references one. +fn proxy_connector_name(proxy: &HTTPProxy) -> Option { + proxy + .spec + .rules + .iter() + .flat_map(|rule| rule.backends.iter().flatten()) + .find_map(|backend| backend.connector.as_ref().map(|c| c.name.clone())) +} + /// Rule that matches requests with x-forwarded-proto: http and redirects to HTTPS (301). /// Evaluated first so HTTP traffic is upgraded before hitting the backend rule. fn https_redirect_rule() -> HTTPProxyRule { @@ -1125,6 +1554,79 @@ async fn patch_device_annotations(api: &Api, connector: &mut Connecto } } +fn format_quota_error(err: &dyn std::error::Error, resource_type: &str) -> Option { + let err_msg = err.to_string(); + // Transient quota-check timeout — the error literally says "Please try + // again in a moment". Don't relabel it as "exceeded"; with the retry + // wrapper applied at creation sites we'll usually never get here, and + // when we do the original message is the most accurate signal. + if err_msg.contains("took too long to be checked against your quota") { + return None; + } + if err_msg.contains("quota") || err_msg.contains("Insufficient quota") { + return Some(format!( + "Quota limit exceeded for {resource_type} resources.\n\n\ + You've reached the limit for creating {resource_type} resources in this project.\n\n\ + To fix this, you can:\n \ + - Delete unused tunnels to free up capacity\n \ + - Contact support to request a higher quota limit\n\n\ + Run 'tunnel list' to see existing tunnels." + )); + } + None +} + +/// True if `err` is the operator's transient quota-check timeout (a 403 +/// whose message says "Please try again in a moment"). Distinct from +/// real quota exhaustion, which produces a different message and +/// shouldn't be retried. +fn is_quota_check_timeout(err: &kube::Error) -> bool { + matches!( + err, + kube::Error::Api(e) + if e.code == 403 + && e.message.contains("took too long to be checked against your quota") + ) +} + +/// Retry a kube API call up to ~15 seconds while it keeps tripping the +/// operator's quota-check timeout. Other errors return immediately so +/// real failures still surface fast. Prints a one-line stderr notice on +/// the first retry so the user knows we're waiting on the server. +async fn with_quota_check_retry(op_name: &str, mut f: F) -> kube::Result +where + F: FnMut() -> Fut, + Fut: std::future::Future>, +{ + let delays = [ + std::time::Duration::from_secs(1), + std::time::Duration::from_secs(2), + std::time::Duration::from_secs(4), + std::time::Duration::from_secs(8), + ]; + for (i, delay) in delays.iter().enumerate() { + match f().await { + Ok(v) => return Ok(v), + Err(err) if is_quota_check_timeout(&err) => { + if i == 0 { + eprintln!( + " … quota check timed out for {op_name}; retrying for up to 15s" + ); + } + warn!( + op = op_name, + attempt = i + 1, + next_delay_s = delay.as_secs(), + "quota check timed out; retrying" + ); + tokio::time::sleep(*delay).await; + } + Err(err) => return Err(err), + } + } + f().await +} + fn publish_tickets_enabled() -> bool { std::env::var("DATUM_CONNECT_PUBLISH_TICKETS") .map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES")) @@ -1141,3 +1643,378 @@ fn create_traffic_protection_policies_enabled() -> bool { .map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES")) .unwrap_or(false) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::datum_apis::connector::{ConnectorSpec, ConnectorStatus}; + use crate::datum_apis::http_proxy::{HTTPProxySpec, HTTPProxyStatus}; + use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, Time}; + use kube::api::ObjectMeta; + + fn cond(type_: &str, status: &str, reason: &str, message: &str) -> Condition { + Condition { + type_: type_.to_string(), + status: status.to_string(), + reason: reason.to_string(), + message: message.to_string(), + last_transition_time: Time(chrono::DateTime::UNIX_EPOCH), + observed_generation: None, + } + } + + fn proxy(conds: Vec) -> HTTPProxy { + let mut p = HTTPProxy::new( + "tunnel-test", + HTTPProxySpec { + hostnames: None, + rules: vec![], + }, + ); + p.metadata = ObjectMeta { + name: Some("tunnel-test".into()), + ..Default::default() + }; + p.status = Some(HTTPProxyStatus { + addresses: None, + hostnames: Some(vec!["ground-pearl.datumproxy.net".into()]), + conditions: Some(conds), + }); + p + } + + fn connector(conds: Vec) -> Connector { + let mut c = Connector::new( + "datum-connect-test", + ConnectorSpec { + connector_class_name: "datum-connect".into(), + capabilities: None, + }, + ); + c.status = Some(ConnectorStatus { + capabilities: None, + conditions: Some(conds), + connection_details: None, + lease_ref: None, + }); + c + } + + #[test] + fn progress_unknown_when_controllers_silent() { + let p = proxy(vec![]); + let progress = TunnelProgress::from_resources(&p, None); + assert_eq!(progress.steps.len(), 6); + assert!( + progress.steps.iter().all(|s| s.status == StepStatus::Unknown), + "no conditions yet → every step Unknown" + ); + assert!(!progress.all_ready()); + assert!(progress.terminal_failure().is_none()); + } + + #[test] + fn progress_all_ready_when_every_condition_true() { + let p = proxy(vec![ + cond(HTTP_PROXY_CONDITION_ACCEPTED, "True", "Accepted", ""), + cond(HTTP_PROXY_CONDITION_CERTIFICATES_READY, "True", "AllCertificatesReady", ""), + cond(HTTP_PROXY_CONDITION_PROGRAMMED, "True", "Programmed", ""), + cond( + HTTP_PROXY_CONDITION_CONNECTOR_METADATA_PROGRAMMED, + "True", + "ConnectorMetadataApplied", + "", + ), + ]); + let c = connector(vec![ + cond(CONNECTOR_CONDITION_READY, "True", "ConnectorReady", ""), + cond(CONNECTOR_CONDITION_IROH_DNS_PUBLISHED, "True", "Owner", ""), + ]); + let progress = TunnelProgress::from_resources(&p, Some(&c)); + assert!(progress.all_ready()); + assert!(progress.terminal_failure().is_none()); + } + + #[test] + fn progress_flags_deferred_to_owner_as_terminal() { + // This is the silent-tunnel failure: the iroh DNS record is owned by + // a different project's Connector. Waiting longer won't help — the + // CLI must bail and surface the owner so the user can act. + let p = proxy(vec![cond(HTTP_PROXY_CONDITION_ACCEPTED, "True", "Accepted", "")]); + let owner_msg = + "iroh DNS record is owned by Connector /other-project/default/datum-connect-xyz"; + let c = connector(vec![ + cond(CONNECTOR_CONDITION_READY, "True", "ConnectorReady", ""), + cond( + CONNECTOR_CONDITION_IROH_DNS_PUBLISHED, + "False", + CONNECTOR_REASON_DEFERRED_TO_OWNER, + owner_msg, + ), + ]); + let progress = TunnelProgress::from_resources(&p, Some(&c)); + let fail = progress.terminal_failure().expect("terminal failure detected"); + assert_eq!(fail.kind, ProgressStepKind::IrohDnsPublished); + assert_eq!(fail.message.as_deref(), Some(owner_msg)); + assert!(!progress.all_ready()); + } + + #[test] + fn progress_pending_for_false_but_non_terminal_reason() { + // CertificatesReady=False with reason "Issuing" should stay Pending + // (still progressing) — not Ready, not terminal. + let p = proxy(vec![cond( + HTTP_PROXY_CONDITION_CERTIFICATES_READY, + "False", + "Issuing", + "Certificate request submitted", + )]); + let progress = TunnelProgress::from_resources(&p, None); + let cert_step = progress + .step(ProgressStepKind::CertificatesReady) + .expect("step exists"); + assert_eq!(cert_step.status, StepStatus::Pending); + assert!(progress.terminal_failure().is_none()); + } + + #[test] + fn progress_step_carries_resource_label() { + // Every step should know which Kubernetes resource backs it so the + // CLI can render "[HTTPProxy/tunnel-test]" or + // "[Connector/datum-connect-test]" alongside the line — that's + // what the user copy-pastes into `datumctl describe`. + let p = proxy(vec![]); + let c = connector(vec![]); + let progress = TunnelProgress::from_resources(&p, Some(&c)); + + for step in &progress.steps { + let resource = step.resource.as_deref().expect("resource label set"); + let expected_kind = step.kind.resource_kind(); + assert!( + resource.starts_with(&format!("{expected_kind}/")), + "step {:?} should be backed by {expected_kind}, got {resource}", + step.kind, + ); + } + + // Connector-backed steps fall back to None when no connector exists. + let progress_no_conn = TunnelProgress::from_resources(&p, None); + let iroh = progress_no_conn + .step(ProgressStepKind::IrohDnsPublished) + .unwrap(); + assert!( + iroh.resource.is_none(), + "connector-backed step has no resource when connector is missing" + ); + let proxy_step = progress_no_conn + .step(ProgressStepKind::ProxyAccepted) + .unwrap(); + assert_eq!( + proxy_step.resource.as_deref(), + Some("HTTPProxy/tunnel-test") + ); + } + + fn api_error(code: u16, message: &str) -> kube::Error { + kube::Error::Api(kube::core::ErrorResponse { + status: "Failure".into(), + message: message.into(), + reason: if code == 403 { "Forbidden".into() } else { "Unknown".into() }, + code, + }) + } + + #[test] + fn quota_check_timeout_classifier_matches_transient_403() { + // The exact phrase the operator emits when the quota check itself + // times out — distinct from real quota exhaustion. The error message + // literally says "Please try again in a moment". + let err = api_error( + 403, + "connectoradvertisements.networking.datumapis.com \"tunnel-x\" is forbidden: \ + Your request took too long to be checked against your quota. Please try again \ + in a moment — if this keeps happening, contact support.", + ); + assert!(is_quota_check_timeout(&err)); + + // Real exhaustion shouldn't trigger retry. + let exhausted = api_error(403, "Insufficient quota for ConnectorAdvertisement"); + assert!(!is_quota_check_timeout(&exhausted)); + + // 401 with similar text shouldn't match — different failure class. + let unauthorized = api_error(401, "took too long to be checked against your quota"); + assert!(!is_quota_check_timeout(&unauthorized)); + + // format_quota_error should NOT mangle the timeout message into a + // misleading "Quota limit exceeded" string. + assert!( + format_quota_error(&err, "ConnectorAdvertisement").is_none(), + "transient timeout must propagate verbatim, not become 'exceeded'" + ); + // It SHOULD format real exhaustion. + assert!(format_quota_error(&exhausted, "ConnectorAdvertisement").is_some()); + } + + #[test] + fn progress_pending_when_status_is_stale_for_current_generation() { + // `tunnel listen --id` PATCHes the HTTPProxy spec to re-point the + // backend at the current connector, bumping generation 1 → 2. The + // controller's prior True conditions still carry observedGeneration=1 + // until it re-reconciles. Treating those as Ready was the bug + // behind "Tunnel ready after 0 sec" while the edge served 503s + // for minutes — Envoy was still on the previous-generation config. + let mut stale = cond( + HTTP_PROXY_CONDITION_PROGRAMMED, + "True", + "Programmed", + "Stale from previous generation", + ); + stale.observed_generation = Some(1); + let mut p_stale = proxy(vec![stale]); + p_stale.metadata.generation = Some(2); + let progress_stale = TunnelProgress::from_resources(&p_stale, None); + let step = progress_stale + .step(ProgressStepKind::ProxyProgrammed) + .expect("step exists"); + assert_eq!( + step.status, + StepStatus::Pending, + "True condition with observedGeneration < generation must be Pending" + ); + assert!(!progress_stale.all_ready()); + + // Once the controller observes the new generation, status flips Ready. + let mut fresh = cond(HTTP_PROXY_CONDITION_PROGRAMMED, "True", "Programmed", ""); + fresh.observed_generation = Some(2); + let mut p_fresh = proxy(vec![fresh]); + p_fresh.metadata.generation = Some(2); + let progress_fresh = TunnelProgress::from_resources(&p_fresh, None); + assert_eq!( + progress_fresh + .step(ProgressStepKind::ProxyProgrammed) + .unwrap() + .status, + StepStatus::Ready, + "matched observedGeneration must be Ready" + ); + } + + fn proxy_with_backend(label: &str, endpoint: &str, connector_name: &str) -> HTTPProxy { + let mut p = HTTPProxy::new( + "tunnel-test", + HTTPProxySpec { + hostnames: Some(vec!["test.datumproxy.net".into()]), + rules: vec![https_redirect_rule(), proxy_rule(endpoint, connector_name)], + }, + ); + let mut ann = std::collections::BTreeMap::new(); + ann.insert(DISPLAY_NAME_ANNOTATION.to_string(), label.to_string()); + p.metadata = ObjectMeta { + name: Some("tunnel-test".into()), + annotations: Some(ann), + ..Default::default() + }; + p + } + + #[test] + fn http_proxy_spec_matches_skips_no_op_resume() { + // The CLI resume path now goes through update_active which calls + // update_project. When the existing tunnel already points at the + // current connector with the same endpoint and label, the lib must + // recognize that and skip the PATCH — sending one would bump + // metadata.generation and trigger a downstream Envoy re-reconcile. + let existing = + proxy_with_backend("my-label", "http://127.0.0.1:11434", "datum-connect-mhxj5"); + let desired_rules = vec![ + https_redirect_rule(), + proxy_rule("http://127.0.0.1:11434", "datum-connect-mhxj5"), + ]; + assert!(http_proxy_spec_matches( + &existing, + "my-label", + &desired_rules + )); + } + + #[test] + fn http_proxy_spec_matches_detects_each_drift_axis() { + let existing = + proxy_with_backend("my-label", "http://127.0.0.1:11434", "datum-connect-mhxj5"); + + // Different connector — adoption across identity change must patch. + let rules_new_connector = vec![ + https_redirect_rule(), + proxy_rule("http://127.0.0.1:11434", "datum-connect-NEW"), + ]; + assert!(!http_proxy_spec_matches( + &existing, + "my-label", + &rules_new_connector + )); + + // Different endpoint — backend retarget must patch. + let rules_new_endpoint = vec![ + https_redirect_rule(), + proxy_rule("http://127.0.0.1:9999", "datum-connect-mhxj5"), + ]; + assert!(!http_proxy_spec_matches( + &existing, + "my-label", + &rules_new_endpoint + )); + + // Different label — rename must patch. + let rules_same = vec![ + https_redirect_rule(), + proxy_rule("http://127.0.0.1:11434", "datum-connect-mhxj5"), + ]; + assert!(!http_proxy_spec_matches( + &existing, + "different-label", + &rules_same + )); + + // No annotation at all — must patch. + let mut bare = existing.clone(); + bare.metadata.annotations = None; + assert!(!http_proxy_spec_matches(&bare, "my-label", &rules_same)); + } + + fn target(host: &str, port: u16) -> ParsedTarget { + ParsedTarget { + address: host.to_string(), + port, + } + } + + fn advertisement_with_target(connector_name: &str, host: &str, port: u16) -> ConnectorAdvertisement { + ConnectorAdvertisement { + metadata: ObjectMeta { + name: Some("tunnel-test".into()), + ..Default::default() + }, + spec: advertisement_spec(connector_name, target(host, port)), + status: None, + } + } + + #[test] + fn advertisement_spec_matches_skips_no_op() { + let existing = advertisement_with_target("datum-connect-mhxj5", "127.0.0.1", 11434); + let desired = advertisement_spec("datum-connect-mhxj5", target("127.0.0.1", 11434)); + assert!(advertisement_spec_matches(&existing, &desired)); + } + + #[test] + fn advertisement_spec_matches_detects_drift() { + let existing = advertisement_with_target("datum-connect-mhxj5", "127.0.0.1", 11434); + let desired_new_port = + advertisement_spec("datum-connect-mhxj5", target("127.0.0.1", 9999)); + assert!(!advertisement_spec_matches(&existing, &desired_new_port)); + + let desired_new_conn = + advertisement_spec("datum-connect-NEW", target("127.0.0.1", 11434)); + assert!(!advertisement_spec_matches(&existing, &desired_new_conn)); + } +}