From fa6584bb2313f8a7ae9eb1122c1dd7da55dd85fd Mon Sep 17 00:00:00 2001 From: Sergio Arroutbi Date: Mon, 11 May 2026 13:30:00 +0200 Subject: [PATCH] Add background attestation observation (FR-087) Spawn a periodic background task that records attestation observations independently of frontend API calls, eliminating timeline gaps when the dashboard is not being viewed. Includes fleet reconciliation every 10th tick and graceful shutdown via watch channel. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Sergio Arroutbi --- src/api/handlers/settings.rs | 2 + src/config.rs | 5 + src/lib.rs | 1 + src/main.rs | 25 +++- src/state.rs | 11 ++ src/tasks/mod.rs | 3 + src/tasks/observations.rs | 220 +++++++++++++++++++++++++++++++++++ 7 files changed, 266 insertions(+), 1 deletion(-) create mode 100644 src/tasks/mod.rs create mode 100644 src/tasks/observations.rs diff --git a/src/api/handlers/settings.rs b/src/api/handlers/settings.rs index 4d8dd9c..9ce3373 100644 --- a/src/api/handlers/settings.rs +++ b/src/api/handlers/settings.rs @@ -48,6 +48,7 @@ pub async fn update_keylime( registrar_url: body.registrar_url.clone(), mtls: state.keylime().mtls_config().cloned(), timeout_secs: 30, + observation_interval_secs: 30, circuit_breaker: Default::default(), }; @@ -161,6 +162,7 @@ pub async fn update_certificates( registrar_url: kl.registrar_url().to_string(), mtls, timeout_secs: 30, + observation_interval_secs: 30, circuit_breaker: Default::default(), }; drop(kl); diff --git a/src/config.rs b/src/config.rs index 2d30261..31ecb7d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -35,6 +35,8 @@ pub struct KeylimeConfig { pub mtls: Option, #[serde(default = "default_timeout_secs")] pub timeout_secs: u64, + #[serde(default = "default_observation_interval_secs")] + pub observation_interval_secs: u64, #[serde(default)] pub circuit_breaker: CircuitBreakerConfig, } @@ -217,6 +219,9 @@ fn default_retention_days() -> u32 { fn default_hash_algorithm() -> String { "sha256".to_string() } +fn default_observation_interval_secs() -> u64 { + 30 +} fn default_ssh_port() -> u16 { 22 } diff --git a/src/lib.rs b/src/lib.rs index 1e7c8ea..15c98ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,3 +11,4 @@ pub mod repository; pub mod settings_store; pub mod state; pub mod storage; +pub mod tasks; diff --git a/src/main.rs b/src/main.rs index ff8a527..547bd09 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ use keylime_webtool_backend::repository::{ }; use keylime_webtool_backend::settings_store; use keylime_webtool_backend::state::AppState; +use keylime_webtool_backend::tasks::background_observation_loop; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -43,11 +44,17 @@ async fn main() -> anyhow::Result<()> { let mtls = persisted.and_then(|s| s.mtls); + let observation_interval_secs: u64 = std::env::var("OBSERVATION_INTERVAL_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(30); + let keylime_config = KeylimeConfig { verifier_url, registrar_url, mtls, timeout_secs: 30, + observation_interval_secs, circuit_breaker: Default::default(), }; @@ -94,13 +101,29 @@ async fn main() -> anyhow::Result<()> { config_path, ); + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(()); + + tokio::spawn(background_observation_loop( + state.clone(), + observation_interval_secs, + shutdown_rx, + )); + let app = routes::build_router(state); let addr = SocketAddr::from(([0, 0, 0, 0], 8080)); tracing::info!("listening on {addr}"); let listener = TcpListener::bind(addr).await?; - axum::serve(listener, app).await?; + axum::serve(listener, app) + .with_graceful_shutdown(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for ctrl-c"); + tracing::info!("shutdown signal received"); + drop(shutdown_tx); + }) + .await?; Ok(()) } diff --git a/src/state.rs b/src/state.rs index 10fb314..6e73c14 100644 --- a/src/state.rs +++ b/src/state.rs @@ -102,6 +102,16 @@ impl AppState { } } + pub fn tracked_agent_ids(&self) -> Vec { + let tracker = self.attestation_tracker.read().unwrap(); + tracker.keys().cloned().collect() + } + + pub fn tracked_success(&self, agent_id: &str) -> Option { + let tracker = self.attestation_tracker.read().unwrap(); + tracker.get(agent_id).map(|s| s.success) + } + pub fn mark_recorded(&self, agent_id: &str, success: bool) { let mut tracker = self.attestation_tracker.write().unwrap(); tracker.insert( @@ -127,6 +137,7 @@ mod tests { registrar_url: "http://localhost:3001".into(), mtls: None, timeout_secs: 5, + observation_interval_secs: 30, circuit_breaker: Default::default(), }; let client = KeylimeClient::new(config).unwrap(); diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs new file mode 100644 index 0000000..5805d50 --- /dev/null +++ b/src/tasks/mod.rs @@ -0,0 +1,3 @@ +mod observations; + +pub use observations::background_observation_loop; diff --git a/src/tasks/observations.rs b/src/tasks/observations.rs new file mode 100644 index 0000000..df6fd24 --- /dev/null +++ b/src/tasks/observations.rs @@ -0,0 +1,220 @@ +use std::collections::HashSet; +use std::time::Duration; + +use tokio::sync::watch; +use tokio::time::interval; +use tracing::{info, warn}; + +use crate::api::handlers::attestations::record_agent_observations; +use crate::models::agent::AgentState; +use crate::state::AppState; + +const RECONCILIATION_EVERY_N_TICKS: u64 = 10; + +pub async fn background_observation_loop( + state: AppState, + interval_secs: u64, + mut shutdown_rx: watch::Receiver<()>, +) { + let mut ticker = interval(Duration::from_secs(interval_secs)); + let mut tick_count: u64 = 0; + let mut total_observations: u64 = 0; + + info!( + interval_secs, + "background observation task started (FR-087)" + ); + + loop { + tokio::select! { + _ = ticker.tick() => {}, + _ = shutdown_rx.changed() => { + info!(total_observations, "background observation task shutting down"); + return; + } + } + + tick_count += 1; + record_agent_observations(&state).await; + total_observations += 1; + + if tick_count % RECONCILIATION_EVERY_N_TICKS == 0 { + reconcile_fleet(&state).await; + } + } +} + +async fn reconcile_fleet(state: &AppState) { + let verifier_ids = match state.keylime().list_verifier_agents().await { + Ok(ids) => ids, + Err(e) => { + warn!("reconciliation: failed to list agents: {e}"); + return; + } + }; + + let tracked_ids: HashSet = state.tracked_agent_ids().into_iter().collect(); + let verifier_set: HashSet<&String> = verifier_ids.iter().collect(); + + let new_agents: Vec<_> = verifier_ids + .iter() + .filter(|id| !tracked_ids.contains(*id)) + .collect(); + if !new_agents.is_empty() { + info!( + count = new_agents.len(), + "reconciliation: discovered untracked agents" + ); + } + + let removed_agents: Vec<_> = tracked_ids + .iter() + .filter(|id| !verifier_set.contains(id)) + .collect(); + if !removed_agents.is_empty() { + info!( + count = removed_agents.len(), + "reconciliation: agents no longer in verifier" + ); + } + + let mut corrections = 0u64; + for id_str in &verifier_ids { + let tracked = state.tracked_success(id_str); + let Some(tracked_success) = tracked else { + continue; + }; + + let agent = match state.keylime().get_verifier_agent(id_str).await { + Ok(a) => a, + Err(_) => continue, + }; + + let agent_state = if agent.is_push_mode() { + AgentState::from_push_agent(&agent) + } else { + AgentState::from_operational_state(&agent.operational_state) + .unwrap_or(AgentState::Failed) + }; + + let actual_success = !agent_state.is_failed(); + if actual_success != tracked_success { + corrections += 1; + info!( + agent_id = id_str, + tracked = tracked_success, + actual = actual_success, + "reconciliation: corrected stale tracker entry" + ); + state.mark_recorded(id_str, actual_success); + } + } + + info!( + corrections, + verifier_count = verifier_ids.len(), + tracked_count = tracked_ids.len(), + "reconciliation sweep complete" + ); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + + use crate::config::KeylimeConfig; + use crate::keylime::client::KeylimeClient; + use crate::repository::{InMemoryCacheBackend, Repositories}; + + fn test_state() -> AppState { + let config = KeylimeConfig { + verifier_url: "http://localhost:3000".into(), + registrar_url: "http://localhost:3001".into(), + mtls: None, + timeout_secs: 5, + observation_interval_secs: 1, + circuit_breaker: Default::default(), + }; + let client = KeylimeClient::new(config).unwrap(); + let repos = Repositories::in_memory(); + AppState::new( + client, + repos.alert, + repos.attestation, + repos.policy, + repos.audit, + Arc::new(InMemoryCacheBackend::new()), + None, + ) + } + + #[tokio::test] + async fn shutdown_signal_stops_loop() { + let state = test_state(); + let (tx, rx) = watch::channel(()); + + let handle = tokio::spawn(background_observation_loop(state, 3600, rx)); + + tx.send(()).unwrap(); + let result = tokio::time::timeout(Duration::from_secs(2), handle).await; + assert!( + result.is_ok(), + "loop should exit promptly on shutdown signal" + ); + } + + #[tokio::test] + async fn api_error_does_not_crash_loop() { + let state = test_state(); + let (tx, rx) = watch::channel(()); + + let handle = tokio::spawn(async move { + let mut ticker = interval(Duration::from_millis(50)); + let mut shutdown = rx; + for _ in 0..3 { + tokio::select! { + _ = ticker.tick() => {}, + _ = shutdown.changed() => return, + } + record_agent_observations(&state).await; + } + }); + + let result = tokio::time::timeout(Duration::from_secs(5), handle).await; + assert!(result.is_ok(), "loop should survive API errors"); + + drop(tx); + } + + #[test] + fn reconciliation_tick_cadence() { + for tick in 1..=30 { + let should_reconcile = tick % RECONCILIATION_EVERY_N_TICKS == 0; + match tick { + 10 | 20 | 30 => assert!(should_reconcile, "tick {tick} should reconcile"), + _ => assert!(!should_reconcile, "tick {tick} should not reconcile"), + } + } + } + + #[test] + fn dedup_tracker_suppresses_duplicate() { + let state = test_state(); + state.mark_recorded("agent-1", true); + assert!( + !state.should_record_attestation("agent-1", true), + "duplicate within interval should be suppressed" + ); + } + + #[test] + fn state_change_bypasses_dedup() { + let state = test_state(); + state.mark_recorded("agent-1", true); + assert!( + state.should_record_attestation("agent-1", false), + "pass -> fail should record immediately" + ); + } +}