From 8a66c826d423bb84587f09604ebc95ed4be8312d Mon Sep 17 00:00:00 2001 From: Jeremy Plichta Date: Wed, 22 Apr 2026 20:23:51 -0600 Subject: [PATCH 1/3] feat: add persistent streaming agent runtime --- Cargo.lock | 1 + Cargo.toml | 1 + README.md | 9 + docs/src/cli/spawn.md | 2 + src/agent.rs | 5 + src/agent_runtime.rs | 864 +++++++++++++++++++++++++++++++++++++ src/channel.rs | 8 + src/config.rs | 5 + src/events.rs | 6 + src/lib.rs | 6 + src/main.rs | 736 +++++++++++++++++++++++++++++-- tests/integration_tests.rs | 470 +++++++++++++++++++- 12 files changed, 2073 insertions(+), 40 deletions(-) create mode 100644 src/agent_runtime.rs diff --git a/Cargo.lock b/Cargo.lock index f50072c..d89ab5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2584,6 +2584,7 @@ dependencies = [ "chrono", "clap", "dirs", + "futures-core", "getrandom 0.4.2", "hex", "libc", diff --git a/Cargo.toml b/Cargo.toml index a26ac14..3f1957f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ tower-mcp = { version = "0.8", features = ["http", "testing"] } schemars = "1.2" regex = "1.12.3" async-trait = "0.1.89" +futures-core = "0.3" url = "2.5" [dev-dependencies] diff --git a/README.md b/README.md index a9058c1..753907d 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,15 @@ That's it! Your agents are now coordinating via Redis. > **Note:** `tt bootstrap` delegates to an AI agent to download Redis from GitHub and compile it for your machine. Alternatively: `brew install redis` (macOS) or `apt install redis-server` (Ubuntu). +## Runtime Model + +`tt spawn` still launches a background `tt agent-loop` worker. The worker now supports two execution modes: + +- **One-shot**: default. Each turn spawns a fresh coding CLI process. +- **Persistent streaming**: opt in with `[agent].persistent = true`. Claude-backed workers keep a live `stream-json` subprocess across turns, can accept urgent messages mid-turn, persist a resumable session ID on the agent record, and emit structured per-turn events onto the Redis event stream. + +During rollout, unsupported CLIs automatically fall back to the one-shot path even if persistence is enabled. + ## 🎯 Mission Mode Start an autonomous mission that handles multiple GitHub issues with dependency-aware scheduling: diff --git a/docs/src/cli/spawn.md b/docs/src/cli/spawn.md index c2a49f1..b67e7f2 100644 --- a/docs/src/cli/spawn.md +++ b/docs/src/cli/spawn.md @@ -153,6 +153,8 @@ tt spawn reviewer --role reviewer & - Repeats until `--max-rounds` reached 4. **Agent stops** with state `Stopped` +By default, each turn uses a fresh CLI subprocess. If `[agent].persistent = true` and the selected CLI has a streaming adapter, `tt agent-loop` keeps one long-lived subprocess alive across turns instead. Today that persistent path is available for Claude-style `stream-json` sessions; unsupported CLIs automatically fall back to the one-shot model. + ## Agent Naming Choose descriptive names. With `--role`, names no longer need to describe the role: diff --git a/src/agent.rs b/src/agent.rs index 773b177..819038e 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -758,6 +758,9 @@ pub struct Agent { /// Defaults to created_at if never set. #[serde(default = "chrono::Utc::now")] pub last_active_at: DateTime, + /// Persistent runtime session identifier for resume-capable CLIs. + #[serde(default)] + pub runtime_session_id: Option, } impl Agent { @@ -786,6 +789,7 @@ impl Agent { tasks_completed: 0, rounds_completed: 0, last_active_at: now, + runtime_session_id: None, } } @@ -864,6 +868,7 @@ impl Agent { tasks_completed: 0, rounds_completed: 0, last_active_at: now, + runtime_session_id: None, } } } diff --git a/src/agent_runtime.rs b/src/agent_runtime.rs new file mode 100644 index 0000000..d791854 --- /dev/null +++ b/src/agent_runtime.rs @@ -0,0 +1,864 @@ +/* + * Copyright (c) 2024-Present, Jeremy Plichta + * Licensed under the MIT License + */ + +//! Agent runtime adapters. + +use std::collections::VecDeque; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::process::{Command, ExitStatus, Stdio}; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; + +use async_trait::async_trait; +use futures_core::Stream; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines}; +use tokio::process::{Child as TokioChild, ChildStdin, ChildStdout, Command as TokioCommand}; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use uuid::Uuid; + +use crate::Result; + +const TURN_STATUS_MARKER: &str = "__TINYTOWN_TURN_STATUS__"; + +#[must_use] +pub fn build_cli_command(cli_name: &str, cli_cmd: &str, prompt_file: &Path) -> String { + let prompt_file = shell_quote(prompt_file); + if cli_name == "auggie" { + format!("{} --instruction-file {}", cli_cmd, prompt_file) + } else { + format!("cat {} | {}", prompt_file, cli_cmd) + } +} + +#[must_use] +fn shell_quote(path: &Path) -> String { + shell_quote_str(&path.to_string_lossy()) +} + +#[must_use] +fn shell_quote_str(value: &str) -> String { + format!("'{}'", value.replace('\'', "'\"'\"'")) +} + +#[must_use] +fn build_turn_script( + cli_name: &str, + cli_cmd: &str, + prompt_file: &Path, + output_file: &Path, +) -> String { + let shell_cmd = build_cli_command(cli_name, cli_cmd, prompt_file); + let output_file = shell_quote(output_file); + format!("{shell_cmd} > {output_file} 2>&1\nprintf '%s%s\\n' '{TURN_STATUS_MARKER}' \"$?\"\n") +} + +#[must_use] +fn parse_turn_status(line: &str) -> Option { + line.strip_prefix(TURN_STATUS_MARKER) + .and_then(|status| status.trim().parse::().ok()) +} + +#[derive(Debug, Clone)] +pub struct AgentTurn { + pub prompt: String, + pub prompt_file: PathBuf, + pub output_file: PathBuf, +} + +#[derive(Debug, Clone)] +pub enum AgentInput { + UserMessage(AgentTurn), + UrgentMessage(AgentTurn), + Cancel, +} + +#[derive(Debug, Clone)] +pub enum AgentEvent { + SessionReady { + session_id: String, + }, + TurnStarted, + AssistantDelta(String), + ToolCall { + name: String, + args: serde_json::Value, + }, + TurnCompleted { + summary: Option, + }, + AwaitingInput, + SessionError(String), + Exited(ExitStatus), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ShutdownMode { + Graceful, + Immediate, +} + +#[derive(Debug)] +pub struct AgentTurnResult { + pub status: std::io::Result, +} + +pub type AgentEventStream<'a> = Pin + Send + 'a>>; + +pub struct AgentEventReceiver { + rx: mpsc::UnboundedReceiver, +} + +impl AgentEventReceiver { + pub async fn recv(&mut self) -> Option { + self.rx.recv().await + } +} + +pub struct AgentRuntime { + pub agent: RuntimeAgent, + pub events: AgentEventReceiver, +} + +pub struct RuntimeAgent { + inner: Box, + event_tx: mpsc::UnboundedSender, +} + +impl RuntimeAgent { + fn new(inner: Box, event_tx: mpsc::UnboundedSender) -> Self { + Self { inner, event_tx } + } + + pub async fn send(&mut self, input: AgentInput) -> Result { + let result = self.inner.send(input).await; + self.flush_events(); + result + } + + pub async fn shutdown(&mut self, mode: ShutdownMode) -> Result> { + let result = self.inner.shutdown(mode).await; + self.flush_events(); + result + } + + #[must_use] + pub fn session_id(&self) -> Option { + self.inner.session_id() + } + + fn flush_events(&mut self) { + for event in self.inner.take_events() { + let _ = self.event_tx.send(event); + } + } +} + +#[async_trait] +pub trait CodingAgent: Send { + async fn send(&mut self, input: AgentInput) -> Result; + fn events(&mut self) -> AgentEventStream<'_>; + fn take_events(&mut self) -> Vec; + async fn shutdown(&mut self, mode: ShutdownMode) -> Result>; + + fn session_id(&self) -> Option { + None + } +} + +#[derive(Debug, Clone)] +pub struct OneShotAgentConfig { + pub cli_name: String, + pub cli_cmd: String, + pub workdir: PathBuf, + pub env: Vec<(String, String)>, +} + +#[derive(Debug, Clone)] +pub struct StreamingAgentConfig { + pub cli_name: String, + pub cli_cmd: String, + pub workdir: PathBuf, + pub env: Vec<(String, String)>, + pub resume_session_id: Option, +} + +#[derive(Debug)] +pub struct PersistentShellAgent { + config: OneShotAgentConfig, + events: VecDeque, + last_exit_status: Option, + session: Option, + session_id: Option, +} + +#[derive(Debug)] +struct PersistentShellSession { + child: TokioChild, + stdin: ChildStdin, + stdout: Lines>, +} + +impl PersistentShellSession { + async fn start(config: &OneShotAgentConfig) -> Result { + let mut cmd = TokioCommand::new("sh"); + cmd.current_dir(&config.workdir) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()); + for (key, value) in &config.env { + cmd.env(key, value); + } + + let mut child = cmd.spawn()?; + let stdin = child + .stdin + .take() + .ok_or_else(|| std::io::Error::other("persistent runtime stdin unavailable"))?; + let stdout = child + .stdout + .take() + .ok_or_else(|| std::io::Error::other("persistent runtime stdout unavailable"))?; + + Ok(Self { + child, + stdin, + stdout: BufReader::new(stdout).lines(), + }) + } +} + +impl PersistentShellAgent { + #[must_use] + pub fn new(config: OneShotAgentConfig) -> Self { + Self::with_session_id(config, None) + } + + #[must_use] + pub fn with_session_id(config: OneShotAgentConfig, session_id: Option) -> Self { + Self { + config, + events: VecDeque::new(), + last_exit_status: None, + session: None, + session_id, + } + } + + async fn session(&mut self) -> Result<&mut PersistentShellSession> { + if self.session.is_none() { + self.session = Some(PersistentShellSession::start(&self.config).await?); + } + Ok(self.session.as_mut().expect("session initialized")) + } + + async fn run_turn(&mut self, turn: AgentTurn) -> Result { + self.events.push_back(AgentEvent::TurnStarted); + std::fs::write(&turn.prompt_file, &turn.prompt)?; + + let script = build_turn_script( + &self.config.cli_name, + &self.config.cli_cmd, + &turn.prompt_file, + &turn.output_file, + ); + let send_result = self.send_script(script).await; + let _ = std::fs::remove_file(&turn.prompt_file); + + match &send_result { + Ok(status) => { + self.last_exit_status = Some(status.clone()); + if status.success() { + self.events + .push_back(AgentEvent::TurnCompleted { summary: None }); + self.events.push_back(AgentEvent::AwaitingInput); + } else { + self.events.push_back(AgentEvent::SessionError(format!( + "CLI exited with status {}", + describe_exit_status(status) + ))); + } + } + Err(err) => { + self.events.push_back(AgentEvent::SessionError(format!( + "Persistent runtime failed: {}", + err + ))); + } + } + + Ok(AgentTurnResult { + status: send_result, + }) + } + + async fn send_script(&mut self, script: String) -> std::io::Result { + let session = self.session().await.map_err(as_io_error)?; + session.stdin.write_all(script.as_bytes()).await?; + session.stdin.flush().await?; + + loop { + match session.stdout.next_line().await? { + Some(line) => { + if let Some(status) = parse_turn_status(&line) { + return Ok(exit_status_from_code(status)); + } + } + None => { + let exit_status = session.child.wait().await?; + self.last_exit_status = Some(exit_status.clone()); + self.events + .push_back(AgentEvent::Exited(exit_status.clone())); + self.session = None; + return Err(std::io::Error::other(format!( + "persistent runtime exited unexpectedly ({})", + describe_exit_status(&exit_status) + ))); + } + } + } + } +} + +#[derive(Debug)] +struct StreamingProcessAgent { + child: TokioChild, + stdin: Option, + events: VecDeque, + session_id: Arc>>, + output_files: Arc>>, + event_tx: mpsc::UnboundedSender, + reader_task: JoinHandle<()>, +} + +#[derive(Default)] +struct StreamingReaderState { + text_summary: String, + tool_name: Option, + tool_json: String, +} + +impl StreamingProcessAgent { + fn spawn( + config: StreamingAgentConfig, + event_tx: mpsc::UnboundedSender, + ) -> Result { + let mut command_line = config.cli_cmd.clone(); + if let Some(session_id) = config.resume_session_id.as_ref() { + command_line.push_str(" --resume "); + command_line.push_str(&shell_quote_str(session_id)); + } + + let mut cmd = TokioCommand::new("sh"); + cmd.arg("-lc") + .arg(command_line) + .current_dir(&config.workdir) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()); + for (key, value) in &config.env { + cmd.env(key, value); + } + + let mut child = cmd.spawn()?; + let stdin = child + .stdin + .take() + .ok_or_else(|| std::io::Error::other("streaming runtime stdin unavailable"))?; + let stdout = child + .stdout + .take() + .ok_or_else(|| std::io::Error::other("streaming runtime stdout unavailable"))?; + + let session_id = Arc::new(Mutex::new(config.resume_session_id)); + let output_files = Arc::new(Mutex::new(VecDeque::new())); + let reader_task = spawn_streaming_reader( + stdout, + event_tx.clone(), + session_id.clone(), + output_files.clone(), + ); + + Ok(Self { + child, + stdin: Some(stdin), + events: VecDeque::new(), + session_id, + output_files, + event_tx, + reader_task, + }) + } +} + +fn spawn_streaming_reader( + stdout: ChildStdout, + event_tx: mpsc::UnboundedSender, + session_id: Arc>>, + output_files: Arc>>, +) -> JoinHandle<()> { + tokio::spawn(async move { + let mut lines = BufReader::new(stdout).lines(); + let mut state = StreamingReaderState::default(); + while let Ok(Some(line)) = lines.next_line().await { + handle_streaming_line(&line, &event_tx, &session_id, &output_files, &mut state); + } + }) +} + +fn handle_streaming_line( + line: &str, + event_tx: &mpsc::UnboundedSender, + session_id: &Arc>>, + output_files: &Arc>>, + state: &mut StreamingReaderState, +) { + let Ok(payload) = serde_json::from_str::(line) else { + let _ = event_tx.send(AgentEvent::SessionError(format!( + "Invalid streaming event: {}", + line + ))); + return; + }; + + if let Some(id) = payload + .get("session_id") + .and_then(serde_json::Value::as_str) + { + *session_id.lock().expect("session id mutex poisoned") = Some(id.to_string()); + } + + match payload.get("type").and_then(serde_json::Value::as_str) { + Some("system") + if payload.get("subtype").and_then(serde_json::Value::as_str) == Some("init") => + { + if let Some(id) = payload + .get("session_id") + .and_then(serde_json::Value::as_str) + { + let _ = event_tx.send(AgentEvent::SessionReady { + session_id: id.to_string(), + }); + } + } + Some("stream_event") => { + let Some(event) = payload.get("event") else { + return; + }; + match event.get("type").and_then(serde_json::Value::as_str) { + Some("content_block_start") + if event + .get("content_block") + .and_then(|block| block.get("type")) + .and_then(serde_json::Value::as_str) + == Some("tool_use") => + { + state.tool_name = event + .get("content_block") + .and_then(|block| block.get("name")) + .and_then(serde_json::Value::as_str) + .map(str::to_string); + state.tool_json.clear(); + } + Some("content_block_delta") => { + if let Some(delta) = event.get("delta") { + match delta.get("type").and_then(serde_json::Value::as_str) { + Some("input_json_delta") => { + if let Some(partial) = delta + .get("partial_json") + .and_then(serde_json::Value::as_str) + { + state.tool_json.push_str(partial); + } + } + Some("text_delta") => { + if let Some(text) = + delta.get("text").and_then(serde_json::Value::as_str) + { + state.text_summary.push_str(text); + append_to_current_output(output_files, text); + let _ = + event_tx.send(AgentEvent::AssistantDelta(text.to_string())); + } + } + _ => {} + } + } + } + Some("content_block_stop") => { + if let Some(name) = state.tool_name.take() { + let args = serde_json::from_str(&state.tool_json) + .unwrap_or_else(|_| serde_json::Value::String(state.tool_json.clone())); + state.tool_json.clear(); + let _ = event_tx.send(AgentEvent::ToolCall { name, args }); + } + } + Some("message_stop") => { + let summary = if state.text_summary.trim().is_empty() { + None + } else { + Some(state.text_summary.trim().to_string()) + }; + state.text_summary.clear(); + finish_current_output(output_files); + let _ = event_tx.send(AgentEvent::TurnCompleted { summary }); + let _ = event_tx.send(AgentEvent::AwaitingInput); + } + _ => {} + } + } + _ => {} + } +} + +fn queue_output_file( + output_files: &Arc>>, + output_file: PathBuf, +) -> std::io::Result<()> { + let _ = std::fs::File::create(&output_file)?; + output_files + .lock() + .expect("output queue mutex poisoned") + .push_back(output_file); + Ok(()) +} + +fn append_to_current_output(output_files: &Arc>>, text: &str) { + let current = output_files + .lock() + .expect("output queue mutex poisoned") + .front() + .cloned(); + if let Some(path) = current + && let Ok(mut file) = std::fs::OpenOptions::new().append(true).open(path) + { + let _ = std::io::Write::write_all(&mut file, text.as_bytes()); + } +} + +fn finish_current_output(output_files: &Arc>>) { + let _ = output_files + .lock() + .expect("output queue mutex poisoned") + .pop_front(); +} + +pub struct StreamingAgent; + +impl StreamingAgent { + pub fn runtime(config: StreamingAgentConfig) -> Result { + let (tx, rx) = mpsc::unbounded_channel(); + let agent: Box = if config.cli_name == "claude" { + Box::new(StreamingProcessAgent::spawn(config, tx.clone())?) + } else { + let session_id = config + .resume_session_id + .unwrap_or_else(|| Uuid::new_v4().to_string()); + let _ = tx.send(AgentEvent::SessionReady { + session_id: session_id.clone(), + }); + Box::new(PersistentShellAgent::with_session_id( + OneShotAgentConfig { + cli_name: config.cli_name, + cli_cmd: config.cli_cmd, + workdir: config.workdir, + env: config.env, + }, + Some(session_id), + )) + }; + + Ok(AgentRuntime { + agent: RuntimeAgent::new(agent, tx), + events: AgentEventReceiver { rx }, + }) + } +} + +#[derive(Debug)] +pub struct OneShotAgent { + config: OneShotAgentConfig, + events: VecDeque, + last_exit_status: Option, +} + +impl OneShotAgent { + #[must_use] + pub fn new(config: OneShotAgentConfig) -> Self { + Self { + config, + events: VecDeque::new(), + last_exit_status: None, + } + } + + #[must_use] + pub fn runtime(config: OneShotAgentConfig) -> AgentRuntime { + let (tx, rx) = mpsc::unbounded_channel(); + AgentRuntime { + agent: RuntimeAgent::new(Box::new(Self::new(config)), tx), + events: AgentEventReceiver { rx }, + } + } + + fn run_turn(&mut self, turn: AgentTurn) -> Result { + self.events.push_back(AgentEvent::TurnStarted); + std::fs::write(&turn.prompt_file, &turn.prompt)?; + + let status = (|| -> std::io::Result { + let output = std::fs::File::create(&turn.output_file)?; + let shell_cmd = build_cli_command( + &self.config.cli_name, + &self.config.cli_cmd, + &turn.prompt_file, + ); + let mut cmd = Command::new("sh"); + cmd.arg("-c") + .arg(&shell_cmd) + .current_dir(&self.config.workdir) + .stdin(Stdio::null()) + .stdout(output.try_clone()?) + .stderr(output); + + for (key, value) in &self.config.env { + cmd.env(key, value); + } + + cmd.status() + })(); + + let _ = std::fs::remove_file(&turn.prompt_file); + + match &status { + Ok(exit_status) => { + self.last_exit_status = Some(exit_status.clone()); + self.events + .push_back(AgentEvent::Exited(exit_status.clone())); + if exit_status.success() { + self.events + .push_back(AgentEvent::TurnCompleted { summary: None }); + } else { + self.events.push_back(AgentEvent::SessionError(format!( + "CLI exited with status {}", + describe_exit_status(exit_status) + ))); + } + } + Err(err) => { + self.events.push_back(AgentEvent::SessionError(format!( + "Failed to run CLI: {}", + err + ))); + } + } + + Ok(AgentTurnResult { status }) + } +} + +#[async_trait] +impl CodingAgent for OneShotAgent { + async fn send(&mut self, input: AgentInput) -> Result { + match input { + AgentInput::UserMessage(turn) | AgentInput::UrgentMessage(turn) => self.run_turn(turn), + AgentInput::Cancel => { + self.events.push_back(AgentEvent::SessionError( + "Cancel is not supported by the one-shot runtime".to_string(), + )); + Ok(AgentTurnResult { + status: Err(std::io::Error::other( + "Cancel is not supported by the one-shot runtime", + )), + }) + } + } + } + + fn events(&mut self) -> AgentEventStream<'_> { + Box::pin(BufferedEventStream { + events: &mut self.events, + }) + } + + fn take_events(&mut self) -> Vec { + self.events.drain(..).collect() + } + + async fn shutdown(&mut self, _mode: ShutdownMode) -> Result> { + Ok(self.last_exit_status.clone()) + } +} + +#[async_trait] +impl CodingAgent for PersistentShellAgent { + async fn send(&mut self, input: AgentInput) -> Result { + match input { + AgentInput::UserMessage(turn) | AgentInput::UrgentMessage(turn) => { + self.run_turn(turn).await + } + AgentInput::Cancel => { + self.events.push_back(AgentEvent::SessionError( + "Cancel is not supported by the persistent runtime".to_string(), + )); + Ok(AgentTurnResult { + status: Err(std::io::Error::other( + "Cancel is not supported by the persistent runtime", + )), + }) + } + } + } + + fn events(&mut self) -> AgentEventStream<'_> { + Box::pin(BufferedEventStream { + events: &mut self.events, + }) + } + + fn take_events(&mut self) -> Vec { + self.events.drain(..).collect() + } + + async fn shutdown(&mut self, mode: ShutdownMode) -> Result> { + let Some(session) = self.session.as_mut() else { + return Ok(self.last_exit_status.clone()); + }; + + let status = match mode { + ShutdownMode::Graceful => { + session.stdin.write_all(b"exit\n").await?; + session.stdin.flush().await?; + session.child.wait().await? + } + ShutdownMode::Immediate => { + session.child.start_kill()?; + session.child.wait().await? + } + }; + + self.last_exit_status = Some(status.clone()); + self.events.push_back(AgentEvent::Exited(status.clone())); + self.session = None; + Ok(Some(status)) + } + + fn session_id(&self) -> Option { + self.session_id.clone() + } +} + +#[async_trait] +impl CodingAgent for StreamingProcessAgent { + async fn send(&mut self, input: AgentInput) -> Result { + match input { + AgentInput::UserMessage(turn) | AgentInput::UrgentMessage(turn) => { + std::fs::write(&turn.prompt_file, &turn.prompt)?; + queue_output_file(&self.output_files, turn.output_file)?; + let payload = serde_json::json!({ + "type": "user", + "message": { + "content": [{ "type": "text", "text": turn.prompt }] + } + }); + let _ = self.event_tx.send(AgentEvent::TurnStarted); + let stdin = self + .stdin + .as_mut() + .ok_or_else(|| std::io::Error::other("streaming runtime stdin closed"))?; + stdin.write_all(payload.to_string().as_bytes()).await?; + stdin.write_all(b"\n").await?; + stdin.flush().await?; + Ok(AgentTurnResult { + status: Ok(success_exit_status()), + }) + } + AgentInput::Cancel => { + let _ = self.stdin.take(); + self.child.start_kill()?; + let status = self.child.wait().await?; + let _ = self.event_tx.send(AgentEvent::Exited(status.clone())); + Ok(AgentTurnResult { status: Ok(status) }) + } + } + } + + fn events(&mut self) -> AgentEventStream<'_> { + Box::pin(BufferedEventStream { + events: &mut self.events, + }) + } + + fn take_events(&mut self) -> Vec { + self.events.drain(..).collect() + } + + async fn shutdown(&mut self, mode: ShutdownMode) -> Result> { + let status = match mode { + ShutdownMode::Graceful => { + if let Some(mut stdin) = self.stdin.take() { + let _ = stdin.shutdown().await; + } + self.child.wait().await? + } + ShutdownMode::Immediate => { + let _ = self.stdin.take(); + self.child.start_kill()?; + self.child.wait().await? + } + }; + self.reader_task.abort(); + let _ = self.event_tx.send(AgentEvent::Exited(status.clone())); + Ok(Some(status)) + } + + fn session_id(&self) -> Option { + self.session_id + .lock() + .expect("session id mutex poisoned") + .clone() + } +} + +struct BufferedEventStream<'a> { + events: &'a mut VecDeque, +} + +impl Stream for BufferedEventStream<'_> { + type Item = AgentEvent; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.events.pop_front()) + } +} + +fn describe_exit_status(status: &ExitStatus) -> String { + status + .code() + .map(|code| code.to_string()) + .unwrap_or_else(|| "terminated by signal".to_string()) +} + +fn as_io_error(err: crate::error::Error) -> std::io::Error { + match err { + crate::error::Error::Io(inner) => inner, + other => std::io::Error::other(other.to_string()), + } +} + +#[must_use] +fn success_exit_status() -> ExitStatus { + exit_status_from_code(0) +} + +#[cfg(unix)] +fn exit_status_from_code(code: i32) -> ExitStatus { + std::os::unix::process::ExitStatusExt::from_raw(code << 8) +} + +#[cfg(windows)] +fn exit_status_from_code(code: i32) -> ExitStatus { + std::os::windows::process::ExitStatusExt::from_raw(code as u32) +} diff --git a/src/channel.rs b/src/channel.rs index 3b235e9..3f098cd 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -394,6 +394,12 @@ impl Channel { fields_to_delete.push("parent_agent_id"); } + if let Some(ref session_id) = agent.runtime_session_id { + fields.push(("runtime_session_id".to_string(), session_id.clone())); + } else { + fields_to_delete.push("runtime_session_id"); + } + // Use a pipeline to make HDEL + HSET atomic let mut pipe = redis::pipe(); if !fields_to_delete.is_empty() { @@ -482,6 +488,7 @@ impl Channel { let nickname = fields.get("nickname").cloned(); let role_id = fields.get("role_id").cloned(); let parent_agent_id = fields.get("parent_agent_id").and_then(|s| s.parse().ok()); + let runtime_session_id = fields.get("runtime_session_id").cloned(); let spawn_mode: crate::agent::SpawnMode = fields .get("spawn_mode") .map(|s| serde_json::from_str(&format!("\"{}\"", s)).unwrap_or_default()) @@ -503,6 +510,7 @@ impl Channel { tasks_completed, rounds_completed, last_active_at, + runtime_session_id, }) } diff --git a/src/config.rs b/src/config.rs index 1c2d941..e4f215b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -109,6 +109,10 @@ pub struct AgentConfig { /// Seconds an idle worker should wait before draining and exiting. #[serde(default = "default_agent_idle_timeout_secs")] pub idle_timeout_secs: u64, + + /// Keep agent CLI subprocesses alive across turns when supported. + #[serde(default)] + pub persistent: bool, } /// Authentication mode for townhall. @@ -336,6 +340,7 @@ impl Default for AgentConfig { fn default() -> Self { Self { idle_timeout_secs: default_agent_idle_timeout_secs(), + persistent: false, } } } diff --git a/src/events.rs b/src/events.rs index 897e0d4..98229bc 100644 --- a/src/events.rs +++ b/src/events.rs @@ -41,6 +41,12 @@ pub enum EventType { AgentStateChanged, AgentSpawned, AgentStopped, + AgentTurnStarted, + AgentTurnCompleted, + AgentAwaitingInput, + AgentToolCall, + AgentAssistantDelta, + AgentRuntimeError, TaskAssigned, TaskCompleted, TaskFailed, diff --git a/src/lib.rs b/src/lib.rs index 19d2988..61bcbb5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,6 +43,7 @@ //! ``` pub mod agent; +pub mod agent_runtime; pub mod app; pub mod channel; pub mod config; @@ -61,6 +62,11 @@ pub mod town; pub use agent::{ Agent, AgentId, AgentState, AgentType, RoleDefinition, SpawnMode, builtin_roles, roles, }; +pub use agent_runtime::{ + AgentEvent, AgentEventReceiver, AgentInput, AgentRuntime, AgentTurn, AgentTurnResult, + CodingAgent, OneShotAgent, OneShotAgentConfig, ShutdownMode, StreamingAgent, + StreamingAgentConfig, build_cli_command, +}; pub use app::audit::{AuditEvent, AuditResult, audit_middleware}; pub use app::auth::{AuthError, AuthState, Principal, auth_middleware, generate_api_key}; pub use app::mcp::{McpState, create_mcp_router}; diff --git a/src/main.rs b/src/main.rs index 5fb2783..471825e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,31 +5,22 @@ //! Tinytown CLI - Simple multi-agent orchestration. +use std::collections::VecDeque; use std::path::{Path, PathBuf}; use clap::{Parser, Subcommand}; use tracing::{info, warn}; use tracing_subscriber::EnvFilter; -use tinytown::{AgentState, GlobalConfig, Result, Task, Town, plan}; +use tinytown::{ + AgentEvent, AgentInput, AgentRuntime, AgentState, AgentTurn, GlobalConfig, OneShotAgent, + OneShotAgentConfig, Result, ShutdownMode, StreamingAgent, StreamingAgentConfig, Task, Town, + build_cli_command, plan, +}; const TT_AGENT_ID_ENV: &str = "TINYTOWN_AGENT_ID"; const TT_AGENT_NAME_ENV: &str = "TINYTOWN_AGENT_NAME"; -/// Build a shell command to run an agent CLI with a prompt/instruction file. -/// Different CLIs have different ways to accept input: -/// - auggie: uses --instruction-file flag -/// - claude, codex, etc.: accept input via stdin (pipe or redirect) -fn build_cli_command(cli_name: &str, cli_cmd: &str, prompt_file: &std::path::Path) -> String { - if cli_name == "auggie" { - // Auggie uses --instruction-file flag - format!("{} --instruction-file '{}'", cli_cmd, prompt_file.display()) - } else { - // Other CLIs accept input via stdin - format!("cat '{}' | {}", prompt_file.display(), cli_cmd) - } -} - fn idle_timeout_elapsed( agent: &tinytown::Agent, idle_timeout_secs: u64, @@ -340,6 +331,655 @@ async fn format_actionable_section( section } +#[derive(Clone)] +struct PendingTurn { + display_round: u32, + actionable_messages: Vec<(tinytown::Message, bool)>, +} + +fn runtime_env(agent_id: tinytown::AgentId, name: &str) -> Vec<(String, String)> { + vec![ + (TT_AGENT_ID_ENV.to_string(), agent_id.to_string()), + (TT_AGENT_NAME_ENV.to_string(), name.to_string()), + ] +} + +async fn persist_runtime_session_id( + channel: &tinytown::Channel, + agent_id: tinytown::AgentId, + session_id: &str, +) -> Result<()> { + if let Some(mut agent) = channel.get_agent_state(agent_id).await? { + if agent.runtime_session_id.as_deref() != Some(session_id) { + agent.runtime_session_id = Some(session_id.to_string()); + channel.set_agent_state(&agent).await?; + } + } + Ok(()) +} + +async fn emit_agent_runtime_event( + channel: &tinytown::Channel, + agent_id: tinytown::AgentId, + event_type: tinytown::EventType, + message: impl Into, +) { + channel + .emit_event(&tinytown::TownEvent::new(event_type, message).with_agent(agent_id)) + .await; +} + +async fn requeue_pending_turns( + channel: &tinytown::Channel, + agent_id: tinytown::AgentId, + pending_turns: &mut VecDeque, +) -> Result<()> { + let mut requeued = 0usize; + while let Some(turn) = pending_turns.pop_front() { + for (msg, was_urgent) in &turn.actionable_messages { + if *was_urgent { + channel.send_urgent(msg).await?; + } else { + channel.send(msg).await?; + } + requeued += 1; + } + } + + if requeued > 0 { + info!(" ↩️ Re-queued {} actionable message(s)", requeued); + } + + if let Some(mut agent) = channel.get_agent_state(agent_id).await? { + if let Some(task_id) = agent.current_task + && let Some(mut task) = channel.get_task(task_id).await? + && task.state == tinytown::TaskState::Running + { + task.assign(agent_id); + channel.set_task(&task).await?; + } + agent.current_task = None; + channel.set_agent_state(&agent).await?; + } + + Ok(()) +} + +fn build_streaming_runtime( + town_path: &Path, + cli_name: &str, + cli_cmd: &str, + agent_id: tinytown::AgentId, + name: &str, + resume_session_id: Option, +) -> Result { + StreamingAgent::runtime(StreamingAgentConfig { + cli_name: cli_name.to_string(), + cli_cmd: cli_cmd.to_string(), + workdir: town_path.to_path_buf(), + env: runtime_env(agent_id, name), + resume_session_id, + }) +} + +async fn run_persistent_agent_loop( + town_path: &Path, + config: &tinytown::Config, + channel: &tinytown::Channel, + name: &str, + agent_id: tinytown::AgentId, + max_rounds: u32, + cli_name: &str, + cli_cmd: &str, + idle_timeout_secs: u64, +) -> Result<()> { + use std::time::Duration; + + let resume_session_id = channel + .get_agent_state(agent_id) + .await? + .and_then(|agent| agent.runtime_session_id); + let AgentRuntime { + agent: mut runtime, + mut events, + } = build_streaming_runtime( + town_path, + cli_name, + cli_cmd, + agent_id, + name, + resume_session_id, + )?; + let mut runtime_alive = true; + let mut round: u32 = 0; + let mut pending_turns: VecDeque = VecDeque::new(); + let mut tick = tokio::time::interval(Duration::from_millis(250)); + tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + loop { + if round >= max_rounds && pending_turns.is_empty() { + break; + } + + tokio::select! { + maybe_event = events.recv() => { + let Some(event) = maybe_event else { + runtime_alive = false; + continue; + }; + + match event { + AgentEvent::SessionReady { session_id } => { + persist_runtime_session_id(channel, agent_id, &session_id).await?; + } + AgentEvent::TurnStarted => { + emit_agent_runtime_event( + channel, + agent_id, + tinytown::EventType::AgentTurnStarted, + "Agent turn started", + ) + .await; + } + AgentEvent::AssistantDelta(delta) => { + emit_agent_runtime_event( + channel, + agent_id, + tinytown::EventType::AgentAssistantDelta, + format!("Assistant delta: {}", truncate_summary(&delta, 160)), + ) + .await; + } + AgentEvent::ToolCall { name: tool_name, .. } => { + channel + .log_agent_activity( + agent_id, + &format!("πŸ”§ Tool call: {}", tool_name), + ) + .await?; + emit_agent_runtime_event( + channel, + agent_id, + tinytown::EventType::AgentToolCall, + format!("Tool call: {}", tool_name), + ) + .await; + } + AgentEvent::TurnCompleted { summary } => { + let Some(turn) = pending_turns.pop_front() else { + continue; + }; + round += 1; + let summary_text = summary + .as_deref() + .map(|text| truncate_summary(text, 100)) + .unwrap_or_else(|| "completed".to_string()); + channel + .log_agent_activity( + agent_id, + &format!("Round {}: βœ… {}", turn.display_round, summary_text), + ) + .await?; + emit_agent_runtime_event( + channel, + agent_id, + tinytown::EventType::AgentTurnCompleted, + format!("Round {} completed: {}", turn.display_round, summary_text), + ) + .await; + + if let Some(mut agent) = channel.get_agent_state(agent_id).await? { + clear_terminal_current_task(channel, &mut agent).await?; + let now = chrono::Utc::now(); + if agent.state != AgentState::Paused { + agent.state = AgentState::Idle; + agent.last_active_at = now; + } + agent.rounds_completed += 1; + agent.last_heartbeat = now; + agent.runtime_session_id = runtime.session_id(); + channel.set_agent_state(&agent).await?; + } + } + AgentEvent::AwaitingInput => { + emit_agent_runtime_event( + channel, + agent_id, + tinytown::EventType::AgentAwaitingInput, + "Agent runtime is awaiting input", + ) + .await; + } + AgentEvent::SessionError(err) => { + channel + .log_agent_activity( + agent_id, + &format!("Persistent runtime: {}", truncate_summary(&err, 140)), + ) + .await?; + emit_agent_runtime_event( + channel, + agent_id, + tinytown::EventType::AgentRuntimeError, + format!("Persistent runtime error: {}", truncate_summary(&err, 160)), + ) + .await; + } + AgentEvent::Exited(status) => { + runtime_alive = false; + if !status.success() && !pending_turns.is_empty() { + channel + .log_agent_activity( + agent_id, + &format!( + "Persistent runtime exited with error {}; re-queueing in-flight work", + status + ), + ) + .await?; + requeue_pending_turns(channel, agent_id, &mut pending_turns).await?; + } + } + } + } + _ = tick.tick() => { + if channel.should_stop(agent_id).await? { + info!(" πŸ›‘ Stop requested, shutting down persistent runtime..."); + channel + .log_agent_activity(agent_id, "πŸ›‘ stopped by request") + .await?; + let _ = runtime.shutdown(ShutdownMode::Immediate).await?; + requeue_pending_turns(channel, agent_id, &mut pending_turns).await?; + channel.clear_stop(agent_id).await?; + break; + } + + if let Some(agent_state) = channel.get_agent_state(agent_id).await? + && agent_state.state == AgentState::Paused + { + if runtime_alive && !pending_turns.is_empty() { + info!(" ⏸️ Agent interrupted; stopping live session and re-queueing work"); + let _ = runtime.shutdown(ShutdownMode::Immediate).await?; + runtime_alive = false; + requeue_pending_turns(channel, agent_id, &mut pending_turns).await?; + } + continue; + } + + if !runtime_alive { + let resume_session_id = channel + .get_agent_state(agent_id) + .await? + .and_then(|agent| agent.runtime_session_id); + let AgentRuntime { + agent: new_runtime, + events: new_events, + } = build_streaming_runtime( + town_path, + cli_name, + cli_cmd, + agent_id, + name, + resume_session_id, + )?; + runtime = new_runtime; + events = new_events; + runtime_alive = true; + } + + if pending_turns.is_empty() { + let display_round = round + 1; + let urgent_messages = channel.receive_urgent(agent_id).await?; + let regular_messages = channel.drain_inbox(agent_id).await?; + let backlog_snapshot = backlog_snapshot_for_agent(channel, name, 8).await?; + + if regular_messages.is_empty() && urgent_messages.is_empty() { + if let Some(mut agent) = channel.get_agent_state(agent_id).await? { + clear_terminal_current_task(channel, &mut agent).await?; + let now = chrono::Utc::now(); + let became_idle = + agent.state != AgentState::Paused && agent.state != AgentState::Idle; + if agent.state != AgentState::Paused { + agent.state = AgentState::Idle; + } + if became_idle { + agent.last_active_at = now; + } + agent.last_heartbeat = now; + + if idle_timeout_elapsed(&agent, idle_timeout_secs, now) { + info!( + " πŸ”» Idle timeout reached after {}s, draining and stopping...", + idle_timeout_secs + ); + agent.state = AgentState::Draining; + channel.set_agent_state(&agent).await?; + channel + .log_agent_activity( + agent_id, + &format!( + "πŸ”» Idle timeout reached after {}s; draining and stopping", + idle_timeout_secs + ), + ) + .await?; + let _ = runtime.shutdown(ShutdownMode::Graceful).await?; + break; + } + + channel.set_agent_state(&agent).await?; + } + continue; + } + + let mut breakdown = MessageBreakdown::default(); + let mut actionable_messages: Vec<(tinytown::Message, bool)> = Vec::new(); + let mut informational_summaries: Vec = Vec::new(); + let mut confirmation_counts: std::collections::BTreeMap = + std::collections::BTreeMap::new(); + + for msg in urgent_messages { + breakdown.count(&msg.msg_type); + match classify_message(&msg.msg_type) { + MessageCategory::Task + | MessageCategory::Query + | MessageCategory::OtherActionable => { + actionable_messages.push((msg, true)); + } + MessageCategory::Informational => { + informational_summaries + .push(truncate_summary(&summarize_message(&msg.msg_type), 100)); + } + MessageCategory::Confirmation => { + let key = truncate_summary(&summarize_message(&msg.msg_type), 60); + *confirmation_counts.entry(key).or_insert(0) += 1; + } + } + } + + for msg in regular_messages { + breakdown.count(&msg.msg_type); + match classify_message(&msg.msg_type) { + MessageCategory::Task + | MessageCategory::Query + | MessageCategory::OtherActionable => { + actionable_messages.push((msg, false)); + } + MessageCategory::Informational => { + informational_summaries + .push(truncate_summary(&summarize_message(&msg.msg_type), 100)); + } + MessageCategory::Confirmation => { + let key = truncate_summary(&summarize_message(&msg.msg_type), 60); + *confirmation_counts.entry(key).or_insert(0) += 1; + } + } + } + + let urgent_present = actionable_messages.iter().any(|(_, urgent)| *urgent); + let regular_present = actionable_messages.iter().any(|(_, urgent)| !*urgent); + if urgent_present && regular_present { + let mut deferred_urgent = Vec::new(); + actionable_messages.retain(|(msg, urgent)| { + if *urgent { + deferred_urgent.push(msg.clone()); + false + } else { + true + } + }); + for msg in deferred_urgent { + channel.send_urgent(&msg).await?; + } + } + + if actionable_messages.is_empty() { + if backlog_snapshot.total_matching > 0 { + actionable_messages.push(( + tinytown::Message::new( + tinytown::AgentId::supervisor(), + agent_id, + tinytown::MessageType::Query { + question: format!( + "No direct assignments right now. Backlog has {} role-matching task(s): review and claim one with `tt backlog claim {}`.", + backlog_snapshot.total_matching, name + ), + }, + ), + false, + )); + } else if backlog_snapshot.total_backlog > 0 { + channel + .log_agent_activity( + agent_id, + &format!( + "Round {}: ⏭️ no direct work and {} backlog task(s) did not match role hint", + display_round, backlog_snapshot.total_backlog + ), + ) + .await?; + continue; + } else { + channel + .log_agent_activity( + agent_id, + &format!( + "Round {}: ⏭️ auto-handled {} informational, {} confirmations", + display_round, + informational_summaries.len(), + breakdown.confirmations + ), + ) + .await?; + continue; + } + } + + let urgent_actionable = actionable_messages + .iter() + .filter(|(_, urgent)| *urgent) + .count(); + track_current_task_for_round(channel, agent_id, &actionable_messages).await?; + let actionable_section = + format_actionable_section(channel, &actionable_messages).await; + + let informational_section = if informational_summaries.is_empty() { + String::new() + } else { + let mut section = String::from("\n## Informational (batched summary)\n\n"); + for summary in informational_summaries.iter().take(8) { + section.push_str(&format!("- {}\n", summary)); + } + if informational_summaries.len() > 8 { + section.push_str(&format!( + "- ...and {} more informational message(s)\n", + informational_summaries.len() - 8 + )); + } + section + }; + + let confirmation_section = if confirmation_counts.is_empty() { + String::new() + } else { + let mut section = String::from("\n## Confirmations (auto-dismissed)\n\n"); + for (kind, count) in &confirmation_counts { + section.push_str(&format!("- {} x{}\n", kind, count)); + } + section + }; + + let role_hint = backlog_role_hint(name); + let backlog_section = { + let mut section = format!( + "\n## Backlog Snapshot\n\n- Total backlog tasks: {}\n- Role-matching backlog tasks: {}\n- Role match hint: {}\n", + backlog_snapshot.total_backlog, backlog_snapshot.total_matching, role_hint + ); + if backlog_snapshot.total_matching > 0 { + section.push_str("\nReview and claim role-matching items:\n"); + for (task_id, task) in &backlog_snapshot.tasks { + let tags = if task.tags.is_empty() { + String::new() + } else { + format!(" [{}]", task.tags.join(", ")) + }; + section.push_str(&format!( + "- {} - {}{}\n", + task_id, + truncate_summary(&task.description, 90), + tags + )); + } + } + section + }; + + let prompt = format!( + r#"# Agent: {name} + +You are agent "{name}" in Tinytown "{town_name}". + +{actionable_section}{informational_section}{confirmation_section} +## Available Commands + +```bash +tt status # Check town status and all agents +tt assign "task" # Assign actionable work +tt backlog list # Review unassigned backlog tasks +tt backlog claim {agent_name} # Claim a backlog task for yourself +tt send --query "question" # Ask for a response +tt send --info "update" # Send FYI update +tt send --ack "received" # Send acknowledgment +tt send --urgent --query "..." # Priority message for the live session +tt task current # Show your tracked current assignment +tt task complete --result "summary" # Mark a task as done +``` + +{backlog_section} +## Current State +- Round: {display_round}/{max_rounds} +- Actionable messages: {actionable_count} +- Urgent actionable: {urgent_actionable} +- Batched informational: {info_count} +- Auto-dismissed confirmations: {confirmation_count} + +## Your Workflow + +1. Handle all actionable messages listed above. +2. If you have no direct assignment or extra capacity, review backlog and claim one role-matching task. +3. Claim only work that matches your role hint; do not claim unrelated tasks. +4. Prefer direct agent-to-agent messages for concrete execution handoffs, review requests, and unblock checks. +5. Use `supervisor` / `conductor` when you need human guidance, priority changes, broader sequencing, escalation, or town-wide visibility. +6. If blocked, send a query with specific unblock needs. +7. Use `tt task current` to confirm the real Tinytown task id before completing work. +8. When finished with a task, mark it complete: `tt task complete --result \"what was done\"` +9. Send informational updates or confirmations as appropriate, including FYI summaries to supervisor/conductor when the conductor should stay informed. +"#, + name = name, + agent_name = name, + town_name = config.name, + actionable_section = actionable_section, + informational_section = informational_section, + confirmation_section = confirmation_section, + backlog_section = backlog_section, + display_round = display_round, + max_rounds = max_rounds, + actionable_count = actionable_messages.len(), + urgent_actionable = urgent_actionable, + info_count = informational_summaries.len(), + confirmation_count = breakdown.confirmations, + ); + + if let Some(mut agent) = channel.get_agent_state(agent_id).await? { + agent.state = AgentState::Working; + agent.last_active_at = chrono::Utc::now(); + agent.last_heartbeat = chrono::Utc::now(); + agent.runtime_session_id = runtime.session_id(); + channel.set_agent_state(&agent).await?; + } + if let Some(mut task) = + tinytown::TaskService::current_for_agent(channel, agent_id).await? + && !task.state.is_terminal() + && task.state != tinytown::TaskState::Running + { + task.start(); + channel.set_task(&task).await?; + } + + let turn = AgentTurn { + prompt, + prompt_file: town_path.join(format!(".tt/agent_{}_prompt.md", name)), + output_file: town_path.join(format!(".tt/logs/{}_round_{}.log", name, display_round)), + }; + runtime.send(AgentInput::UserMessage(turn)).await?; + pending_turns.push_back(PendingTurn { + display_round, + actionable_messages, + }); + } else { + let urgent_messages = channel.receive_urgent(agent_id).await?; + if urgent_messages.is_empty() { + continue; + } + + let mut actionable_messages = Vec::new(); + for msg in urgent_messages { + if matches!( + classify_message(&msg.msg_type), + MessageCategory::Task + | MessageCategory::Query + | MessageCategory::OtherActionable + ) { + actionable_messages.push((msg, true)); + } + } + if actionable_messages.is_empty() { + continue; + } + + let display_round = round + pending_turns.len() as u32 + 1; + let actionable_section = + format_actionable_section(channel, &actionable_messages).await; + let prompt = format!( + "# Agent: {name}\n\nYou are in a live Tinytown session. Prioritize this urgent follow-up immediately.\n\n{actionable_section}\n## Current State\n- Round: {display_round}/{max_rounds}\n- This message arrived while you were already working.\n", + name = name, + actionable_section = actionable_section, + display_round = display_round, + max_rounds = max_rounds, + ); + let turn = AgentTurn { + prompt, + prompt_file: town_path.join(format!(".tt/agent_{}_prompt.md", name)), + output_file: town_path.join(format!(".tt/logs/{}_round_{}.log", name, display_round)), + }; + runtime.send(AgentInput::UrgentMessage(turn)).await?; + pending_turns.push_back(PendingTurn { + display_round, + actionable_messages, + }); + } + } + } + } + + let _ = runtime.shutdown(ShutdownMode::Graceful).await?; + + if let Some(mut agent) = channel.get_agent_state(agent_id).await? { + agent.state = AgentState::Stopped; + agent.last_heartbeat = chrono::Utc::now(); + agent.runtime_session_id = runtime.session_id(); + channel.set_agent_state(&agent).await?; + info!( + "🏁 Agent '{}' finished: {} rounds, {} tasks", + name, agent.rounds_completed, agent.tasks_completed + ); + } else { + info!("🏁 Agent '{}' finished after {} rounds", name, max_rounds); + } + + Ok(()) +} + #[derive(Parser)] #[command(name = "tt")] #[command(author, version, about = "Tinytown - Simple multi-agent orchestration using Redis", long_about = None)] @@ -3029,6 +3669,33 @@ async fn main() -> Result<()> { ); info!(" CLI: {} ({})", cli_name, cli_cmd); info!(" Idle timeout: {}s", idle_timeout_secs); + if config.agent.persistent { + info!(" Runtime: persistent session"); + run_persistent_agent_loop( + &cli.town, + config, + channel, + &name, + agent_id, + max_rounds, + &cli_name, + &cli_cmd, + idle_timeout_secs, + ) + .await?; + return Ok(()); + } + info!(" Runtime: one-shot subprocess per turn"); + + let AgentRuntime { + agent: mut runtime, + events: _events, + } = OneShotAgent::runtime(OneShotAgentConfig { + cli_name: cli_name.clone(), + cli_cmd: cli_cmd.clone(), + workdir: cli.town.clone(), + env: runtime_env(agent_id, &name), + }); // Use manual counter - only increment AFTER CLI execution (fixes round-burning bug) let mut round: u32 = 0; @@ -3353,10 +4020,6 @@ Only run commands needed to complete listed work; inbox messages for this round confirmation_count = breakdown.confirmations, ); - // Write prompt to temp file (under .tt/) - let prompt_file = cli.town.join(format!(".tt/agent_{}_prompt.md", name)); - std::fs::write(&prompt_file, &prompt)?; - // Update agent state to working if let Some(mut agent) = channel.get_agent_state(agent_id).await? { agent.state = AgentState::Working; @@ -3375,25 +4038,14 @@ Only run commands needed to complete listed work; inbox messages for this round // Run the agent CLI info!(" πŸ€– Running {}...", cli_name); - let output_file = cli - .town - .join(format!(".tt/logs/{}_round_{}.log", name, display_round)); - let output = std::fs::File::create(&output_file)?; - - let shell_cmd = build_cli_command(&cli_name, &cli_cmd, &prompt_file); - let status = std::process::Command::new("sh") - .arg("-c") - .arg(&shell_cmd) - .current_dir(&cli.town) - .env(TT_AGENT_ID_ENV, agent_id.to_string()) - .env(TT_AGENT_NAME_ENV, &name) - .stdin(std::process::Stdio::null()) - .stdout(output.try_clone()?) - .stderr(output) - .status(); - - // Clean up prompt file - let _ = std::fs::remove_file(&prompt_file); + let turn = AgentTurn { + prompt, + prompt_file: cli.town.join(format!(".tt/agent_{}_prompt.md", name)), + output_file: cli + .town + .join(format!(".tt/logs/{}_round_{}.log", name, display_round)), + }; + let status = runtime.send(AgentInput::UserMessage(turn)).await?.status; // Log activity and result let activity_msg = match &status { @@ -3470,6 +4122,8 @@ Only run commands needed to complete listed work; inbox messages for this round tokio::time::sleep(Duration::from_secs(1)).await; } + let _ = runtime.shutdown(ShutdownMode::Graceful).await?; + // Mark agent as stopped with final stats if let Some(mut agent) = channel.get_agent_state(agent_id).await? { agent.state = AgentState::Stopped; @@ -3993,6 +4647,12 @@ Now, help the user orchestrate their project! tinytown::events::EventType::AgentStopped | tinytown::events::EventType::AgentCompleted => "🏁", tinytown::events::EventType::AgentStateChanged => "πŸ”„", + tinytown::events::EventType::AgentTurnStarted => "▢️", + tinytown::events::EventType::AgentTurnCompleted => "βœ…", + tinytown::events::EventType::AgentAwaitingInput => "⏸️", + tinytown::events::EventType::AgentToolCall => "πŸ”§", + tinytown::events::EventType::AgentAssistantDelta => "πŸ’¬", + tinytown::events::EventType::AgentRuntimeError => "⚠️", tinytown::events::EventType::TaskAssigned => "πŸ“Œ", tinytown::events::EventType::TaskCompleted => "βœ…", tinytown::events::EventType::TaskFailed => "❌", diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 2365d1f..84db344 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -15,8 +15,8 @@ use std::time::Duration; use tempfile::TempDir; use tinytown::message::MessageType; use tinytown::{ - Agent, AgentId, AgentState, AgentType, Message, Priority, Task, TaskId, TaskService, TaskState, - Town, + Agent, AgentId, AgentState, AgentType, EventType, Message, Priority, Task, TaskId, TaskService, + TaskState, Town, }; use uuid::Uuid; @@ -73,6 +73,105 @@ fn reserve_unused_port() -> Result> { Ok(port) } +fn shell_quote(value: &str) -> String { + format!("'{}'", value.replace('\'', "'\"'\"'")) +} + +fn write_fake_streaming_claude( + town_path: &std::path::Path, + start_count_path: &std::path::Path, + input_log_path: &std::path::Path, +) -> Result> { + let script_path = town_path.join("fake_claude_stream.py"); + std::fs::write( + &script_path, + r#"#!/usr/bin/env python3 +import json +import os +import re +import sys +import time + +resume_id = None +args = sys.argv[1:] +for idx, arg in enumerate(args): + if arg in ("--resume", "-r") and idx + 1 < len(args): + resume_id = args[idx + 1] + +session_id = resume_id or "fake-session-1" +start_count_path = os.environ.get("TINYTOWN_FAKE_START_COUNT") +if start_count_path: + with open(start_count_path, "a", encoding="utf-8") as handle: + handle.write("start\n") + +print(json.dumps({"type": "system", "subtype": "init", "session_id": session_id}), flush=True) + +for raw_line in sys.stdin: + raw_line = raw_line.strip() + if not raw_line: + continue + payload = json.loads(raw_line) + if payload.get("type") != "user": + continue + text = payload["message"]["content"][0]["text"] + input_log_path = os.environ.get("TINYTOWN_FAKE_INPUT_LOG") + if input_log_path: + with open(input_log_path, "a", encoding="utf-8") as handle: + handle.write(text) + handle.write("\n---\n") + + match = re.search(r"SLEEP_MS=(\d+)", text) + if match: + time.sleep(int(match.group(1)) / 1000.0) + + if "TOOL_CALL=1" in text: + print(json.dumps({ + "type": "stream_event", + "event": { + "type": "content_block_start", + "content_block": {"type": "tool_use", "name": "Read"} + }, + "session_id": session_id + }), flush=True) + print(json.dumps({ + "type": "stream_event", + "event": { + "type": "content_block_delta", + "delta": {"type": "input_json_delta", "partial_json": "{\"path\":\"README.md\"}"} + }, + "session_id": session_id + }), flush=True) + print(json.dumps({ + "type": "stream_event", + "event": {"type": "content_block_stop"}, + "session_id": session_id + }), flush=True) + + summary = "handled urgent turn" if "URGENT" in text else "handled turn" + print(json.dumps({ + "type": "stream_event", + "event": { + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": summary} + }, + "session_id": session_id + }), flush=True) + print(json.dumps({ + "type": "stream_event", + "event": {"type": "message_stop"}, + "session_id": session_id + }), flush=True) +"#, + )?; + + Ok(format!( + "TINYTOWN_FAKE_START_COUNT={} TINYTOWN_FAKE_INPUT_LOG={} python3 {}", + shell_quote(&start_count_path.display().to_string()), + shell_quote(&input_log_path.display().to_string()), + shell_quote(&script_path.display().to_string()), + )) +} + // ============================================================================ // TOWN INITIALIZATION AND CONFIGURATION TESTS // ============================================================================ @@ -5021,6 +5120,31 @@ idle_timeout_secs = 42 Ok(()) } +/// Test that the persistent agent runtime flag can be parsed from config TOML. +#[tokio::test] +async fn test_agent_persistent_config_parse() -> Result<(), Box> { + use tinytown::Config; + + let temp_dir = TempDir::new()?; + let config_path = temp_dir.path().join("tinytown.toml"); + + std::fs::write( + &config_path, + r#" +name = "agent-persistent-test" + +[agent] +persistent = true +"#, + )?; + + let config = Config::load(temp_dir.path())?; + assert!(config.agent.persistent); + assert_eq!(config.agent.idle_timeout_secs, 300); + + Ok(()) +} + /// Test that the worker loop exits cleanly after the idle timeout elapses. #[tokio::test] async fn test_agent_loop_exits_cleanly_after_idle_timeout() -> Result<(), Box> @@ -5059,6 +5183,48 @@ async fn test_agent_loop_exits_cleanly_after_idle_timeout() -> Result<(), Box Result<(), Box> { + let town = create_test_town("persistent-agent-loop-idle-timeout").await?; + let town_path = town.config().root.clone(); + + let mut config = tinytown::Config::load(&town_path)?; + config.agent.idle_timeout_secs = 1; + config.agent.persistent = true; + config.save()?; + + let handle = town.spawn_agent("idle-worker", "claude").await?; + let agent_id = handle.id(); + + let status = tokio::task::spawn_blocking(move || { + std::process::Command::new(env!("CARGO_BIN_EXE_tt")) + .arg("--town") + .arg(&town_path) + .arg("agent-loop") + .arg("idle-worker") + .arg(agent_id.to_string()) + .arg("100") + .status() + }) + .await??; + + assert!( + status.success(), + "persistent agent-loop should exit cleanly" + ); + + let agent = town + .channel() + .get_agent_state(agent_id) + .await? + .expect("idle worker should still be registered"); + assert_eq!(agent.state, AgentState::Stopped); + + Ok(()) +} + /// Test that a stale terminal current_task does not block worker idle timeout. #[tokio::test] async fn test_agent_loop_ignores_stale_terminal_current_task() @@ -5111,6 +5277,306 @@ async fn test_agent_loop_ignores_stale_terminal_current_task() Ok(()) } +/// Test that the persistent runtime reuses a single streaming child across turns. +#[tokio::test] +async fn test_agent_loop_persistent_runtime_reuses_single_streaming_process() +-> Result<(), Box> { + let town = create_test_town("agent-loop-persistent-reuse").await?; + let town_path = town.config().root.clone(); + let start_count_path = town_path.join("fake_claude.starts"); + let input_log_path = town_path.join("fake_claude.inputs"); + let fake_command = write_fake_streaming_claude(&town_path, &start_count_path, &input_log_path)?; + + let mut config = tinytown::Config::load(&town_path)?; + config.agent.persistent = true; + config.default_cli = "claude".to_string(); + config.agent_clis.insert( + "claude".to_string(), + tinytown::agent::AgentCli::new("claude", &fake_command), + ); + config.save()?; + + let handle = town.spawn_agent("persistent-worker", "claude").await?; + let agent_id = handle.id(); + + town.channel() + .send(&Message::new( + AgentId::supervisor(), + agent_id, + MessageType::Task { + description: "FIRST_TURN TOOL_CALL=1".to_string(), + }, + )) + .await?; + + let town_path_for_loop = town_path.clone(); + let loop_handle = tokio::task::spawn_blocking(move || { + std::process::Command::new(env!("CARGO_BIN_EXE_tt")) + .arg("--town") + .arg(&town_path_for_loop) + .arg("agent-loop") + .arg("persistent-worker") + .arg(agent_id.to_string()) + .arg("2") + .status() + }); + + tokio::time::sleep(Duration::from_millis(500)).await; + town.channel() + .send(&Message::new( + AgentId::supervisor(), + agent_id, + MessageType::Task { + description: "SECOND_TURN".to_string(), + }, + )) + .await?; + + let status = loop_handle.await??; + assert!( + status.success(), + "persistent agent-loop should exit cleanly" + ); + + let starts = std::fs::read_to_string(start_count_path)?; + assert_eq!( + starts.lines().count(), + 1, + "streaming runtime should start once" + ); + + let inputs = std::fs::read_to_string(input_log_path)?; + assert!(inputs.contains("FIRST_TURN")); + assert!(inputs.contains("SECOND_TURN")); + + let agent = town + .channel() + .get_agent_state(agent_id) + .await? + .expect("persistent worker should still be registered"); + assert_eq!(agent.state, AgentState::Stopped); + assert_eq!(agent.rounds_completed, 2); + assert_eq!(agent.runtime_session_id.as_deref(), Some("fake-session-1")); + + Ok(()) +} + +/// Test that urgent messages can be injected into a live persistent session. +#[tokio::test] +async fn test_agent_loop_persistent_runtime_processes_urgent_message_mid_turn() +-> Result<(), Box> { + let town = create_test_town("agent-loop-persistent-urgent").await?; + let town_path = town.config().root.clone(); + let start_count_path = town_path.join("fake_claude.starts"); + let input_log_path = town_path.join("fake_claude.inputs"); + let fake_command = write_fake_streaming_claude(&town_path, &start_count_path, &input_log_path)?; + + let mut config = tinytown::Config::load(&town_path)?; + config.agent.persistent = true; + config.default_cli = "claude".to_string(); + config.agent_clis.insert( + "claude".to_string(), + tinytown::agent::AgentCli::new("claude", &fake_command), + ); + config.save()?; + + let handle = town.spawn_agent("urgent-worker", "claude").await?; + let agent_id = handle.id(); + + town.channel() + .send(&Message::new( + AgentId::supervisor(), + agent_id, + MessageType::Task { + description: "FIRST_TURN SLEEP_MS=1500".to_string(), + }, + )) + .await?; + + let town_path_for_loop = town_path.clone(); + let loop_handle = tokio::task::spawn_blocking(move || { + std::process::Command::new(env!("CARGO_BIN_EXE_tt")) + .arg("--town") + .arg(&town_path_for_loop) + .arg("agent-loop") + .arg("urgent-worker") + .arg(agent_id.to_string()) + .arg("2") + .status() + }); + + tokio::time::sleep(Duration::from_millis(300)).await; + town.channel() + .send_urgent(&Message::new( + AgentId::supervisor(), + agent_id, + MessageType::Query { + question: "URGENT_PING".to_string(), + }, + )) + .await?; + + let status = loop_handle.await??; + assert!( + status.success(), + "persistent urgent agent-loop should exit cleanly" + ); + + let starts = std::fs::read_to_string(start_count_path)?; + assert_eq!( + starts.lines().count(), + 1, + "urgent turn should not restart runtime" + ); + + let inputs = std::fs::read_to_string(input_log_path)?; + assert!(inputs.contains("SLEEP_MS=1500")); + assert!(inputs.contains("URGENT_PING")); + + let agent = town + .channel() + .get_agent_state(agent_id) + .await? + .expect("urgent worker should still be registered"); + assert_eq!(agent.state, AgentState::Stopped); + assert_eq!(agent.rounds_completed, 2); + + Ok(()) +} + +/// Test that stop requests interrupt a live persistent session instead of waiting for the turn to end. +#[tokio::test] +async fn test_agent_loop_persistent_runtime_stop_interrupts_live_turn() +-> Result<(), Box> { + let town = create_test_town("agent-loop-persistent-stop").await?; + let town_path = town.config().root.clone(); + let start_count_path = town_path.join("fake_claude.starts"); + let input_log_path = town_path.join("fake_claude.inputs"); + let fake_command = write_fake_streaming_claude(&town_path, &start_count_path, &input_log_path)?; + + let mut config = tinytown::Config::load(&town_path)?; + config.agent.persistent = true; + config.default_cli = "claude".to_string(); + config.agent_clis.insert( + "claude".to_string(), + tinytown::agent::AgentCli::new("claude", &fake_command), + ); + config.save()?; + + let handle = town.spawn_agent("stop-worker", "claude").await?; + let agent_id = handle.id(); + + town.channel() + .send(&Message::new( + AgentId::supervisor(), + agent_id, + MessageType::Task { + description: "LONG_TURN SLEEP_MS=5000".to_string(), + }, + )) + .await?; + + let town_path_for_loop = town_path.clone(); + let started = std::time::Instant::now(); + let loop_handle = tokio::task::spawn_blocking(move || { + std::process::Command::new(env!("CARGO_BIN_EXE_tt")) + .arg("--town") + .arg(&town_path_for_loop) + .arg("agent-loop") + .arg("stop-worker") + .arg(agent_id.to_string()) + .arg("10") + .status() + }); + + tokio::time::sleep(Duration::from_millis(400)).await; + town.channel().request_stop(agent_id).await?; + + let status = loop_handle.await??; + assert!(status.success(), "persistent stop path should exit cleanly"); + assert!( + started.elapsed() < Duration::from_secs(4), + "stop request should interrupt the live turn quickly" + ); + + let agent = town + .channel() + .get_agent_state(agent_id) + .await? + .expect("stop worker should still be registered"); + assert_eq!(agent.state, AgentState::Stopped); + + Ok(()) +} + +/// Test that persistent runtime events are emitted onto the per-agent Redis stream. +#[tokio::test] +async fn test_agent_loop_persistent_runtime_emits_structured_agent_events() +-> Result<(), Box> { + let town = create_test_town("agent-loop-persistent-events").await?; + let town_path = town.config().root.clone(); + let start_count_path = town_path.join("fake_claude.starts"); + let input_log_path = town_path.join("fake_claude.inputs"); + let fake_command = write_fake_streaming_claude(&town_path, &start_count_path, &input_log_path)?; + + let mut config = tinytown::Config::load(&town_path)?; + config.agent.persistent = true; + config.default_cli = "claude".to_string(); + config.agent_clis.insert( + "claude".to_string(), + tinytown::agent::AgentCli::new("claude", &fake_command), + ); + config.save()?; + + let handle = town.spawn_agent("event-worker", "claude").await?; + let agent_id = handle.id(); + + town.channel() + .send(&Message::new( + AgentId::supervisor(), + agent_id, + MessageType::Task { + description: "EVENT_TURN TOOL_CALL=1".to_string(), + }, + )) + .await?; + + let status = tokio::task::spawn_blocking({ + let town_path = town_path.clone(); + move || { + std::process::Command::new(env!("CARGO_BIN_EXE_tt")) + .arg("--town") + .arg(&town_path) + .arg("agent-loop") + .arg("event-worker") + .arg(agent_id.to_string()) + .arg("1") + .status() + } + }) + .await??; + assert!( + status.success(), + "persistent event worker should exit cleanly" + ); + + let events = town + .event_stream() + .read_agent_events(agent_id, "0-0", 50) + .await?; + let event_types: Vec = events + .into_iter() + .map(|(_, event)| event.event_type) + .collect(); + + assert!(event_types.contains(&EventType::AgentTurnStarted)); + assert!(event_types.contains(&EventType::AgentToolCall)); + assert!(event_types.contains(&EventType::AgentAssistantDelta)); + assert!(event_types.contains(&EventType::AgentTurnCompleted)); + + Ok(()) +} + /// Test that multiple consumers can read from the same docket stream. #[tokio::test] async fn test_docket_multiple_consumers() -> Result<(), Box> { From 528b723065faa555237ae1390ac057fd8b3c7d90 Mon Sep 17 00:00:00 2001 From: Jeremy Plichta Date: Thu, 23 Apr 2026 16:30:18 -0600 Subject: [PATCH 2/3] refactor: retarget persistent runtime to codex app-server --- src/agent_runtime.rs | 1052 ++++++++++++++++++++---------------- src/lib.rs | 2 +- src/main.rs | 37 +- tests/integration_tests.rs | 322 ++++++++--- 4 files changed, 842 insertions(+), 571 deletions(-) diff --git a/src/agent_runtime.rs b/src/agent_runtime.rs index d791854..979373c 100644 --- a/src/agent_runtime.rs +++ b/src/agent_runtime.rs @@ -5,7 +5,7 @@ //! Agent runtime adapters. -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::process::{Command, ExitStatus, Stdio}; @@ -14,15 +14,18 @@ use std::task::{Context, Poll}; use async_trait::async_trait; use futures_core::Stream; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines}; +use serde_json::{Value, json}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child as TokioChild, ChildStdin, ChildStdout, Command as TokioCommand}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; -use uuid::Uuid; use crate::Result; -const TURN_STATUS_MARKER: &str = "__TINYTOWN_TURN_STATUS__"; +#[must_use] +pub fn supports_persistent_runtime(cli_name: &str) -> bool { + matches!(cli_name, "codex" | "codex-mini") +} #[must_use] pub fn build_cli_command(cli_name: &str, cli_cmd: &str, prompt_file: &Path) -> String { @@ -44,24 +47,6 @@ fn shell_quote_str(value: &str) -> String { format!("'{}'", value.replace('\'', "'\"'\"'")) } -#[must_use] -fn build_turn_script( - cli_name: &str, - cli_cmd: &str, - prompt_file: &Path, - output_file: &Path, -) -> String { - let shell_cmd = build_cli_command(cli_name, cli_cmd, prompt_file); - let output_file = shell_quote(output_file); - format!("{shell_cmd} > {output_file} 2>&1\nprintf '%s%s\\n' '{TURN_STATUS_MARKER}' \"$?\"\n") -} - -#[must_use] -fn parse_turn_status(line: &str) -> Option { - line.strip_prefix(TURN_STATUS_MARKER) - .and_then(|status| status.trim().parse::().ok()) -} - #[derive(Debug, Clone)] pub struct AgentTurn { pub prompt: String, @@ -187,174 +172,172 @@ pub struct StreamingAgentConfig { } #[derive(Debug)] -pub struct PersistentShellAgent { +pub struct OneShotAgent { config: OneShotAgentConfig, events: VecDeque, last_exit_status: Option, - session: Option, - session_id: Option, -} - -#[derive(Debug)] -struct PersistentShellSession { - child: TokioChild, - stdin: ChildStdin, - stdout: Lines>, -} - -impl PersistentShellSession { - async fn start(config: &OneShotAgentConfig) -> Result { - let mut cmd = TokioCommand::new("sh"); - cmd.current_dir(&config.workdir) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::inherit()); - for (key, value) in &config.env { - cmd.env(key, value); - } - - let mut child = cmd.spawn()?; - let stdin = child - .stdin - .take() - .ok_or_else(|| std::io::Error::other("persistent runtime stdin unavailable"))?; - let stdout = child - .stdout - .take() - .ok_or_else(|| std::io::Error::other("persistent runtime stdout unavailable"))?; - - Ok(Self { - child, - stdin, - stdout: BufReader::new(stdout).lines(), - }) - } } -impl PersistentShellAgent { +impl OneShotAgent { #[must_use] pub fn new(config: OneShotAgentConfig) -> Self { - Self::with_session_id(config, None) - } - - #[must_use] - pub fn with_session_id(config: OneShotAgentConfig, session_id: Option) -> Self { Self { config, events: VecDeque::new(), last_exit_status: None, - session: None, - session_id, } } - async fn session(&mut self) -> Result<&mut PersistentShellSession> { - if self.session.is_none() { - self.session = Some(PersistentShellSession::start(&self.config).await?); + #[must_use] + pub fn runtime(config: OneShotAgentConfig) -> AgentRuntime { + let (tx, rx) = mpsc::unbounded_channel(); + AgentRuntime { + agent: RuntimeAgent::new(Box::new(Self::new(config)), tx), + events: AgentEventReceiver { rx }, } - Ok(self.session.as_mut().expect("session initialized")) } - async fn run_turn(&mut self, turn: AgentTurn) -> Result { + fn run_turn(&mut self, turn: AgentTurn) -> Result { self.events.push_back(AgentEvent::TurnStarted); std::fs::write(&turn.prompt_file, &turn.prompt)?; - let script = build_turn_script( - &self.config.cli_name, - &self.config.cli_cmd, - &turn.prompt_file, - &turn.output_file, - ); - let send_result = self.send_script(script).await; + let status = (|| -> std::io::Result { + let output = std::fs::File::create(&turn.output_file)?; + let shell_cmd = build_cli_command( + &self.config.cli_name, + &self.config.cli_cmd, + &turn.prompt_file, + ); + let mut cmd = Command::new("sh"); + cmd.arg("-c") + .arg(&shell_cmd) + .current_dir(&self.config.workdir) + .stdin(Stdio::null()) + .stdout(output.try_clone()?) + .stderr(output); + + for (key, value) in &self.config.env { + cmd.env(key, value); + } + + cmd.status() + })(); + let _ = std::fs::remove_file(&turn.prompt_file); - match &send_result { - Ok(status) => { - self.last_exit_status = Some(status.clone()); - if status.success() { + match &status { + Ok(exit_status) => { + self.last_exit_status = Some(exit_status.clone()); + self.events + .push_back(AgentEvent::Exited(exit_status.clone())); + if exit_status.success() { self.events .push_back(AgentEvent::TurnCompleted { summary: None }); - self.events.push_back(AgentEvent::AwaitingInput); } else { self.events.push_back(AgentEvent::SessionError(format!( "CLI exited with status {}", - describe_exit_status(status) + describe_exit_status(exit_status) ))); } } Err(err) => { self.events.push_back(AgentEvent::SessionError(format!( - "Persistent runtime failed: {}", + "Failed to run CLI: {}", err ))); } } - Ok(AgentTurnResult { - status: send_result, + Ok(AgentTurnResult { status }) + } +} + +#[async_trait] +impl CodingAgent for OneShotAgent { + async fn send(&mut self, input: AgentInput) -> Result { + match input { + AgentInput::UserMessage(turn) | AgentInput::UrgentMessage(turn) => self.run_turn(turn), + AgentInput::Cancel => { + self.events.push_back(AgentEvent::SessionError( + "Cancel is not supported by the one-shot runtime".to_string(), + )); + Ok(AgentTurnResult { + status: Err(std::io::Error::other( + "Cancel is not supported by the one-shot runtime", + )), + }) + } + } + } + + fn events(&mut self) -> AgentEventStream<'_> { + Box::pin(BufferedEventStream { + events: &mut self.events, }) } - async fn send_script(&mut self, script: String) -> std::io::Result { - let session = self.session().await.map_err(as_io_error)?; - session.stdin.write_all(script.as_bytes()).await?; - session.stdin.flush().await?; + fn take_events(&mut self) -> Vec { + self.events.drain(..).collect() + } - loop { - match session.stdout.next_line().await? { - Some(line) => { - if let Some(status) = parse_turn_status(&line) { - return Ok(exit_status_from_code(status)); - } - } - None => { - let exit_status = session.child.wait().await?; - self.last_exit_status = Some(exit_status.clone()); - self.events - .push_back(AgentEvent::Exited(exit_status.clone())); - self.session = None; - return Err(std::io::Error::other(format!( - "persistent runtime exited unexpectedly ({})", - describe_exit_status(&exit_status) - ))); - } - } + async fn shutdown(&mut self, _mode: ShutdownMode) -> Result> { + Ok(self.last_exit_status.clone()) + } +} + +#[derive(Debug, Clone)] +struct CodexAppServerLaunchConfig { + launch_cmd: String, + workdir: PathBuf, + env: Vec<(String, String)>, + resume_thread_id: Option, + model: Option, + reasoning_effort: Option, +} + +impl CodexAppServerLaunchConfig { + fn from_streaming(config: StreamingAgentConfig) -> Self { + let (model, reasoning_effort) = codex_model_overrides(&config.cli_name); + Self { + launch_cmd: build_persistent_launch_command(&config.cli_name, &config.cli_cmd), + workdir: config.workdir, + env: config.env, + resume_thread_id: config.resume_session_id, + model, + reasoning_effort, } } } +#[derive(Debug, Default)] +struct PendingRequestState { + next_id: u64, + pending: HashMap>>, +} + #[derive(Debug)] -struct StreamingProcessAgent { +pub struct CodexAppServerAgent { + config: CodexAppServerLaunchConfig, child: TokioChild, stdin: Option, events: VecDeque, - session_id: Arc>>, - output_files: Arc>>, + last_exit_status: Option, + thread_id: Arc>>, + active_turn_id: Arc>>, + output_files: Arc>>, + request_state: Arc>, event_tx: mpsc::UnboundedSender, reader_task: JoinHandle<()>, } -#[derive(Default)] -struct StreamingReaderState { - text_summary: String, - tool_name: Option, - tool_json: String, -} - -impl StreamingProcessAgent { - fn spawn( - config: StreamingAgentConfig, +impl CodexAppServerAgent { + async fn spawn( + config: CodexAppServerLaunchConfig, event_tx: mpsc::UnboundedSender, ) -> Result { - let mut command_line = config.cli_cmd.clone(); - if let Some(session_id) = config.resume_session_id.as_ref() { - command_line.push_str(" --resume "); - command_line.push_str(&shell_quote_str(session_id)); - } - let mut cmd = TokioCommand::new("sh"); cmd.arg("-lc") - .arg(command_line) + .arg(&config.launch_cmd) .current_dir(&config.workdir) .stdin(Stdio::piped()) .stdout(Stdio::piped()) @@ -367,421 +350,536 @@ impl StreamingProcessAgent { let stdin = child .stdin .take() - .ok_or_else(|| std::io::Error::other("streaming runtime stdin unavailable"))?; + .ok_or_else(|| std::io::Error::other("codex app-server stdin unavailable"))?; let stdout = child .stdout .take() - .ok_or_else(|| std::io::Error::other("streaming runtime stdout unavailable"))?; + .ok_or_else(|| std::io::Error::other("codex app-server stdout unavailable"))?; - let session_id = Arc::new(Mutex::new(config.resume_session_id)); - let output_files = Arc::new(Mutex::new(VecDeque::new())); - let reader_task = spawn_streaming_reader( + let thread_id = Arc::new(Mutex::new(None)); + let active_turn_id = Arc::new(Mutex::new(None)); + let output_files = Arc::new(Mutex::new(HashMap::new())); + let request_state = Arc::new(Mutex::new(PendingRequestState::default())); + let reader_task = spawn_codex_reader( stdout, event_tx.clone(), - session_id.clone(), + thread_id.clone(), + active_turn_id.clone(), output_files.clone(), + request_state.clone(), ); - Ok(Self { + let mut agent = Self { + config, child, stdin: Some(stdin), events: VecDeque::new(), - session_id, + last_exit_status: None, + thread_id, + active_turn_id, output_files, + request_state, event_tx, reader_task, - }) + }; + + agent.initialize().await?; + let thread_id = agent.start_or_resume_thread().await?; + let _ = agent.event_tx.send(AgentEvent::SessionReady { + session_id: thread_id, + }); + + Ok(agent) + } + + async fn initialize(&mut self) -> Result<()> { + let params = json!({ + "clientInfo": { + "name": "tinytown", + "title": "Tinytown", + "version": env!("CARGO_PKG_VERSION"), + } + }); + let _ = self.send_request("initialize", params).await?; + self.send_notification("initialized", json!({})).await?; + Ok(()) + } + + async fn start_or_resume_thread(&mut self) -> Result { + let params = json!({ + "approvalPolicy": "never", + "cwd": self.config.workdir, + "sandbox": "danger-full-access", + "model": self.config.model, + }); + + let result = if let Some(thread_id) = self.config.resume_thread_id.as_ref() { + let mut resume = params; + if let Some(object) = resume.as_object_mut() { + object.insert("threadId".to_string(), Value::String(thread_id.clone())); + } + self.send_request("thread/resume", resume).await? + } else { + self.send_request("thread/start", params).await? + }; + + let thread_id = result + .get("thread") + .and_then(|thread| thread.get("id")) + .and_then(Value::as_str) + .ok_or_else(|| std::io::Error::other("codex app-server thread id missing"))? + .to_string(); + *self.thread_id.lock().expect("thread id mutex poisoned") = Some(thread_id.clone()); + Ok(thread_id) + } + + async fn send_request(&mut self, method: &str, params: Value) -> Result { + let (request_id, rx) = { + let mut state = self + .request_state + .lock() + .expect("request state mutex poisoned"); + state.next_id += 1; + let request_id = state.next_id; + let (tx, rx) = oneshot::channel(); + state.pending.insert(request_id, tx); + (request_id, rx) + }; + + self.write_json(&json!({ + "id": request_id, + "method": method, + "params": params, + })) + .await?; + + match rx.await { + Ok(Ok(result)) => Ok(result), + Ok(Err(err)) => Err(err.into()), + Err(_) => Err(std::io::Error::other(format!( + "codex app-server request dropped before response: {}", + method + )) + .into()), + } + } + + async fn send_notification(&mut self, method: &str, params: Value) -> Result<()> { + self.write_json(&json!({ + "method": method, + "params": params, + })) + .await + } + + async fn write_json(&mut self, payload: &Value) -> Result<()> { + let stdin = self + .stdin + .as_mut() + .ok_or_else(|| std::io::Error::other("codex app-server stdin closed"))?; + stdin.write_all(payload.to_string().as_bytes()).await?; + stdin.write_all(b"\n").await?; + stdin.flush().await?; + Ok(()) + } + + async fn start_turn(&mut self, turn: AgentTurn) -> Result { + std::fs::write(&turn.prompt_file, &turn.prompt)?; + + let mut params = json!({ + "approvalPolicy": "never", + "cwd": self.config.workdir, + "threadId": self + .session_id() + .ok_or_else(|| std::io::Error::other("codex thread id missing"))?, + "input": [{"type": "text", "text": turn.prompt}], + }); + if let Some(object) = params.as_object_mut() { + if let Some(model) = self.config.model.as_ref() { + object.insert("model".to_string(), Value::String(model.clone())); + } + if let Some(effort) = self.config.reasoning_effort.as_ref() { + object.insert("effort".to_string(), Value::String(effort.clone())); + } + object.insert( + "sandboxPolicy".to_string(), + Value::String("danger-full-access".to_string()), + ); + } + + let result = self.send_request("turn/start", params).await; + let _ = std::fs::remove_file(&turn.prompt_file); + + match result { + Ok(payload) => { + let turn_id = payload + .get("turn") + .and_then(|value| value.get("id")) + .and_then(Value::as_str) + .ok_or_else(|| std::io::Error::other("codex turn id missing"))? + .to_string(); + *self + .active_turn_id + .lock() + .expect("active turn mutex poisoned") = Some(turn_id.clone()); + queue_output_file(&self.output_files, &turn_id, turn.output_file)?; + Ok(AgentTurnResult { + status: Ok(success_exit_status()), + }) + } + Err(err) => { + self.events.push_back(AgentEvent::SessionError(format!( + "Failed to start Codex turn: {}", + err + ))); + Ok(AgentTurnResult { + status: Err(as_io_error(err)), + }) + } + } + } + + async fn steer_turn(&mut self, turn: AgentTurn, turn_id: String) -> Result { + std::fs::write(&turn.prompt_file, &turn.prompt)?; + let result = self + .send_request( + "turn/steer", + json!({ + "threadId": self + .session_id() + .ok_or_else(|| std::io::Error::other("codex thread id missing"))?, + "expectedTurnId": turn_id, + "input": [{"type": "text", "text": turn.prompt}], + }), + ) + .await; + let _ = std::fs::remove_file(&turn.prompt_file); + + match result { + Ok(_) => Ok(AgentTurnResult { + status: Ok(success_exit_status()), + }), + Err(err) => { + self.events.push_back(AgentEvent::SessionError(format!( + "Failed to steer Codex turn: {}", + err + ))); + Ok(AgentTurnResult { + status: Err(as_io_error(err)), + }) + } + } + } + + async fn interrupt_active_turn(&mut self) -> Result<()> { + let Some(thread_id) = self.session_id() else { + return Ok(()); + }; + let Some(turn_id) = self + .active_turn_id + .lock() + .expect("active turn mutex poisoned") + .clone() + else { + return Ok(()); + }; + + let _ = self + .send_request( + "turn/interrupt", + json!({ + "threadId": thread_id, + "turnId": turn_id, + }), + ) + .await?; + Ok(()) } } -fn spawn_streaming_reader( +fn spawn_codex_reader( stdout: ChildStdout, event_tx: mpsc::UnboundedSender, - session_id: Arc>>, - output_files: Arc>>, + thread_id: Arc>>, + active_turn_id: Arc>>, + output_files: Arc>>, + request_state: Arc>, ) -> JoinHandle<()> { tokio::spawn(async move { let mut lines = BufReader::new(stdout).lines(); - let mut state = StreamingReaderState::default(); + while let Ok(Some(line)) = lines.next_line().await { - handle_streaming_line(&line, &event_tx, &session_id, &output_files, &mut state); + handle_codex_line( + &line, + &event_tx, + &thread_id, + &active_turn_id, + &output_files, + &request_state, + ); } + + fail_pending_requests( + &request_state, + "codex app-server closed before sending a response", + ); }) } -fn handle_streaming_line( +fn handle_codex_line( line: &str, event_tx: &mpsc::UnboundedSender, - session_id: &Arc>>, - output_files: &Arc>>, - state: &mut StreamingReaderState, + thread_id: &Arc>>, + active_turn_id: &Arc>>, + output_files: &Arc>>, + request_state: &Arc>, ) { - let Ok(payload) = serde_json::from_str::(line) else { + let Ok(payload) = serde_json::from_str::(line) else { let _ = event_tx.send(AgentEvent::SessionError(format!( - "Invalid streaming event: {}", + "Invalid codex app-server event: {}", line ))); return; }; - if let Some(id) = payload - .get("session_id") - .and_then(serde_json::Value::as_str) - { - *session_id.lock().expect("session id mutex poisoned") = Some(id.to_string()); - } + if let Some(id) = payload.get("id").and_then(Value::as_u64) { + let response = if let Some(result) = payload.get("result") { + Ok(result.clone()) + } else if let Some(err) = payload.get("error") { + let message = err + .get("message") + .and_then(Value::as_str) + .unwrap_or("unknown JSON-RPC error"); + Err(std::io::Error::other(message.to_string())) + } else { + Err(std::io::Error::other("missing JSON-RPC result")) + }; - match payload.get("type").and_then(serde_json::Value::as_str) { - Some("system") - if payload.get("subtype").and_then(serde_json::Value::as_str) == Some("init") => + if let Some(tx) = request_state + .lock() + .expect("request state mutex poisoned") + .pending + .remove(&id) { - if let Some(id) = payload - .get("session_id") - .and_then(serde_json::Value::as_str) + let _ = tx.send(response); + } + return; + } + + let Some(method) = payload.get("method").and_then(Value::as_str) else { + return; + }; + let params = payload.get("params").unwrap_or(&Value::Null); + + match method { + "thread/started" => { + if let Some(id) = params + .get("thread") + .and_then(|thread| thread.get("id")) + .and_then(Value::as_str) { - let _ = event_tx.send(AgentEvent::SessionReady { - session_id: id.to_string(), - }); + *thread_id.lock().expect("thread id mutex poisoned") = Some(id.to_string()); } } - Some("stream_event") => { - let Some(event) = payload.get("event") else { - return; - }; - match event.get("type").and_then(serde_json::Value::as_str) { - Some("content_block_start") - if event - .get("content_block") - .and_then(|block| block.get("type")) - .and_then(serde_json::Value::as_str) - == Some("tool_use") => - { - state.tool_name = event - .get("content_block") - .and_then(|block| block.get("name")) - .and_then(serde_json::Value::as_str) - .map(str::to_string); - state.tool_json.clear(); - } - Some("content_block_delta") => { - if let Some(delta) = event.get("delta") { - match delta.get("type").and_then(serde_json::Value::as_str) { - Some("input_json_delta") => { - if let Some(partial) = delta - .get("partial_json") - .and_then(serde_json::Value::as_str) - { - state.tool_json.push_str(partial); - } - } - Some("text_delta") => { - if let Some(text) = - delta.get("text").and_then(serde_json::Value::as_str) - { - state.text_summary.push_str(text); - append_to_current_output(output_files, text); - let _ = - event_tx.send(AgentEvent::AssistantDelta(text.to_string())); - } - } - _ => {} - } - } - } - Some("content_block_stop") => { - if let Some(name) = state.tool_name.take() { - let args = serde_json::from_str(&state.tool_json) - .unwrap_or_else(|_| serde_json::Value::String(state.tool_json.clone())); - state.tool_json.clear(); - let _ = event_tx.send(AgentEvent::ToolCall { name, args }); - } + "turn/started" => { + if let Some(id) = params + .get("turn") + .and_then(|turn| turn.get("id")) + .and_then(Value::as_str) + { + *active_turn_id.lock().expect("active turn mutex poisoned") = Some(id.to_string()); + } + let _ = event_tx.send(AgentEvent::TurnStarted); + } + "item/agentMessage/delta" => { + if let Some(delta) = params.get("delta").and_then(Value::as_str) { + if let Some(turn_id) = params.get("turnId").and_then(Value::as_str) { + append_to_turn_output(output_files, turn_id, delta); } - Some("message_stop") => { - let summary = if state.text_summary.trim().is_empty() { - None - } else { - Some(state.text_summary.trim().to_string()) - }; - state.text_summary.clear(); - finish_current_output(output_files); - let _ = event_tx.send(AgentEvent::TurnCompleted { summary }); - let _ = event_tx.send(AgentEvent::AwaitingInput); + let _ = event_tx.send(AgentEvent::AssistantDelta(delta.to_string())); + } + } + "item/completed" => { + if let Some((name, args)) = + extract_tool_call(params.get("item").unwrap_or(&Value::Null)) + { + let _ = event_tx.send(AgentEvent::ToolCall { name, args }); + } + } + "turn/completed" => { + let turn_id = params + .get("turn") + .and_then(|turn| turn.get("id")) + .and_then(Value::as_str) + .map(str::to_string); + if let Some(turn_id) = turn_id.as_ref() { + finish_turn_output(output_files, turn_id); + let mut active = active_turn_id.lock().expect("active turn mutex poisoned"); + if active.as_deref() == Some(turn_id.as_str()) { + *active = None; } - _ => {} } + + let summary = params + .get("turn") + .and_then(extract_turn_summary) + .or_else(|| turn_id.map(|id| format!("turn {}", id))); + let _ = event_tx.send(AgentEvent::TurnCompleted { summary }); + let _ = event_tx.send(AgentEvent::AwaitingInput); } _ => {} } } +fn fail_pending_requests(request_state: &Arc>, message: &str) { + let pending = { + let mut state = request_state.lock().expect("request state mutex poisoned"); + std::mem::take(&mut state.pending) + }; + + for (_, tx) in pending { + let _ = tx.send(Err(std::io::Error::other(message.to_string()))); + } +} + +fn extract_tool_call(item: &Value) -> Option<(String, Value)> { + match item.get("type").and_then(Value::as_str) { + Some("commandExecution") => Some(( + item.get("command") + .and_then(Value::as_str) + .unwrap_or("commandExecution") + .to_string(), + json!({ + "status": item.get("status").and_then(Value::as_str), + }), + )), + Some("mcpToolCall") => { + let server = item.get("server").and_then(Value::as_str).unwrap_or("mcp"); + let tool = item.get("tool").and_then(Value::as_str).unwrap_or("tool"); + Some(( + format!("{}:{}", server, tool), + item.get("arguments").cloned().unwrap_or(Value::Null), + )) + } + _ => None, + } +} + +fn extract_turn_summary(turn: &Value) -> Option { + if let Some(summary) = turn.get("summary").and_then(Value::as_str) + && !summary.trim().is_empty() + { + return Some(summary.trim().to_string()); + } + + turn.get("status") + .and_then(Value::as_str) + .map(|status| status.to_string()) +} + fn queue_output_file( - output_files: &Arc>>, + output_files: &Arc>>, + turn_id: &str, output_file: PathBuf, ) -> std::io::Result<()> { let _ = std::fs::File::create(&output_file)?; output_files .lock() - .expect("output queue mutex poisoned") - .push_back(output_file); + .expect("output file mutex poisoned") + .insert(turn_id.to_string(), output_file); Ok(()) } -fn append_to_current_output(output_files: &Arc>>, text: &str) { - let current = output_files +fn append_to_turn_output( + output_files: &Arc>>, + turn_id: &str, + text: &str, +) { + let path = output_files .lock() - .expect("output queue mutex poisoned") - .front() + .expect("output file mutex poisoned") + .get(turn_id) .cloned(); - if let Some(path) = current + if let Some(path) = path && let Ok(mut file) = std::fs::OpenOptions::new().append(true).open(path) { let _ = std::io::Write::write_all(&mut file, text.as_bytes()); } } -fn finish_current_output(output_files: &Arc>>) { - let _ = output_files +fn finish_turn_output(output_files: &Arc>>, turn_id: &str) { + output_files .lock() - .expect("output queue mutex poisoned") - .pop_front(); + .expect("output file mutex poisoned") + .remove(turn_id); } -pub struct StreamingAgent; - -impl StreamingAgent { - pub fn runtime(config: StreamingAgentConfig) -> Result { - let (tx, rx) = mpsc::unbounded_channel(); - let agent: Box = if config.cli_name == "claude" { - Box::new(StreamingProcessAgent::spawn(config, tx.clone())?) - } else { - let session_id = config - .resume_session_id - .unwrap_or_else(|| Uuid::new_v4().to_string()); - let _ = tx.send(AgentEvent::SessionReady { - session_id: session_id.clone(), - }); - Box::new(PersistentShellAgent::with_session_id( - OneShotAgentConfig { - cli_name: config.cli_name, - cli_cmd: config.cli_cmd, - workdir: config.workdir, - env: config.env, - }, - Some(session_id), - )) - }; - - Ok(AgentRuntime { - agent: RuntimeAgent::new(agent, tx), - events: AgentEventReceiver { rx }, - }) +#[must_use] +fn build_persistent_launch_command(cli_name: &str, cli_cmd: &str) -> String { + if supports_persistent_runtime(cli_name) && cli_cmd.trim_start().starts_with("codex") { + "codex app-server --listen stdio://".to_string() + } else { + cli_cmd.to_string() } } -#[derive(Debug)] -pub struct OneShotAgent { - config: OneShotAgentConfig, - events: VecDeque, - last_exit_status: Option, -} - -impl OneShotAgent { - #[must_use] - pub fn new(config: OneShotAgentConfig) -> Self { - Self { - config, - events: VecDeque::new(), - last_exit_status: None, - } - } - - #[must_use] - pub fn runtime(config: OneShotAgentConfig) -> AgentRuntime { - let (tx, rx) = mpsc::unbounded_channel(); - AgentRuntime { - agent: RuntimeAgent::new(Box::new(Self::new(config)), tx), - events: AgentEventReceiver { rx }, - } +#[must_use] +fn codex_model_overrides(cli_name: &str) -> (Option, Option) { + match cli_name { + "codex-mini" => (Some("gpt-5.4-mini".to_string()), Some("medium".to_string())), + _ => (None, None), } +} - fn run_turn(&mut self, turn: AgentTurn) -> Result { - self.events.push_back(AgentEvent::TurnStarted); - std::fs::write(&turn.prompt_file, &turn.prompt)?; - - let status = (|| -> std::io::Result { - let output = std::fs::File::create(&turn.output_file)?; - let shell_cmd = build_cli_command( - &self.config.cli_name, - &self.config.cli_cmd, - &turn.prompt_file, - ); - let mut cmd = Command::new("sh"); - cmd.arg("-c") - .arg(&shell_cmd) - .current_dir(&self.config.workdir) - .stdin(Stdio::null()) - .stdout(output.try_clone()?) - .stderr(output); - - for (key, value) in &self.config.env { - cmd.env(key, value); - } - - cmd.status() - })(); - - let _ = std::fs::remove_file(&turn.prompt_file); +pub struct StreamingAgent; - match &status { - Ok(exit_status) => { - self.last_exit_status = Some(exit_status.clone()); - self.events - .push_back(AgentEvent::Exited(exit_status.clone())); - if exit_status.success() { - self.events - .push_back(AgentEvent::TurnCompleted { summary: None }); - } else { - self.events.push_back(AgentEvent::SessionError(format!( - "CLI exited with status {}", - describe_exit_status(exit_status) - ))); - } - } - Err(err) => { - self.events.push_back(AgentEvent::SessionError(format!( - "Failed to run CLI: {}", - err - ))); - } +impl StreamingAgent { + pub async fn runtime(config: StreamingAgentConfig) -> Result { + if !supports_persistent_runtime(&config.cli_name) { + return Err(std::io::Error::other(format!( + "persistent runtime not supported for {}", + config.cli_name + )) + .into()); } - Ok(AgentTurnResult { status }) - } -} - -#[async_trait] -impl CodingAgent for OneShotAgent { - async fn send(&mut self, input: AgentInput) -> Result { - match input { - AgentInput::UserMessage(turn) | AgentInput::UrgentMessage(turn) => self.run_turn(turn), - AgentInput::Cancel => { - self.events.push_back(AgentEvent::SessionError( - "Cancel is not supported by the one-shot runtime".to_string(), - )); - Ok(AgentTurnResult { - status: Err(std::io::Error::other( - "Cancel is not supported by the one-shot runtime", - )), - }) - } - } - } + let (tx, rx) = mpsc::unbounded_channel(); + let agent = CodexAppServerAgent::spawn( + CodexAppServerLaunchConfig::from_streaming(config), + tx.clone(), + ) + .await?; - fn events(&mut self) -> AgentEventStream<'_> { - Box::pin(BufferedEventStream { - events: &mut self.events, + Ok(AgentRuntime { + agent: RuntimeAgent::new(Box::new(agent), tx), + events: AgentEventReceiver { rx }, }) } - - fn take_events(&mut self) -> Vec { - self.events.drain(..).collect() - } - - async fn shutdown(&mut self, _mode: ShutdownMode) -> Result> { - Ok(self.last_exit_status.clone()) - } } #[async_trait] -impl CodingAgent for PersistentShellAgent { +impl CodingAgent for CodexAppServerAgent { async fn send(&mut self, input: AgentInput) -> Result { match input { - AgentInput::UserMessage(turn) | AgentInput::UrgentMessage(turn) => { - self.run_turn(turn).await + AgentInput::UserMessage(turn) => self.start_turn(turn).await, + AgentInput::UrgentMessage(turn) => { + let active_turn_id = self + .active_turn_id + .lock() + .expect("active turn mutex poisoned") + .clone(); + if let Some(active_turn_id) = active_turn_id { + self.steer_turn(turn, active_turn_id).await + } else { + self.start_turn(turn).await + } } AgentInput::Cancel => { - self.events.push_back(AgentEvent::SessionError( - "Cancel is not supported by the persistent runtime".to_string(), - )); - Ok(AgentTurnResult { - status: Err(std::io::Error::other( - "Cancel is not supported by the persistent runtime", - )), - }) - } - } - } - - fn events(&mut self) -> AgentEventStream<'_> { - Box::pin(BufferedEventStream { - events: &mut self.events, - }) - } - - fn take_events(&mut self) -> Vec { - self.events.drain(..).collect() - } - - async fn shutdown(&mut self, mode: ShutdownMode) -> Result> { - let Some(session) = self.session.as_mut() else { - return Ok(self.last_exit_status.clone()); - }; - - let status = match mode { - ShutdownMode::Graceful => { - session.stdin.write_all(b"exit\n").await?; - session.stdin.flush().await?; - session.child.wait().await? - } - ShutdownMode::Immediate => { - session.child.start_kill()?; - session.child.wait().await? - } - }; - - self.last_exit_status = Some(status.clone()); - self.events.push_back(AgentEvent::Exited(status.clone())); - self.session = None; - Ok(Some(status)) - } - - fn session_id(&self) -> Option { - self.session_id.clone() - } -} - -#[async_trait] -impl CodingAgent for StreamingProcessAgent { - async fn send(&mut self, input: AgentInput) -> Result { - match input { - AgentInput::UserMessage(turn) | AgentInput::UrgentMessage(turn) => { - std::fs::write(&turn.prompt_file, &turn.prompt)?; - queue_output_file(&self.output_files, turn.output_file)?; - let payload = serde_json::json!({ - "type": "user", - "message": { - "content": [{ "type": "text", "text": turn.prompt }] - } - }); - let _ = self.event_tx.send(AgentEvent::TurnStarted); - let stdin = self - .stdin - .as_mut() - .ok_or_else(|| std::io::Error::other("streaming runtime stdin closed"))?; - stdin.write_all(payload.to_string().as_bytes()).await?; - stdin.write_all(b"\n").await?; - stdin.flush().await?; + self.interrupt_active_turn().await?; Ok(AgentTurnResult { status: Ok(success_exit_status()), }) } - AgentInput::Cancel => { - let _ = self.stdin.take(); - self.child.start_kill()?; - let status = self.child.wait().await?; - let _ = self.event_tx.send(AgentEvent::Exited(status.clone())); - Ok(AgentTurnResult { status: Ok(status) }) - } } } @@ -796,28 +894,45 @@ impl CodingAgent for StreamingProcessAgent { } async fn shutdown(&mut self, mode: ShutdownMode) -> Result> { - let status = match mode { + match mode { ShutdownMode::Graceful => { if let Some(mut stdin) = self.stdin.take() { let _ = stdin.shutdown().await; } - self.child.wait().await? + let status = match tokio::time::timeout( + std::time::Duration::from_secs(2), + self.child.wait(), + ) + .await + { + Ok(result) => result?, + Err(_) => { + self.child.start_kill()?; + self.child.wait().await? + } + }; + self.last_exit_status = Some(status.clone()); + self.reader_task.abort(); + self.events.push_back(AgentEvent::Exited(status.clone())); + Ok(Some(status)) } ShutdownMode::Immediate => { + let _ = self.interrupt_active_turn().await; let _ = self.stdin.take(); self.child.start_kill()?; - self.child.wait().await? + let status = self.child.wait().await?; + self.last_exit_status = Some(status.clone()); + self.reader_task.abort(); + self.events.push_back(AgentEvent::Exited(status.clone())); + Ok(Some(status)) } - }; - self.reader_task.abort(); - let _ = self.event_tx.send(AgentEvent::Exited(status.clone())); - Ok(Some(status)) + } } fn session_id(&self) -> Option { - self.session_id + self.thread_id .lock() - .expect("session id mutex poisoned") + .expect("thread id mutex poisoned") .clone() } } @@ -857,8 +972,3 @@ fn success_exit_status() -> ExitStatus { fn exit_status_from_code(code: i32) -> ExitStatus { std::os::unix::process::ExitStatusExt::from_raw(code << 8) } - -#[cfg(windows)] -fn exit_status_from_code(code: i32) -> ExitStatus { - std::os::windows::process::ExitStatusExt::from_raw(code as u32) -} diff --git a/src/lib.rs b/src/lib.rs index 61bcbb5..ed40b77 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,7 +65,7 @@ pub use agent::{ pub use agent_runtime::{ AgentEvent, AgentEventReceiver, AgentInput, AgentRuntime, AgentTurn, AgentTurnResult, CodingAgent, OneShotAgent, OneShotAgentConfig, ShutdownMode, StreamingAgent, - StreamingAgentConfig, build_cli_command, + StreamingAgentConfig, build_cli_command, supports_persistent_runtime, }; pub use app::audit::{AuditEvent, AuditResult, audit_middleware}; pub use app::auth::{AuthError, AuthState, Principal, auth_middleware, generate_api_key}; diff --git a/src/main.rs b/src/main.rs index 471825e..666c23c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -405,7 +405,7 @@ async fn requeue_pending_turns( Ok(()) } -fn build_streaming_runtime( +async fn build_streaming_runtime( town_path: &Path, cli_name: &str, cli_cmd: &str, @@ -420,6 +420,7 @@ fn build_streaming_runtime( env: runtime_env(agent_id, name), resume_session_id, }) + .await } async fn run_persistent_agent_loop( @@ -449,7 +450,8 @@ async fn run_persistent_agent_loop( agent_id, name, resume_session_id, - )?; + ) + .await?; let mut runtime_alive = true; let mut round: u32 = 0; let mut pending_turns: VecDeque = VecDeque::new(); @@ -621,7 +623,8 @@ async fn run_persistent_agent_loop( agent_id, name, resume_session_id, - )?; + ) + .await?; runtime = new_runtime; events = new_events; runtime_alive = true; @@ -937,7 +940,10 @@ tt task complete --result "summary" # Mark a task as done continue; } - let display_round = round + pending_turns.len() as u32 + 1; + let display_round = pending_turns + .front() + .map(|turn| turn.display_round) + .unwrap_or(round + 1); let actionable_section = format_actionable_section(channel, &actionable_messages).await; let prompt = format!( @@ -953,10 +959,16 @@ tt task complete --result "summary" # Mark a task as done output_file: town_path.join(format!(".tt/logs/{}_round_{}.log", name, display_round)), }; runtime.send(AgentInput::UrgentMessage(turn)).await?; - pending_turns.push_back(PendingTurn { - display_round, - actionable_messages, - }); + if let Some(existing_turn) = pending_turns.back_mut() { + existing_turn + .actionable_messages + .extend(actionable_messages); + } else { + pending_turns.push_back(PendingTurn { + display_round, + actionable_messages, + }); + } } } } @@ -3669,8 +3681,8 @@ async fn main() -> Result<()> { ); info!(" CLI: {} ({})", cli_name, cli_cmd); info!(" Idle timeout: {}s", idle_timeout_secs); - if config.agent.persistent { - info!(" Runtime: persistent session"); + if config.agent.persistent && tinytown::supports_persistent_runtime(&cli_name) { + info!(" Runtime: persistent Codex app-server session"); run_persistent_agent_loop( &cli.town, config, @@ -3684,6 +3696,11 @@ async fn main() -> Result<()> { ) .await?; return Ok(()); + } else if config.agent.persistent { + info!( + " Runtime: persistent requested but unsupported for {}; falling back to one-shot", + cli_name + ); } info!(" Runtime: one-shot subprocess per turn"); diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 84db344..53078c7 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -77,12 +77,12 @@ fn shell_quote(value: &str) -> String { format!("'{}'", value.replace('\'', "'\"'\"'")) } -fn write_fake_streaming_claude( +fn write_fake_codex_app_server( town_path: &std::path::Path, start_count_path: &std::path::Path, input_log_path: &std::path::Path, ) -> Result> { - let script_path = town_path.join("fake_claude_stream.py"); + let script_path = town_path.join("fake_codex_app_server.py"); std::fs::write( &script_path, r#"#!/usr/bin/env python3 @@ -91,76 +91,210 @@ import os import re import sys import time +import threading -resume_id = None -args = sys.argv[1:] -for idx, arg in enumerate(args): - if arg in ("--resume", "-r") and idx + 1 < len(args): - resume_id = args[idx + 1] +thread_id = "fake-thread-1" +next_turn_id = 0 +active_turn = None +state_lock = threading.Lock() +write_lock = threading.Lock() -session_id = resume_id or "fake-session-1" start_count_path = os.environ.get("TINYTOWN_FAKE_START_COUNT") if start_count_path: with open(start_count_path, "a", encoding="utf-8") as handle: handle.write("start\n") -print(json.dumps({"type": "system", "subtype": "init", "session_id": session_id}), flush=True) +def write_json(payload): + with write_lock: + sys.stdout.write(json.dumps(payload) + "\n") + sys.stdout.flush() + +def log_input(text): + input_log_path = os.environ.get("TINYTOWN_FAKE_INPUT_LOG") + if not input_log_path: + return + with open(input_log_path, "a", encoding="utf-8") as handle: + handle.write(text) + handle.write("\n---\n") + +def turn_worker(turn_id, initial_text, cancel_event, extras): + global active_turn + sleep_ms = 0 + match = re.search(r"SLEEP_MS=(\d+)", initial_text) + if match: + sleep_ms = int(match.group(1)) + + elapsed = 0 + while elapsed < sleep_ms: + if cancel_event.is_set(): + break + time.sleep(0.05) + elapsed += 50 + + if cancel_event.is_set(): + write_json({ + "method": "turn/completed", + "params": { + "threadId": thread_id, + "turn": {"id": turn_id, "status": "interrupted"} + } + }) + with state_lock: + active_turn = None + return + + merged_text = initial_text + "\n" + "\n".join(extras) + + if "TOOL_CALL=1" in merged_text: + write_json({ + "method": "item/completed", + "params": { + "threadId": thread_id, + "turnId": turn_id, + "item": { + "id": "tool-1", + "type": "commandExecution", + "command": "cat README.md", + "status": "completed" + } + } + }) + + summary = "handled urgent turn" if "URGENT" in merged_text else "handled turn" + write_json({ + "method": "item/agentMessage/delta", + "params": { + "threadId": thread_id, + "turnId": turn_id, + "itemId": "assistant-1", + "delta": summary + } + }) + write_json({ + "method": "turn/completed", + "params": { + "threadId": thread_id, + "turn": {"id": turn_id, "status": "completed"} + } + }) + + with state_lock: + active_turn = None for raw_line in sys.stdin: raw_line = raw_line.strip() if not raw_line: continue + payload = json.loads(raw_line) - if payload.get("type") != "user": + method = payload.get("method") + request_id = payload.get("id") + params = payload.get("params", {}) + + if method == "initialize": + write_json({ + "id": request_id, + "result": { + "codexHome": "/tmp/fake-codex-home", + "platformFamily": "unix", + "platformOs": "macos", + "userAgent": "fake-codex" + } + }) continue - text = payload["message"]["content"][0]["text"] - input_log_path = os.environ.get("TINYTOWN_FAKE_INPUT_LOG") - if input_log_path: - with open(input_log_path, "a", encoding="utf-8") as handle: - handle.write(text) - handle.write("\n---\n") - match = re.search(r"SLEEP_MS=(\d+)", text) - if match: - time.sleep(int(match.group(1)) / 1000.0) - - if "TOOL_CALL=1" in text: - print(json.dumps({ - "type": "stream_event", - "event": { - "type": "content_block_start", - "content_block": {"type": "tool_use", "name": "Read"} - }, - "session_id": session_id - }), flush=True) - print(json.dumps({ - "type": "stream_event", - "event": { - "type": "content_block_delta", - "delta": {"type": "input_json_delta", "partial_json": "{\"path\":\"README.md\"}"} - }, - "session_id": session_id - }), flush=True) - print(json.dumps({ - "type": "stream_event", - "event": {"type": "content_block_stop"}, - "session_id": session_id - }), flush=True) - - summary = "handled urgent turn" if "URGENT" in text else "handled turn" - print(json.dumps({ - "type": "stream_event", - "event": { - "type": "content_block_delta", - "delta": {"type": "text_delta", "text": summary} - }, - "session_id": session_id - }), flush=True) - print(json.dumps({ - "type": "stream_event", - "event": {"type": "message_stop"}, - "session_id": session_id - }), flush=True) + if method == "initialized": + continue + + if method == "thread/start": + write_json({ + "id": request_id, + "result": { + "approvalPolicy": "never", + "approvalsReviewer": "user", + "cwd": params.get("cwd", "."), + "model": params.get("model", "gpt-5.4-codex"), + "modelProvider": "openai", + "sandbox": {"mode": "danger-full-access"}, + "thread": {"id": thread_id} + } + }) + write_json({ + "method": "thread/started", + "params": {"thread": {"id": thread_id}} + }) + continue + + if method == "thread/resume": + write_json({ + "id": request_id, + "result": { + "approvalPolicy": "never", + "approvalsReviewer": "user", + "cwd": params.get("cwd", "."), + "model": params.get("model", "gpt-5.4-codex"), + "modelProvider": "openai", + "sandbox": {"mode": "danger-full-access"}, + "thread": {"id": params.get("threadId", thread_id)} + } + }) + write_json({ + "method": "thread/started", + "params": {"thread": {"id": params.get("threadId", thread_id)}} + }) + continue + + if method == "turn/start": + text = params["input"][0]["text"] + log_input(text) + with state_lock: + next_turn_id += 1 + turn_id = f"turn-{next_turn_id}" + cancel_event = threading.Event() + extras = [] + active_turn = { + "turn_id": turn_id, + "cancel_event": cancel_event, + "extras": extras, + } + write_json({ + "id": request_id, + "result": {"turn": {"id": turn_id, "status": "inProgress"}} + }) + write_json({ + "method": "turn/started", + "params": { + "threadId": thread_id, + "turn": {"id": turn_id, "status": "inProgress"} + } + }) + threading.Thread( + target=turn_worker, + args=(turn_id, text, cancel_event, extras), + daemon=True, + ).start() + continue + + if method == "turn/steer": + text = params["input"][0]["text"] + log_input(text) + with state_lock: + current_turn = active_turn + if current_turn is not None: + current_turn["extras"].append(text) + write_json({ + "id": request_id, + "result": {"turnId": params.get("expectedTurnId")} + }) + continue + + if method == "turn/interrupt": + with state_lock: + current_turn = active_turn + if current_turn is not None: + current_turn["cancel_event"].set() + write_json({"id": request_id, "result": {}}) + continue "#, )?; @@ -5283,20 +5417,20 @@ async fn test_agent_loop_persistent_runtime_reuses_single_streaming_process() -> Result<(), Box> { let town = create_test_town("agent-loop-persistent-reuse").await?; let town_path = town.config().root.clone(); - let start_count_path = town_path.join("fake_claude.starts"); - let input_log_path = town_path.join("fake_claude.inputs"); - let fake_command = write_fake_streaming_claude(&town_path, &start_count_path, &input_log_path)?; + let start_count_path = town_path.join("fake_codex.starts"); + let input_log_path = town_path.join("fake_codex.inputs"); + let fake_command = write_fake_codex_app_server(&town_path, &start_count_path, &input_log_path)?; let mut config = tinytown::Config::load(&town_path)?; config.agent.persistent = true; - config.default_cli = "claude".to_string(); + config.default_cli = "codex".to_string(); config.agent_clis.insert( - "claude".to_string(), - tinytown::agent::AgentCli::new("claude", &fake_command), + "codex".to_string(), + tinytown::agent::AgentCli::new("codex", &fake_command), ); config.save()?; - let handle = town.spawn_agent("persistent-worker", "claude").await?; + let handle = town.spawn_agent("persistent-worker", "codex").await?; let agent_id = handle.id(); town.channel() @@ -5321,7 +5455,17 @@ async fn test_agent_loop_persistent_runtime_reuses_single_streaming_process() .status() }); - tokio::time::sleep(Duration::from_millis(500)).await; + tokio::time::timeout(Duration::from_secs(10), async { + loop { + if let Ok(inputs) = std::fs::read_to_string(&input_log_path) + && inputs.contains("FIRST_TURN") + { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await?; town.channel() .send(&Message::new( AgentId::supervisor(), @@ -5356,7 +5500,7 @@ async fn test_agent_loop_persistent_runtime_reuses_single_streaming_process() .expect("persistent worker should still be registered"); assert_eq!(agent.state, AgentState::Stopped); assert_eq!(agent.rounds_completed, 2); - assert_eq!(agent.runtime_session_id.as_deref(), Some("fake-session-1")); + assert_eq!(agent.runtime_session_id.as_deref(), Some("fake-thread-1")); Ok(()) } @@ -5367,20 +5511,20 @@ async fn test_agent_loop_persistent_runtime_processes_urgent_message_mid_turn() -> Result<(), Box> { let town = create_test_town("agent-loop-persistent-urgent").await?; let town_path = town.config().root.clone(); - let start_count_path = town_path.join("fake_claude.starts"); - let input_log_path = town_path.join("fake_claude.inputs"); - let fake_command = write_fake_streaming_claude(&town_path, &start_count_path, &input_log_path)?; + let start_count_path = town_path.join("fake_codex.starts"); + let input_log_path = town_path.join("fake_codex.inputs"); + let fake_command = write_fake_codex_app_server(&town_path, &start_count_path, &input_log_path)?; let mut config = tinytown::Config::load(&town_path)?; config.agent.persistent = true; - config.default_cli = "claude".to_string(); + config.default_cli = "codex".to_string(); config.agent_clis.insert( - "claude".to_string(), - tinytown::agent::AgentCli::new("claude", &fake_command), + "codex".to_string(), + tinytown::agent::AgentCli::new("codex", &fake_command), ); config.save()?; - let handle = town.spawn_agent("urgent-worker", "claude").await?; + let handle = town.spawn_agent("urgent-worker", "codex").await?; let agent_id = handle.id(); town.channel() @@ -5401,7 +5545,7 @@ async fn test_agent_loop_persistent_runtime_processes_urgent_message_mid_turn() .arg("agent-loop") .arg("urgent-worker") .arg(agent_id.to_string()) - .arg("2") + .arg("1") .status() }); @@ -5439,7 +5583,7 @@ async fn test_agent_loop_persistent_runtime_processes_urgent_message_mid_turn() .await? .expect("urgent worker should still be registered"); assert_eq!(agent.state, AgentState::Stopped); - assert_eq!(agent.rounds_completed, 2); + assert_eq!(agent.rounds_completed, 1); Ok(()) } @@ -5450,20 +5594,20 @@ async fn test_agent_loop_persistent_runtime_stop_interrupts_live_turn() -> Result<(), Box> { let town = create_test_town("agent-loop-persistent-stop").await?; let town_path = town.config().root.clone(); - let start_count_path = town_path.join("fake_claude.starts"); - let input_log_path = town_path.join("fake_claude.inputs"); - let fake_command = write_fake_streaming_claude(&town_path, &start_count_path, &input_log_path)?; + let start_count_path = town_path.join("fake_codex.starts"); + let input_log_path = town_path.join("fake_codex.inputs"); + let fake_command = write_fake_codex_app_server(&town_path, &start_count_path, &input_log_path)?; let mut config = tinytown::Config::load(&town_path)?; config.agent.persistent = true; - config.default_cli = "claude".to_string(); + config.default_cli = "codex".to_string(); config.agent_clis.insert( - "claude".to_string(), - tinytown::agent::AgentCli::new("claude", &fake_command), + "codex".to_string(), + tinytown::agent::AgentCli::new("codex", &fake_command), ); config.save()?; - let handle = town.spawn_agent("stop-worker", "claude").await?; + let handle = town.spawn_agent("stop-worker", "codex").await?; let agent_id = handle.id(); town.channel() @@ -5515,20 +5659,20 @@ async fn test_agent_loop_persistent_runtime_emits_structured_agent_events() -> Result<(), Box> { let town = create_test_town("agent-loop-persistent-events").await?; let town_path = town.config().root.clone(); - let start_count_path = town_path.join("fake_claude.starts"); - let input_log_path = town_path.join("fake_claude.inputs"); - let fake_command = write_fake_streaming_claude(&town_path, &start_count_path, &input_log_path)?; + let start_count_path = town_path.join("fake_codex.starts"); + let input_log_path = town_path.join("fake_codex.inputs"); + let fake_command = write_fake_codex_app_server(&town_path, &start_count_path, &input_log_path)?; let mut config = tinytown::Config::load(&town_path)?; config.agent.persistent = true; - config.default_cli = "claude".to_string(); + config.default_cli = "codex".to_string(); config.agent_clis.insert( - "claude".to_string(), - tinytown::agent::AgentCli::new("claude", &fake_command), + "codex".to_string(), + tinytown::agent::AgentCli::new("codex", &fake_command), ); config.save()?; - let handle = town.spawn_agent("event-worker", "claude").await?; + let handle = town.spawn_agent("event-worker", "codex").await?; let agent_id = handle.id(); town.channel() From ba5dd39497abbe018a7a1ff3deb89eb4daab1511 Mon Sep 17 00:00:00 2001 From: Jeremy Plichta Date: Thu, 23 Apr 2026 16:32:34 -0600 Subject: [PATCH 3/3] fix: handle unexpected codex runtime exit --- src/agent_runtime.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/agent_runtime.rs b/src/agent_runtime.rs index 979373c..760a0c2 100644 --- a/src/agent_runtime.rs +++ b/src/agent_runtime.rs @@ -623,6 +623,10 @@ fn spawn_codex_reader( &request_state, "codex app-server closed before sending a response", ); + let _ = event_tx.send(AgentEvent::SessionError( + "Codex app-server stream closed unexpectedly".to_string(), + )); + let _ = event_tx.send(AgentEvent::Exited(exit_status_from_code(1))); }) }