Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion connect-lib/bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ type ReloadHandle = Handle<EnvFilter, Registry>;
static RELOAD_HANDLE: OnceLock<ReloadHandle> = OnceLock::new();

fn init_tracing() {
let default_directive = "datum_connect=info";
let debug = debug_enabled();
let default_directive = if debug {
"datum_connect=debug,connect_lib=debug"
} else {
"datum_connect=info"
};
let filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new(default_directive));
let (filter_layer, handle) = reload::Layer::new(filter);
Expand All @@ -59,8 +64,32 @@ fn init_tracing() {
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
.try_init();
let _ = RELOAD_HANDLE.set(handle);
if debug {
eprintln!("[datum-connect] debug logging enabled (token refresh, heartbeat, etc.)");
}
}

/// Whether verbose debug logging is enabled. Toggled by the `--debug` CLI flag
/// or the `DATUM_CONNECT_DEBUG=1` env var. When enabled, the tracing filter is
/// bumped to `debug` for the `datum_connect` and `connect_lib` targets so that
/// token-refresh events (proactive + forced) and heartbeat 401 handling print
/// to the console (stderr).
fn debug_enabled() -> bool {
if std::env::var("DATUM_CONNECT_DEBUG")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
{
return true;
}
// Fall back to the global Args flag captured at parse time. Set by
// `Args::parse()` before the runtime-dependent code runs; the binary
// parses args after init_tracing(), so the env var is the primary path
// for the very first subscriber, and DEBUG_FLAG covers the flag case.
DEBUG_FLAG.get().copied().unwrap_or(false)
}

static DEBUG_FLAG: OnceLock<bool> = OnceLock::new();

