From 642aec434f4d06eb780e17f2e3b5fcd812f254c4 Mon Sep 17 00:00:00 2001 From: Drew Raines Date: Tue, 23 Jun 2026 17:59:00 +0000 Subject: [PATCH] fix: force token refresh on 401 and add debug logging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The heartbeat's 401 handler only logged the rejection and never triggered a refresh — a stale comment claimed refresh was 'external/out-of-band'. So when a token was rejected early (clock skew, revocation, IdP rotation), the heartbeat kept retrying with the dead token until the proactive timer eventually fired (up to ~1h), causing tunnels to stop working. - Add DatumCloudClient::force_token_refresh() delegating to ExternalTokenSource::force_refresh(). - Rewrite heartbeat force_refresh_auth to actually call it; fix the misleading doc comment. LoginState::Missing guard preserved. - Add --debug CLI flag and DATUM_CONNECT_DEBUG=1 env var to datum-connect; bumps tracing filter to debug so refresh events print to stderr. - Add structured info!/debug! events in ExternalTokenSource covering forced-refresh requests, proactive-timer fires, successful swaps (with old/new JWT expiry, forced-vs-proactive flag), and failures. - Add force_refresh_swaps_token_via_loop test guarding the regression. --- connect-lib/bin/src/main.rs | 50 +++++- .../src/datum_cloud/external_token_source.rs | 147 ++++++++++++++++-- connect-lib/lib/src/datum_cloud/mod.rs | 16 ++ connect-lib/lib/src/heartbeat.rs | 36 ++--- 4 files changed, 218 insertions(+), 31 deletions(-) diff --git a/connect-lib/bin/src/main.rs b/connect-lib/bin/src/main.rs index 8814548..450085b 100644 --- a/connect-lib/bin/src/main.rs +++ b/connect-lib/bin/src/main.rs @@ -48,7 +48,12 @@ type ReloadHandle = Handle; static RELOAD_HANDLE: OnceLock = OnceLock::new(); fn init_tracing() { - let default_directive = "datum_connect=info"; + let debug = debug_enabled(); + let default_directive = if debug { + "datum_connect=debug,connect_lib=debug" + } else { + "datum_connect=info" + }; let filter = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new(default_directive)); let (filter_layer, handle) = reload::Layer::new(filter); @@ -59,8 +64,32 @@ fn init_tracing() { .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) .try_init(); let _ = RELOAD_HANDLE.set(handle); + if debug { + eprintln!("[datum-connect] debug logging enabled (token refresh, heartbeat, etc.)"); + } +} + +/// Whether verbose debug logging is enabled. Toggled by the `--debug` CLI flag +/// or the `DATUM_CONNECT_DEBUG=1` env var. When enabled, the tracing filter is +/// bumped to `debug` for the `datum_connect` and `connect_lib` targets so that +/// token-refresh events (proactive + forced) and heartbeat 401 handling print +/// to the console (stderr). +fn debug_enabled() -> bool { + if std::env::var("DATUM_CONNECT_DEBUG") + .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) + .unwrap_or(false) + { + return true; + } + // Fall back to the global Args flag captured at parse time. Set by + // `Args::parse()` before the runtime-dependent code runs; the binary + // parses args after init_tracing(), so the env var is the primary path + // for the very first subscriber, and DEBUG_FLAG covers the flag case. + DEBUG_FLAG.get().copied().unwrap_or(false) } +static DEBUG_FLAG: OnceLock = OnceLock::new(); + fn silence_tracing() { if let Some(handle) = RELOAD_HANDLE.get() { let _ = handle.modify(|f| *f = EnvFilter::new("off")); @@ -86,6 +115,10 @@ struct Args { project: Option, #[clap(long, global = true)] json: bool, + /// Enable verbose debug logging on stderr (token refresh events, heartbeat + /// 401 handling, etc.). Also enabled by `DATUM_CONNECT_DEBUG=1`. + #[clap(long, global = true)] + debug: bool, #[clap(subcommand)] command: Commands, } @@ -174,6 +207,21 @@ async fn run() -> n0_error::Result<()> { let args = Args::parse(); + // Honour the `--debug` CLI flag by bumping the tracing filter to debug. + // init_tracing() runs before Args::parse(), so the env var path covered + // the initial subscriber; this reloads the filter for the flag case. + // `RUST_LOG` always takes precedence and is left untouched when set. + let rust_log_set = std::env::var("RUST_LOG").is_ok(); + if args.debug && !rust_log_set { + let _ = DEBUG_FLAG.set(true); + if let Some(handle) = RELOAD_HANDLE.get() { + let _ = handle.modify(|f| { + *f = EnvFilter::new("datum_connect=debug,connect_lib=debug"); + }); + eprintln!("[datum-connect] debug logging enabled (token refresh, heartbeat, etc.)"); + } + } + let json = args.json; let project_id = match args.project { diff --git a/connect-lib/lib/src/datum_cloud/external_token_source.rs b/connect-lib/lib/src/datum_cloud/external_token_source.rs index 9c0c7ca..87ab745 100644 --- a/connect-lib/lib/src/datum_cloud/external_token_source.rs +++ b/connect-lib/lib/src/datum_cloud/external_token_source.rs @@ -5,7 +5,7 @@ use arc_swap::ArcSwap; use base64::Engine; use secrecy::{ExposeSecret, SecretString}; use tokio::sync::watch; -use tracing::{debug, warn}; +use tracing::{debug, info, warn}; /// Errors that can occur when constructing an [`ExternalTokenSource`] from environment. #[derive(Debug, thiserror::Error)] @@ -127,6 +127,10 @@ impl ExternalTokenSource { /// and calls [`swap_token()`](Self::swap_token) with the result. pub fn force_refresh(&self) { let current = *self.refresh_trigger.borrow(); + info!( + trigger_count = current.wrapping_add(1), + "token refresh: forced refresh requested (401 or stale auth observed)" + ); let _ = self.refresh_trigger.send(current.wrapping_add(1)); } @@ -169,6 +173,21 @@ impl ExternalTokenSource { std::time::SystemTime::now() + std::time::Duration::from_secs(3600) }); + if let Some(exp) = initial_exp { + debug!( + exp = exp, + next_refresh_in_secs = next_refresh + .duration_since(std::time::SystemTime::now()) + .map(|d| d.as_secs()) + .unwrap_or(0), + "token refresh loop started; proactive refresh scheduled 60s before JWT expiry" + ); + } else { + debug!( + "token refresh loop started; no JWT expiry claim, defaulting to 1h refresh interval" + ); + } + let mut backoff = std::time::Duration::from_secs(5); const MAX_BACKOFF: std::time::Duration = std::time::Duration::from_secs(60); @@ -183,31 +202,43 @@ impl ExternalTokenSource { }; // Wait either for the timer or a force_refresh signal - tokio::select! { - _ = tokio::time::sleep(wait) => {}, + let forced = tokio::select! { + _ = tokio::time::sleep(wait) => { + debug!("token refresh: proactive timer fired"); + false + } _ = refresh_rx.changed() => { - debug!("ExternalTokenSource: forced refresh triggered"); + info!("token refresh: forced refresh signalled (401 or stale auth)"); + true } - } + }; // Execute helper to get a fresh token match Self::exec_helper(&helper, &session) { Ok(new_token) => { + let prev_exp = parse_jwt_expiry(&self.token()).ok().flatten(); + let new_exp = parse_jwt_expiry(&new_token).ok().flatten(); self.swap_token(new_token.clone()); backoff = std::time::Duration::from_secs(5); // Reset backoff + info!( + forced, + new_exp = ?new_exp, + prev_exp = ?prev_exp, + "token refresh: succeeded; token swapped and watchers notified" + ); + // Parse new expiry for next refresh - next_refresh = match parse_jwt_expiry(&new_token) { - Ok(Some(exp)) => std::time::UNIX_EPOCH + next_refresh = match new_exp { + Some(exp) => std::time::UNIX_EPOCH + std::time::Duration::from_secs(exp.saturating_sub(60)), - _ => { - std::time::SystemTime::now() - + std::time::Duration::from_secs(3600) + None => { + std::time::SystemTime::now() + std::time::Duration::from_secs(3600) } }; } Err(e) => { - warn!("token refresh failed: {e}"); + warn!(forced, "token refresh failed: {e}; retrying in {:?}", backoff); // Retry with backoff next_refresh = std::time::SystemTime::now() + backoff; backoff = std::cmp::min(backoff * 2, MAX_BACKOFF); @@ -485,4 +516,98 @@ mod tests { source.force_refresh(); assert_eq!(*rx.borrow(), 2); } + + /// Verifies the end-to-end refresh path: when `force_refresh()` is + /// signalled (e.g. after a 401), the background loop re-executes the + /// credentials helper and swaps in the new token, notifying watchers. + /// + /// This guards against the "stale auth" regression where the heartbeat + /// observed a 401 but never actually triggered a refresh — the token + /// stayed dead until the proactive timer eventually fired. + #[tokio::test] + async fn force_refresh_swaps_token_via_loop() { + let _lock = crate::ENV_LOCK.lock().unwrap(); + let dir = TempDir::new(); + + // Helper that emits a distinct JWT on every invocation by reading + // and incrementing a counter file. This lets the test observe that + // the loop actually re-executed the helper (not just that the signal + // was sent). + let counter_path = dir.path().join("counter"); + std::fs::write(&counter_path, "0").expect("should write counter"); + let helper_path = dir.path().join("counter-helper.sh"); + let counter_str = counter_path.to_string_lossy().replace('\'', "'\\''"); + let script = format!( + "#!/bin/sh\n\ + n=$(cat '{counter_str}')\n\ + n=$((n + 1))\n\ + echo \"$n\" > '{counter_str}'\n\ + exp=$((1700000000 + n))\n\ + header=$(printf '{{\"alg\":\"HS256\",\"typ\":\"JWT\"}}' | base64 | tr -d '=' | tr '/+' '_-')\n\ + payload=$(printf '{{\"exp\":%d,\"sub\":\"rotating\"}}' \"$exp\" | base64 | tr -d '=' | tr '/+' '_-')\n\ + printf '%s.%s.rotated\\n' \"$header\" \"$payload\"\n", + ); + std::fs::write(&helper_path, script).expect("should write helper script"); + #[cfg(unix)] + std::fs::set_permissions( + &helper_path, + std::os::unix::fs::PermissionsExt::from_mode(0o755), + ) + .expect("should set executable permission"); + + unsafe { + std::env::set_var("DATUM_CREDENTIALS_HELPER", helper_path.to_string_lossy().as_ref()); + std::env::set_var("DATUM_SESSION", "test-session"); + } + + // Use a token with a far-future expiry so the proactive timer does + // not fire during the test — only the forced refresh should swap. + let initial = make_jwt_with_exp(9999999999); + std::fs::write(&counter_path, "0").expect("should reset counter"); + // Build the source by hand so from_env() doesn't consume the first + // helper invocation (we want the *loop* to be the one rotating). + let (token_tx, _) = watch::channel(initial.clone()); + let (refresh_tx, _) = watch::channel(0u64); + let source = ExternalTokenSource { + token: std::sync::Arc::new(ArcSwap::from_pointee(SecretString::new( + initial.clone().into(), + ))), + token_tx: std::sync::Arc::new(token_tx), + refresh_trigger: std::sync::Arc::new(refresh_tx), + }; + + let mut rx = source.watch(); + assert_eq!(*rx.borrow(), initial, "watch initial value"); + + source.start_refresh( + helper_path.to_string_lossy().to_string(), + "test-session".to_string(), + ); + + // Nothing should have rotated yet (proactive timer is far in the + // future). Give the loop a moment to prove a negative. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + assert_eq!(source.token(), initial, "no proactive refresh expected yet"); + + // Force a refresh (as the heartbeat does on a 401) and wait for the + // loop to re-exec the helper and swap the token. + source.force_refresh(); + for _ in 0..40 { + if source.token() != initial { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + + let new_token = source.token(); + assert_ne!( + new_token, initial, + "force_refresh must have rotated the token" + ); + assert!( + new_token.ends_with(".rotated"), + "rotated token should come from the counter helper: {new_token}" + ); + assert_eq!(*rx.borrow(), new_token, "watchers notified of new token"); + } } diff --git a/connect-lib/lib/src/datum_cloud/mod.rs b/connect-lib/lib/src/datum_cloud/mod.rs index c50d7fb..4327c6a 100644 --- a/connect-lib/lib/src/datum_cloud/mod.rs +++ b/connect-lib/lib/src/datum_cloud/mod.rs @@ -200,6 +200,22 @@ impl DatumCloudClient { self.token_source.token() } + /// Force an immediate token refresh by signalling the background refresh + /// loop in [`ExternalTokenSource`] to re-execute the credentials helper + /// now, ahead of its proactive schedule. + /// + /// Call this when a 401 response is observed from the API: the proactive + /// refresh timer only fires near JWT expiry, so a token rejected early + /// (clock skew, revocation, IdP-side rotation) would otherwise leave + /// callers retrying with the same dead token until the timer catches up. + /// The refresh loop swaps the new token into the shared [`ExternalTokenSource`] + /// and notifies watchers; the next [`Self::project_control_plane_client`] + /// call (and any `ProjectControlPlaneClient` rebuilt via its token watch) + /// picks up the fresh token. + pub fn force_token_refresh(&self) { + self.token_source.force_refresh(); + } + pub fn api_url(&self) -> Cow<'static, str> { self.env.api_url() } diff --git a/connect-lib/lib/src/heartbeat.rs b/connect-lib/lib/src/heartbeat.rs index 2cd1019..72edb27 100644 --- a/connect-lib/lib/src/heartbeat.rs +++ b/connect-lib/lib/src/heartbeat.rs @@ -309,25 +309,22 @@ fn classify_lease_error(err: &kube::Error) -> LeaseErrorAction { } } -/// Force an OAuth token refresh after a 401. The proactive timer only refreshes -/// when the token is near expiry, so a server-side rejection that arrives early -/// (clock skew, revocation, etc.) would otherwise leave the heartbeat retrying -/// with the same dead token until the timer eventually fires. +/// Force a token refresh after a 401. The proactive refresh timer in +/// [`ExternalTokenSource`] only fires when the access token is within 60s of +/// JWT expiry, so a token rejected before that (clock skew, revocation, +/// IdP-side rotation) would otherwise leave the heartbeat retrying with the +/// same dead token until the timer eventually catches up — the classic +/// "stale auth" tunnel failure. /// -/// When auth is already in [`LoginState::Missing`] (e.g. after a previous -/// permanent refresh failure), this returns immediately without contacting the -/// IdP — the auth layer has already surfaced the loss to the operator and -/// there is nothing to refresh until they log in again. +/// This signals the in-process refresh loop to re-execute the +/// `DATUM_CREDENTIALS_HELPER` subprocess immediately. The loop swaps the new +/// token into the shared [`ExternalTokenSource`] and notifies watchers; the +/// next `project_control_plane_client()` call picks up the fresh token. /// -/// Plugin-mode adaptation (connect-lib fork): connect-lib does not own the -/// OAuth flow — token refresh is driven by the parent process (datumctl) -/// via the `DATUM_CREDENTIALS_HELPER` subprocess, which swaps the new token -/// into `ExternalTokenSource` out-of-band. From inside the heartbeat loop -/// all we can do is log the 401 trigger; the next pcp-client construction -/// will pick up whatever token the helper has provided. The -/// `LoginState::Missing` guard remains in place so that if a future -/// LoginState-driven mechanism marks the session as dead, we stop -/// hammering the kube path on every 401. +/// When auth is already in [`LoginState::Missing`] (e.g. after a previous +/// permanent refresh failure), this returns immediately without contacting +/// the helper — the auth layer has already surfaced the loss to the operator +/// and there is nothing to refresh until they log in again. async fn force_refresh_auth(project_id: &str, datum: &DatumCloudClient) { if matches!(datum.login_state(), crate::datum_cloud::LoginState::Missing) { debug!( @@ -336,10 +333,11 @@ async fn force_refresh_auth(project_id: &str, datum: &DatumCloudClient) { ); return; } - debug!( + info!( %project_id, - "heartbeat: 401 observed; token refresh is external in plugin mode (datumctl credentials helper)" + "heartbeat: 401 observed; forcing token refresh via credentials helper" ); + datum.force_token_refresh(); } async fn run_project(