From 65bfc3f20e91a1e9206053ca9157c39e80863ef6 Mon Sep 17 00:00:00 2001 From: Sergio Arroutbi Date: Mon, 11 May 2026 14:51:46 +0200 Subject: [PATCH] Fix dedup timing race with monotonic Instant MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The dedup tracker used chrono::DateTime (wall-clock) with num_seconds() truncation, causing the background task to skip every other tick: 29.8s truncated to 29 < 30 → rejected. Switch to tokio::time::Instant (monotonic, sub-nanosecond precision) and replace tokio::time::interval with sleep-after-processing so the 30s gap starts after mark_recorded, not before. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Sergio Arroutbi --- src/state.rs | 16 +++++++-------- src/tasks/observations.rs | 43 ++++++++++++++++----------------------- 2 files changed, 24 insertions(+), 35 deletions(-) diff --git a/src/state.rs b/src/state.rs index 6e73c14..96cd102 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,8 +1,9 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::{Arc, RwLock}; +use std::time::Duration; -use chrono::{DateTime, Utc}; +use tokio::time::Instant; use crate::config::SshConfig; use crate::keylime::client::KeylimeClient; @@ -11,11 +12,11 @@ use crate::repository::{ }; use crate::settings_store::{self, PersistedKeylime, PersistedSettings}; -const DEDUP_INTERVAL_SECS: i64 = 30; +const DEDUP_INTERVAL: Duration = Duration::from_secs(30); struct AttestationSnapshot { success: bool, - recorded_at: DateTime, + recorded_at: Instant, } #[derive(Clone)] @@ -94,10 +95,7 @@ impl AppState { if snapshot.success != success { return true; } - let elapsed = Utc::now() - .signed_duration_since(snapshot.recorded_at) - .num_seconds(); - elapsed >= DEDUP_INTERVAL_SECS + snapshot.recorded_at.elapsed() >= DEDUP_INTERVAL } } } @@ -118,7 +116,7 @@ impl AppState { agent_id.to_string(), AttestationSnapshot { success, - recorded_at: Utc::now(), + recorded_at: Instant::now(), }, ); } @@ -182,7 +180,7 @@ mod tests { "agent-1".to_string(), AttestationSnapshot { success: true, - recorded_at: Utc::now() - chrono::Duration::seconds(DEDUP_INTERVAL_SECS + 1), + recorded_at: Instant::now() - DEDUP_INTERVAL - Duration::from_secs(1), }, ); } diff --git a/src/tasks/observations.rs b/src/tasks/observations.rs index df6fd24..b47efe6 100644 --- a/src/tasks/observations.rs +++ b/src/tasks/observations.rs @@ -2,7 +2,6 @@ use std::collections::HashSet; use std::time::Duration; use tokio::sync::watch; -use tokio::time::interval; use tracing::{info, warn}; use crate::api::handlers::attestations::record_agent_observations; @@ -16,7 +15,7 @@ pub async fn background_observation_loop( interval_secs: u64, mut shutdown_rx: watch::Receiver<()>, ) { - let mut ticker = interval(Duration::from_secs(interval_secs)); + let sleep_duration = Duration::from_secs(interval_secs); let mut tick_count: u64 = 0; let mut total_observations: u64 = 0; @@ -26,14 +25,6 @@ pub async fn background_observation_loop( ); loop { - tokio::select! { - _ = ticker.tick() => {}, - _ = shutdown_rx.changed() => { - info!(total_observations, "background observation task shutting down"); - return; - } - } - tick_count += 1; record_agent_observations(&state).await; total_observations += 1; @@ -41,6 +32,14 @@ pub async fn background_observation_loop( if tick_count % RECONCILIATION_EVERY_N_TICKS == 0 { reconcile_fleet(&state).await; } + + tokio::select! { + _ = tokio::time::sleep(sleep_duration) => {}, + _ = shutdown_rx.changed() => { + info!(total_observations, "background observation task shutting down"); + return; + } + } } } @@ -156,8 +155,9 @@ mod tests { let handle = tokio::spawn(background_observation_loop(state, 3600, rx)); + tokio::task::yield_now().await; tx.send(()).unwrap(); - let result = tokio::time::timeout(Duration::from_secs(2), handle).await; + let result = tokio::time::timeout(Duration::from_secs(5), handle).await; assert!( result.is_ok(), "loop should exit promptly on shutdown signal" @@ -167,24 +167,15 @@ mod tests { #[tokio::test] async fn api_error_does_not_crash_loop() { let state = test_state(); - let (tx, rx) = watch::channel(()); + let (_tx, rx) = watch::channel(()); - let handle = tokio::spawn(async move { - let mut ticker = interval(Duration::from_millis(50)); - let mut shutdown = rx; - for _ in 0..3 { - tokio::select! { - _ = ticker.tick() => {}, - _ = shutdown.changed() => return, - } - record_agent_observations(&state).await; - } - }); + let handle = tokio::spawn(background_observation_loop(state, 1, rx)); let result = tokio::time::timeout(Duration::from_secs(5), handle).await; - assert!(result.is_ok(), "loop should survive API errors"); - - drop(tx); + assert!( + result.is_err(), + "loop should keep running despite API errors" + ); } #[test]