fn silence_tracing() {
if let Some(handle) = RELOAD_HANDLE.get() {
let _ = handle.modify(|f| *f = EnvFilter::new("off"));
Expand All @@ -86,6 +115,10 @@ struct Args {
project: Option<String>,
#[clap(long, global = true)]
json: bool,
/// Enable verbose debug logging on stderr (token refresh events, heartbeat
/// 401 handling, etc.). Also enabled by `DATUM_CONNECT_DEBUG=1`.
#[clap(long, global = true)]
debug: bool,
#[clap(subcommand)]
command: Commands,
}
Expand Down Expand Up @@ -174,6 +207,21 @@ async fn run() -> n0_error::Result<()> {

let args = Args::parse();

// Honour the `--debug` CLI flag by bumping the tracing filter to debug.
// init_tracing() runs before Args::parse(), so the env var path covered
// the initial subscriber; this reloads the filter for the flag case.
// `RUST_LOG` always takes precedence and is left untouched when set.
let rust_log_set = std::env::var("RUST_LOG").is_ok();
if args.debug && !rust_log_set {
let _ = DEBUG_FLAG.set(true);
if let Some(handle) = RELOAD_HANDLE.get() {
let _ = handle.modify(|f| {
*f = EnvFilter::new("datum_connect=debug,connect_lib=debug");
});
eprintln!("[datum-connect] debug logging enabled (token refresh, heartbeat, etc.)");
}
}

let json = args.json;

let project_id = match args.project {
Expand Down
147 changes: 136 additions & 11 deletions connect-lib/lib/src/datum_cloud/external_token_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arc_swap::ArcSwap;
use base64::Engine;
use secrecy::{ExposeSecret, SecretString};
use tokio::sync::watch;
use tracing::{debug, warn};
use tracing::{debug, info, warn};

/// Errors that can occur when constructing an [`ExternalTokenSource`] from environment.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -127,6 +127,10 @@ impl ExternalTokenSource {
/// and calls [`swap_token()`](Self::swap_token) with the result.
pub fn force_refresh(&self) {
let current = *self.refresh_trigger.borrow();
info!(
trigger_count = current.wrapping_add(1),
"token refresh: forced refresh requested (401 or stale auth observed)"
);
let _ = self.refresh_trigger.send(current.wrapping_add(1));
}

Expand Down Expand Up @@ -169,6 +173,21 @@ impl ExternalTokenSource {
std::time::SystemTime::now() + std::time::Duration::from_secs(3600)
});

if let Some(exp) = initial_exp {
debug!(
exp = exp,
next_refresh_in_secs = next_refresh
.duration_since(std::time::SystemTime::now())
.map(|d| d.as_secs())
.unwrap_or(0),
"token refresh loop started; proactive refresh scheduled 60s before JWT expiry"
);
} else {
debug!(
"token refresh loop started; no JWT expiry claim, defaulting to 1h refresh interval"
);
}

let mut backoff = std::time::Duration::from_secs(5);
const MAX_BACKOFF: std::time::Duration = std::time::Duration::from_secs(60);

Expand All @@ -183,31 +202,43 @@ impl ExternalTokenSource {
};

// Wait either for the timer or a force_refresh signal
tokio::select! {
_ = tokio::time::sleep(wait) => {},
let forced = tokio::select! {
_ = tokio::time::sleep(wait) => {
debug!("token refresh: proactive timer fired");
false
}
_ = refresh_rx.changed() => {
debug!("ExternalTokenSource: forced refresh triggered");
info!("token refresh: forced refresh signalled (401 or stale auth)");
true
}
}
};

// Execute helper to get a fresh token
match Self::exec_helper(&helper, &session) {
Ok(new_token) => {
let prev_exp = parse_jwt_expiry(&self.token()).ok().flatten();
let new_exp = parse_jwt_expiry(&new_token).ok().flatten();
self.swap_token(new_token.clone());
backoff = std::time::Duration::from_secs(5); // Reset backoff

info!(
forced,
new_exp = ?new_exp,
prev_exp = ?prev_exp,
"token refresh: succeeded; token swapped and watchers notified"
);

// Parse new expiry for next refresh
next_refresh = match parse_jwt_expiry(&new_token) {
Ok(Some(exp)) => std::time::UNIX_EPOCH
next_refresh = match new_exp {
Some(exp) => std::time::UNIX_EPOCH
+ std::time::Duration::from_secs(exp.saturating_sub(60)),
_ => {
std::time::SystemTime::now()
+ std::time::Duration::from_secs(3600)
None => {
std::time::SystemTime::now() + std::time::Duration::from_secs(3600)
}
};
}
Err(e) => {
warn!("token refresh failed: {e}");
warn!(forced, "token refresh failed: {e}; retrying in {:?}", backoff);
// Retry with backoff
next_refresh = std::time::SystemTime::now() + backoff;
backoff = std::cmp::min(backoff * 2, MAX_BACKOFF);
Expand Down Expand Up @@ -485,4 +516,98 @@ mod tests {
source.force_refresh();
assert_eq!(*rx.borrow(), 2);
}

/// Verifies the end-to-end refresh path: when `force_refresh()` is
/// signalled (e.g. after a 401), the background loop re-executes the
/// credentials helper and swaps in the new token, notifying watchers.
///
/// This guards against the "stale auth" regression where the heartbeat
/// observed a 401 but never actually triggered a refresh — the token
/// stayed dead until the proactive timer eventually fired.
#[tokio::test]
async fn force_refresh_swaps_token_via_loop() {
let _lock = crate::ENV_LOCK.lock().unwrap();
let dir = TempDir::new();

// Helper that emits a distinct JWT on every invocation by reading
// and incrementing a counter file. This lets the test observe that
// the loop actually re-executed the helper (not just that the signal
// was sent).
let counter_path = dir.path().join("counter");
std::fs::write(&counter_path, "0").expect("should write counter");
let helper_path = dir.path().join("counter-helper.sh");
let counter_str = counter_path.to_string_lossy().replace('\'', "'\\''");
let script = format!(
"#!/bin/sh\n\
n=$(cat '{counter_str}')\n\
n=$((n + 1))\n\
echo \"$n\" > '{counter_str}'\n\
exp=$((1700000000 + n))\n\
header=$(printf '{{\"alg\":\"HS256\",\"typ\":\"JWT\"}}' | base64 | tr -d '=' | tr '/+' '_-')\n\
payload=$(printf '{{\"exp\":%d,\"sub\":\"rotating\"}}' \"$exp\" | base64 | tr -d '=' | tr '/+' '_-')\n\
printf '%s.%s.rotated\\n' \"$header\" \"$payload\"\n",
);
std::fs::write(&helper_path, script).expect("should write helper script");
#[cfg(unix)]
std::fs::set_permissions(
&helper_path,
std::os::unix::fs::PermissionsExt::from_mode(0o755),
)
.expect("should set executable permission");

unsafe {
std::env::set_var("DATUM_CREDENTIALS_HELPER", helper_path.to_string_lossy().as_ref());
std::env::set_var("DATUM_SESSION", "test-session");
}

// Use a token with a far-future expiry so the proactive timer does
// not fire during the test — only the forced refresh should swap.
let initial = make_jwt_with_exp(9999999999);
std::fs::write(&counter_path, "0").expect("should reset counter");
// Build the source by hand so from_env() doesn't consume the first
// helper invocation (we want the *loop* to be the one rotating).
let (token_tx, _) = watch::channel(initial.clone());
let (refresh_tx, _) = watch::channel(0u64);
let source = ExternalTokenSource {
token: std::sync::Arc::new(ArcSwap::from_pointee(SecretString::new(
initial.clone().into(),
))),
token_tx: std::sync::Arc::new(token_tx),
refresh_trigger: std::sync::Arc::new(refresh_tx),
};

let mut rx = source.watch();
assert_eq!(*rx.borrow(), initial, "watch initial value");

source.start_refresh(
helper_path.to_string_lossy().to_string(),
"test-session".to_string(),
);

// Nothing should have rotated yet (proactive timer is far in the
// future). Give the loop a moment to prove a negative.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert_eq!(source.token(), initial, "no proactive refresh expected yet");

// Force a refresh (as the heartbeat does on a 401) and wait for the
// loop to re-exec the helper and swap the token.
source.force_refresh();
for _ in 0..40 {
if source.token() != initial {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}

let new_token = source.token();
assert_ne!(
new_token, initial,
"force_refresh must have rotated the token"
);
assert!(
new_token.ends_with(".rotated"),
"rotated token should come from the counter helper: {new_token}"
);
assert_eq!(*rx.borrow(), new_token, "watchers notified of new token");
}
}
16 changes: 16 additions & 0 deletions connect-lib/lib/src/datum_cloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,22 @@ impl DatumCloudClient {
self.token_source.token()
}

/// Force an immediate token refresh by signalling the background refresh
/// loop in [`ExternalTokenSource`] to re-execute the credentials helper
/// now, ahead of its proactive schedule.
///
/// Call this when a 401 response is observed from the API: the proactive
/// refresh timer only fires near JWT expiry, so a token rejected early
/// (clock skew, revocation, IdP-side rotation) would otherwise leave
/// callers retrying with the same dead token until the timer catches up.
/// The refresh loop swaps the new token into the shared [`ExternalTokenSource`]
/// and notifies watchers; the next [`Self::project_control_plane_client`]
/// call (and any `ProjectControlPlaneClient` rebuilt via its token watch)
/// picks up the fresh token.
pub fn force_token_refresh(&self) {
self.token_source.force_refresh();
}

pub fn api_url(&self) -> Cow<'static, str> {
self.env.api_url()
}
Expand Down
36 changes: 17 additions & 19 deletions connect-lib/lib/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,25 +309,22 @@ fn classify_lease_error(err: &kube::Error) -> LeaseErrorAction {
}
}

/// Force an OAuth token refresh after a 401. The proactive timer only refreshes
/// when the token is near expiry, so a server-side rejection that arrives early
/// (clock skew, revocation, etc.) would otherwise leave the heartbeat retrying
/// with the same dead token until the timer eventually fires.
/// Force a token refresh after a 401. The proactive refresh timer in
/// [`ExternalTokenSource`] only fires when the access token is within 60s of
/// JWT expiry, so a token rejected before that (clock skew, revocation,
/// IdP-side rotation) would otherwise leave the heartbeat retrying with the
/// same dead token until the timer eventually catches up — the classic
/// "stale auth" tunnel failure.
///
/// When auth is already in [`LoginState::Missing`] (e.g. after a previous
/// permanent refresh failure), this returns immediately without contacting the
/// IdP — the auth layer has already surfaced the loss to the operator and
/// there is nothing to refresh until they log in again.
/// This signals the in-process refresh loop to re-execute the
/// `DATUM_CREDENTIALS_HELPER` subprocess immediately. The loop swaps the new
/// token into the shared [`ExternalTokenSource`] and notifies watchers; the
/// next `project_control_plane_client()` call picks up the fresh token.
///
/// Plugin-mode adaptation (connect-lib fork): connect-lib does not own the
/// OAuth flow — token refresh is driven by the parent process (datumctl)
/// via the `DATUM_CREDENTIALS_HELPER` subprocess, which swaps the new token
/// into `ExternalTokenSource` out-of-band. From inside the heartbeat loop
/// all we can do is log the 401 trigger; the next pcp-client construction
/// will pick up whatever token the helper has provided. The
/// `LoginState::Missing` guard remains in place so that if a future
/// LoginState-driven mechanism marks the session as dead, we stop
/// hammering the kube path on every 401.
/// When auth is already in [`LoginState::Missing`] (e.g. after a previous
/// permanent refresh failure), this returns immediately without contacting
/// the helper — the auth layer has already surfaced the loss to the operator
/// and there is nothing to refresh until they log in again.
async fn force_refresh_auth(project_id: &str, datum: &DatumCloudClient) {
if matches!(datum.login_state(), crate::datum_cloud::LoginState::Missing) {
debug!(
Expand All @@ -336,10 +333,11 @@ async fn force_refresh_auth(project_id: &str, datum: &DatumCloudClient) {
);
return;
}
debug!(
info!(
%project_id,
"heartbeat: 401 observed; token refresh is external in plugin mode (datumctl credentials helper)"
"heartbeat: 401 observed; forcing token refresh via credentials helper"
);
datum.force_token_refresh();
}

async fn run_project(
Expand Down