diff --git a/Cargo.lock b/Cargo.lock index f50072c..d89ab5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2584,6 +2584,7 @@ dependencies = [ "chrono", "clap", "dirs", + "futures-core", "getrandom 0.4.2", "hex", "libc", diff --git a/Cargo.toml b/Cargo.toml index a26ac14..3f1957f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ tower-mcp = { version = "0.8", features = ["http", "testing"] } schemars = "1.2" regex = "1.12.3" async-trait = "0.1.89" +futures-core = "0.3" url = "2.5" [dev-dependencies] diff --git a/README.md b/README.md index a9058c1..753907d 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,15 @@ That's it! Your agents are now coordinating via Redis. > **Note:** `tt bootstrap` delegates to an AI agent to download Redis from GitHub and compile it for your machine. Alternatively: `brew install redis` (macOS) or `apt install redis-server` (Ubuntu). +## Runtime Model + +`tt spawn` still launches a background `tt agent-loop` worker. The worker now supports two execution modes: + +- **One-shot**: default. Each turn spawns a fresh coding CLI process. +- **Persistent streaming**: opt in with `[agent].persistent = true`. Claude-backed workers keep a live `stream-json` subprocess across turns, can accept urgent messages mid-turn, persist a resumable session ID on the agent record, and emit structured per-turn events onto the Redis event stream. + +During rollout, unsupported CLIs automatically fall back to the one-shot path even if persistence is enabled. + ## 🎯 Mission Mode Start an autonomous mission that handles multiple GitHub issues with dependency-aware scheduling: diff --git a/docs/src/cli/spawn.md b/docs/src/cli/spawn.md index c2a49f1..b67e7f2 100644 --- a/docs/src/cli/spawn.md +++ b/docs/src/cli/spawn.md @@ -153,6 +153,8 @@ tt spawn reviewer --role reviewer & - Repeats until `--max-rounds` reached 4. **Agent stops** with state `Stopped` +By default, each turn uses a fresh CLI subprocess. If `[agent].persistent = true` and the selected CLI has a streaming adapter, `tt agent-loop` keeps one long-lived subprocess alive across turns instead. Today that persistent path is available for Claude-style `stream-json` sessions; unsupported CLIs automatically fall back to the one-shot model. + ## Agent Naming Choose descriptive names. With `--role`, names no longer need to describe the role: diff --git a/src/agent.rs b/src/agent.rs index 773b177..819038e 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -758,6 +758,9 @@ pub struct Agent { /// Defaults to created_at if never set. #[serde(default = "chrono::Utc::now")] pub last_active_at: DateTime, + /// Persistent runtime session identifier for resume-capable CLIs. + #[serde(default)] + pub runtime_session_id: Option, } impl Agent { @@ -786,6 +789,7 @@ impl Agent { tasks_completed: 0, rounds_completed: 0, last_active_at: now, + runtime_session_id: None, } } @@ -864,6 +868,7 @@ impl Agent { tasks_completed: 0, rounds_completed: 0, last_active_at: now, + runtime_session_id: None, } } } diff --git a/src/agent_runtime.rs b/src/agent_runtime.rs new file mode 100644 index 0000000..760a0c2 --- /dev/null +++ b/src/agent_runtime.rs @@ -0,0 +1,978 @@ +/* + * Copyright (c) 2024-Present, Jeremy Plichta + * Licensed under the MIT License + */ + +//! Agent runtime adapters. + +use std::collections::{HashMap, VecDeque}; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::process::{Command, ExitStatus, Stdio}; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; + +use async_trait::async_trait; +use futures_core::Stream; +use serde_json::{Value, json}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::process::{Child as TokioChild, ChildStdin, ChildStdout, Command as TokioCommand}; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; + +use crate::Result; + +#[must_use] +pub fn supports_persistent_runtime(cli_name: &str) -> bool { + matches!(cli_name, "codex" | "codex-mini") +} + +#[must_use] +pub fn build_cli_command(cli_name: &str, cli_cmd: &str, prompt_file: &Path) -> String { + let prompt_file = shell_quote(prompt_file); + if cli_name == "auggie" { + format!("{} --instruction-file {}", cli_cmd, prompt_file) + } else { + format!("cat {} | {}", prompt_file, cli_cmd) + } +} + +#[must_use] +fn shell_quote(path: &Path) -> String { + shell_quote_str(&path.to_string_lossy()) +} + +#[must_use] +fn shell_quote_str(value: &str) -> String { + format!("'{}'", value.replace('\'', "'\"'\"'")) +} + +#[derive(Debug, Clone)] +pub struct AgentTurn { + pub prompt: String, + pub prompt_file: PathBuf, + pub output_file: PathBuf, +} + +#[derive(Debug, Clone)] +pub enum AgentInput { + UserMessage(AgentTurn), + UrgentMessage(AgentTurn), + Cancel, +} + +#[derive(Debug, Clone)] +pub enum AgentEvent { + SessionReady { + session_id: String, + }, + TurnStarted, + AssistantDelta(String), + ToolCall { + name: String, + args: serde_json::Value, + }, + TurnCompleted { + summary: Option, + }, + AwaitingInput, + SessionError(String), + Exited(ExitStatus), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ShutdownMode { + Graceful, + Immediate, +} + +#[derive(Debug)] +pub struct AgentTurnResult { + pub status: std::io::Result, +} + +pub type AgentEventStream<'a> = Pin + Send + 'a>>; + +pub struct AgentEventReceiver { + rx: mpsc::UnboundedReceiver, +} + +impl AgentEventReceiver { + pub async fn recv(&mut self) -> Option { + self.rx.recv().await + } +} + +pub struct AgentRuntime { + pub agent: RuntimeAgent, + pub events: AgentEventReceiver, +} + +pub struct RuntimeAgent { + inner: Box, + event_tx: mpsc::UnboundedSender, +} + +impl RuntimeAgent { + fn new(inner: Box, event_tx: mpsc::UnboundedSender) -> Self { + Self { inner, event_tx } + } + + pub async fn send(&mut self, input: AgentInput) -> Result { + let result = self.inner.send(input).await; + self.flush_events(); + result + } + + pub async fn shutdown(&mut self, mode: ShutdownMode) -> Result> { + let result = self.inner.shutdown(mode).await; + self.flush_events(); + result + } + + #[must_use] + pub fn session_id(&self) -> Option { + self.inner.session_id() + } + + fn flush_events(&mut self) { + for event in self.inner.take_events() { + let _ = self.event_tx.send(event); + } + } +} + +#[async_trait] +pub trait CodingAgent: Send { + async fn send(&mut self, input: AgentInput) -> Result; + fn events(&mut self) -> AgentEventStream<'_>; + fn take_events(&mut self) -> Vec; + async fn shutdown(&mut self, mode: ShutdownMode) -> Result>; + + fn session_id(&self) -> Option { + None + } +} + +#[derive(Debug, Clone)] +pub struct OneShotAgentConfig { + pub cli_name: String, + pub cli_cmd: String, + pub workdir: PathBuf, + pub env: Vec<(String, String)>, +} + +#[derive(Debug, Clone)] +pub struct StreamingAgentConfig { + pub cli_name: String, + pub cli_cmd: String, + pub workdir: PathBuf, + pub env: Vec<(String, String)>, + pub resume_session_id: Option, +} + +#[derive(Debug)] +pub struct OneShotAgent { + config: OneShotAgentConfig, + events: VecDeque, + last_exit_status: Option, +} + +impl OneShotAgent { + #[must_use] + pub fn new(config: OneShotAgentConfig) -> Self { + Self { + config, + events: VecDeque::new(), + last_exit_status: None, + } + } + + #[must_use] + pub fn runtime(config: OneShotAgentConfig) -> AgentRuntime { + let (tx, rx) = mpsc::unbounded_channel(); + AgentRuntime { + agent: RuntimeAgent::new(Box::new(Self::new(config)), tx), + events: AgentEventReceiver { rx }, + } + } + + fn run_turn(&mut self, turn: AgentTurn) -> Result { + self.events.push_back(AgentEvent::TurnStarted); + std::fs::write(&turn.prompt_file, &turn.prompt)?; + + let status = (|| -> std::io::Result { + let output = std::fs::File::create(&turn.output_file)?; + let shell_cmd = build_cli_command( + &self.config.cli_name, + &self.config.cli_cmd, + &turn.prompt_file, + ); + let mut cmd = Command::new("sh"); + cmd.arg("-c") + .arg(&shell_cmd) + .current_dir(&self.config.workdir) + .stdin(Stdio::null()) + .stdout(output.try_clone()?) + .stderr(output); + + for (key, value) in &self.config.env { + cmd.env(key, value); + } + + cmd.status() + })(); + + let _ = std::fs::remove_file(&turn.prompt_file); + + match &status { + Ok(exit_status) => { + self.last_exit_status = Some(exit_status.clone()); + self.events + .push_back(AgentEvent::Exited(exit_status.clone())); + if exit_status.success() { + self.events + .push_back(AgentEvent::TurnCompleted { summary: None }); + } else { + self.events.push_back(AgentEvent::SessionError(format!( + "CLI exited with status {}", + describe_exit_status(exit_status) + ))); + } + } + Err(err) => { + self.events.push_back(AgentEvent::SessionError(format!( + "Failed to run CLI: {}", + err + ))); + } + } + + Ok(AgentTurnResult { status }) + } +} + +#[async_trait] +impl CodingAgent for OneShotAgent { + async fn send(&mut self, input: AgentInput) -> Result { + match input { + AgentInput::UserMessage(turn) | AgentInput::UrgentMessage(turn) => self.run_turn(turn), + AgentInput::Cancel => { + self.events.push_back(AgentEvent::SessionError( + "Cancel is not supported by the one-shot runtime".to_string(), + )); + Ok(AgentTurnResult { + status: Err(std::io::Error::other( + "Cancel is not supported by the one-shot runtime", + )), + }) + } + } + } + + fn events(&mut self) -> AgentEventStream<'_> { + Box::pin(BufferedEventStream { + events: &mut self.events, + }) + } + + fn take_events(&mut self) -> Vec { + self.events.drain(..).collect() + } + + async fn shutdown(&mut self, _mode: ShutdownMode) -> Result> { + Ok(self.last_exit_status.clone()) + } +} + +#[derive(Debug, Clone)] +struct CodexAppServerLaunchConfig { + launch_cmd: String, + workdir: PathBuf, + env: Vec<(String, String)>, + resume_thread_id: Option, + model: Option, + reasoning_effort: Option, +} + +impl CodexAppServerLaunchConfig { + fn from_streaming(config: StreamingAgentConfig) -> Self { + let (model, reasoning_effort) = codex_model_overrides(&config.cli_name); + Self { + launch_cmd: build_persistent_launch_command(&config.cli_name, &config.cli_cmd), + workdir: config.workdir, + env: config.env, + resume_thread_id: config.resume_session_id, + model, + reasoning_effort, + } + } +} + +#[derive(Debug, Default)] +struct PendingRequestState { + next_id: u64, + pending: HashMap>>, +} + +#[derive(Debug)] +pub struct CodexAppServerAgent { + config: CodexAppServerLaunchConfig, + child: TokioChild, + stdin: Option, + events: VecDeque, + last_exit_status: Option, + thread_id: Arc>>, + active_turn_id: Arc>>, + output_files: Arc>>, + request_state: Arc>, + event_tx: mpsc::UnboundedSender, + reader_task: JoinHandle<()>, +} + +impl CodexAppServerAgent { + async fn spawn( + config: CodexAppServerLaunchConfig, + event_tx: mpsc::UnboundedSender, + ) -> Result { + let mut cmd = TokioCommand::new("sh"); + cmd.arg("-lc") + .arg(&config.launch_cmd) + .current_dir(&config.workdir) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()); + for (key, value) in &config.env { + cmd.env(key, value); + } + + let mut child = cmd.spawn()?; + let stdin = child + .stdin + .take() + .ok_or_else(|| std::io::Error::other("codex app-server stdin unavailable"))?; + let stdout = child + .stdout + .take() + .ok_or_else(|| std::io::Error::other("codex app-server stdout unavailable"))?; + + let thread_id = Arc::new(Mutex::new(None)); + let active_turn_id = Arc::new(Mutex::new(None)); + let output_files = Arc::new(Mutex::new(HashMap::new())); + let request_state = Arc::new(Mutex::new(PendingRequestState::default())); + let reader_task = spawn_codex_reader( + stdout, + event_tx.clone(), + thread_id.clone(), + active_turn_id.clone(), + output_files.clone(), + request_state.clone(), + ); + + let mut agent = Self { + config, + child, + stdin: Some(stdin), + events: VecDeque::new(), + last_exit_status: None, + thread_id, + active_turn_id, + output_files, + request_state, + event_tx, + reader_task, + }; + + agent.initialize().await?; + let thread_id = agent.start_or_resume_thread().await?; + let _ = agent.event_tx.send(AgentEvent::SessionReady { + session_id: thread_id, + }); + + Ok(agent) + } + + async fn initialize(&mut self) -> Result<()> { + let params = json!({ + "clientInfo": { + "name": "tinytown", + "title": "Tinytown", + "version": env!("CARGO_PKG_VERSION"), + } + }); + let _ = self.send_request("initialize", params).await?; + self.send_notification("initialized", json!({})).await?; + Ok(()) + } + + async fn start_or_resume_thread(&mut self) -> Result { + let params = json!({ + "approvalPolicy": "never", + "cwd": self.config.workdir, + "sandbox": "danger-full-access", + "model": self.config.model, + }); + + let result = if let Some(thread_id) = self.config.resume_thread_id.as_ref() { + let mut resume = params; + if let Some(object) = resume.as_object_mut() { + object.insert("threadId".to_string(), Value::String(thread_id.clone())); + } + self.send_request("thread/resume", resume).await? + } else { + self.send_request("thread/start", params).await? + }; + + let thread_id = result + .get("thread") + .and_then(|thread| thread.get("id")) + .and_then(Value::as_str) + .ok_or_else(|| std::io::Error::other("codex app-server thread id missing"))? + .to_string(); + *self.thread_id.lock().expect("thread id mutex poisoned") = Some(thread_id.clone()); + Ok(thread_id) + } + + async fn send_request(&mut self, method: &str, params: Value) -> Result { + let (request_id, rx) = { + let mut state = self + .request_state + .lock() + .expect("request state mutex poisoned"); + state.next_id += 1; + let request_id = state.next_id; + let (tx, rx) = oneshot::channel(); + state.pending.insert(request_id, tx); + (request_id, rx) + }; + + self.write_json(&json!({ + "id": request_id, + "method": method, + "params": params, + })) + .await?; + + match rx.await { + Ok(Ok(result)) => Ok(result), + Ok(Err(err)) => Err(err.into()), + Err(_) => Err(std::io::Error::other(format!( + "codex app-server request dropped before response: {}", + method + )) + .into()), + } + } + + async fn send_notification(&mut self, method: &str, params: Value) -> Result<()> { + self.write_json(&json!({ + "method": method, + "params": params, + })) + .await + } + + async fn write_json(&mut self, payload: &Value) -> Result<()> { + let stdin = self + .stdin + .as_mut() + .ok_or_else(|| std::io::Error::other("codex app-server stdin closed"))?; + stdin.write_all(payload.to_string().as_bytes()).await?; + stdin.write_all(b"\n").await?; + stdin.flush().await?; + Ok(()) + } + + async fn start_turn(&mut self, turn: AgentTurn) -> Result { + std::fs::write(&turn.prompt_file, &turn.prompt)?; + + let mut params = json!({ + "approvalPolicy": "never", + "cwd": self.config.workdir, + "threadId": self + .session_id() + .ok_or_else(|| std::io::Error::other("codex thread id missing"))?, + "input": [{"type": "text", "text": turn.prompt}], + }); + if let Some(object) = params.as_object_mut() { + if let Some(model) = self.config.model.as_ref() { + object.insert("model".to_string(), Value::String(model.clone())); + } + if let Some(effort) = self.config.reasoning_effort.as_ref() { + object.insert("effort".to_string(), Value::String(effort.clone())); + } + object.insert( + "sandboxPolicy".to_string(), + Value::String("danger-full-access".to_string()), + ); + } + + let result = self.send_request("turn/start", params).await; + let _ = std::fs::remove_file(&turn.prompt_file); + + match result { + Ok(payload) => { + let turn_id = payload + .get("turn") + .and_then(|value| value.get("id")) + .and_then(Value::as_str) + .ok_or_else(|| std::io::Error::other("codex turn id missing"))? + .to_string(); + *self + .active_turn_id + .lock() + .expect("active turn mutex poisoned") = Some(turn_id.clone()); + queue_output_file(&self.output_files, &turn_id, turn.output_file)?; + Ok(AgentTurnResult { + status: Ok(success_exit_status()), + }) + } + Err(err) => { + self.events.push_back(AgentEvent::SessionError(format!( + "Failed to start Codex turn: {}", + err + ))); + Ok(AgentTurnResult { + status: Err(as_io_error(err)), + }) + } + } + } + + async fn steer_turn(&mut self, turn: AgentTurn, turn_id: String) -> Result { + std::fs::write(&turn.prompt_file, &turn.prompt)?; + let result = self + .send_request( + "turn/steer", + json!({ + "threadId": self + .session_id() + .ok_or_else(|| std::io::Error::other("codex thread id missing"))?, + "expectedTurnId": turn_id, + "input": [{"type": "text", "text": turn.prompt}], + }), + ) + .await; + let _ = std::fs::remove_file(&turn.prompt_file); + + match result { + Ok(_) => Ok(AgentTurnResult { + status: Ok(success_exit_status()), + }), + Err(err) => { + self.events.push_back(AgentEvent::SessionError(format!( + "Failed to steer Codex turn: {}", + err + ))); + Ok(AgentTurnResult { + status: Err(as_io_error(err)), + }) + } + } + } + + async fn interrupt_active_turn(&mut self) -> Result<()> { + let Some(thread_id) = self.session_id() else { + return Ok(()); + }; + let Some(turn_id) = self + .active_turn_id + .lock() + .expect("active turn mutex poisoned") + .clone() + else { + return Ok(()); + }; + + let _ = self + .send_request( + "turn/interrupt", + json!({ + "threadId": thread_id, + "turnId": turn_id, + }), + ) + .await?; + Ok(()) + } +} + +fn spawn_codex_reader( + stdout: ChildStdout, + event_tx: mpsc::UnboundedSender, + thread_id: Arc>>, + active_turn_id: Arc>>, + output_files: Arc>>, + request_state: Arc>, +) -> JoinHandle<()> { + tokio::spawn(async move { + let mut lines = BufReader::new(stdout).lines(); + + while let Ok(Some(line)) = lines.next_line().await { + handle_codex_line( + &line, + &event_tx, + &thread_id, + &active_turn_id, + &output_files, + &request_state, + ); + } + + fail_pending_requests( + &request_state, + "codex app-server closed before sending a response", + ); + let _ = event_tx.send(AgentEvent::SessionError( + "Codex app-server stream closed unexpectedly".to_string(), + )); + let _ = event_tx.send(AgentEvent::Exited(exit_status_from_code(1))); + }) +} + +fn handle_codex_line( + line: &str, + event_tx: &mpsc::UnboundedSender, + thread_id: &Arc>>, + active_turn_id: &Arc>>, + output_files: &Arc>>, + request_state: &Arc>, +) { + let Ok(payload) = serde_json::from_str::(line) else { + let _ = event_tx.send(AgentEvent::SessionError(format!( + "Invalid codex app-server event: {}", + line + ))); + return; + }; + + if let Some(id) = payload.get("id").and_then(Value::as_u64) { + let response = if let Some(result) = payload.get("result") { + Ok(result.clone()) + } else if let Some(err) = payload.get("error") { + let message = err + .get("message") + .and_then(Value::as_str) + .unwrap_or("unknown JSON-RPC error"); + Err(std::io::Error::other(message.to_string())) + } else { + Err(std::io::Error::other("missing JSON-RPC result")) + }; + + if let Some(tx) = request_state + .lock() + .expect("request state mutex poisoned") + .pending + .remove(&id) + { + let _ = tx.send(response); + } + return; + } + + let Some(method) = payload.get("method").and_then(Value::as_str) else { + return; + }; + let params = payload.get("params").unwrap_or(&Value::Null); + + match method { + "thread/started" => { + if let Some(id) = params + .get("thread") + .and_then(|thread| thread.get("id")) + .and_then(Value::as_str) + { + *thread_id.lock().expect("thread id mutex poisoned") = Some(id.to_string()); + } + } + "turn/started" => { + if let Some(id) = params + .get("turn") + .and_then(|turn| turn.get("id")) + .and_then(Value::as_str) + { + *active_turn_id.lock().expect("active turn mutex poisoned") = Some(id.to_string()); + } + let _ = event_tx.send(AgentEvent::TurnStarted); + } + "item/agentMessage/delta" => { + if let Some(delta) = params.get("delta").and_then(Value::as_str) { + if let Some(turn_id) = params.get("turnId").and_then(Value::as_str) { + append_to_turn_output(output_files, turn_id, delta); + } + let _ = event_tx.send(AgentEvent::AssistantDelta(delta.to_string())); + } + } + "item/completed" => { + if let Some((name, args)) = + extract_tool_call(params.get("item").unwrap_or(&Value::Null)) + { + let _ = event_tx.send(AgentEvent::ToolCall { name, args }); + } + } + "turn/completed" => { + let turn_id = params + .get("turn") + .and_then(|turn| turn.get("id")) + .and_then(Value::as_str) + .map(str::to_string); + if let Some(turn_id) = turn_id.as_ref() { + finish_turn_output(output_files, turn_id); + let mut active = active_turn_id.lock().expect("active turn mutex poisoned"); + if active.as_deref() == Some(turn_id.as_str()) { + *active = None; + } + } + + let summary = params + .get("turn") + .and_then(extract_turn_summary) + .or_else(|| turn_id.map(|id| format!("turn {}", id))); + let _ = event_tx.send(AgentEvent::TurnCompleted { summary }); + let _ = event_tx.send(AgentEvent::AwaitingInput); + } + _ => {} + } +} + +fn fail_pending_requests(request_state: &Arc>, message: &str) { + let pending = { + let mut state = request_state.lock().expect("request state mutex poisoned"); + std::mem::take(&mut state.pending) + }; + + for (_, tx) in pending { + let _ = tx.send(Err(std::io::Error::other(message.to_string()))); + } +} + +fn extract_tool_call(item: &Value) -> Option<(String, Value)> { + match item.get("type").and_then(Value::as_str) { + Some("commandExecution") => Some(( + item.get("command") + .and_then(Value::as_str) + .unwrap_or("commandExecution") + .to_string(), + json!({ + "status": item.get("status").and_then(Value::as_str), + }), + )), + Some("mcpToolCall") => { + let server = item.get("server").and_then(Value::as_str).unwrap_or("mcp"); + let tool = item.get("tool").and_then(Value::as_str).unwrap_or("tool"); + Some(( + format!("{}:{}", server, tool), + item.get("arguments").cloned().unwrap_or(Value::Null), + )) + } + _ => None, + } +} + +fn extract_turn_summary(turn: &Value) -> Option { + if let Some(summary) = turn.get("summary").and_then(Value::as_str) + && !summary.trim().is_empty() + { + return Some(summary.trim().to_string()); + } + + turn.get("status") + .and_then(Value::as_str) + .map(|status| status.to_string()) +} + +fn queue_output_file( + output_files: &Arc>>, + turn_id: &str, + output_file: PathBuf, +) -> std::io::Result<()> { + let _ = std::fs::File::create(&output_file)?; + output_files + .lock() + .expect("output file mutex poisoned") + .insert(turn_id.to_string(), output_file); + Ok(()) +} + +fn append_to_turn_output( + output_files: &Arc>>, + turn_id: &str, + text: &str, +) { + let path = output_files + .lock() + .expect("output file mutex poisoned") + .get(turn_id) + .cloned(); + if let Some(path) = path + && let Ok(mut file) = std::fs::OpenOptions::new().append(true).open(path) + { + let _ = std::io::Write::write_all(&mut file, text.as_bytes()); + } +} + +fn finish_turn_output(output_files: &Arc>>, turn_id: &str) { + output_files + .lock() + .expect("output file mutex poisoned") + .remove(turn_id); +} + +#[must_use] +fn build_persistent_launch_command(cli_name: &str, cli_cmd: &str) -> String { + if supports_persistent_runtime(cli_name) && cli_cmd.trim_start().starts_with("codex") { + "codex app-server --listen stdio://".to_string() + } else { + cli_cmd.to_string() + } +} + +#[must_use] +fn codex_model_overrides(cli_name: &str) -> (Option, Option) { + match cli_name { + "codex-mini" => (Some("gpt-5.4-mini".to_string()), Some("medium".to_string())), + _ => (None, None), + } +} + +pub struct StreamingAgent; + +impl StreamingAgent { + pub async fn runtime(config: StreamingAgentConfig) -> Result { + if !supports_persistent_runtime(&config.cli_name) { + return Err(std::io::Error::other(format!( + "persistent runtime not supported for {}", + config.cli_name + )) + .into()); + } + + let (tx, rx) = mpsc::unbounded_channel(); + let agent = CodexAppServerAgent::spawn( + CodexAppServerLaunchConfig::from_streaming(config), + tx.clone(), + ) + .await?; + + Ok(AgentRuntime { + agent: RuntimeAgent::new(Box::new(agent), tx), + events: AgentEventReceiver { rx }, + }) + } +} + +#[async_trait] +impl CodingAgent for CodexAppServerAgent { + async fn send(&mut self, input: AgentInput) -> Result { + match input { + AgentInput::UserMessage(turn) => self.start_turn(turn).await, + AgentInput::UrgentMessage(turn) => { + let active_turn_id = self + .active_turn_id + .lock() + .expect("active turn mutex poisoned") + .clone(); + if let Some(active_turn_id) = active_turn_id { + self.steer_turn(turn, active_turn_id).await + } else { + self.start_turn(turn).await + } + } + AgentInput::Cancel => { + self.interrupt_active_turn().await?; + Ok(AgentTurnResult { + status: Ok(success_exit_status()), + }) + } + } + } + + fn events(&mut self) -> AgentEventStream<'_> { + Box::pin(BufferedEventStream { + events: &mut self.events, + }) + } + + fn take_events(&mut self) -> Vec { + self.events.drain(..).collect() + } + + async fn shutdown(&mut self, mode: ShutdownMode) -> Result> { + match mode { + ShutdownMode::Graceful => { + if let Some(mut stdin) = self.stdin.take() { + let _ = stdin.shutdown().await; + } + let status = match tokio::time::timeout( + std::time::Duration::from_secs(2), + self.child.wait(), + ) + .await + { + Ok(result) => result?, + Err(_) => { + self.child.start_kill()?; + self.child.wait().await? + } + }; + self.last_exit_status = Some(status.clone()); + self.reader_task.abort(); + self.events.push_back(AgentEvent::Exited(status.clone())); + Ok(Some(status)) + } + ShutdownMode::Immediate => { + let _ = self.interrupt_active_turn().await; + let _ = self.stdin.take(); + self.child.start_kill()?; + let status = self.child.wait().await?; + self.last_exit_status = Some(status.clone()); + self.reader_task.abort(); + self.events.push_back(AgentEvent::Exited(status.clone())); + Ok(Some(status)) + } + } + } + + fn session_id(&self) -> Option { + self.thread_id + .lock() + .expect("thread id mutex poisoned") + .clone() + } +} + +struct BufferedEventStream<'a> { + events: &'a mut VecDeque, +} + +impl Stream for BufferedEventStream<'_> { + type Item = AgentEvent; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.events.pop_front()) + } +} + +fn describe_exit_status(status: &ExitStatus) -> String { + status + .code() + .map(|code| code.to_string()) + .unwrap_or_else(|| "terminated by signal".to_string()) +} + +fn as_io_error(err: crate::error::Error) -> std::io::Error { + match err { + crate::error::Error::Io(inner) => inner, + other => std::io::Error::other(other.to_string()), + } +} + +#[must_use] +fn success_exit_status() -> ExitStatus { + exit_status_from_code(0) +} + +#[cfg(unix)] +fn exit_status_from_code(code: i32) -> ExitStatus { + std::os::unix::process::ExitStatusExt::from_raw(code << 8) +} diff --git a/src/channel.rs b/src/channel.rs index 3b235e9..3f098cd 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -394,6 +394,12 @@ impl Channel { fields_to_delete.push("parent_agent_id"); } + if let Some(ref session_id) = agent.runtime_session_id { + fields.push(("runtime_session_id".to_string(), session_id.clone())); + } else { + fields_to_delete.push("runtime_session_id"); + } + // Use a pipeline to make HDEL + HSET atomic let mut pipe = redis::pipe(); if !fields_to_delete.is_empty() { @@ -482,6 +488,7 @@ impl Channel { let nickname = fields.get("nickname").cloned(); let role_id = fields.get("role_id").cloned(); let parent_agent_id = fields.get("parent_agent_id").and_then(|s| s.parse().ok()); + let runtime_session_id = fields.get("runtime_session_id").cloned(); let spawn_mode: crate::agent::SpawnMode = fields .get("spawn_mode") .map(|s| serde_json::from_str(&format!("\"{}\"", s)).unwrap_or_default()) @@ -503,6 +510,7 @@ impl Channel { tasks_completed, rounds_completed, last_active_at, + runtime_session_id, }) } diff --git a/src/config.rs b/src/config.rs index 1c2d941..e4f215b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -109,6 +109,10 @@ pub struct AgentConfig { /// Seconds an idle worker should wait before draining and exiting. #[serde(default = "default_agent_idle_timeout_secs")] pub idle_timeout_secs: u64, + + /// Keep agent CLI subprocesses alive across turns when supported. + #[serde(default)] + pub persistent: bool, } /// Authentication mode for townhall. @@ -336,6 +340,7 @@ impl Default for AgentConfig { fn default() -> Self { Self { idle_timeout_secs: default_agent_idle_timeout_secs(), + persistent: false, } } } diff --git a/src/events.rs b/src/events.rs index 897e0d4..98229bc 100644 --- a/src/events.rs +++ b/src/events.rs @@ -41,6 +41,12 @@ pub enum EventType { AgentStateChanged, AgentSpawned, AgentStopped, + AgentTurnStarted, + AgentTurnCompleted, + AgentAwaitingInput, + AgentToolCall, + AgentAssistantDelta, + AgentRuntimeError, TaskAssigned, TaskCompleted, TaskFailed, diff --git a/src/lib.rs b/src/lib.rs index 19d2988..ed40b77 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,6 +43,7 @@ //! ``` pub mod agent; +pub mod agent_runtime; pub mod app; pub mod channel; pub mod config; @@ -61,6 +62,11 @@ pub mod town; pub use agent::{ Agent, AgentId, AgentState, AgentType, RoleDefinition, SpawnMode, builtin_roles, roles, }; +pub use agent_runtime::{ + AgentEvent, AgentEventReceiver, AgentInput, AgentRuntime, AgentTurn, AgentTurnResult, + CodingAgent, OneShotAgent, OneShotAgentConfig, ShutdownMode, StreamingAgent, + StreamingAgentConfig, build_cli_command, supports_persistent_runtime, +}; pub use app::audit::{AuditEvent, AuditResult, audit_middleware}; pub use app::auth::{AuthError, AuthState, Principal, auth_middleware, generate_api_key}; pub use app::mcp::{McpState, create_mcp_router}; diff --git a/src/main.rs b/src/main.rs index 5fb2783..666c23c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,31 +5,22 @@ //! Tinytown CLI - Simple multi-agent orchestration. +use std::collections::VecDeque; use std::path::{Path, PathBuf}; use clap::{Parser, Subcommand}; use tracing::{info, warn}; use tracing_subscriber::EnvFilter; -use tinytown::{AgentState, GlobalConfig, Result, Task, Town, plan}; +use tinytown::{ + AgentEvent, AgentInput, AgentRuntime, AgentState, AgentTurn, GlobalConfig, OneShotAgent, + OneShotAgentConfig, Result, ShutdownMode, StreamingAgent, StreamingAgentConfig, Task, Town, + build_cli_command, plan, +}; const TT_AGENT_ID_ENV: &str = "TINYTOWN_AGENT_ID"; const TT_AGENT_NAME_ENV: &str = "TINYTOWN_AGENT_NAME"; -/// Build a shell command to run an agent CLI with a prompt/instruction file. -/// Different CLIs have different ways to accept input: -/// - auggie: uses --instruction-file flag -/// - claude, codex, etc.: accept input via stdin (pipe or redirect) -fn build_cli_command(cli_name: &str, cli_cmd: &str, prompt_file: &std::path::Path) -> String { - if cli_name == "auggie" { - // Auggie uses --instruction-file flag - format!("{} --instruction-file '{}'", cli_cmd, prompt_file.display()) - } else { - // Other CLIs accept input via stdin - format!("cat '{}' | {}", prompt_file.display(), cli_cmd) - } -} - fn idle_timeout_elapsed( agent: &tinytown::Agent, idle_timeout_secs: u64, @@ -340,6 +331,667 @@ async fn format_actionable_section( section } +#[derive(Clone)] +struct PendingTurn { + display_round: u32, + actionable_messages: Vec<(tinytown::Message, bool)>, +} + +fn runtime_env(agent_id: tinytown::AgentId, name: &str) -> Vec<(String, String)> { + vec![ + (TT_AGENT_ID_ENV.to_string(), agent_id.to_string()), + (TT_AGENT_NAME_ENV.to_string(), name.to_string()), + ] +} + +async fn persist_runtime_session_id( + channel: &tinytown::Channel, + agent_id: tinytown::AgentId, + session_id: &str, +) -> Result<()> { + if let Some(mut agent) = channel.get_agent_state(agent_id).await? { + if agent.runtime_session_id.as_deref() != Some(session_id) { + agent.runtime_session_id = Some(session_id.to_string()); + channel.set_agent_state(&agent).await?; + } + } + Ok(()) +} + +async fn emit_agent_runtime_event( + channel: &tinytown::Channel, + agent_id: tinytown::AgentId, + event_type: tinytown::EventType, + message: impl Into, +) { + channel + .emit_event(&tinytown::TownEvent::new(event_type, message).with_agent(agent_id)) + .await; +} + +async fn requeue_pending_turns( + channel: &tinytown::Channel, + agent_id: tinytown::AgentId, + pending_turns: &mut VecDeque, +) -> Result<()> { + let mut requeued = 0usize; + while let Some(turn) = pending_turns.pop_front() { + for (msg, was_urgent) in &turn.actionable_messages { + if *was_urgent { + channel.send_urgent(msg).await?; + } else { + channel.send(msg).await?; + } + requeued += 1; + } + } + + if requeued > 0 { + info!(" ↩️ Re-queued {} actionable message(s)", requeued); + } + + if let Some(mut agent) = channel.get_agent_state(agent_id).await? { + if let Some(task_id) = agent.current_task + && let Some(mut task) = channel.get_task(task_id).await? + && task.state == tinytown::TaskState::Running + { + task.assign(agent_id); + channel.set_task(&task).await?; + } + agent.current_task = None; + channel.set_agent_state(&agent).await?; + } + + Ok(()) +} + +async fn build_streaming_runtime( + town_path: &Path, + cli_name: &str, + cli_cmd: &str, + agent_id: tinytown::AgentId, + name: &str, + resume_session_id: Option, +) -> Result { + StreamingAgent::runtime(StreamingAgentConfig { + cli_name: cli_name.to_string(), + cli_cmd: cli_cmd.to_string(), + workdir: town_path.to_path_buf(), + env: runtime_env(agent_id, name), + resume_session_id, + }) + .await +} + +async fn run_persistent_agent_loop( + town_path: &Path, + config: &tinytown::Config, + channel: &tinytown::Channel, + name: &str, + agent_id: tinytown::AgentId, + max_rounds: u32, + cli_name: &str, + cli_cmd: &str, + idle_timeout_secs: u64, +) -> Result<()> { + use std::time::Duration; + + let resume_session_id = channel + .get_agent_state(agent_id) + .await? + .and_then(|agent| agent.runtime_session_id); + let AgentRuntime { + agent: mut runtime, + mut events, + } = build_streaming_runtime( + town_path, + cli_name, + cli_cmd, + agent_id, + name, + resume_session_id, + ) + .await?; + let mut runtime_alive = true; + let mut round: u32 = 0; + let mut pending_turns: VecDeque = VecDeque::new(); + let mut tick = tokio::time::interval(Duration::from_millis(250)); + tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + loop { + if round >= max_rounds && pending_turns.is_empty() { + break; + } + + tokio::select! { + maybe_event = events.recv() => { + let Some(event) = maybe_event else { + runtime_alive = false; + continue; + }; + + match event { + AgentEvent::SessionReady { session_id } => { + persist_runtime_session_id(channel, agent_id, &session_id).await?; + } + AgentEvent::TurnStarted => { + emit_agent_runtime_event( + channel, + agent_id, + tinytown::EventType::AgentTurnStarted, + "Agent turn started", + ) + .await; + } + AgentEvent::AssistantDelta(delta) => { + emit_agent_runtime_event( + channel, + agent_id, + tinytown::EventType::AgentAssistantDelta, + format!("Assistant delta: {}", truncate_summary(&delta, 160)), + ) + .await; + } + AgentEvent::ToolCall { name: tool_name, .. } => { + channel + .log_agent_activity( + agent_id, + &format!("πŸ”§ Tool call: {}", tool_name), + ) + .await?; + emit_agent_runtime_event( + channel, + agent_id, + tinytown::EventType::AgentToolCall, + format!("Tool call: {}", tool_name), + ) + .await; + } + AgentEvent::TurnCompleted { summary } => { + let Some(turn) = pending_turns.pop_front() else { + continue; + }; + round += 1; + let summary_text = summary + .as_deref() + .map(|text| truncate_summary(text, 100)) + .unwrap_or_else(|| "completed".to_string()); + channel + .log_agent_activity( + agent_id, + &format!("Round {}: βœ… {}", turn.display_round, summary_text), + ) + .await?; + emit_agent_runtime_event( + channel, + agent_id, + tinytown::EventType::AgentTurnCompleted, + format!("Round {} completed: {}", turn.display_round, summary_text), + ) + .await; + + if let Some(mut agent) = channel.get_agent_state(agent_id).await? { + clear_terminal_current_task(channel, &mut agent).await?; + let now = chrono::Utc::now(); + if agent.state != AgentState::Paused { + agent.state = AgentState::Idle; + agent.last_active_at = now; + } + agent.rounds_completed += 1; + agent.last_heartbeat = now; + agent.runtime_session_id = runtime.session_id(); + channel.set_agent_state(&agent).await?; + } + } + AgentEvent::AwaitingInput => { + emit_agent_runtime_event( + channel, + agent_id, + tinytown::EventType::AgentAwaitingInput, + "Agent runtime is awaiting input", + ) + .await; + } + AgentEvent::SessionError(err) => { + channel + .log_agent_activity( + agent_id, + &format!("Persistent runtime: {}", truncate_summary(&err, 140)), + ) + .await?; + emit_agent_runtime_event( + channel, + agent_id, + tinytown::EventType::AgentRuntimeError, + format!("Persistent runtime error: {}", truncate_summary(&err, 160)), + ) + .await; + } + AgentEvent::Exited(status) => { + runtime_alive = false; + if !status.success() && !pending_turns.is_empty() { + channel + .log_agent_activity( + agent_id, + &format!( + "Persistent runtime exited with error {}; re-queueing in-flight work", + status + ), + ) + .await?; + requeue_pending_turns(channel, agent_id, &mut pending_turns).await?; + } + } + } + } + _ = tick.tick() => { + if channel.should_stop(agent_id).await? { + info!(" πŸ›‘ Stop requested, shutting down persistent runtime..."); + channel + .log_agent_activity(agent_id, "πŸ›‘ stopped by request") + .await?; + let _ = runtime.shutdown(ShutdownMode::Immediate).await?; + requeue_pending_turns(channel, agent_id, &mut pending_turns).await?; + channel.clear_stop(agent_id).await?; + break; + } + + if let Some(agent_state) = channel.get_agent_state(agent_id).await? + && agent_state.state == AgentState::Paused + { + if runtime_alive && !pending_turns.is_empty() { + info!(" ⏸️ Agent interrupted; stopping live session and re-queueing work"); + let _ = runtime.shutdown(ShutdownMode::Immediate).await?; + runtime_alive = false; + requeue_pending_turns(channel, agent_id, &mut pending_turns).await?; + } + continue; + } + + if !runtime_alive { + let resume_session_id = channel + .get_agent_state(agent_id) + .await? + .and_then(|agent| agent.runtime_session_id); + let AgentRuntime { + agent: new_runtime, + events: new_events, + } = build_streaming_runtime( + town_path, + cli_name, + cli_cmd, + agent_id, + name, + resume_session_id, + ) + .await?; + runtime = new_runtime; + events = new_events; + runtime_alive = true; + } + + if pending_turns.is_empty() { + let display_round = round + 1; + let urgent_messages = channel.receive_urgent(agent_id).await?; + let regular_messages = channel.drain_inbox(agent_id).await?; + let backlog_snapshot = backlog_snapshot_for_agent(channel, name, 8).await?; + + if regular_messages.is_empty() && urgent_messages.is_empty() { + if let Some(mut agent) = channel.get_agent_state(agent_id).await? { + clear_terminal_current_task(channel, &mut agent).await?; + let now = chrono::Utc::now(); + let became_idle = + agent.state != AgentState::Paused && agent.state != AgentState::Idle; + if agent.state != AgentState::Paused { + agent.state = AgentState::Idle; + } + if became_idle { + agent.last_active_at = now; + } + agent.last_heartbeat = now; + + if idle_timeout_elapsed(&agent, idle_timeout_secs, now) { + info!( + " πŸ”» Idle timeout reached after {}s, draining and stopping...", + idle_timeout_secs + ); + agent.state = AgentState::Draining; + channel.set_agent_state(&agent).await?; + channel + .log_agent_activity( + agent_id, + &format!( + "πŸ”» Idle timeout reached after {}s; draining and stopping", + idle_timeout_secs + ), + ) + .await?; + let _ = runtime.shutdown(ShutdownMode::Graceful).await?; + break; + } + + channel.set_agent_state(&agent).await?; + } + continue; + } + + let mut breakdown = MessageBreakdown::default(); + let mut actionable_messages: Vec<(tinytown::Message, bool)> = Vec::new(); + let mut informational_summaries: Vec = Vec::new(); + let mut confirmation_counts: std::collections::BTreeMap = + std::collections::BTreeMap::new(); + + for msg in urgent_messages { + breakdown.count(&msg.msg_type); + match classify_message(&msg.msg_type) { + MessageCategory::Task + | MessageCategory::Query + | MessageCategory::OtherActionable => { + actionable_messages.push((msg, true)); + } + MessageCategory::Informational => { + informational_summaries + .push(truncate_summary(&summarize_message(&msg.msg_type), 100)); + } + MessageCategory::Confirmation => { + let key = truncate_summary(&summarize_message(&msg.msg_type), 60); + *confirmation_counts.entry(key).or_insert(0) += 1; + } + } + } + + for msg in regular_messages { + breakdown.count(&msg.msg_type); + match classify_message(&msg.msg_type) { + MessageCategory::Task + | MessageCategory::Query + | MessageCategory::OtherActionable => { + actionable_messages.push((msg, false)); + } + MessageCategory::Informational => { + informational_summaries + .push(truncate_summary(&summarize_message(&msg.msg_type), 100)); + } + MessageCategory::Confirmation => { + let key = truncate_summary(&summarize_message(&msg.msg_type), 60); + *confirmation_counts.entry(key).or_insert(0) += 1; + } + } + } + + let urgent_present = actionable_messages.iter().any(|(_, urgent)| *urgent); + let regular_present = actionable_messages.iter().any(|(_, urgent)| !*urgent); + if urgent_present && regular_present { + let mut deferred_urgent = Vec::new(); + actionable_messages.retain(|(msg, urgent)| { + if *urgent { + deferred_urgent.push(msg.clone()); + false + } else { + true + } + }); + for msg in deferred_urgent { + channel.send_urgent(&msg).await?; + } + } + + if actionable_messages.is_empty() { + if backlog_snapshot.total_matching > 0 { + actionable_messages.push(( + tinytown::Message::new( + tinytown::AgentId::supervisor(), + agent_id, + tinytown::MessageType::Query { + question: format!( + "No direct assignments right now. Backlog has {} role-matching task(s): review and claim one with `tt backlog claim {}`.", + backlog_snapshot.total_matching, name + ), + }, + ), + false, + )); + } else if backlog_snapshot.total_backlog > 0 { + channel + .log_agent_activity( + agent_id, + &format!( + "Round {}: ⏭️ no direct work and {} backlog task(s) did not match role hint", + display_round, backlog_snapshot.total_backlog + ), + ) + .await?; + continue; + } else { + channel + .log_agent_activity( + agent_id, + &format!( + "Round {}: ⏭️ auto-handled {} informational, {} confirmations", + display_round, + informational_summaries.len(), + breakdown.confirmations + ), + ) + .await?; + continue; + } + } + + let urgent_actionable = actionable_messages + .iter() + .filter(|(_, urgent)| *urgent) + .count(); + track_current_task_for_round(channel, agent_id, &actionable_messages).await?; + let actionable_section = + format_actionable_section(channel, &actionable_messages).await; + + let informational_section = if informational_summaries.is_empty() { + String::new() + } else { + let mut section = String::from("\n## Informational (batched summary)\n\n"); + for summary in informational_summaries.iter().take(8) { + section.push_str(&format!("- {}\n", summary)); + } + if informational_summaries.len() > 8 { + section.push_str(&format!( + "- ...and {} more informational message(s)\n", + informational_summaries.len() - 8 + )); + } + section + }; + + let confirmation_section = if confirmation_counts.is_empty() { + String::new() + } else { + let mut section = String::from("\n## Confirmations (auto-dismissed)\n\n"); + for (kind, count) in &confirmation_counts { + section.push_str(&format!("- {} x{}\n", kind, count)); + } + section + }; + + let role_hint = backlog_role_hint(name); + let backlog_section = { + let mut section = format!( + "\n## Backlog Snapshot\n\n- Total backlog tasks: {}\n- Role-matching backlog tasks: {}\n- Role match hint: {}\n", + backlog_snapshot.total_backlog, backlog_snapshot.total_matching, role_hint + ); + if backlog_snapshot.total_matching > 0 { + section.push_str("\nReview and claim role-matching items:\n"); + for (task_id, task) in &backlog_snapshot.tasks { + let tags = if task.tags.is_empty() { + String::new() + } else { + format!(" [{}]", task.tags.join(", ")) + }; + section.push_str(&format!( + "- {} - {}{}\n", + task_id, + truncate_summary(&task.description, 90), + tags + )); + } + } + section + }; + + let prompt = format!( + r#"# Agent: {name} + +You are agent "{name}" in Tinytown "{town_name}". + +{actionable_section}{informational_section}{confirmation_section} +## Available Commands + +```bash +tt status # Check town status and all agents +tt assign "task" # Assign actionable work +tt backlog list # Review unassigned backlog tasks +tt backlog claim {agent_name} # Claim a backlog task for yourself +tt send --query "question" # Ask for a response +tt send --info "update" # Send FYI update +tt send --ack "received" # Send acknowledgment +tt send --urgent --query "..." # Priority message for the live session +tt task current # Show your tracked current assignment +tt task complete --result "summary" # Mark a task as done +``` + +{backlog_section} +## Current State +- Round: {display_round}/{max_rounds} +- Actionable messages: {actionable_count} +- Urgent actionable: {urgent_actionable} +- Batched informational: {info_count} +- Auto-dismissed confirmations: {confirmation_count} + +## Your Workflow + +1. Handle all actionable messages listed above. +2. If you have no direct assignment or extra capacity, review backlog and claim one role-matching task. +3. Claim only work that matches your role hint; do not claim unrelated tasks. +4. Prefer direct agent-to-agent messages for concrete execution handoffs, review requests, and unblock checks. +5. Use `supervisor` / `conductor` when you need human guidance, priority changes, broader sequencing, escalation, or town-wide visibility. +6. If blocked, send a query with specific unblock needs. +7. Use `tt task current` to confirm the real Tinytown task id before completing work. +8. When finished with a task, mark it complete: `tt task complete --result \"what was done\"` +9. Send informational updates or confirmations as appropriate, including FYI summaries to supervisor/conductor when the conductor should stay informed. +"#, + name = name, + agent_name = name, + town_name = config.name, + actionable_section = actionable_section, + informational_section = informational_section, + confirmation_section = confirmation_section, + backlog_section = backlog_section, + display_round = display_round, + max_rounds = max_rounds, + actionable_count = actionable_messages.len(), + urgent_actionable = urgent_actionable, + info_count = informational_summaries.len(), + confirmation_count = breakdown.confirmations, + ); + + if let Some(mut agent) = channel.get_agent_state(agent_id).await? { + agent.state = AgentState::Working; + agent.last_active_at = chrono::Utc::now(); + agent.last_heartbeat = chrono::Utc::now(); + agent.runtime_session_id = runtime.session_id(); + channel.set_agent_state(&agent).await?; + } + if let Some(mut task) = + tinytown::TaskService::current_for_agent(channel, agent_id).await? + && !task.state.is_terminal() + && task.state != tinytown::TaskState::Running + { + task.start(); + channel.set_task(&task).await?; + } + + let turn = AgentTurn { + prompt, + prompt_file: town_path.join(format!(".tt/agent_{}_prompt.md", name)), + output_file: town_path.join(format!(".tt/logs/{}_round_{}.log", name, display_round)), + }; + runtime.send(AgentInput::UserMessage(turn)).await?; + pending_turns.push_back(PendingTurn { + display_round, + actionable_messages, + }); + } else { + let urgent_messages = channel.receive_urgent(agent_id).await?; + if urgent_messages.is_empty() { + continue; + } + + let mut actionable_messages = Vec::new(); + for msg in urgent_messages { + if matches!( + classify_message(&msg.msg_type), + MessageCategory::Task + | MessageCategory::Query + | MessageCategory::OtherActionable + ) { + actionable_messages.push((msg, true)); + } + } + if actionable_messages.is_empty() { + continue; + } + + let display_round = pending_turns + .front() + .map(|turn| turn.display_round) + .unwrap_or(round + 1); + let actionable_section = + format_actionable_section(channel, &actionable_messages).await; + let prompt = format!( + "# Agent: {name}\n\nYou are in a live Tinytown session. Prioritize this urgent follow-up immediately.\n\n{actionable_section}\n## Current State\n- Round: {display_round}/{max_rounds}\n- This message arrived while you were already working.\n", + name = name, + actionable_section = actionable_section, + display_round = display_round, + max_rounds = max_rounds, + ); + let turn = AgentTurn { + prompt, + prompt_file: town_path.join(format!(".tt/agent_{}_prompt.md", name)), + output_file: town_path.join(format!(".tt/logs/{}_round_{}.log", name, display_round)), + }; + runtime.send(AgentInput::UrgentMessage(turn)).await?; + if let Some(existing_turn) = pending_turns.back_mut() { + existing_turn + .actionable_messages + .extend(actionable_messages); + } else { + pending_turns.push_back(PendingTurn { + display_round, + actionable_messages, + }); + } + } + } + } + } + + let _ = runtime.shutdown(ShutdownMode::Graceful).await?; + + if let Some(mut agent) = channel.get_agent_state(agent_id).await? { + agent.state = AgentState::Stopped; + agent.last_heartbeat = chrono::Utc::now(); + agent.runtime_session_id = runtime.session_id(); + channel.set_agent_state(&agent).await?; + info!( + "🏁 Agent '{}' finished: {} rounds, {} tasks", + name, agent.rounds_completed, agent.tasks_completed + ); + } else { + info!("🏁 Agent '{}' finished after {} rounds", name, max_rounds); + } + + Ok(()) +} + #[derive(Parser)] #[command(name = "tt")] #[command(author, version, about = "Tinytown - Simple multi-agent orchestration using Redis", long_about = None)] @@ -3029,6 +3681,38 @@ async fn main() -> Result<()> { ); info!(" CLI: {} ({})", cli_name, cli_cmd); info!(" Idle timeout: {}s", idle_timeout_secs); + if config.agent.persistent && tinytown::supports_persistent_runtime(&cli_name) { + info!(" Runtime: persistent Codex app-server session"); + run_persistent_agent_loop( + &cli.town, + config, + channel, + &name, + agent_id, + max_rounds, + &cli_name, + &cli_cmd, + idle_timeout_secs, + ) + .await?; + return Ok(()); + } else if config.agent.persistent { + info!( + " Runtime: persistent requested but unsupported for {}; falling back to one-shot", + cli_name + ); + } + info!(" Runtime: one-shot subprocess per turn"); + + let AgentRuntime { + agent: mut runtime, + events: _events, + } = OneShotAgent::runtime(OneShotAgentConfig { + cli_name: cli_name.clone(), + cli_cmd: cli_cmd.clone(), + workdir: cli.town.clone(), + env: runtime_env(agent_id, &name), + }); // Use manual counter - only increment AFTER CLI execution (fixes round-burning bug) let mut round: u32 = 0; @@ -3353,10 +4037,6 @@ Only run commands needed to complete listed work; inbox messages for this round confirmation_count = breakdown.confirmations, ); - // Write prompt to temp file (under .tt/) - let prompt_file = cli.town.join(format!(".tt/agent_{}_prompt.md", name)); - std::fs::write(&prompt_file, &prompt)?; - // Update agent state to working if let Some(mut agent) = channel.get_agent_state(agent_id).await? { agent.state = AgentState::Working; @@ -3375,25 +4055,14 @@ Only run commands needed to complete listed work; inbox messages for this round // Run the agent CLI info!(" πŸ€– Running {}...", cli_name); - let output_file = cli - .town - .join(format!(".tt/logs/{}_round_{}.log", name, display_round)); - let output = std::fs::File::create(&output_file)?; - - let shell_cmd = build_cli_command(&cli_name, &cli_cmd, &prompt_file); - let status = std::process::Command::new("sh") - .arg("-c") - .arg(&shell_cmd) - .current_dir(&cli.town) - .env(TT_AGENT_ID_ENV, agent_id.to_string()) - .env(TT_AGENT_NAME_ENV, &name) - .stdin(std::process::Stdio::null()) - .stdout(output.try_clone()?) - .stderr(output) - .status(); - - // Clean up prompt file - let _ = std::fs::remove_file(&prompt_file); + let turn = AgentTurn { + prompt, + prompt_file: cli.town.join(format!(".tt/agent_{}_prompt.md", name)), + output_file: cli + .town + .join(format!(".tt/logs/{}_round_{}.log", name, display_round)), + }; + let status = runtime.send(AgentInput::UserMessage(turn)).await?.status; // Log activity and result let activity_msg = match &status { @@ -3470,6 +4139,8 @@ Only run commands needed to complete listed work; inbox messages for this round tokio::time::sleep(Duration::from_secs(1)).await; } + let _ = runtime.shutdown(ShutdownMode::Graceful).await?; + // Mark agent as stopped with final stats if let Some(mut agent) = channel.get_agent_state(agent_id).await? { agent.state = AgentState::Stopped; @@ -3993,6 +4664,12 @@ Now, help the user orchestrate their project! tinytown::events::EventType::AgentStopped | tinytown::events::EventType::AgentCompleted => "🏁", tinytown::events::EventType::AgentStateChanged => "πŸ”„", + tinytown::events::EventType::AgentTurnStarted => "▢️", + tinytown::events::EventType::AgentTurnCompleted => "βœ…", + tinytown::events::EventType::AgentAwaitingInput => "⏸️", + tinytown::events::EventType::AgentToolCall => "πŸ”§", + tinytown::events::EventType::AgentAssistantDelta => "πŸ’¬", + tinytown::events::EventType::AgentRuntimeError => "⚠️", tinytown::events::EventType::TaskAssigned => "πŸ“Œ", tinytown::events::EventType::TaskCompleted => "βœ…", tinytown::events::EventType::TaskFailed => "❌", diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 2365d1f..53078c7 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -15,8 +15,8 @@ use std::time::Duration; use tempfile::TempDir; use tinytown::message::MessageType; use tinytown::{ - Agent, AgentId, AgentState, AgentType, Message, Priority, Task, TaskId, TaskService, TaskState, - Town, + Agent, AgentId, AgentState, AgentType, EventType, Message, Priority, Task, TaskId, TaskService, + TaskState, Town, }; use uuid::Uuid; @@ -73,6 +73,239 @@ fn reserve_unused_port() -> Result> { Ok(port) } +fn shell_quote(value: &str) -> String { + format!("'{}'", value.replace('\'', "'\"'\"'")) +} + +fn write_fake_codex_app_server( + town_path: &std::path::Path, + start_count_path: &std::path::Path, + input_log_path: &std::path::Path, +) -> Result> { + let script_path = town_path.join("fake_codex_app_server.py"); + std::fs::write( + &script_path, + r#"#!/usr/bin/env python3 +import json +import os +import re +import sys +import time +import threading + +thread_id = "fake-thread-1" +next_turn_id = 0 +active_turn = None +state_lock = threading.Lock() +write_lock = threading.Lock() + +start_count_path = os.environ.get("TINYTOWN_FAKE_START_COUNT") +if start_count_path: + with open(start_count_path, "a", encoding="utf-8") as handle: + handle.write("start\n") + +def write_json(payload): + with write_lock: + sys.stdout.write(json.dumps(payload) + "\n") + sys.stdout.flush() + +def log_input(text): + input_log_path = os.environ.get("TINYTOWN_FAKE_INPUT_LOG") + if not input_log_path: + return + with open(input_log_path, "a", encoding="utf-8") as handle: + handle.write(text) + handle.write("\n---\n") + +def turn_worker(turn_id, initial_text, cancel_event, extras): + global active_turn + sleep_ms = 0 + match = re.search(r"SLEEP_MS=(\d+)", initial_text) + if match: + sleep_ms = int(match.group(1)) + + elapsed = 0 + while elapsed < sleep_ms: + if cancel_event.is_set(): + break + time.sleep(0.05) + elapsed += 50 + + if cancel_event.is_set(): + write_json({ + "method": "turn/completed", + "params": { + "threadId": thread_id, + "turn": {"id": turn_id, "status": "interrupted"} + } + }) + with state_lock: + active_turn = None + return + + merged_text = initial_text + "\n" + "\n".join(extras) + + if "TOOL_CALL=1" in merged_text: + write_json({ + "method": "item/completed", + "params": { + "threadId": thread_id, + "turnId": turn_id, + "item": { + "id": "tool-1", + "type": "commandExecution", + "command": "cat README.md", + "status": "completed" + } + } + }) + + summary = "handled urgent turn" if "URGENT" in merged_text else "handled turn" + write_json({ + "method": "item/agentMessage/delta", + "params": { + "threadId": thread_id, + "turnId": turn_id, + "itemId": "assistant-1", + "delta": summary + } + }) + write_json({ + "method": "turn/completed", + "params": { + "threadId": thread_id, + "turn": {"id": turn_id, "status": "completed"} + } + }) + + with state_lock: + active_turn = None + +for raw_line in sys.stdin: + raw_line = raw_line.strip() + if not raw_line: + continue + + payload = json.loads(raw_line) + method = payload.get("method") + request_id = payload.get("id") + params = payload.get("params", {}) + + if method == "initialize": + write_json({ + "id": request_id, + "result": { + "codexHome": "/tmp/fake-codex-home", + "platformFamily": "unix", + "platformOs": "macos", + "userAgent": "fake-codex" + } + }) + continue + + if method == "initialized": + continue + + if method == "thread/start": + write_json({ + "id": request_id, + "result": { + "approvalPolicy": "never", + "approvalsReviewer": "user", + "cwd": params.get("cwd", "."), + "model": params.get("model", "gpt-5.4-codex"), + "modelProvider": "openai", + "sandbox": {"mode": "danger-full-access"}, + "thread": {"id": thread_id} + } + }) + write_json({ + "method": "thread/started", + "params": {"thread": {"id": thread_id}} + }) + continue + + if method == "thread/resume": + write_json({ + "id": request_id, + "result": { + "approvalPolicy": "never", + "approvalsReviewer": "user", + "cwd": params.get("cwd", "."), + "model": params.get("model", "gpt-5.4-codex"), + "modelProvider": "openai", + "sandbox": {"mode": "danger-full-access"}, + "thread": {"id": params.get("threadId", thread_id)} + } + }) + write_json({ + "method": "thread/started", + "params": {"thread": {"id": params.get("threadId", thread_id)}} + }) + continue + + if method == "turn/start": + text = params["input"][0]["text"] + log_input(text) + with state_lock: + next_turn_id += 1 + turn_id = f"turn-{next_turn_id}" + cancel_event = threading.Event() + extras = [] + active_turn = { + "turn_id": turn_id, + "cancel_event": cancel_event, + "extras": extras, + } + write_json({ + "id": request_id, + "result": {"turn": {"id": turn_id, "status": "inProgress"}} + }) + write_json({ + "method": "turn/started", + "params": { + "threadId": thread_id, + "turn": {"id": turn_id, "status": "inProgress"} + } + }) + threading.Thread( + target=turn_worker, + args=(turn_id, text, cancel_event, extras), + daemon=True, + ).start() + continue + + if method == "turn/steer": + text = params["input"][0]["text"] + log_input(text) + with state_lock: + current_turn = active_turn + if current_turn is not None: + current_turn["extras"].append(text) + write_json({ + "id": request_id, + "result": {"turnId": params.get("expectedTurnId")} + }) + continue + + if method == "turn/interrupt": + with state_lock: + current_turn = active_turn + if current_turn is not None: + current_turn["cancel_event"].set() + write_json({"id": request_id, "result": {}}) + continue +"#, + )?; + + Ok(format!( + "TINYTOWN_FAKE_START_COUNT={} TINYTOWN_FAKE_INPUT_LOG={} python3 {}", + shell_quote(&start_count_path.display().to_string()), + shell_quote(&input_log_path.display().to_string()), + shell_quote(&script_path.display().to_string()), + )) +} + // ============================================================================ // TOWN INITIALIZATION AND CONFIGURATION TESTS // ============================================================================ @@ -5021,6 +5254,31 @@ idle_timeout_secs = 42 Ok(()) } +/// Test that the persistent agent runtime flag can be parsed from config TOML. +#[tokio::test] +async fn test_agent_persistent_config_parse() -> Result<(), Box> { + use tinytown::Config; + + let temp_dir = TempDir::new()?; + let config_path = temp_dir.path().join("tinytown.toml"); + + std::fs::write( + &config_path, + r#" +name = "agent-persistent-test" + +[agent] +persistent = true +"#, + )?; + + let config = Config::load(temp_dir.path())?; + assert!(config.agent.persistent); + assert_eq!(config.agent.idle_timeout_secs, 300); + + Ok(()) +} + /// Test that the worker loop exits cleanly after the idle timeout elapses. #[tokio::test] async fn test_agent_loop_exits_cleanly_after_idle_timeout() -> Result<(), Box> @@ -5059,6 +5317,48 @@ async fn test_agent_loop_exits_cleanly_after_idle_timeout() -> Result<(), Box Result<(), Box> { + let town = create_test_town("persistent-agent-loop-idle-timeout").await?; + let town_path = town.config().root.clone(); + + let mut config = tinytown::Config::load(&town_path)?; + config.agent.idle_timeout_secs = 1; + config.agent.persistent = true; + config.save()?; + + let handle = town.spawn_agent("idle-worker", "claude").await?; + let agent_id = handle.id(); + + let status = tokio::task::spawn_blocking(move || { + std::process::Command::new(env!("CARGO_BIN_EXE_tt")) + .arg("--town") + .arg(&town_path) + .arg("agent-loop") + .arg("idle-worker") + .arg(agent_id.to_string()) + .arg("100") + .status() + }) + .await??; + + assert!( + status.success(), + "persistent agent-loop should exit cleanly" + ); + + let agent = town + .channel() + .get_agent_state(agent_id) + .await? + .expect("idle worker should still be registered"); + assert_eq!(agent.state, AgentState::Stopped); + + Ok(()) +} + /// Test that a stale terminal current_task does not block worker idle timeout. #[tokio::test] async fn test_agent_loop_ignores_stale_terminal_current_task() @@ -5111,6 +5411,316 @@ async fn test_agent_loop_ignores_stale_terminal_current_task() Ok(()) } +/// Test that the persistent runtime reuses a single streaming child across turns. +#[tokio::test] +async fn test_agent_loop_persistent_runtime_reuses_single_streaming_process() +-> Result<(), Box> { + let town = create_test_town("agent-loop-persistent-reuse").await?; + let town_path = town.config().root.clone(); + let start_count_path = town_path.join("fake_codex.starts"); + let input_log_path = town_path.join("fake_codex.inputs"); + let fake_command = write_fake_codex_app_server(&town_path, &start_count_path, &input_log_path)?; + + let mut config = tinytown::Config::load(&town_path)?; + config.agent.persistent = true; + config.default_cli = "codex".to_string(); + config.agent_clis.insert( + "codex".to_string(), + tinytown::agent::AgentCli::new("codex", &fake_command), + ); + config.save()?; + + let handle = town.spawn_agent("persistent-worker", "codex").await?; + let agent_id = handle.id(); + + town.channel() + .send(&Message::new( + AgentId::supervisor(), + agent_id, + MessageType::Task { + description: "FIRST_TURN TOOL_CALL=1".to_string(), + }, + )) + .await?; + + let town_path_for_loop = town_path.clone(); + let loop_handle = tokio::task::spawn_blocking(move || { + std::process::Command::new(env!("CARGO_BIN_EXE_tt")) + .arg("--town") + .arg(&town_path_for_loop) + .arg("agent-loop") + .arg("persistent-worker") + .arg(agent_id.to_string()) + .arg("2") + .status() + }); + + tokio::time::timeout(Duration::from_secs(10), async { + loop { + if let Ok(inputs) = std::fs::read_to_string(&input_log_path) + && inputs.contains("FIRST_TURN") + { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await?; + town.channel() + .send(&Message::new( + AgentId::supervisor(), + agent_id, + MessageType::Task { + description: "SECOND_TURN".to_string(), + }, + )) + .await?; + + let status = loop_handle.await??; + assert!( + status.success(), + "persistent agent-loop should exit cleanly" + ); + + let starts = std::fs::read_to_string(start_count_path)?; + assert_eq!( + starts.lines().count(), + 1, + "streaming runtime should start once" + ); + + let inputs = std::fs::read_to_string(input_log_path)?; + assert!(inputs.contains("FIRST_TURN")); + assert!(inputs.contains("SECOND_TURN")); + + let agent = town + .channel() + .get_agent_state(agent_id) + .await? + .expect("persistent worker should still be registered"); + assert_eq!(agent.state, AgentState::Stopped); + assert_eq!(agent.rounds_completed, 2); + assert_eq!(agent.runtime_session_id.as_deref(), Some("fake-thread-1")); + + Ok(()) +} + +/// Test that urgent messages can be injected into a live persistent session. +#[tokio::test] +async fn test_agent_loop_persistent_runtime_processes_urgent_message_mid_turn() +-> Result<(), Box> { + let town = create_test_town("agent-loop-persistent-urgent").await?; + let town_path = town.config().root.clone(); + let start_count_path = town_path.join("fake_codex.starts"); + let input_log_path = town_path.join("fake_codex.inputs"); + let fake_command = write_fake_codex_app_server(&town_path, &start_count_path, &input_log_path)?; + + let mut config = tinytown::Config::load(&town_path)?; + config.agent.persistent = true; + config.default_cli = "codex".to_string(); + config.agent_clis.insert( + "codex".to_string(), + tinytown::agent::AgentCli::new("codex", &fake_command), + ); + config.save()?; + + let handle = town.spawn_agent("urgent-worker", "codex").await?; + let agent_id = handle.id(); + + town.channel() + .send(&Message::new( + AgentId::supervisor(), + agent_id, + MessageType::Task { + description: "FIRST_TURN SLEEP_MS=1500".to_string(), + }, + )) + .await?; + + let town_path_for_loop = town_path.clone(); + let loop_handle = tokio::task::spawn_blocking(move || { + std::process::Command::new(env!("CARGO_BIN_EXE_tt")) + .arg("--town") + .arg(&town_path_for_loop) + .arg("agent-loop") + .arg("urgent-worker") + .arg(agent_id.to_string()) + .arg("1") + .status() + }); + + tokio::time::sleep(Duration::from_millis(300)).await; + town.channel() + .send_urgent(&Message::new( + AgentId::supervisor(), + agent_id, + MessageType::Query { + question: "URGENT_PING".to_string(), + }, + )) + .await?; + + let status = loop_handle.await??; + assert!( + status.success(), + "persistent urgent agent-loop should exit cleanly" + ); + + let starts = std::fs::read_to_string(start_count_path)?; + assert_eq!( + starts.lines().count(), + 1, + "urgent turn should not restart runtime" + ); + + let inputs = std::fs::read_to_string(input_log_path)?; + assert!(inputs.contains("SLEEP_MS=1500")); + assert!(inputs.contains("URGENT_PING")); + + let agent = town + .channel() + .get_agent_state(agent_id) + .await? + .expect("urgent worker should still be registered"); + assert_eq!(agent.state, AgentState::Stopped); + assert_eq!(agent.rounds_completed, 1); + + Ok(()) +} + +/// Test that stop requests interrupt a live persistent session instead of waiting for the turn to end. +#[tokio::test] +async fn test_agent_loop_persistent_runtime_stop_interrupts_live_turn() +-> Result<(), Box> { + let town = create_test_town("agent-loop-persistent-stop").await?; + let town_path = town.config().root.clone(); + let start_count_path = town_path.join("fake_codex.starts"); + let input_log_path = town_path.join("fake_codex.inputs"); + let fake_command = write_fake_codex_app_server(&town_path, &start_count_path, &input_log_path)?; + + let mut config = tinytown::Config::load(&town_path)?; + config.agent.persistent = true; + config.default_cli = "codex".to_string(); + config.agent_clis.insert( + "codex".to_string(), + tinytown::agent::AgentCli::new("codex", &fake_command), + ); + config.save()?; + + let handle = town.spawn_agent("stop-worker", "codex").await?; + let agent_id = handle.id(); + + town.channel() + .send(&Message::new( + AgentId::supervisor(), + agent_id, + MessageType::Task { + description: "LONG_TURN SLEEP_MS=5000".to_string(), + }, + )) + .await?; + + let town_path_for_loop = town_path.clone(); + let started = std::time::Instant::now(); + let loop_handle = tokio::task::spawn_blocking(move || { + std::process::Command::new(env!("CARGO_BIN_EXE_tt")) + .arg("--town") + .arg(&town_path_for_loop) + .arg("agent-loop") + .arg("stop-worker") + .arg(agent_id.to_string()) + .arg("10") + .status() + }); + + tokio::time::sleep(Duration::from_millis(400)).await; + town.channel().request_stop(agent_id).await?; + + let status = loop_handle.await??; + assert!(status.success(), "persistent stop path should exit cleanly"); + assert!( + started.elapsed() < Duration::from_secs(4), + "stop request should interrupt the live turn quickly" + ); + + let agent = town + .channel() + .get_agent_state(agent_id) + .await? + .expect("stop worker should still be registered"); + assert_eq!(agent.state, AgentState::Stopped); + + Ok(()) +} + +/// Test that persistent runtime events are emitted onto the per-agent Redis stream. +#[tokio::test] +async fn test_agent_loop_persistent_runtime_emits_structured_agent_events() +-> Result<(), Box> { + let town = create_test_town("agent-loop-persistent-events").await?; + let town_path = town.config().root.clone(); + let start_count_path = town_path.join("fake_codex.starts"); + let input_log_path = town_path.join("fake_codex.inputs"); + let fake_command = write_fake_codex_app_server(&town_path, &start_count_path, &input_log_path)?; + + let mut config = tinytown::Config::load(&town_path)?; + config.agent.persistent = true; + config.default_cli = "codex".to_string(); + config.agent_clis.insert( + "codex".to_string(), + tinytown::agent::AgentCli::new("codex", &fake_command), + ); + config.save()?; + + let handle = town.spawn_agent("event-worker", "codex").await?; + let agent_id = handle.id(); + + town.channel() + .send(&Message::new( + AgentId::supervisor(), + agent_id, + MessageType::Task { + description: "EVENT_TURN TOOL_CALL=1".to_string(), + }, + )) + .await?; + + let status = tokio::task::spawn_blocking({ + let town_path = town_path.clone(); + move || { + std::process::Command::new(env!("CARGO_BIN_EXE_tt")) + .arg("--town") + .arg(&town_path) + .arg("agent-loop") + .arg("event-worker") + .arg(agent_id.to_string()) + .arg("1") + .status() + } + }) + .await??; + assert!( + status.success(), + "persistent event worker should exit cleanly" + ); + + let events = town + .event_stream() + .read_agent_events(agent_id, "0-0", 50) + .await?; + let event_types: Vec = events + .into_iter() + .map(|(_, event)| event.event_type) + .collect(); + + assert!(event_types.contains(&EventType::AgentTurnStarted)); + assert!(event_types.contains(&EventType::AgentToolCall)); + assert!(event_types.contains(&EventType::AgentAssistantDelta)); + assert!(event_types.contains(&EventType::AgentTurnCompleted)); + + Ok(()) +} + /// Test that multiple consumers can read from the same docket stream. #[tokio::test] async fn test_docket_multiple_consumers() -> Result<(), Box> {