diff --git a/Cargo.lock b/Cargo.lock index c38269f..992d077 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2953,6 +2953,7 @@ dependencies = [ "discord-voice-echo", "echo-system-types", "globset", + "libc", "praxis-echo", "pulse-echo", "recall-echo", diff --git a/Cargo.toml b/Cargo.toml index 0f63f13..1e7fe0f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ chrono-tz = "0.10" # Misc uuid = { version = "1", features = ["v4"] } chrono = { version = "0.4", features = ["serde"] } +libc = "0.2" # Shared types echo-system-types = { git = "https://github.com/dnacenta/echo-system-types", tag = "v0.2.0" } diff --git a/src/chat/repl.rs b/src/chat/repl.rs index e4d09f3..603f5c4 100644 --- a/src/chat/repl.rs +++ b/src/chat/repl.rs @@ -60,6 +60,15 @@ pub async fn run( content: MessageContent::Text(input.to_string()), }); + // Compact conversation if approaching context budget + crate::context::compact_if_needed( + &mut conversation, + provider, + config.llm.context_budget, + config.llm.max_tokens, + ) + .await; + // Tool definitions let tool_defs = if provider.supports_tools() && !tools.is_empty() { Some(tools.definitions()) @@ -169,12 +178,6 @@ pub async fn run( } } } - - // Keep conversation bounded - if conversation.len() > 100 { - let drain_count = conversation.len() - 100; - conversation.drain(..drain_count); - } } // Save session to EPHEMERAL.md diff --git a/src/cli/down.rs b/src/cli/down.rs index ba9484a..09218ee 100644 --- a/src/cli/down.rs +++ b/src/cli/down.rs @@ -1,6 +1,40 @@ +use crate::config::Config; +use crate::pidfile; + pub async fn run() -> Result<(), Box> { - // Phase 1: simple — just tell the user to Ctrl+C - // Future: PID file, graceful shutdown signal - eprintln!("Use Ctrl+C to stop a running entity, or kill the process."); + let config = Config::load()?; + let root_dir = config.root_dir()?; + + let pid = match pidfile::read(&root_dir) { + Some(pid) => pid, + None => { + eprintln!("No running entity found (no PID file)."); + return Ok(()); + } + }; + + if !pidfile::is_alive(pid) { + eprintln!("Entity is not running (stale PID file, pid {}).", pid); + pidfile::remove(&root_dir); + return Ok(()); + } + + println!("Stopping entity (pid {})...", pid); + + if !pidfile::kill(pid) { + return Err(format!("Failed to send SIGTERM to pid {}", pid).into()); + } + + // Wait up to 10 seconds for the process to exit + for _ in 0..100 { + if !pidfile::is_alive(pid) { + pidfile::remove(&root_dir); + println!("Entity stopped."); + return Ok(()); + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + eprintln!("Entity did not stop within 10 seconds (pid {}).", pid); Ok(()) } diff --git a/src/cli/status.rs b/src/cli/status.rs index f7e1298..c877130 100644 --- a/src/cli/status.rs +++ b/src/cli/status.rs @@ -1,7 +1,9 @@ use crate::config::Config; +use crate::pidfile; pub async fn run() -> Result<(), Box> { let config = Config::load()?; + let root_dir = config.root_dir()?; println!("Entity: {}", config.entity.name); println!( @@ -26,19 +28,26 @@ pub async fn run() -> Result<(), Box> { ); } - // Check if server is running - let url = format!( - "http://{}:{}/health", - config.server.host, config.server.port - ); - match reqwest::get(&url).await { - Ok(resp) if resp.status().is_success() => { - println!("Status: RUNNING"); + // Check PID file first, then fall back to health endpoint + let status = match pidfile::read(&root_dir) { + Some(pid) if pidfile::is_alive(pid) => format!("RUNNING (pid {})", pid), + Some(pid) => { + pidfile::remove(&root_dir); + format!("STOPPED (stale pid {})", pid) } - _ => { - println!("Status: STOPPED"); + None => { + // No PID file — try health endpoint as fallback + let url = format!( + "http://{}:{}/health", + config.server.host, config.server.port + ); + match reqwest::get(&url).await { + Ok(resp) if resp.status().is_success() => "RUNNING".to_string(), + _ => "STOPPED".to_string(), + } } - } + }; + println!("Status: {}", status); Ok(()) } diff --git a/src/config/mod.rs b/src/config/mod.rs index a1c3378..6687cc1 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -55,6 +55,9 @@ pub struct LlmConfig { pub model: String, #[serde(default = "default_max_tokens")] pub max_tokens: u32, + /// Maximum estimated tokens in conversation before compaction triggers (0 = default 150k). + #[serde(default)] + pub context_budget: usize, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/context.rs b/src/context.rs new file mode 100644 index 0000000..ec0316c --- /dev/null +++ b/src/context.rs @@ -0,0 +1,184 @@ +use echo_system_types::llm::{ContentBlock, LmProvider, Message, MessageContent, Role}; + +/// Default context budget in estimated tokens (leaves room for system prompt + response). +const DEFAULT_CONTEXT_BUDGET: usize = 150_000; + +/// How many of the most recent messages to always keep uncompacted. +const KEEP_RECENT: usize = 20; + +/// Minimum messages before compaction is even considered. +const MIN_MESSAGES_FOR_COMPACTION: usize = 30; + +/// Rough chars-per-token estimate for English text. +const CHARS_PER_TOKEN: usize = 4; + +/// Estimate the token count of a single message. +pub fn estimate_message_tokens(msg: &Message) -> usize { + let chars = match &msg.content { + MessageContent::Text(s) => s.len(), + MessageContent::Blocks(blocks) => blocks + .iter() + .map(|block| match block { + ContentBlock::Text { text } => text.len(), + ContentBlock::ToolUse { name, input, .. } => name.len() + input.to_string().len(), + ContentBlock::ToolResult { content, .. } => content.len(), + }) + .sum(), + }; + // Add overhead for role/structure (~20 tokens) + (chars / CHARS_PER_TOKEN) + 20 +} + +/// Estimate the total token count of a conversation. +pub fn estimate_conversation_tokens(conversation: &[Message]) -> usize { + conversation.iter().map(estimate_message_tokens).sum() +} + +/// Extract text content from a message for summarization purposes. +fn message_to_text(msg: &Message) -> String { + match &msg.content { + MessageContent::Text(s) => s.clone(), + MessageContent::Blocks(blocks) => blocks + .iter() + .filter_map(|block| match block { + ContentBlock::Text { text } => Some(text.as_str()), + ContentBlock::ToolUse { name, .. } => Some(name.as_str()), + ContentBlock::ToolResult { content, .. } => { + // Truncate large tool results in the summary input + if content.len() > 500 { + None + } else { + Some(content.as_str()) + } + } + }) + .collect::>() + .join(" "), + } +} + +/// Build a summarization prompt from the messages being compacted. +fn build_summary_prompt(messages: &[Message]) -> String { + let mut lines = Vec::new(); + for msg in messages { + let role = match msg.role { + Role::User => "User", + Role::Assistant => "Assistant", + }; + let text = message_to_text(msg); + if !text.is_empty() { + // Truncate extremely long messages in the summarization input + let truncated = if text.len() > 2000 { + format!("{}...", &text[..1997]) + } else { + text + }; + lines.push(format!("{}: {}", role, truncated)); + } + } + lines.join("\n") +} + +/// Compact a conversation by summarizing older messages. +/// +/// If the conversation is under the token budget or too short, returns it unchanged. +/// Otherwise, summarizes the oldest messages (keeping the most recent ones intact) +/// and replaces them with a single summary message. +pub async fn compact_if_needed( + conversation: &mut Vec, + provider: &dyn LmProvider, + context_budget: usize, + max_tokens: u32, +) { + let budget = if context_budget > 0 { + context_budget + } else { + DEFAULT_CONTEXT_BUDGET + }; + + // Don't compact small conversations + if conversation.len() < MIN_MESSAGES_FOR_COMPACTION { + return; + } + + let total_tokens = estimate_conversation_tokens(conversation); + if total_tokens <= budget { + return; + } + + tracing::info!( + "Context compaction triggered: ~{} tokens (budget {}), {} messages", + total_tokens, + budget, + conversation.len() + ); + + // Split: older messages to summarize, recent messages to keep + let keep_count = KEEP_RECENT.min(conversation.len()); + let split_at = conversation.len() - keep_count; + + if split_at < 2 { + // Not enough old messages to summarize — just trim + let drain_count = conversation.len().saturating_sub(keep_count); + conversation.drain(..drain_count); + return; + } + + let old_messages = &conversation[..split_at]; + let summary_input = build_summary_prompt(old_messages); + + let summarize_prompt = format!( + "Summarize this conversation concisely, preserving key decisions, code context, \ + task state, and important details. Focus on what matters for continuing the \ + conversation. Be direct — no preamble.\n\n{}", + summary_input + ); + + let summary_messages = vec![Message { + role: Role::User, + content: MessageContent::Text(summarize_prompt), + }]; + + // Use the same provider to generate the summary + let summary_text = match provider + .invoke( + "You are a concise summarizer. Output only the summary.", + &summary_messages, + max_tokens.min(2048), + None, + ) + .await + { + Ok(result) => result.text(), + Err(e) => { + tracing::warn!( + "Context compaction failed: {}. Falling back to simple trim.", + e + ); + // Fall back to simple trim + conversation.drain(..split_at); + return; + } + }; + + // Replace old messages with the summary + conversation.drain(..split_at); + conversation.insert( + 0, + Message { + role: Role::User, + content: MessageContent::Text(format!( + "[Context summary of earlier conversation]\n{}", + summary_text + )), + }, + ); + + let new_tokens = estimate_conversation_tokens(conversation); + tracing::info!( + "Compacted {} messages into summary. ~{} → ~{} tokens", + split_at, + total_tokens, + new_tokens + ); +} diff --git a/src/main.rs b/src/main.rs index b40ecf4..a3dadcb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,8 +4,10 @@ mod chat; mod claude_provider; mod cli; mod config; +mod context; mod events; mod init; +mod pidfile; mod plugins; mod scheduler; mod server; diff --git a/src/pidfile.rs b/src/pidfile.rs new file mode 100644 index 0000000..a0f96c9 --- /dev/null +++ b/src/pidfile.rs @@ -0,0 +1,38 @@ +use std::fs; +use std::path::{Path, PathBuf}; + +const PID_FILENAME: &str = ".pulse-null.pid"; + +/// Get the PID file path for an entity root directory. +pub fn path(root_dir: &Path) -> PathBuf { + root_dir.join(PID_FILENAME) +} + +/// Write the current process PID to the PID file. +pub fn write(root_dir: &Path) -> std::io::Result<()> { + let pid = std::process::id(); + fs::write(path(root_dir), pid.to_string()) +} + +/// Read the PID from the PID file, if it exists and is valid. +pub fn read(root_dir: &Path) -> Option { + fs::read_to_string(path(root_dir)) + .ok() + .and_then(|s| s.trim().parse().ok()) +} + +/// Remove the PID file. +pub fn remove(root_dir: &Path) { + let _ = fs::remove_file(path(root_dir)); +} + +/// Check if a process with the given PID is still alive. +pub fn is_alive(pid: u32) -> bool { + Path::new(&format!("/proc/{}", pid)).exists() +} + +/// Send SIGTERM to the given PID. Returns true if the signal was sent. +pub fn kill(pid: u32) -> bool { + // SAFETY: sending SIGTERM to a process ID is a standard Unix operation + unsafe { libc::kill(pid as i32, libc::SIGTERM) == 0 } +} diff --git a/src/plugins/manager.rs b/src/plugins/manager.rs index be0abd6..1ffcae0 100644 --- a/src/plugins/manager.rs +++ b/src/plugins/manager.rs @@ -200,6 +200,7 @@ mod tests { api_key: None, model: "test".into(), max_tokens: 1024, + context_budget: 0, }, security: SecurityConfig { secret: None, diff --git a/src/server/auth.rs b/src/server/auth.rs index 583203a..c890e78 100644 --- a/src/server/auth.rs +++ b/src/server/auth.rs @@ -80,6 +80,7 @@ mod tests { api_key: None, model: "test".into(), max_tokens: 1024, + context_budget: 0, }, security: SecurityConfig { secret, diff --git a/src/server/e2e_tests.rs b/src/server/e2e_tests.rs index b9790c6..8caff05 100644 --- a/src/server/e2e_tests.rs +++ b/src/server/e2e_tests.rs @@ -98,6 +98,7 @@ fn test_config() -> Config { api_key: None, model: "mock-model".to_string(), max_tokens: 1024, + context_budget: 0, }, security: SecurityConfig { secret: None, diff --git a/src/server/handlers/chat.rs b/src/server/handlers/chat.rs index 9baa582..46b0d22 100644 --- a/src/server/handlers/chat.rs +++ b/src/server/handlers/chat.rs @@ -74,6 +74,15 @@ pub async fn chat( content: MessageContent::Text(user_message), }); + // Compact conversation if approaching context budget + crate::context::compact_if_needed( + &mut conversation, + state.provider.as_ref(), + state.config.llm.context_budget, + state.config.llm.max_tokens, + ) + .await; + // Build tool definitions (only if provider supports tools) let tool_defs = if state.provider.supports_tools() && !state.tools.is_empty() { Some(state.tools.definitions()) @@ -125,12 +134,6 @@ pub async fn chat( // Done — extract text and return let text = result.text(); - // Keep conversation bounded (last 100 messages) - if conversation.len() > 100 { - let drain_count = conversation.len() - 100; - conversation.drain(..drain_count); - } - // Emit PostConversation event emit_post_conversation( &state, diff --git a/src/server/mod.rs b/src/server/mod.rs index 416a9ff..857e5cb 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -17,6 +17,7 @@ use tokio::sync::RwLock; use crate::claude_provider::ClaudeProvider; use crate::config::Config; use crate::events::EventBus; +use crate::pidfile; use crate::plugins::manager::PluginManager; use crate::scheduler::intent::IntentQueue; use crate::scheduler::Schedule; @@ -151,9 +152,24 @@ pub async fn start(config: Config) -> Result<(), Box> { let addr = format!("{}:{}", config.server.host, config.server.port); let listener = tokio::net::TcpListener::bind(&addr).await?; + // Write PID file so `pulse-null down` can find us + pidfile::write(&root_dir)?; tracing::info!("Listening on {}", addr); - axum::serve(listener, app).await?; + // Graceful shutdown on SIGTERM or SIGINT + let shutdown = async { + let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to install SIGTERM handler"); + let sigint = tokio::signal::ctrl_c(); + tokio::select! { + _ = sigterm.recv() => tracing::info!("Received SIGTERM"), + _ = sigint => tracing::info!("Received SIGINT"), + } + }; + + axum::serve(listener, app) + .with_graceful_shutdown(shutdown) + .await?; // Clean up plugins on shutdown plugin_manager.stop_all().await; @@ -163,5 +179,9 @@ pub async fn start(config: Config) -> Result<(), Box> { handle.abort(); } + // Remove PID file + pidfile::remove(&root_dir); + tracing::info!("Shutdown complete"); + Ok(()) } diff --git a/src/server/trust.rs b/src/server/trust.rs index 7e72550..48318bf 100644 --- a/src/server/trust.rs +++ b/src/server/trust.rs @@ -58,6 +58,7 @@ mod tests { api_key: None, model: "test".into(), max_tokens: 1024, + context_budget: 0, }, security: SecurityConfig { secret: None,