diff --git a/Cargo.lock b/Cargo.lock index c38269f..2da6d00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -302,11 +302,11 @@ dependencies = [ [[package]] name = "bridge-echo" -version = "0.2.0" -source = "git+https://github.com/dnacenta/bridge-echo?branch=development#31cefd8a50b0cab0ff12f1f24ff66b679bf13c7f" +version = "0.3.0" +source = "git+https://github.com/dnacenta/bridge-echo?branch=development#8a1cccbc59a57b585fe73d1eeedd3825f316ed55" dependencies = [ "axum", - "echo-system-types", + "echo-system-types 0.3.0", "regex", "reqwest", "serde", @@ -405,10 +405,10 @@ dependencies = [ [[package]] name = "chat-echo" version = "0.2.2" -source = "git+https://github.com/dnacenta/chat-echo?tag=v0.2.2#144f1d298c0b4df7d48faf1ea118f030f50826ea" +source = "git+https://github.com/dnacenta/chat-echo?branch=development#db49c536e3dbf8248cf60f47da6613f16d978765" dependencies = [ "axum", - "echo-system-types", + "echo-system-types 0.3.0", "futures-util", "reqwest", "serde", @@ -899,15 +899,15 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] name = "discord-echo" version = "0.1.0" -source = "git+https://github.com/dnacenta/discord-echo?tag=v0.1.0#2af8efbe6b3f175df73bb433cb68eaf002c3622b" +source = "git+https://github.com/dnacenta/discord-echo?tag=v0.1.0#f03166b5ed1dfadd50c3c590c4991518b00e0643" dependencies = [ - "echo-system-types", + "echo-system-types 0.2.0", "futures-util", "reqwest", "serde", @@ -919,13 +919,13 @@ dependencies = [ [[package]] name = "discord-voice-echo" -version = "0.1.0" -source = "git+https://github.com/dnacenta/discord-voice-echo?branch=development#18f4759f8f388007f77cb321eb409767afc48bbc" +version = "0.2.0" +source = "git+https://github.com/dnacenta/discord-voice-echo?branch=development#8f8a115135f50460d5d68d640c97af06860c94df" dependencies = [ "async-trait", "base64", "dotenvy", - "echo-system-types", + "echo-system-types 0.2.0", "futures-util", "hound", "rubato 1.0.1", @@ -992,6 +992,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "echo-system-types" +version = "0.3.0" +source = "git+https://github.com/dnacenta/echo-system-types?tag=v0.3.0#ad901031f50fe6f0bef5a9476bbc97a778d00124" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "ed25519" version = "2.2.3" @@ -1072,7 +1081,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1727,7 +1736,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.2", "system-configuration", "tokio", "tower-service", @@ -2355,7 +2364,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2855,12 +2864,12 @@ dependencies = [ [[package]] name = "praxis-echo" -version = "0.1.0" -source = "git+https://github.com/dnacenta/praxis-echo?branch=development#352677d2f564ab069237d34e6f0b49e127904736" +version = "0.3.0" +source = "git+https://github.com/dnacenta/praxis-echo?branch=development#099e69ace1a38ea914d9839d14ea3b14289b48b0" dependencies = [ "clap", "dirs", - "echo-system-types", + "echo-system-types 0.3.0", "serde", "serde_json", ] @@ -2926,19 +2935,19 @@ dependencies = [ [[package]] name = "pulse-echo" -version = "0.1.0" -source = "git+https://github.com/dnacenta/pulse-echo?tag=v0.1.0#1c1c2e84e123d15f64069b0ef91fd2e34aac21b8" +version = "0.2.0" +source = "git+https://github.com/dnacenta/pulse-echo?branch=development#08e01d1fc7c77e1769717ac13d8e941a2efdb5fe" dependencies = [ "chrono", "dirs", - "echo-system-types", + "echo-system-types 0.3.0", "serde", "serde_json", ] [[package]] name = "pulse-null" -version = "0.9.1" +version = "0.10.0" dependencies = [ "axum", "bridge-echo", @@ -2951,8 +2960,9 @@ dependencies = [ "dialoguer", "discord-echo", "discord-voice-echo", - "echo-system-types", + "echo-system-types 0.3.0", "globset", + "libc", "praxis-echo", "pulse-echo", "recall-echo", @@ -2985,7 +2995,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls 0.23.37", - "socket2 0.5.10", + "socket2 0.6.2", "thiserror 2.0.18", "tokio", "tracing", @@ -3022,16 +3032,16 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.2", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] name = "quote" -version = "1.0.44" +version = "1.0.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b2ebcf727b7760c461f091f9f0f539b77b8e87f2fd88131e7f1b433b3cece4" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" dependencies = [ "proc-macro2", ] @@ -3138,12 +3148,13 @@ dependencies = [ [[package]] name = "recall-echo" -version = "0.6.0" -source = "git+https://github.com/dnacenta/recall-echo?branch=development#880c876d6be6aaa2be9514cb89fc0f64e183f842" +version = "1.0.0" +source = "git+https://github.com/dnacenta/recall-echo?branch=development#f9ee77a733403bf9d63fb2b4e6306587032b72fb" dependencies = [ "clap", "dirs", - "echo-system-types", + "echo-system-types 0.3.0", + "serde", "serde_json", ] @@ -3376,7 +3387,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4191,7 +4202,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4495,7 +4506,7 @@ dependencies = [ "serde_spanned", "toml_datetime", "toml_write", - "winnow 0.7.14", + "winnow 0.7.15", ] [[package]] @@ -4911,12 +4922,12 @@ checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] name = "vigil-echo" -version = "0.1.0" -source = "git+https://github.com/dnacenta/vigil-echo?branch=development#dfe0e76ba6a4c8764ea18dd77e2d3d2abe3f4122" +version = "0.3.0" +source = "git+https://github.com/dnacenta/vigil-echo?branch=development#208986acf5d28ce722205962a2b227662b9ccd69" dependencies = [ "clap", "dirs", - "echo-system-types", + "echo-system-types 0.3.0", "owo-colors", "serde", "serde_json", @@ -4936,15 +4947,15 @@ dependencies = [ [[package]] name = "voice-echo" -version = "0.6.0" -source = "git+https://github.com/dnacenta/voice-echo?branch=development#0460cd39f97e1aecfb55ecb9a7af8b16ec67dd9a" +version = "0.7.0" +source = "git+https://github.com/dnacenta/voice-echo?branch=development#06a978e0201f3c6ab39afcb194b9457b08eb78da" dependencies = [ "axum", "base64", "bytes", "chrono", "dotenvy", - "echo-system-types", + "echo-system-types 0.2.0", "hound", "rand 0.8.5", "reqwest", @@ -5188,7 +5199,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5584,9 +5595,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.7.14" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a5364e9d77fcdeeaa6062ced926ee3381faa2ee02d3eb83a5c27a8825540829" +checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 0f63f13..892e75b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pulse-null" -version = "0.9.1" +version = "0.10.0" edition = "2021" rust-version = "1.80" license = "AGPL-3.0-only" @@ -51,17 +51,18 @@ chrono-tz = "0.10" # Misc uuid = { version = "1", features = ["v4"] } chrono = { version = "0.4", features = ["serde"] } +libc = "0.2" # Shared types -echo-system-types = { git = "https://github.com/dnacenta/echo-system-types", tag = "v0.2.0" } +echo-system-types = { git = "https://github.com/dnacenta/echo-system-types", tag = "v0.3.0" } # Core dependencies (always included — these define an entity) vigil-echo = { git = "https://github.com/dnacenta/vigil-echo", branch = "development" } praxis-echo = { git = "https://github.com/dnacenta/praxis-echo", branch = "development" } -chat-echo = { git = "https://github.com/dnacenta/chat-echo", tag = "v0.2.2" } +chat-echo = { git = "https://github.com/dnacenta/chat-echo", branch = "development" } bridge-echo = { git = "https://github.com/dnacenta/bridge-echo", branch = "development" } recall-echo = { git = "https://github.com/dnacenta/recall-echo", branch = "development" } -pulse-echo = { git = "https://github.com/dnacenta/pulse-echo", tag = "v0.1.0" } +pulse-echo = { git = "https://github.com/dnacenta/pulse-echo", branch = "development" } # Plugins (optional — different interfaces to speak through) voice-echo = { git = "https://github.com/dnacenta/voice-echo", branch = "development", optional = true } diff --git a/src/chat/banner.rs b/src/chat/banner.rs index e3f92ff..5293c25 100644 --- a/src/chat/banner.rs +++ b/src/chat/banner.rs @@ -1,7 +1,8 @@ use std::path::Path; -use praxis_echo::runtime::{self as pipeline, ThresholdStatus}; -use vigil_echo::runtime::{self as vigil, CognitiveStatus, Trend}; +use echo_system_types::monitoring::{ + CognitiveMonitor, CognitiveStatus, DocumentHealth, PipelineMonitor, ThresholdStatus, Trend, +}; use crate::config::Config; @@ -53,21 +54,23 @@ pub fn render(config: &Config, root_dir: &Path, plugin_count: usize) { // Pipeline health if config.pipeline.enabled { - render_pipeline(root_dir, config); + let monitor = praxis_echo::runtime::PraxisMonitor::new(); + render_pipeline(root_dir, config, &monitor); } // Cognitive health if config.monitoring.enabled { - render_vigil(root_dir, config); + let monitor = vigil_echo::runtime::VigilMonitor::new(); + render_vigil(root_dir, config, &monitor); } println!(); } /// Render pipeline document progress bars. -fn render_pipeline(root_dir: &Path, config: &Config) { +fn render_pipeline(root_dir: &Path, config: &Config, monitor: &dyn PipelineMonitor) { let thresholds = config.pipeline.to_thresholds(); - let health = pipeline::calculate(root_dir, &thresholds); + let health = monitor.calculate(root_dir, &thresholds); println!(); @@ -83,7 +86,7 @@ fn render_pipeline(root_dir: &Path, config: &Config) { } /// Print a single document progress bar with color. -fn print_doc_bar(name: &str, doc: &pipeline::DocumentHealth) { +fn print_doc_bar(name: &str, doc: &DocumentHealth) { let bar = status_bar(doc.count, doc.hard); let count_label = format!("{}/{}", doc.count, doc.hard); @@ -118,8 +121,8 @@ fn status_bar(count: usize, hard: usize) -> String { } /// Render cognitive health signals. -fn render_vigil(root_dir: &Path, config: &Config) { - let health = vigil::assess( +fn render_vigil(root_dir: &Path, config: &Config, monitor: &dyn CognitiveMonitor) { + let health = monitor.assess( root_dir, config.monitoring.window_size, config.monitoring.min_samples, diff --git a/src/chat/repl.rs b/src/chat/repl.rs index e4d09f3..29daabf 100644 --- a/src/chat/repl.rs +++ b/src/chat/repl.rs @@ -60,6 +60,18 @@ pub async fn run( content: MessageContent::Text(input.to_string()), }); + // Compact conversation if approaching context budget + crate::context::compact_if_needed( + &mut conversation, + provider, + config.llm.context_budget, + config.llm.max_tokens, + root_dir, + entity_name, + "repl", + ) + .await; + // Tool definitions let tool_defs = if provider.supports_tools() && !tools.is_empty() { Some(tools.definitions()) @@ -169,76 +181,14 @@ pub async fn run( } } } - - // Keep conversation bounded - if conversation.len() > 100 { - let drain_count = conversation.len() - 100; - conversation.drain(..drain_count); - } } - // Save session to EPHEMERAL.md - save_session(root_dir, entity_name, &conversation); + // Archive full conversation + write EPHEMERAL summary + crate::session::end_session(root_dir, entity_name, &conversation, "repl", "session-end"); Ok(()) } -/// Save a brief session summary to EPHEMERAL.md. -fn save_session(root_dir: &Path, entity_name: &str, conversation: &[Message]) { - // Only save if there was actual conversation - let message_count = conversation.len(); - if message_count == 0 { - return; - } - - let ephemeral_path = root_dir.join("EPHEMERAL.md"); - - let now = chrono::Utc::now().format("%Y-%m-%d %H:%M UTC"); - - // Collect user messages for the summary - let user_messages: Vec<&str> = conversation - .iter() - .filter_map(|m| { - if matches!(m.role, Role::User) { - if let MessageContent::Text(ref t) = m.content { - Some(t.as_str()) - } else { - None - } - } else { - None - } - }) - .collect(); - - let topics: Vec<&str> = user_messages.iter().take(5).copied().collect(); - - let mut content = format!("## CLI Chat Session — {}\n\n", now); - content.push_str(&format!( - "Conversation with {} ({} messages)\n\n", - entity_name, message_count - )); - content.push_str("### Topics discussed\n\n"); - for topic in &topics { - // Truncate long messages - let display = if topic.len() > 80 { - format!("{}...", &topic[..77]) - } else { - topic.to_string() - }; - content.push_str(&format!("- {}\n", display)); - } - if user_messages.len() > 5 { - content.push_str(&format!("- ...and {} more\n", user_messages.len() - 5)); - } - - if let Err(e) = std::fs::write(&ephemeral_path, content) { - eprintln!(" \x1b[33mwarning\x1b[0m could not save session: {}", e); - } else { - println!(" \x1b[2msession saved to EPHEMERAL.md\x1b[0m"); - } -} - /// Print a tool execution indicator (dimmed). fn print_tool_indicator(name: &str, input: &serde_json::Value) { let detail = match name { diff --git a/src/cli/archive.rs b/src/cli/archive.rs index 65d7049..3668c1e 100644 --- a/src/cli/archive.rs +++ b/src/cli/archive.rs @@ -1,12 +1,16 @@ use console::style; +use echo_system_types::monitoring::PipelineMonitor; +use praxis_echo::runtime::PraxisMonitor; + use crate::config::Config; pub async fn list(document: Option) -> Result<(), Box> { let config = Config::load()?; let root_dir = config.root_dir()?; - let files = praxis_echo::runtime::list_archives(&root_dir, document.as_deref())?; + let monitor = PraxisMonitor::new(); + let files = monitor.list_archives(&root_dir, document.as_deref())?; println!(); if files.is_empty() { @@ -27,7 +31,8 @@ pub async fn run(document: String) -> Result<(), Box> { let config = Config::load()?; let root_dir = config.root_dir()?; - let result = praxis_echo::runtime::archive_document_by_name(&root_dir, &document)?; + let monitor = PraxisMonitor::new(); + let result = monitor.archive_by_name(&root_dir, &document)?; println!(); println!(" {} {}", style("✓").green(), result); println!(); diff --git a/src/cli/chat.rs b/src/cli/chat.rs index c2ac8fc..605d84a 100644 --- a/src/cli/chat.rs +++ b/src/cli/chat.rs @@ -15,7 +15,7 @@ pub async fn run() -> Result<(), Box> { // Build system prompt from identity documents let root_dir = config.root_dir()?; - let system_prompt = prompt::build_system_prompt(&root_dir, &config)?; + let system_prompt = prompt::build_system_prompt(&root_dir, &config, None, None)?; // Register tools let mut tools = ToolRegistry::new(); diff --git a/src/cli/down.rs b/src/cli/down.rs index ba9484a..09218ee 100644 --- a/src/cli/down.rs +++ b/src/cli/down.rs @@ -1,6 +1,40 @@ +use crate::config::Config; +use crate::pidfile; + pub async fn run() -> Result<(), Box> { - // Phase 1: simple — just tell the user to Ctrl+C - // Future: PID file, graceful shutdown signal - eprintln!("Use Ctrl+C to stop a running entity, or kill the process."); + let config = Config::load()?; + let root_dir = config.root_dir()?; + + let pid = match pidfile::read(&root_dir) { + Some(pid) => pid, + None => { + eprintln!("No running entity found (no PID file)."); + return Ok(()); + } + }; + + if !pidfile::is_alive(pid) { + eprintln!("Entity is not running (stale PID file, pid {}).", pid); + pidfile::remove(&root_dir); + return Ok(()); + } + + println!("Stopping entity (pid {})...", pid); + + if !pidfile::kill(pid) { + return Err(format!("Failed to send SIGTERM to pid {}", pid).into()); + } + + // Wait up to 10 seconds for the process to exit + for _ in 0..100 { + if !pidfile::is_alive(pid) { + pidfile::remove(&root_dir); + println!("Entity stopped."); + return Ok(()); + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + eprintln!("Entity did not stop within 10 seconds (pid {}).", pid); Ok(()) } diff --git a/src/cli/mod.rs b/src/cli/mod.rs index bb62308..c8918dc 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -5,6 +5,7 @@ pub mod init; pub mod intent; pub mod pipeline; pub mod plugin; +pub mod recall; pub mod schedule; pub mod status; pub mod up; diff --git a/src/cli/pipeline.rs b/src/cli/pipeline.rs index 61cfd17..6d88f0b 100644 --- a/src/cli/pipeline.rs +++ b/src/cli/pipeline.rs @@ -1,5 +1,7 @@ use console::style; -use praxis_echo::runtime::{self, ThresholdStatus}; + +use echo_system_types::monitoring::{DocumentHealth, PipelineMonitor, ThresholdStatus}; +use praxis_echo::runtime::PraxisMonitor; use crate::config::Config; @@ -12,9 +14,10 @@ pub async fn health_cmd() -> Result<(), Box> { return Ok(()); } + let monitor = PraxisMonitor::new(); let thresholds = config.pipeline.to_thresholds(); - let pipeline_health = runtime::calculate(&root_dir, &thresholds); - let state = runtime::PipelineState::load(&root_dir); + let pipeline_health = monitor.calculate(&root_dir, &thresholds); + let state = monitor.load_state(&root_dir); println!(); println!(" {}", style("Pipeline Health").bold()); @@ -43,7 +46,7 @@ pub async fn health_cmd() -> Result<(), Box> { Ok(()) } -fn print_doc_status(name: &str, doc: &runtime::DocumentHealth) { +fn print_doc_status(name: &str, doc: &DocumentHealth) { let status_color = match doc.status { ThresholdStatus::Green => style(format!("{}/{}", doc.count, doc.hard)).green(), ThresholdStatus::Yellow => style(format!("{}/{}", doc.count, doc.hard)).yellow(), diff --git a/src/cli/recall.rs b/src/cli/recall.rs new file mode 100644 index 0000000..07d1e73 --- /dev/null +++ b/src/cli/recall.rs @@ -0,0 +1,29 @@ +use recall_echo::RecallEcho; + +use crate::config::Config; + +/// Build a RecallEcho instance with pulse-null's directory layout. +fn build_recall(config: &Config) -> Result> { + let root_dir = config.root_dir()?; + Ok(RecallEcho::new(root_dir)) +} + +pub async fn dashboard_cmd() -> Result<(), Box> { + let config = Config::load()?; + let recall = build_recall(&config)?; + let status = recall.health(); + println!("Memory health: {:?}", status); + Ok(()) +} + +pub async fn search(query: String, ranked: bool) -> Result<(), Box> { + if ranked { + recall_echo::search::run_ranked(&query, 10).map_err(|e| e.into()) + } else { + recall_echo::search::run(&query, 2).map_err(|e| e.into()) + } +} + +pub async fn distill() -> Result<(), Box> { + recall_echo::distill::run().map_err(|e| e.into()) +} diff --git a/src/cli/status.rs b/src/cli/status.rs index f7e1298..c877130 100644 --- a/src/cli/status.rs +++ b/src/cli/status.rs @@ -1,7 +1,9 @@ use crate::config::Config; +use crate::pidfile; pub async fn run() -> Result<(), Box> { let config = Config::load()?; + let root_dir = config.root_dir()?; println!("Entity: {}", config.entity.name); println!( @@ -26,19 +28,26 @@ pub async fn run() -> Result<(), Box> { ); } - // Check if server is running - let url = format!( - "http://{}:{}/health", - config.server.host, config.server.port - ); - match reqwest::get(&url).await { - Ok(resp) if resp.status().is_success() => { - println!("Status: RUNNING"); + // Check PID file first, then fall back to health endpoint + let status = match pidfile::read(&root_dir) { + Some(pid) if pidfile::is_alive(pid) => format!("RUNNING (pid {})", pid), + Some(pid) => { + pidfile::remove(&root_dir); + format!("STOPPED (stale pid {})", pid) } - _ => { - println!("Status: STOPPED"); + None => { + // No PID file — try health endpoint as fallback + let url = format!( + "http://{}:{}/health", + config.server.host, config.server.port + ); + match reqwest::get(&url).await { + Ok(resp) if resp.status().is_success() => "RUNNING".to_string(), + _ => "STOPPED".to_string(), + } } - } + }; + println!("Status: {}", status); Ok(()) } diff --git a/src/config/mod.rs b/src/config/mod.rs index a1c3378..60d44f7 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -55,6 +55,9 @@ pub struct LlmConfig { pub model: String, #[serde(default = "default_max_tokens")] pub max_tokens: u32, + /// Maximum estimated tokens in conversation before compaction triggers (0 = default 150k). + #[serde(default)] + pub context_budget: usize, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -236,9 +239,9 @@ pub struct PipelineConfig { } impl PipelineConfig { - /// Convert to praxis-echo runtime thresholds. - pub fn to_thresholds(&self) -> praxis_echo::runtime::Thresholds { - praxis_echo::runtime::Thresholds { + /// Convert to shared pipeline thresholds. + pub fn to_thresholds(&self) -> echo_system_types::monitoring::PipelineThresholds { + echo_system_types::monitoring::PipelineThresholds { learning_soft: self.learning_soft, learning_hard: self.learning_hard, thoughts_soft: self.thoughts_soft, diff --git a/src/context.rs b/src/context.rs new file mode 100644 index 0000000..a631b32 --- /dev/null +++ b/src/context.rs @@ -0,0 +1,200 @@ +use std::path::Path; + +use echo_system_types::llm::{ContentBlock, LmProvider, Message, MessageContent, Role}; + +/// Default context budget in estimated tokens (leaves room for system prompt + response). +const DEFAULT_CONTEXT_BUDGET: usize = 150_000; + +/// How many of the most recent messages to always keep uncompacted. +const KEEP_RECENT: usize = 20; + +/// Minimum messages before compaction is even considered. +const MIN_MESSAGES_FOR_COMPACTION: usize = 30; + +/// Rough chars-per-token estimate for English text. +const CHARS_PER_TOKEN: usize = 4; + +/// Estimate the token count of a single message. +pub fn estimate_message_tokens(msg: &Message) -> usize { + let chars = match &msg.content { + MessageContent::Text(s) => s.len(), + MessageContent::Blocks(blocks) => blocks + .iter() + .map(|block| match block { + ContentBlock::Text { text } => text.len(), + ContentBlock::ToolUse { name, input, .. } => name.len() + input.to_string().len(), + ContentBlock::ToolResult { content, .. } => content.len(), + }) + .sum(), + }; + // Add overhead for role/structure (~20 tokens) + (chars / CHARS_PER_TOKEN) + 20 +} + +/// Estimate the total token count of a conversation. +pub fn estimate_conversation_tokens(conversation: &[Message]) -> usize { + conversation.iter().map(estimate_message_tokens).sum() +} + +/// Extract text content from a message for summarization purposes. +fn message_to_text(msg: &Message) -> String { + match &msg.content { + MessageContent::Text(s) => s.clone(), + MessageContent::Blocks(blocks) => blocks + .iter() + .filter_map(|block| match block { + ContentBlock::Text { text } => Some(text.as_str()), + ContentBlock::ToolUse { name, .. } => Some(name.as_str()), + ContentBlock::ToolResult { content, .. } => { + // Truncate large tool results in the summary input + if content.len() > 500 { + None + } else { + Some(content.as_str()) + } + } + }) + .collect::>() + .join(" "), + } +} + +/// Build a summarization prompt from the messages being compacted. +fn build_summary_prompt(messages: &[Message]) -> String { + let mut lines = Vec::new(); + for msg in messages { + let role = match msg.role { + Role::User => "User", + Role::Assistant => "Assistant", + }; + let text = message_to_text(msg); + if !text.is_empty() { + // Truncate extremely long messages in the summarization input + let truncated = if text.len() > 2000 { + format!("{}...", &text[..1997]) + } else { + text + }; + lines.push(format!("{}: {}", role, truncated)); + } + } + lines.join("\n") +} + +/// Compact a conversation by summarizing older messages. +/// +/// If the conversation is under the token budget or too short, returns it unchanged. +/// Otherwise, summarizes the oldest messages (keeping the most recent ones intact) +/// and replaces them with a single summary message. +pub async fn compact_if_needed( + conversation: &mut Vec, + provider: &dyn LmProvider, + context_budget: usize, + max_tokens: u32, + root_dir: &Path, + entity_name: &str, + channel: &str, +) { + let budget = if context_budget > 0 { + context_budget + } else { + DEFAULT_CONTEXT_BUDGET + }; + + // Don't compact small conversations + if conversation.len() < MIN_MESSAGES_FOR_COMPACTION { + return; + } + + let total_tokens = estimate_conversation_tokens(conversation); + if total_tokens <= budget { + return; + } + + tracing::info!( + "Context compaction triggered: ~{} tokens (budget {}), {} messages", + total_tokens, + budget, + conversation.len() + ); + + // Split: older messages to summarize, recent messages to keep + let keep_count = KEEP_RECENT.min(conversation.len()); + let split_at = conversation.len() - keep_count; + + if split_at < 2 { + // Not enough old messages to summarize — just trim + let drain_count = conversation.len().saturating_sub(keep_count); + conversation.drain(..drain_count); + return; + } + + let old_messages = &conversation[..split_at]; + + // Archive the messages about to be compacted + let meta = crate::session::ArchiveMeta { + trigger: "compaction".to_string(), + channel: channel.to_string(), + entity_name: entity_name.to_string(), + }; + if let Err(e) = crate::session::archive_conversation(root_dir, old_messages, &meta) { + tracing::warn!("Failed to archive compacted messages: {}", e); + } + + let summary_input = build_summary_prompt(old_messages); + + let summarize_prompt = format!( + "Summarize this conversation concisely, preserving key decisions, code context, \ + task state, and important details. Focus on what matters for continuing the \ + conversation. Be direct — no preamble.\n\n{}", + summary_input + ); + + let summary_messages = vec![Message { + role: Role::User, + content: MessageContent::Text(summarize_prompt), + }]; + + // Use the same provider to generate the summary + let summary_text = match provider + .invoke( + "You are a concise summarizer. Output only the summary.", + &summary_messages, + max_tokens.min(2048), + None, + ) + .await + { + Ok(result) => result.text(), + Err(e) => { + tracing::warn!( + "Context compaction failed: {}. Falling back to simple trim.", + e + ); + // Fall back to simple trim + conversation.drain(..split_at); + return; + } + }; + + // Replace old messages with the summary + conversation.drain(..split_at); + conversation.insert( + 0, + Message { + role: Role::User, + content: MessageContent::Text(format!( + "[Context summary of earlier conversation]\n{}", + summary_text + )), + }, + ); + + let new_tokens = estimate_conversation_tokens(conversation); + tracing::info!( + "Compacted {} messages into summary. ~{} → ~{} tokens", + split_at, + total_tokens, + new_tokens + ); +} diff --git a/src/init/wizard.rs b/src/init/wizard.rs index 8c342a9..1c4e70e 100644 --- a/src/init/wizard.rs +++ b/src/init/wizard.rs @@ -243,6 +243,7 @@ fn create_directory_structure(dir: &Path) -> Result<(), Box, + }, } #[derive(Subcommand)] @@ -143,6 +151,20 @@ enum IntentAction { Clear, } +#[derive(Subcommand)] +enum RecallAction { + /// Search conversation archives + Search { + /// Search query + query: String, + /// Use ranked scoring instead of line-level matches + #[arg(long)] + ranked: bool, + }, + /// Analyze and auto-distill MEMORY.md + Distill, +} + #[derive(Subcommand)] enum ArchiveAction { /// List archived files @@ -246,6 +268,19 @@ async fn main() { std::process::exit(1); } } + Commands::Recall { action } => { + let result = match action { + None => cli::recall::dashboard_cmd().await, + Some(RecallAction::Search { query, ranked }) => { + cli::recall::search(query, ranked).await + } + Some(RecallAction::Distill) => cli::recall::distill().await, + }; + if let Err(e) = result { + eprintln!("Error: {e}"); + std::process::exit(1); + } + } Commands::Intent { action } => { let result = match action { IntentAction::List => cli::intent::list().await, diff --git a/src/pidfile.rs b/src/pidfile.rs new file mode 100644 index 0000000..a0f96c9 --- /dev/null +++ b/src/pidfile.rs @@ -0,0 +1,38 @@ +use std::fs; +use std::path::{Path, PathBuf}; + +const PID_FILENAME: &str = ".pulse-null.pid"; + +/// Get the PID file path for an entity root directory. +pub fn path(root_dir: &Path) -> PathBuf { + root_dir.join(PID_FILENAME) +} + +/// Write the current process PID to the PID file. +pub fn write(root_dir: &Path) -> std::io::Result<()> { + let pid = std::process::id(); + fs::write(path(root_dir), pid.to_string()) +} + +/// Read the PID from the PID file, if it exists and is valid. +pub fn read(root_dir: &Path) -> Option { + fs::read_to_string(path(root_dir)) + .ok() + .and_then(|s| s.trim().parse().ok()) +} + +/// Remove the PID file. +pub fn remove(root_dir: &Path) { + let _ = fs::remove_file(path(root_dir)); +} + +/// Check if a process with the given PID is still alive. +pub fn is_alive(pid: u32) -> bool { + Path::new(&format!("/proc/{}", pid)).exists() +} + +/// Send SIGTERM to the given PID. Returns true if the signal was sent. +pub fn kill(pid: u32) -> bool { + // SAFETY: sending SIGTERM to a process ID is a standard Unix operation + unsafe { libc::kill(pid as i32, libc::SIGTERM) == 0 } +} diff --git a/src/plugins/manager.rs b/src/plugins/manager.rs index be0abd6..1ffcae0 100644 --- a/src/plugins/manager.rs +++ b/src/plugins/manager.rs @@ -200,6 +200,7 @@ mod tests { api_key: None, model: "test".into(), max_tokens: 1024, + context_budget: 0, }, security: SecurityConfig { secret: None, diff --git a/src/scheduler/intent.rs b/src/scheduler/intent.rs index 82254e1..5626165 100644 --- a/src/scheduler/intent.rs +++ b/src/scheduler/intent.rs @@ -441,7 +441,12 @@ async fn execute_intent( } // Build system prompt - let system_prompt = match prompt::build_system_prompt(&root_dir, &state.config) { + let system_prompt = match prompt::build_system_prompt( + &root_dir, + &state.config, + state.pipeline_monitor.as_ref(), + state.cognitive_monitor.as_ref(), + ) { Ok(p) => p, Err(e) => { tracing::error!( @@ -616,8 +621,8 @@ async fn execute_intent( log_intent_execution(&root_dir, intent, &parsed.clean_content); // Record outcome for pulse-echo - if state.config.pulse.enabled { - let outcome = pulse_echo::runtime::build_outcome( + if let Some(ref tracker) = state.outcome_tracker { + let outcome = tracker.build_outcome( &intent.id, &intent.description, &parsed.clean_content, @@ -625,27 +630,26 @@ async fn execute_intent( result.total_input_tokens, result.total_output_tokens, ); - if let Err(e) = - pulse_echo::runtime::record_outcome(&root_dir, outcome, state.config.pulse.max_outcomes) + if let Err(e) = tracker.record_outcome(&root_dir, outcome, state.config.pulse.max_outcomes) { tracing::error!("Failed to record outcome for intent '{}': {}", intent.id, e); } } // Extract cognitive signals and check for health changes - if state.config.monitoring.enabled { + if let Some(ref monitor) = state.cognitive_monitor { let window = state.config.monitoring.window_size; let min_samples = state.config.monitoring.min_samples; - let health_before = vigil_echo::runtime::assess(&root_dir, window, min_samples); + let health_before = monitor.assess(&root_dir, window, min_samples); let previous_status = health_before.status.to_string(); - let frame = vigil_echo::runtime::extract(&result.response_text, &intent.id); - if let Err(e) = vigil_echo::runtime::record(&root_dir, frame, window) { + let frame = monitor.extract(&result.response_text, &intent.id); + if let Err(e) = monitor.record(&root_dir, frame, window) { tracing::error!("Failed to record signals for intent '{}': {}", intent.id, e); } - let health_after = vigil_echo::runtime::assess(&root_dir, window, min_samples); + let health_after = monitor.assess(&root_dir, window, min_samples); if health_after.sufficient_data && health_after.status != health_before.status { state.event_bus.emit(EntityEvent::CognitiveHealthChanged { previous: previous_status, @@ -656,13 +660,15 @@ async fn execute_intent( } // Update pipeline state - if state.config.pipeline.enabled { + if let Some(ref monitor) = state.pipeline_monitor { let thresholds = state.config.pipeline.to_thresholds(); - let health = praxis_echo::runtime::calculate(&root_dir, &thresholds); - let new_counts = praxis_echo::runtime::counts_from_health(&health); - let mut pipeline_state = praxis_echo::runtime::PipelineState::load(&root_dir); - pipeline_state.update_counts(&new_counts); - let _ = pipeline_state.save(&root_dir); + let health = monitor.calculate(&root_dir, &thresholds); + let new_counts = monitor.counts_from_health(&health); + let mut pipeline_state = monitor.load_state(&root_dir); + pipeline_state.update_counts(&new_counts, &chrono::Utc::now().to_rfc3339()); + if let Err(e) = monitor.save_state(&root_dir, &pipeline_state) { + tracing::error!("Failed to save pipeline state: {}", e); + } // Emit PipelineAlert for documents at hard limit let docs = [ @@ -673,7 +679,7 @@ async fn execute_intent( ("PRAXIS", &health.praxis), ]; for (name, doc_health) in &docs { - if doc_health.status == praxis_echo::runtime::ThresholdStatus::Red { + if doc_health.status == echo_system_types::monitoring::ThresholdStatus::Red { state.event_bus.emit(EntityEvent::PipelineAlert { document: name.to_string(), count: doc_health.count, @@ -689,7 +695,7 @@ async fn execute_intent( }); } - let archived = praxis_echo::runtime::check_and_archive(&root_dir, &thresholds, &health); + let archived = monitor.check_and_archive(&root_dir, &thresholds, &health); for doc in &archived { tracing::info!("Auto-archived overflow from {} (intent)", doc); } diff --git a/src/scheduler/runner.rs b/src/scheduler/runner.rs index 4e6b9b6..6cecb9a 100644 --- a/src/scheduler/runner.rs +++ b/src/scheduler/runner.rs @@ -100,7 +100,12 @@ async fn execute_task( } } - let system_prompt = match prompt::build_system_prompt(&root_dir, &state.config) { + let system_prompt = match prompt::build_system_prompt( + &root_dir, + &state.config, + state.pipeline_monitor.as_ref(), + state.cognitive_monitor.as_ref(), + ) { Ok(p) => p, Err(e) => { tracing::error!("Cannot build system prompt for task '{}': {}", task.id, e); @@ -295,8 +300,8 @@ async fn execute_task( log_execution(&root_dir, task, &parsed.clean_content); // Record outcome for pulse-echo - if state.config.pulse.enabled { - let outcome = pulse_echo::runtime::build_outcome( + if let Some(ref tracker) = state.outcome_tracker { + let outcome = tracker.build_outcome( &task.id, &task.name, &parsed.clean_content, @@ -304,29 +309,28 @@ async fn execute_task( input_tokens, output_tokens, ); - if let Err(e) = - pulse_echo::runtime::record_outcome(&root_dir, outcome, state.config.pulse.max_outcomes) + if let Err(e) = tracker.record_outcome(&root_dir, outcome, state.config.pulse.max_outcomes) { tracing::error!("Failed to record outcome for task '{}': {}", task.id, e); } } // Post-execution: extract cognitive signals and check for health changes - if state.config.monitoring.enabled { + if let Some(ref monitor) = state.cognitive_monitor { let window = state.config.monitoring.window_size; let min_samples = state.config.monitoring.min_samples; // Assess health BEFORE recording new signals (to detect change) - let health_before = vigil_echo::runtime::assess(&root_dir, window, min_samples); + let health_before = monitor.assess(&root_dir, window, min_samples); let previous_status = health_before.status.to_string(); - let frame = vigil_echo::runtime::extract(&response_text, &task.id); - if let Err(e) = vigil_echo::runtime::record(&root_dir, frame, window) { + let frame = monitor.extract(&response_text, &task.id); + if let Err(e) = monitor.record(&root_dir, frame, window) { tracing::error!("Failed to record signals for task '{}': {}", task.id, e); } // Assess health AFTER recording new signals - let health_after = vigil_echo::runtime::assess(&root_dir, window, min_samples); + let health_after = monitor.assess(&root_dir, window, min_samples); if health_after.sufficient_data && health_after.status != health_before.status { state.event_bus.emit(EntityEvent::CognitiveHealthChanged { previous: previous_status, @@ -337,14 +341,14 @@ async fn execute_task( } // Post-execution: update pipeline state and auto-archive - if state.config.pipeline.enabled { + if let Some(ref monitor) = state.pipeline_monitor { let thresholds = state.config.pipeline.to_thresholds(); - let health = praxis_echo::runtime::calculate(&root_dir, &thresholds); - let new_counts = praxis_echo::runtime::counts_from_health(&health); + let health = monitor.calculate(&root_dir, &thresholds); + let new_counts = monitor.counts_from_health(&health); - let mut pipeline_state = praxis_echo::runtime::PipelineState::load(&root_dir); - pipeline_state.update_counts(&new_counts); - if let Err(e) = pipeline_state.save(&root_dir) { + let mut pipeline_state = monitor.load_state(&root_dir); + pipeline_state.update_counts(&new_counts, &Utc::now().to_rfc3339()); + if let Err(e) = monitor.save_state(&root_dir, &pipeline_state) { tracing::error!("Failed to save pipeline state: {}", e); } @@ -357,7 +361,7 @@ async fn execute_task( ("PRAXIS", &health.praxis), ]; for (name, doc_health) in &docs { - if doc_health.status == praxis_echo::runtime::ThresholdStatus::Red { + if doc_health.status == echo_system_types::monitoring::ThresholdStatus::Red { state.event_bus.emit(EntityEvent::PipelineAlert { document: name.to_string(), count: doc_health.count, @@ -373,7 +377,7 @@ async fn execute_task( }); } - let archived = praxis_echo::runtime::check_and_archive(&root_dir, &thresholds, &health); + let archived = monitor.check_and_archive(&root_dir, &thresholds, &health); for doc in &archived { tracing::info!("Auto-archived overflow from {}", doc); } diff --git a/src/server/auth.rs b/src/server/auth.rs index 583203a..c5e0ed4 100644 --- a/src/server/auth.rs +++ b/src/server/auth.rs @@ -80,6 +80,7 @@ mod tests { api_key: None, model: "test".into(), max_tokens: 1024, + context_budget: 0, }, security: SecurityConfig { secret, @@ -102,6 +103,10 @@ mod tests { system_prompt: RwLock::new(String::new()), tools: ToolRegistry::new(), event_bus: Arc::new(EventBus::new(16)), + root_dir: std::env::temp_dir(), + pipeline_monitor: None, + cognitive_monitor: None, + outcome_tracker: None, }) } diff --git a/src/server/e2e_tests.rs b/src/server/e2e_tests.rs index b9790c6..3f21486 100644 --- a/src/server/e2e_tests.rs +++ b/src/server/e2e_tests.rs @@ -98,6 +98,7 @@ fn test_config() -> Config { api_key: None, model: "mock-model".to_string(), max_tokens: 1024, + context_budget: 0, }, security: SecurityConfig { secret: None, @@ -133,6 +134,10 @@ fn build_state(provider: MockProvider, tools: ToolRegistry) -> Arc { system_prompt: RwLock::new("You are a test entity.".to_string()), tools, event_bus: Arc::new(EventBus::new(16)), + root_dir: std::env::temp_dir(), + pipeline_monitor: None, + cognitive_monitor: None, + outcome_tracker: None, }) } diff --git a/src/server/handlers/chat.rs b/src/server/handlers/chat.rs index 9baa582..ee8e039 100644 --- a/src/server/handlers/chat.rs +++ b/src/server/handlers/chat.rs @@ -74,6 +74,18 @@ pub async fn chat( content: MessageContent::Text(user_message), }); + // Compact conversation if approaching context budget + crate::context::compact_if_needed( + &mut conversation, + state.provider.as_ref(), + state.config.llm.context_budget, + state.config.llm.max_tokens, + &state.root_dir, + &state.config.entity.name, + &req.channel, + ) + .await; + // Build tool definitions (only if provider supports tools) let tool_defs = if state.provider.supports_tools() && !state.tools.is_empty() { Some(state.tools.definitions()) @@ -125,12 +137,6 @@ pub async fn chat( // Done — extract text and return let text = result.text(); - // Keep conversation bounded (last 100 messages) - if conversation.len() > 100 { - let drain_count = conversation.len() - 100; - conversation.drain(..drain_count); - } - // Emit PostConversation event emit_post_conversation( &state, diff --git a/src/server/handlers/dashboard.rs b/src/server/handlers/dashboard.rs index 4addf3b..fc016ee 100644 --- a/src/server/handlers/dashboard.rs +++ b/src/server/handlers/dashboard.rs @@ -3,8 +3,7 @@ use std::sync::Arc; use axum::extract::State; use axum::Json; -use praxis_echo::runtime::{self as pipeline, ThresholdStatus}; -use vigil_echo::runtime::{self as vigil, CognitiveStatus, Trend}; +use echo_system_types::monitoring::{CognitiveStatus, DocumentHealth, ThresholdStatus, Trend}; use crate::server::AppState; @@ -23,10 +22,10 @@ pub async fn dashboard(State(state): State>) -> Json>) -> Json>) -> Json serde_json::Value { +fn doc_json(doc: &DocumentHealth) -> serde_json::Value { serde_json::json!({ "count": doc.count, "hard_limit": doc.hard, diff --git a/src/server/mod.rs b/src/server/mod.rs index 416a9ff..8f6552c 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -7,6 +7,7 @@ pub mod prompt; pub mod rate_limit; pub mod trust; +use std::path::PathBuf; use std::sync::Arc; use axum::middleware; @@ -17,11 +18,13 @@ use tokio::sync::RwLock; use crate::claude_provider::ClaudeProvider; use crate::config::Config; use crate::events::EventBus; +use crate::pidfile; use crate::plugins::manager::PluginManager; use crate::scheduler::intent::IntentQueue; use crate::scheduler::Schedule; use crate::tools::ToolRegistry; use echo_system_types::llm::{LmProvider, Message}; +use echo_system_types::monitoring::{CognitiveMonitor, OutcomeTracker, PipelineMonitor}; /// Shared application state pub struct AppState { @@ -31,6 +34,10 @@ pub struct AppState { pub system_prompt: RwLock, pub tools: ToolRegistry, pub event_bus: Arc, + pub root_dir: PathBuf, + pub pipeline_monitor: Option>, + pub cognitive_monitor: Option>, + pub outcome_tracker: Option>, } pub async fn start(config: Config) -> Result<(), Box> { @@ -43,9 +50,34 @@ pub async fn start(config: Config) -> Result<(), Box> { config.llm.model.clone(), )); - // Build system prompt from SELF.md + CLAUDE.md + MEMORY.md let root_dir = config.root_dir()?; - let system_prompt = prompt::build_system_prompt(&root_dir, &config)?; + + // Construct monitoring trait objects based on config + let pipeline_monitor: Option> = if config.pipeline.enabled { + Some(Arc::new(praxis_echo::runtime::PraxisMonitor::new())) + } else { + None + }; + + let cognitive_monitor: Option> = if config.monitoring.enabled { + Some(Arc::new(vigil_echo::runtime::VigilMonitor::new())) + } else { + None + }; + + let outcome_tracker: Option> = if config.pulse.enabled { + Some(Arc::new(pulse_echo::runtime::PulseTracker::new())) + } else { + None + }; + + // Build system prompt from SELF.md + CLAUDE.md + MEMORY.md + let system_prompt = prompt::build_system_prompt( + &root_dir, + &config, + pipeline_monitor.as_ref(), + cognitive_monitor.as_ref(), + )?; // Register built-in tools let mut tools = ToolRegistry::new(); @@ -94,6 +126,10 @@ pub async fn start(config: Config) -> Result<(), Box> { system_prompt: RwLock::new(system_prompt), tools, event_bus: Arc::clone(&event_bus), + root_dir: root_dir.clone(), + pipeline_monitor, + cognitive_monitor, + outcome_tracker, }); // Load schedule and intent queue, start scheduler @@ -141,7 +177,7 @@ pub async fn start(config: Config) -> Result<(), Box> { Arc::clone(&state), auth::require_auth, )) - .with_state(state) + .with_state(Arc::clone(&state)) .layer(middleware::from_fn_with_state( limiter, rate_limit::rate_limit, @@ -151,9 +187,38 @@ pub async fn start(config: Config) -> Result<(), Box> { let addr = format!("{}:{}", config.server.host, config.server.port); let listener = tokio::net::TcpListener::bind(&addr).await?; + // Write PID file so `pulse-null down` can find us + pidfile::write(&root_dir)?; tracing::info!("Listening on {}", addr); - axum::serve(listener, app).await?; + // Graceful shutdown on SIGTERM or SIGINT + let shutdown = async { + let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to install SIGTERM handler"); + let sigint = tokio::signal::ctrl_c(); + tokio::select! { + _ = sigterm.recv() => tracing::info!("Received SIGTERM"), + _ = sigint => tracing::info!("Received SIGINT"), + } + }; + + axum::serve(listener, app) + .with_graceful_shutdown(shutdown) + .await?; + + // Archive conversation on shutdown + { + let conversation = state.conversation.read().await; + if !conversation.is_empty() { + crate::session::end_session( + &root_dir, + &config.entity.name, + &conversation, + "http", + "server-shutdown", + ); + } + } // Clean up plugins on shutdown plugin_manager.stop_all().await; @@ -163,5 +228,9 @@ pub async fn start(config: Config) -> Result<(), Box> { handle.abort(); } + // Remove PID file + pidfile::remove(&root_dir); + tracing::info!("Shutdown complete"); + Ok(()) } diff --git a/src/server/prompt.rs b/src/server/prompt.rs index 8cc681b..5a7ce2b 100644 --- a/src/server/prompt.rs +++ b/src/server/prompt.rs @@ -1,4 +1,7 @@ use std::path::Path; +use std::sync::Arc; + +use echo_system_types::monitoring::{CognitiveMonitor, PipelineMonitor}; use crate::config::Config; use crate::scheduler::cost::CostTracker; @@ -8,6 +11,8 @@ use crate::scheduler::intent::IntentQueue; pub fn build_system_prompt( root_dir: &Path, config: &Config, + pipeline_monitor: Option<&Arc>, + cognitive_monitor: Option<&Arc>, ) -> Result> { let mut parts = Vec::new(); @@ -48,11 +53,11 @@ pub fn build_system_prompt( } // Pipeline health — document counts and threshold status - if config.pipeline.enabled { + if let Some(monitor) = pipeline_monitor { let thresholds = config.pipeline.to_thresholds(); - let pipeline_state = praxis_echo::runtime::PipelineState::load(root_dir); - let pipeline_health = praxis_echo::runtime::calculate(root_dir, &thresholds); - let pipeline_text = praxis_echo::runtime::render( + let pipeline_state = monitor.load_state(root_dir); + let pipeline_health = monitor.calculate(root_dir, &thresholds); + let pipeline_text = monitor.render_for_prompt( &pipeline_health, pipeline_state.sessions_without_movement, config.pipeline.freeze_threshold, @@ -64,13 +69,13 @@ pub fn build_system_prompt( } // Cognitive health — metacognitive monitoring assessment - if config.monitoring.enabled { - let cognitive_health = vigil_echo::runtime::assess( + if let Some(monitor) = cognitive_monitor { + let cognitive_health = monitor.assess( root_dir, config.monitoring.window_size, config.monitoring.min_samples, ); - let cognitive_text = vigil_echo::runtime::render(&cognitive_health); + let cognitive_text = monitor.render_for_prompt(&cognitive_health); parts.push(format!( "\n{}\n", cognitive_text diff --git a/src/server/trust.rs b/src/server/trust.rs index 7e72550..48318bf 100644 --- a/src/server/trust.rs +++ b/src/server/trust.rs @@ -58,6 +58,7 @@ mod tests { api_key: None, model: "test".into(), max_tokens: 1024, + context_budget: 0, }, security: SecurityConfig { secret: None, diff --git a/src/session.rs b/src/session.rs new file mode 100644 index 0000000..3fd53c0 --- /dev/null +++ b/src/session.rs @@ -0,0 +1,440 @@ +use std::fs; +use std::io::Write as IoWrite; +use std::path::{Path, PathBuf}; + +use chrono::Utc; +use echo_system_types::llm::{ContentBlock, Message, MessageContent, Role}; + +/// Metadata for an archive log entry. +pub struct ArchiveMeta { + pub trigger: String, + pub channel: String, + pub entity_name: String, +} + +/// Serialize a conversation to grep-searchable markdown. +pub fn conversation_to_markdown(conversation: &[Message]) -> String { + let mut output = String::new(); + + for (i, msg) in conversation.iter().enumerate() { + if i > 0 { + output.push_str("\n---\n\n"); + } + + let role_label = match msg.role { + Role::User => "User", + Role::Assistant => "Assistant", + }; + + match &msg.content { + MessageContent::Text(text) => { + output.push_str(&format!("### {}\n\n{}\n", role_label, text)); + } + MessageContent::Blocks(blocks) => { + output.push_str(&format!("### {}\n\n", role_label)); + for block in blocks { + match block { + ContentBlock::Text { text } => { + output.push_str(text); + output.push('\n'); + } + ContentBlock::ToolUse { id, name, input } => { + let input_display = serde_json::to_string_pretty(input) + .unwrap_or_else(|_| input.to_string()); + output.push_str(&format!( + "**Tool: {}** (id: {})\n```json\n{}\n```\n\n", + name, id, input_display + )); + } + ContentBlock::ToolResult { + tool_use_id, + content, + is_error, + } => { + let status = if *is_error == Some(true) { + " [ERROR]" + } else { + "" + }; + let display = if content.len() > 2000 { + format!( + "{}...\n\n[truncated, {} bytes total]", + &content[..2000], + content.len() + ) + } else { + content.clone() + }; + output.push_str(&format!( + "**Tool Result**{} (for: {})\n```\n{}\n```\n\n", + status, tool_use_id, display + )); + } + } + } + } + } + } + + output +} + +/// Archives directory for conversations. +fn conversations_dir(root_dir: &Path) -> PathBuf { + root_dir.join("archives").join("conversations") +} + +/// Index file path. +fn index_path(root_dir: &Path) -> PathBuf { + conversations_dir(root_dir).join("INDEX.md") +} + +/// Scan for the highest conversation-NNN.md number. Returns 0 if none exist. +fn highest_log_number(dir: &Path) -> u32 { + let entries = match fs::read_dir(dir) { + Ok(e) => e, + Err(_) => return 0, + }; + + let mut max = 0u32; + for entry in entries.flatten() { + let name = entry.file_name(); + let name = name.to_string_lossy(); + if let Some(num_str) = name + .strip_prefix("conversation-") + .and_then(|s| s.strip_suffix(".md")) + { + if let Ok(n) = num_str.parse::() { + if n > max { + max = n; + } + } + } + } + max +} + +/// Write a full conversation archive. Returns the path to the created file. +pub fn archive_conversation( + root_dir: &Path, + conversation: &[Message], + meta: &ArchiveMeta, +) -> Result { + if conversation.is_empty() { + return Err("Nothing to archive (empty conversation)".to_string()); + } + + let conv_dir = conversations_dir(root_dir); + fs::create_dir_all(&conv_dir) + .map_err(|e| format!("Failed to create conversations archive dir: {e}"))?; + + let next_num = highest_log_number(&conv_dir) + 1; + let now = Utc::now(); + let date_full = now.format("%Y-%m-%dT%H:%M:%SZ").to_string(); + let date_short = now.format("%Y-%m-%d").to_string(); + let message_count = conversation.len(); + + let conversation_md = conversation_to_markdown(conversation); + + let content = format!( + "---\nlog: {next_num}\ndate: \"{date_full}\"\ntrigger: {trigger}\nchannel: {channel}\nentity: \"{entity}\"\nmessage_count: {message_count}\n---\n\n# Conversation {next_num:03}\n\n{conversation_md}", + trigger = meta.trigger, + channel = meta.channel, + entity = meta.entity_name, + ); + + let log_path = conv_dir.join(format!("conversation-{next_num:03}.md")); + fs::write(&log_path, &content) + .map_err(|e| format!("Failed to write conversation archive: {e}"))?; + + append_index( + root_dir, + next_num, + &date_short, + &meta.trigger, + &meta.channel, + message_count, + )?; + + Ok(log_path) +} + +/// Append an entry to INDEX.md. Creates it if missing. +fn append_index( + root_dir: &Path, + log_num: u32, + date: &str, + trigger: &str, + channel: &str, + message_count: usize, +) -> Result<(), String> { + let idx = index_path(root_dir); + + if !idx.exists() { + fs::write( + &idx, + "# Conversation Archive Index\n\n| Log | Date | Trigger | Channel | Messages |\n|-----|------|---------|---------|----------|\n", + ) + .map_err(|e| format!("Failed to create INDEX.md: {e}"))?; + } + + let mut file = fs::OpenOptions::new() + .append(true) + .open(&idx) + .map_err(|e| format!("Failed to open INDEX.md: {e}"))?; + + writeln!( + file, + "| {log_num:03} | {date} | {trigger} | {channel} | {message_count} |" + ) + .map_err(|e| format!("Failed to write to INDEX.md: {e}"))?; + + Ok(()) +} + +/// Full session-end routine: archive conversation + write EPHEMERAL summary. +pub fn end_session( + root_dir: &Path, + entity_name: &str, + conversation: &[Message], + channel: &str, + trigger: &str, +) { + if conversation.is_empty() { + return; + } + + // Path 1: Archive full conversation + let meta = ArchiveMeta { + trigger: trigger.to_string(), + channel: channel.to_string(), + entity_name: entity_name.to_string(), + }; + + match archive_conversation(root_dir, conversation, &meta) { + Ok(path) => { + tracing::info!("Conversation archived to {}", path.display()); + } + Err(e) => { + tracing::warn!("Failed to archive conversation: {}", e); + } + } + + // Path 2: Write lightweight EPHEMERAL summary + write_ephemeral_summary(root_dir, entity_name, conversation); +} + +/// Write a lightweight session summary to memory/EPHEMERAL.md. +fn write_ephemeral_summary(root_dir: &Path, entity_name: &str, conversation: &[Message]) { + if conversation.is_empty() { + return; + } + + let ephemeral_path = root_dir.join("memory").join("EPHEMERAL.md"); + let now = Utc::now().format("%Y-%m-%d %H:%M UTC"); + + let user_messages: Vec<&str> = conversation + .iter() + .filter_map(|m| { + if matches!(m.role, Role::User) { + if let MessageContent::Text(ref t) = m.content { + Some(t.as_str()) + } else { + None + } + } else { + None + } + }) + .collect(); + + let topics: Vec<&str> = user_messages.iter().take(5).copied().collect(); + + let mut content = format!("## Chat Session — {}\n\n", now); + content.push_str(&format!( + "Conversation with {} ({} messages)\n\n", + entity_name, + conversation.len() + )); + content.push_str("### Topics discussed\n\n"); + for topic in &topics { + let display = if topic.len() > 80 { + format!("{}...", &topic[..77]) + } else { + topic.to_string() + }; + content.push_str(&format!("- {}\n", display)); + } + if user_messages.len() > 5 { + content.push_str(&format!("- ...and {} more\n", user_messages.len() - 5)); + } + + if let Err(e) = fs::write(&ephemeral_path, content) { + tracing::warn!("Could not save session summary: {}", e); + } else { + println!(" \x1b[2msession saved to EPHEMERAL.md\x1b[0m"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use echo_system_types::llm::{ContentBlock, Message, MessageContent, Role}; + + #[test] + fn empty_conversation_produces_empty_markdown() { + let md = conversation_to_markdown(&[]); + assert!(md.is_empty()); + } + + #[test] + fn text_message_renders_correctly() { + let conversation = vec![ + Message { + role: Role::User, + content: MessageContent::Text("Hello".into()), + }, + Message { + role: Role::Assistant, + content: MessageContent::Text("Hi there".into()), + }, + ]; + let md = conversation_to_markdown(&conversation); + assert!(md.contains("### User")); + assert!(md.contains("Hello")); + assert!(md.contains("### Assistant")); + assert!(md.contains("Hi there")); + } + + #[test] + fn tool_use_renders_as_readable_block() { + let conversation = vec![Message { + role: Role::Assistant, + content: MessageContent::Blocks(vec![ContentBlock::ToolUse { + id: "t1".into(), + name: "file_read".into(), + input: serde_json::json!({"path": "SELF.md"}), + }]), + }]; + let md = conversation_to_markdown(&conversation); + assert!(md.contains("**Tool: file_read**")); + assert!(md.contains("SELF.md")); + } + + #[test] + fn tool_result_renders_with_content() { + let conversation = vec![Message { + role: Role::User, + content: MessageContent::Blocks(vec![ContentBlock::ToolResult { + tool_use_id: "t1".into(), + content: "file contents here".into(), + is_error: None, + }]), + }]; + let md = conversation_to_markdown(&conversation); + assert!(md.contains("**Tool Result**")); + assert!(md.contains("file contents here")); + } + + #[test] + fn large_tool_result_gets_truncated() { + let large_content = "x".repeat(3000); + let conversation = vec![Message { + role: Role::User, + content: MessageContent::Blocks(vec![ContentBlock::ToolResult { + tool_use_id: "t1".into(), + content: large_content, + is_error: None, + }]), + }]; + let md = conversation_to_markdown(&conversation); + assert!(md.contains("[truncated, 3000 bytes total]")); + } + + #[test] + fn error_tool_result_shows_error_marker() { + let conversation = vec![Message { + role: Role::User, + content: MessageContent::Blocks(vec![ContentBlock::ToolResult { + tool_use_id: "t1".into(), + content: "not found".into(), + is_error: Some(true), + }]), + }]; + let md = conversation_to_markdown(&conversation); + assert!(md.contains("[ERROR]")); + } + + #[test] + fn archive_conversation_creates_file_and_index() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + fs::create_dir_all(root.join("archives/conversations")).unwrap(); + + let conversation = vec![Message { + role: Role::User, + content: MessageContent::Text("test".into()), + }]; + let meta = ArchiveMeta { + trigger: "session-end".into(), + channel: "repl".into(), + entity_name: "TestEntity".into(), + }; + + let path = archive_conversation(root, &conversation, &meta).unwrap(); + assert!(path.exists()); + + let content = fs::read_to_string(&path).unwrap(); + assert!(content.starts_with("---\n")); + assert!(content.contains("log: 1")); + assert!(content.contains("trigger: session-end")); + assert!(content.contains("channel: repl")); + assert!(content.contains("message_count: 1")); + assert!(content.contains("# Conversation 001")); + + let idx = root.join("archives/conversations/INDEX.md"); + assert!(idx.exists()); + let index_content = fs::read_to_string(&idx).unwrap(); + assert!(index_content.contains("| 001 |")); + } + + #[test] + fn archive_sequences_correctly() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + fs::create_dir_all(root.join("archives/conversations")).unwrap(); + fs::write( + root.join("archives/conversations/conversation-003.md"), + "old", + ) + .unwrap(); + + let conversation = vec![Message { + role: Role::User, + content: MessageContent::Text("test".into()), + }]; + let meta = ArchiveMeta { + trigger: "session-end".into(), + channel: "repl".into(), + entity_name: "Test".into(), + }; + + let path = archive_conversation(root, &conversation, &meta).unwrap(); + assert!(path.to_string_lossy().contains("conversation-004.md")); + } + + #[test] + fn empty_conversation_returns_error() { + let tmp = tempfile::tempdir().unwrap(); + let result = archive_conversation( + tmp.path(), + &[], + &ArchiveMeta { + trigger: "session-end".into(), + channel: "repl".into(), + entity_name: "Test".into(), + }, + ); + assert!(result.is_err()); + } +}