diff --git a/src/agentsight/src/config.rs b/src/agentsight/src/config.rs index 041da7de..e659f7fc 100644 --- a/src/agentsight/src/config.rs +++ b/src/agentsight/src/config.rs @@ -238,6 +238,8 @@ struct JsonFullConfig { #[serde(default)] deadloop: Option, #[serde(default)] + reactive: Option, + #[serde(default)] cgroup_filter_enabled: Option, #[serde(default)] cgroup_ids: Option>, @@ -254,6 +256,17 @@ struct JsonDeadloop { kill_after_count: Option, } +/// Reactive exporter 配置区段 +#[derive(serde::Deserialize, Clone, Debug)] +struct JsonReactive { + #[serde(default)] + enabled: Option, + #[serde(default)] + debounce_secs: Option, + #[serde(default)] + workspace: Option, +} + /// Runtime 动态配置区段(支持热加载,无需重启) #[derive(serde::Deserialize, Clone, Debug)] pub struct JsonRuntime { @@ -502,6 +515,14 @@ pub struct AgentsightConfig { pub deadloop_kill_enabled: bool, /// 触发 kill 的循环次数阈值(检测到 N 次后 kill) pub deadloop_kill_after_count: usize, + + // --- Reactive Exporter Configuration --- + /// Enable the reactive exporter (observe→act pipeline) + pub reactive_enabled: Option, + /// Minimum seconds between consecutive checkpoint actions + pub reactive_debounce_secs: Option, + /// Workspace path for ws-ckpt checkpoint + pub reactive_workspace: Option, } impl Default for AgentsightConfig { @@ -562,6 +583,11 @@ impl Default for AgentsightConfig { // DeadLoop auto-kill defaults (disabled by default) deadloop_kill_enabled: false, deadloop_kill_after_count: 3, + + // Reactive exporter defaults (disabled by default) + reactive_enabled: None, + reactive_debounce_secs: None, + reactive_workspace: None, } } } @@ -703,6 +729,13 @@ impl AgentsightConfig { } } + // 解析 reactive exporter 配置 + if let Some(ref r) = parsed.reactive { + self.reactive_enabled = r.enabled; + self.reactive_debounce_secs = r.debounce_secs; + self.reactive_workspace = r.workspace.clone(); + } + // Parse cgroup filter settings if let Some(v) = parsed.cgroup_filter_enabled { self.cgroup_filter_enabled = v; diff --git a/src/agentsight/src/genai/mod.rs b/src/agentsight/src/genai/mod.rs index 023ee655..7efbccfd 100644 --- a/src/agentsight/src/genai/mod.rs +++ b/src/agentsight/src/genai/mod.rs @@ -11,6 +11,7 @@ pub mod instance_id; pub mod logtail; pub mod encrypt; pub mod anolisa_release; +pub mod reactive; pub use semantic::{ GenAISemanticEvent, LLMCall, LLMRequest, LLMResponse, diff --git a/src/agentsight/src/genai/reactive.rs b/src/agentsight/src/genai/reactive.rs new file mode 100644 index 00000000..8fa48514 --- /dev/null +++ b/src/agentsight/src/genai/reactive.rs @@ -0,0 +1,646 @@ +use std::process::{Command, Stdio}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{self, SyncSender}; +use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant}; + +use super::exporter::GenAIExporter; +use super::semantic::GenAISemanticEvent; + +pub struct ReactiveConfig { + pub enabled: bool, + pub debounce_secs: u64, + pub workspace_path: Option, +} + +impl Default for ReactiveConfig { + fn default() -> Self { + Self { + enabled: false, + debounce_secs: 30, + workspace_path: None, + } + } +} + +#[allow(dead_code)] +enum Msg { + Checkpoint { + reason: String, + conversation_id: Option, + }, + InterruptionAlert { + interruption_type: String, + conversation_id: Option, + }, + TokenAccum { + agent_name: String, + input_tokens: u64, + has_cache: bool, + }, + Advisory { + message: String, + }, + Shutdown, +} + +use std::collections::HashMap as StdHashMap; + +struct AgentTokenState { + cumulative: u64, + any_cache_hit: bool, + window_start: Instant, + last_advisory: Option, +} + +/// Lightweight handle for forwarding interruption alerts to the ReactiveExporter +/// background thread. Send+Sync+Clone — safe to store in AgentSight and call from +/// detect_and_store_interruptions(). +#[derive(Clone)] +pub struct ReactiveNotifier { + tx: SyncSender, +} + +impl ReactiveNotifier { + pub fn notify_interruption(&self, interruption_type: &str, conversation_id: Option) { + let _ = self.tx.try_send(Msg::InterruptionAlert { + interruption_type: interruption_type.to_string(), + conversation_id, + }); + } +} + +pub struct ReactiveExporter { + tx: SyncSender, + shutdown: Arc, + handle: Option>, +} + +impl ReactiveExporter { + pub fn new(config: ReactiveConfig) -> Option<(Self, ReactiveNotifier)> { + if !config.enabled { + return None; + } + + let ws_ckpt_available = Command::new("ws-ckpt") + .arg("--version") + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .is_ok(); + + if !ws_ckpt_available { + log::warn!("[reactive] ws-ckpt not found, checkpoint action disabled"); + } + + let workspace = config + .workspace_path + .or_else(|| std::env::var("AGENTSIGHT_WORKSPACE").ok()) + .unwrap_or_else(|| "/root".to_string()); + + let debounce = Duration::from_secs(config.debounce_secs); + let (tx, rx) = mpsc::sync_channel::(32); + let shutdown = Arc::new(AtomicBool::new(false)); + let shutdown_clone = Arc::clone(&shutdown); + + let handle = thread::Builder::new() + .name("reactive-exporter".into()) + .spawn(move || { + let mut last_ckpt = Instant::now() - debounce; + let mut agent_tokens: StdHashMap = StdHashMap::new(); + let one_hour = Duration::from_secs(3600); + + while !shutdown_clone.load(Ordering::Relaxed) { + let msg = match rx.recv_timeout(Duration::from_secs(1)) { + Ok(m) => m, + Err(mpsc::RecvTimeoutError::Timeout) => continue, + Err(mpsc::RecvTimeoutError::Disconnected) => break, + }; + + match msg { + Msg::Shutdown => break, + Msg::Advisory { message } => { + log::info!("[reactive] advisory: {message}"); + } + Msg::InterruptionAlert { + interruption_type, + conversation_id, + } => { + if last_ckpt.elapsed() < debounce { + log::debug!("[reactive] debounced interruption alert ({interruption_type})"); + continue; + } + if !ws_ckpt_available { + log::info!("[reactive] would checkpoint for {interruption_type} but ws-ckpt unavailable"); + continue; + } + let snapshot_id = format!( + "auto-{}-{}", + chrono::Utc::now().format("%Y%m%dT%H%M%S"), + &interruption_type + ); + let msg_text = format!( + "reactive: {} (conv={})", + interruption_type, + conversation_id.as_deref().unwrap_or("unknown") + ); + match Command::new("ws-ckpt") + .args(["checkpoint", "-w", &workspace, "-i", &snapshot_id, "-m", &msg_text]) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + { + Ok(mut child) => { + let deadline = Instant::now() + Duration::from_secs(10); + loop { + match child.try_wait() { + Ok(Some(s)) if s.success() => { + log::info!("[reactive] checkpoint created: {snapshot_id}"); + last_ckpt = Instant::now(); + break; + } + Ok(Some(s)) => { log::warn!("[reactive] ws-ckpt exited {s}"); break; } + Ok(None) if Instant::now() >= deadline => { + log::warn!("[reactive] ws-ckpt timed out, killing"); + let _ = child.kill(); + let _ = child.wait(); + break; + } + Ok(None) => thread::sleep(Duration::from_millis(100)), + Err(e) => { log::warn!("[reactive] ws-ckpt wait error: {e}"); break; } + } + } + } + Err(e) => log::warn!("[reactive] ws-ckpt spawn failed: {e}"), + } + } + Msg::TokenAccum { + agent_name, + input_tokens, + has_cache, + } => { + let state = agent_tokens.entry(agent_name.clone()).or_insert_with(|| { + AgentTokenState { + cumulative: 0, + any_cache_hit: false, + window_start: Instant::now(), + last_advisory: None, + } + }); + if state.window_start.elapsed() > one_hour { + state.cumulative = 0; + state.any_cache_hit = false; + state.window_start = Instant::now(); + } + state.cumulative += input_tokens; + if has_cache { + state.any_cache_hit = true; + } + if state.cumulative >= 200_000 + && !state.any_cache_hit + && state + .last_advisory + .map_or(true, |t| t.elapsed() > one_hour) + { + log::info!( + "[reactive] advisory: agent '{}' consumed {} input tokens with no prompt caching", + agent_name, state.cumulative + ); + state.last_advisory = Some(Instant::now()); + } + } + Msg::Checkpoint { + reason, + conversation_id, + } => { + if last_ckpt.elapsed() < debounce { + log::debug!("[reactive] debounced checkpoint ({reason})"); + continue; + } + if !ws_ckpt_available { + log::info!( + "[reactive] would checkpoint for {reason} but ws-ckpt unavailable" + ); + continue; + } + let snapshot_id = format!( + "auto-{}-{}", + chrono::Utc::now().format("%Y%m%dT%H%M%S"), + &reason + ); + let msg_text = format!( + "reactive: {} (conv={})", + reason, + conversation_id.as_deref().unwrap_or("unknown") + ); + match Command::new("ws-ckpt") + .args(["checkpoint", "-w", &workspace, "-i", &snapshot_id, "-m", &msg_text]) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + { + Ok(mut child) => { + // Poll with timeout: try_wait in a loop up to 10s. + // Avoids blocking indefinitely if ws-ckpt hangs. + let deadline = Instant::now() + Duration::from_secs(10); + loop { + match child.try_wait() { + Ok(Some(status)) if status.success() => { + log::info!("[reactive] checkpoint created: {snapshot_id}"); + last_ckpt = Instant::now(); + break; + } + Ok(Some(status)) => { + log::warn!("[reactive] ws-ckpt exited {status}"); + break; + } + Ok(None) if Instant::now() >= deadline => { + log::warn!("[reactive] ws-ckpt timed out, killing"); + let _ = child.kill(); + let _ = child.wait(); + break; + } + Ok(None) => { + thread::sleep(Duration::from_millis(100)); + } + Err(e) => { + log::warn!("[reactive] ws-ckpt wait error: {e}"); + break; + } + } + } + } + Err(e) => log::warn!("[reactive] ws-ckpt spawn failed: {e}"), + } + } + } + } + }) + .ok()?; + + let notifier = ReactiveNotifier { tx: tx.clone() }; + Some(( + Self { + tx, + shutdown, + handle: Some(handle), + }, + notifier, + )) + } + + /// Send an interruption alert from the existing detection pipeline. + /// Called by unified.rs after detect_and_store_interruptions() for Critical events. + pub fn notify_interruption(&self, interruption_type: &str, conversation_id: Option) { + let _ = self.tx.try_send(Msg::InterruptionAlert { + interruption_type: interruption_type.to_string(), + conversation_id, + }); + } + + fn detect_critical(events: &[GenAISemanticEvent]) -> Option<(String, Option)> { + for event in events { + if let GenAISemanticEvent::LLMCall(call) = event { + let conv_id = call.metadata.get("conversation_id").cloned(); + if let Some(ref err) = call.error { + let lower = err.to_lowercase(); + if lower.contains("crash") + || lower.contains("oom") + || lower.contains("sigkill") + || lower.contains("signal 9") + { + return Some(("agent_crash".into(), conv_id)); + } + if lower.contains("context_length_exceeded") + || lower.contains("context_window") + || lower.contains("maximum context length") + { + return Some(("context_overflow".into(), conv_id)); + } + } + } + } + None + } + +} + +impl GenAIExporter for ReactiveExporter { + fn name(&self) -> &str { + "reactive" + } + + fn export(&self, events: &[GenAISemanticEvent]) { + if let Some((reason, conv_id)) = Self::detect_critical(events) { + let _ = self.tx.try_send(Msg::Checkpoint { + reason, + conversation_id: conv_id, + }); + } + // Per-call token accumulation for cumulative advisory + for event in events { + if let GenAISemanticEvent::LLMCall(call) = event { + if let Some(ref usage) = call.token_usage { + let has_cache = usage.cache_read_input_tokens.unwrap_or(0) > 0 + || usage.cache_creation_input_tokens.unwrap_or(0) > 0; + let _ = self.tx.try_send(Msg::TokenAccum { + agent_name: call + .agent_name + .clone() + .unwrap_or_else(|| call.process_name.clone()), + input_tokens: usage.input_tokens as u64, + has_cache, + }); + } + } + } + } +} + +impl Drop for ReactiveExporter { + fn drop(&mut self) { + self.shutdown.store(true, Ordering::Relaxed); + let _ = self.tx.try_send(Msg::Shutdown); + if let Some(h) = self.handle.take() { + let _ = h.join(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::genai::semantic::{GenAISemanticEvent, LLMCall, LLMRequest, LLMResponse, TokenUsage}; + use std::collections::HashMap; + + fn make_call(error: Option<&str>, input_tokens: u32, cache_read: Option) -> GenAISemanticEvent { + let mut metadata = HashMap::new(); + metadata.insert("conversation_id".to_string(), "conv-1".to_string()); + GenAISemanticEvent::LLMCall(LLMCall { + call_id: "test".into(), + start_timestamp_ns: 0, + end_timestamp_ns: 0, + duration_ns: 0, + provider: "openai".into(), + model: "gpt-4".into(), + request: LLMRequest { + messages: vec![], + temperature: None, + max_tokens: None, + frequency_penalty: None, + presence_penalty: None, + top_p: None, + top_k: None, + seed: None, + stop_sequences: None, + stream: false, + tools: None, + raw_body: None, + }, + response: LLMResponse { + messages: vec![], + streamed: false, + raw_body: None, + }, + token_usage: Some(TokenUsage { + input_tokens, + output_tokens: 100, + total_tokens: input_tokens + 100, + cache_creation_input_tokens: None, + cache_read_input_tokens: cache_read, + }), + error: error.map(String::from), + pid: 1234, + process_name: "test-agent".into(), + agent_name: Some("TestAgent".into()), + metadata, + }) + } + + #[test] + fn detect_critical_finds_crash() { + let events = vec![make_call(Some("process crashed with OOM killer"), 1000, None)]; + let result = ReactiveExporter::detect_critical(&events); + assert!(result.is_some()); + let (reason, conv) = result.unwrap(); + assert_eq!(reason, "agent_crash"); + assert_eq!(conv.as_deref(), Some("conv-1")); + } + + #[test] + fn detect_critical_ignores_normal_errors() { + let events = vec![make_call(Some("HTTP 429 rate limited"), 1000, None)]; + assert!(ReactiveExporter::detect_critical(&events).is_none()); + } + + #[test] + fn detect_critical_ignores_no_error() { + let events = vec![make_call(None, 1000, None)]; + assert!(ReactiveExporter::detect_critical(&events).is_none()); + } + + #[test] + fn detect_critical_finds_context_overflow() { + let events = vec![make_call( + Some("This model's maximum context length is 128000 tokens"), + 1000, + None, + )]; + let result = ReactiveExporter::detect_critical(&events); + assert!(result.is_some()); + let (reason, _) = result.unwrap(); + assert_eq!(reason, "context_overflow"); + } + + #[test] + fn detect_critical_finds_context_length_exceeded() { + let events = vec![make_call( + Some("context_length_exceeded: input too long"), + 1000, + None, + )]; + let (reason, _) = ReactiveExporter::detect_critical(&events).unwrap(); + assert_eq!(reason, "context_overflow"); + } + + #[test] + fn notify_interruption_does_not_panic() { + let config = ReactiveConfig { + enabled: true, + debounce_secs: 1, + workspace_path: Some("/tmp".to_string()), + }; + if let Some((exporter, _notifier)) = ReactiveExporter::new(config) { + exporter.notify_interruption("retry_storm", Some("conv-99".into())); + std::thread::sleep(Duration::from_millis(100)); + drop(exporter); + } + } + + #[test] + fn export_does_not_panic_on_disabled() { + let config = ReactiveConfig { + enabled: false, + ..Default::default() + }; + assert!(ReactiveExporter::new(config).is_none()); + } + + /// Integration test: export a crash event → background thread processes it → + /// ws-ckpt is spawned (will fail because daemon isn't running, but spawn + + /// timeout + kill must complete without panicking or hanging). + /// Also tests debounce: second call within debounce window is dropped. + #[test] + fn export_crash_event_triggers_checkpoint_attempt() { + use crate::genai::exporter::GenAIExporter; + + let config = ReactiveConfig { + enabled: true, + debounce_secs: 2, + workspace_path: Some("/tmp".to_string()), + }; + + // new() probes for ws-ckpt binary. If not installed, skip gracefully. + let exporter = match ReactiveExporter::new(config) { + Some((e, _)) => e, + None => { + eprintln!("ws-ckpt not installed, skipping integration test"); + return; + } + }; + + let crash_event = make_call(Some("Process killed by OOM killer"), 1000, None); + let events = vec![crash_event]; + + // First export: should trigger checkpoint attempt + exporter.export(&events); + + // Give background thread time to spawn ws-ckpt + timeout. + // ws-ckpt without a running daemon hangs on socket connect until our + // 10s try_wait deadline kills it, so we need to wait >= 11s. + std::thread::sleep(Duration::from_secs(13)); + + // Second export within debounce window: should be debounced (no second spawn) + let crash_event2 = make_call(Some("Another OOM crash"), 1000, None); + exporter.export(&[crash_event2]); + std::thread::sleep(Duration::from_millis(200)); + + // Drop should complete promptly. The background thread either: + // - Is idle (debounced the second message) → exits on Shutdown within 1s + // - Is in try_wait loop for a second ws-ckpt → has up to 10s before it + // checks shutdown. We allow 12s total for Drop. + let start = std::time::Instant::now(); + drop(exporter); + let drop_time = start.elapsed(); + assert!( + drop_time < Duration::from_secs(12), + "Drop took too long ({drop_time:?}), background thread stuck" + ); + } + + /// Integration: context_overflow event triggers the full pipeline + /// (export → channel → background thread → ws-ckpt spawn attempt). + #[test] + fn export_context_overflow_triggers_checkpoint() { + use crate::genai::exporter::GenAIExporter; + + let config = ReactiveConfig { + enabled: true, + debounce_secs: 2, + workspace_path: Some("/tmp".to_string()), + }; + let exporter = match ReactiveExporter::new(config) { + Some((e, _)) => e, + None => { + eprintln!("ws-ckpt not installed, skipping"); + return; + } + }; + + let overflow_event = make_call( + Some("This model's maximum context length is 128000 tokens. You requested 200000."), + 1000, + None, + ); + exporter.export(&[overflow_event]); + + // Wait for ws-ckpt spawn + timeout + std::thread::sleep(Duration::from_secs(13)); + let start = std::time::Instant::now(); + drop(exporter); + assert!(start.elapsed() < Duration::from_secs(5)); + } + + /// Integration: notify_interruption triggers checkpoint attempt + /// (simulates unified.rs forwarding a RetryStorm detection). + #[test] + fn notify_interruption_triggers_checkpoint() { + let config = ReactiveConfig { + enabled: true, + debounce_secs: 2, + workspace_path: Some("/tmp".to_string()), + }; + let exporter = match ReactiveExporter::new(config) { + Some((e, _)) => e, + None => { + eprintln!("ws-ckpt not installed, skipping"); + return; + } + }; + + exporter.notify_interruption("retry_storm", Some("conv-42".into())); + + // Wait for ws-ckpt attempt + timeout + std::thread::sleep(Duration::from_secs(13)); + let start = std::time::Instant::now(); + drop(exporter); + assert!(start.elapsed() < Duration::from_secs(5)); + } + + /// Integration: cumulative token advisory fires after 200K input tokens + /// with no cache, debounced per-agent per-hour. + #[test] + fn cumulative_advisory_fires_at_threshold() { + use crate::genai::exporter::GenAIExporter; + + let config = ReactiveConfig { + enabled: true, + debounce_secs: 60, + workspace_path: Some("/tmp".to_string()), + }; + let exporter = match ReactiveExporter::new(config) { + Some((e, _)) => e, + None => { + eprintln!("ws-ckpt not installed, skipping"); + return; + } + }; + + // Send 5 calls × 50K tokens = 250K total, no cache + for _ in 0..5 { + let event = make_call(None, 50_000, None); + exporter.export(&[event]); + } + + // Give background thread time to process all TokenAccum messages + std::thread::sleep(Duration::from_millis(500)); + + // Send one with cache → should NOT reset (any_cache_hit is per-window) + // Actually it WILL set any_cache_hit=true for this agent. But advisory + // already fired (if at all) after the 4th message (200K reached). + // The test validates the pipeline doesn't panic and processes correctly. + + let cached_event = make_call(None, 10_000, Some(5_000)); + exporter.export(&[cached_event]); + std::thread::sleep(Duration::from_millis(200)); + + // Clean shutdown + let start = std::time::Instant::now(); + drop(exporter); + assert!( + start.elapsed() < Duration::from_secs(3), + "Drop should be fast (no ws-ckpt in this test path)" + ); + } +} diff --git a/src/agentsight/src/unified.rs b/src/agentsight/src/unified.rs index 1b57c76a..bae91d7e 100644 --- a/src/agentsight/src/unified.rs +++ b/src/agentsight/src/unified.rs @@ -103,6 +103,8 @@ pub struct AgentSight { deadloop_kill_enabled: bool, /// DeadLoop auto-kill: trigger threshold (kill after N detections) deadloop_kill_after_count: usize, + /// Notifier for forwarding Critical interruptions to ReactiveExporter + reactive_notifier: Option, } /// GenAI events waiting for session_id resolution via ResponseSessionMapper. @@ -381,6 +383,24 @@ impl AgentSight { } } + // Register ReactiveExporter (observe→act: checkpoint on critical interruptions) + let reactive_notifier = { + let reactive_config = crate::genai::reactive::ReactiveConfig { + enabled: config.reactive_enabled.unwrap_or(false), + debounce_secs: config.reactive_debounce_secs.unwrap_or(30), + workspace_path: config.reactive_workspace.clone(), + }; + if let Some((exporter, notifier)) = + crate::genai::reactive::ReactiveExporter::new(reactive_config) + { + log::info!("ReactiveExporter enabled"); + genai_exporters.push(Box::new(exporter)); + Some(notifier) + } else { + None + } + }; + // Create analyzer with tokenizer if configured let analyzer = if let Some(ref tokenizer_path) = config.tokenizer_path { if Path::new(tokenizer_path).exists() { @@ -504,6 +524,7 @@ impl AgentSight { pending_logtail, deadloop_kill_enabled: config.deadloop_kill_enabled, deadloop_kill_after_count: config.deadloop_kill_after_count, + reactive_notifier, }) } @@ -1082,6 +1103,9 @@ impl AgentSight { "RetryStorm detected: {} × {:?} in conversation {}", count, ie.interruption_type, cid ); + if let Some(ref n) = self.reactive_notifier { + n.notify_interruption("retry_storm", Some(cid.to_string())); + } } } } @@ -1141,6 +1165,9 @@ impl AgentSight { "DeadLoop detected in conversation {}: {:?}", cid, loop_event.detail ); + if let Some(ref n) = self.reactive_notifier { + n.notify_interruption("dead_loop", Some(cid.to_string())); + } // ── Auto-kill 止血 ── if self.deadloop_kill_enabled {