diff --git a/src/app/mcp/tools.rs b/src/app/mcp/tools.rs index 65c7081..a319cfc 100644 --- a/src/app/mcp/tools.rs +++ b/src/app/mcp/tools.rs @@ -1493,7 +1493,12 @@ pub fn mission_reject_tool(state: Arc) -> Tool { }; let storage = mission_storage(&state); match storage.get_mission(mission_id).await { - Ok(Some(_)) => {} + Ok(Some(mut mission)) => { + mission.clear_help_request_state(); + if let Err(e) = storage.save_mission(&mission).await { + return Ok(error_response(e.to_string())); + } + } Ok(None) => { return Ok(error_response(format!( "Mission {} not found", @@ -1551,7 +1556,7 @@ pub fn mission_pause_tool(state: Arc) -> Tool { Err(msg) => return Ok(error_response(msg)), }; let storage = mission_storage(&state); - let Some(mission) = (match storage.get_mission(mission_id).await { + let Some(mut mission) = (match storage.get_mission(mission_id).await { Ok(mission) => mission, Err(e) => return Ok(error_response(e.to_string())), }) else { @@ -1578,9 +1583,21 @@ pub fn mission_pause_tool(state: Arc) -> Tool { "mcp", "pause requested via mission.pause", ); + mission.clear_help_request_state(); + if let Err(e) = storage.save_mission(&mission).await { + return Ok(error_response(e.to_string())); + } if let Err(e) = storage.save_control_message(¬e).await { return Ok(error_response(e.to_string())); } + if let Err(e) = crate::mission::retire_help_requests_for_mission( + state.town.channel(), + mission_id, + ) + .await + { + return Ok(error_response(e.to_string())); + } let dispatcher = crate::mission::MissionDispatcher::new( storage.clone(), state.town.channel().clone(), @@ -1614,7 +1631,7 @@ pub fn mission_resume_tool(state: Arc) -> Tool { Err(msg) => return Ok(error_response(msg)), }; let storage = mission_storage(&state); - let Some(mission) = (match storage.get_mission(mission_id).await { + let Some(mut mission) = (match storage.get_mission(mission_id).await { Ok(mission) => mission, Err(e) => return Ok(error_response(e.to_string())), }) else { @@ -1654,9 +1671,21 @@ pub fn mission_resume_tool(state: Arc) -> Tool { "mcp", "resume requested via mission.resume", ); + mission.clear_help_request_state(); + if let Err(e) = storage.save_mission(&mission).await { + return Ok(error_response(e.to_string())); + } if let Err(e) = storage.save_control_message(¬e).await { return Ok(error_response(e.to_string())); } + if let Err(e) = crate::mission::retire_help_requests_for_mission( + state.town.channel(), + mission_id, + ) + .await + { + return Ok(error_response(e.to_string())); + } let dispatcher = crate::mission::MissionDispatcher::new( storage.clone(), state.town.channel().clone(), @@ -1798,10 +1827,22 @@ pub fn mission_note_tool(state: Arc) -> Tool { if let Err(e) = storage.save_control_message(¬e).await { return Ok(error_response(e.to_string())); } + let retired = match crate::mission::retire_help_requests_for_mission( + state.town.channel(), + mission_id, + ) + .await + { + Ok(retired) => retired, + Err(e) => return Ok(error_response(e.to_string())), + }; if let Err(e) = storage .log_event( mission_id, - &format!("Operator note queued via MCP: {}", input.message), + &format!( + "Operator note queued via MCP: {} (retired {} stale help request(s))", + input.message, retired + ), ) .await { @@ -1954,6 +1995,7 @@ pub fn mission_stop_tool(state: Arc) -> Tool { } else { mission.block("Stopped by user"); } + mission.clear_help_request_state(); if let Err(e) = storage.save_mission(&mission).await { return Ok(error_response(e.to_string())); @@ -1961,10 +2003,22 @@ pub fn mission_stop_tool(state: Arc) -> Tool { if let Err(e) = storage.remove_active(mission_id).await { return Ok(error_response(e.to_string())); } + let retired = match crate::mission::retire_help_requests_for_mission( + state.town.channel(), + mission_id, + ) + .await + { + Ok(retired) => retired, + Err(e) => return Ok(error_response(e.to_string())), + }; if let Err(e) = storage .log_event( mission_id, - &format!("Mission stopped via MCP (force={})", input.force), + &format!( + "Mission stopped via MCP (force={}) and retired {} stale help request(s)", + input.force, retired + ), ) .await { diff --git a/src/app/server.rs b/src/app/server.rs index ced2686..c9c2e7e 100644 --- a/src/app/server.rs +++ b/src/app/server.rs @@ -1181,6 +1181,7 @@ async fn stop_mission( } else { mission.block("Stopped by user"); } + mission.clear_help_request_state(); storage .save_mission(&mission) @@ -1190,10 +1191,16 @@ async fn stop_mission( .remove_active(id) .await .map_err(|e| ProblemDetails::internal_error(&e.to_string()))?; + let retired = crate::mission::retire_help_requests_for_mission(state.town.channel(), id) + .await + .map_err(|e| ProblemDetails::internal_error(&e.to_string()))?; let _ = storage .log_event( id, - &format!("Mission stopped via API (force={})", req.force), + &format!( + "Mission stopped via API (force={}) and retired {} stale help request(s)", + req.force, retired + ), ) .await; @@ -1226,6 +1233,7 @@ async fn resume_mission( } mission.start(); + mission.clear_help_request_state(); storage .save_mission(&mission) .await @@ -1234,7 +1242,18 @@ async fn resume_mission( .add_active(id) .await .map_err(|e| ProblemDetails::internal_error(&e.to_string()))?; - let _ = storage.log_event(id, "Mission resumed via API").await; + let retired = crate::mission::retire_help_requests_for_mission(state.town.channel(), id) + .await + .map_err(|e| ProblemDetails::internal_error(&e.to_string()))?; + let _ = storage + .log_event( + id, + &format!( + "Mission resumed via API (retired {} stale help request(s))", + retired + ), + ) + .await; Ok(Json(serde_json::json!({ "id": id.to_string(), diff --git a/src/channel.rs b/src/channel.rs index 0315b81..3b235e9 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -533,6 +533,49 @@ impl Channel { Ok(agents.into_iter().find(|a| a.name == name)) } + /// Resolve a user-supplied agent reference. + /// + /// Accepts a full UUID, short UUID, canonical name, nickname/display name, + /// or full display label. Returns `Ok(None)` when there is no match. + pub async fn resolve_agent_ref(&self, raw: &str) -> Result> { + let raw = raw.trim(); + if raw.is_empty() { + return Ok(None); + } + + if let Ok(agent_id) = raw.parse::() { + return self.get_agent_state(agent_id).await; + } + + let raw_lower = raw.to_ascii_lowercase(); + let agents = self.list_agents().await?; + let matches: Vec<_> = agents + .into_iter() + .filter(|agent| { + agent.name.eq_ignore_ascii_case(raw) + || agent.display_name().eq_ignore_ascii_case(raw) + || agent.display_label().eq_ignore_ascii_case(raw) + || agent.id.short_id().eq_ignore_ascii_case(&raw_lower) + }) + .collect(); + + match matches.as_slice() { + [] => Ok(None), + [agent] => Ok(Some(agent.clone())), + _ => { + let labels = matches + .iter() + .map(|agent| format!("{} ({})", agent.display_label(), agent.id.short_id())) + .collect::>() + .join(", "); + Err(crate::Error::Config(format!( + "Agent reference '{}' is ambiguous: {}", + raw, labels + ))) + } + } + } + /// Delete an agent from Redis. pub async fn delete_agent(&self, agent_id: AgentId) -> Result<()> { let mut conn = self.conn.clone(); @@ -948,6 +991,43 @@ impl Channel { Ok(messages) } + /// Remove inbox messages matching a predicate while preserving order. + pub async fn remove_inbox_messages_matching( + &self, + agent_id: AgentId, + mut predicate: F, + ) -> Result + where + F: FnMut(&Message) -> bool, + { + let mut conn = self.conn.clone(); + let inbox_key = self.inbox_key(agent_id); + let items: Vec = conn.lrange(&inbox_key, 0, -1).await?; + if items.is_empty() { + return Ok(0); + } + + let mut kept = Vec::with_capacity(items.len()); + let mut removed = 0usize; + for item in items { + match serde_json::from_str::(&item) { + Ok(message) if predicate(&message) => removed += 1, + _ => kept.push(item), + } + } + + if removed == 0 { + return Ok(0); + } + + let _: () = conn.del(&inbox_key).await?; + if !kept.is_empty() { + let _: () = conn.rpush(&inbox_key, kept).await?; + } + + Ok(removed) + } + /// Move a message to another agent's inbox. pub async fn move_message_to_inbox(&self, message: &Message, to_agent: AgentId) -> Result<()> { // Create a new message with updated recipient diff --git a/src/main.rs b/src/main.rs index 1b17d1d..5fb2783 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,7 @@ use clap::{Parser, Subcommand}; use tracing::{info, warn}; use tracing_subscriber::EnvFilter; -use tinytown::{GlobalConfig, Result, Task, Town, plan}; +use tinytown::{AgentState, GlobalConfig, Result, Task, Town, plan}; const TT_AGENT_ID_ENV: &str = "TINYTOWN_AGENT_ID"; const TT_AGENT_NAME_ENV: &str = "TINYTOWN_AGENT_NAME"; @@ -1900,6 +1900,10 @@ async fn main() -> Result<()> { if agents.is_empty() { info!("No agents. Run 'tt spawn ' to create one."); } else { + let prunable = agents + .iter() + .filter(|agent| matches!(agent.state, AgentState::Stopped | AgentState::Error)) + .count(); info!("Agents:"); for agent in agents { info!( @@ -1909,6 +1913,12 @@ async fn main() -> Result<()> { agent.state ); } + if prunable > 0 { + info!( + " Note: {} terminal agent(s) are still listed. Run 'tt prune' to remove them from Redis.", + prunable + ); + } } } @@ -1932,6 +1942,10 @@ async fn main() -> Result<()> { let agents = town.list_agents().await; info!("🤖 Agents: {}", agents.len()); + let prunable_agents = agents + .iter() + .filter(|agent| matches!(agent.state, AgentState::Stopped | AgentState::Error)) + .count(); // Fetch tasks once before the agent loop to avoid N+1 Redis calls let all_tasks = town.channel().list_tasks().await.unwrap_or_default(); @@ -2093,6 +2107,13 @@ async fn main() -> Result<()> { } } + if prunable_agents > 0 { + info!( + " Note: {} terminal agent(s) are still listed. Run 'tt prune' to remove them from Redis.", + prunable_agents + ); + } + // Task summary section (reuse pre-fetched all_tasks) let tasks = &all_tasks; let backlog_count = town.channel().backlog_len().await.unwrap_or(0); @@ -4917,10 +4938,22 @@ Now, help the user orchestrate their project! } mission.start(); + mission.clear_help_request_state(); storage.save_mission(&mission).await?; storage.add_active(mission_id).await?; + let retired = tinytown::mission::retire_help_requests_for_mission( + town.channel(), + mission_id, + ) + .await?; storage - .log_event(mission_id, "Mission resumed via CLI") + .log_event( + mission_id, + &format!( + "Mission resumed via CLI (retired {} stale help request(s))", + retired + ), + ) .await?; info!("â–ļī¸ Mission {} resumed", run_id); @@ -4969,17 +5002,27 @@ Now, help the user orchestrate their project! let mission_id: MissionId = run_id .parse() .map_err(|_| tinytown::Error::Config("Invalid mission ID".into()))?; - let Some(_mission) = storage.get_mission(mission_id).await? else { + let Some(mut mission) = storage.get_mission(mission_id).await? else { info!("❌ Mission {} not found", run_id); return Ok(()); }; let note = MissionControlMessage::new(mission_id, "conductor", message.clone()); + mission.clear_help_request_state(); + storage.save_mission(&mission).await?; storage.save_control_message(¬e).await?; + let retired = tinytown::mission::retire_help_requests_for_mission( + town.channel(), + mission_id, + ) + .await?; storage .log_event( mission_id, - &format!("Conductor note queued for dispatcher: {}", message), + &format!( + "Conductor note queued for dispatcher: {} (retired {} stale help request(s))", + message, retired + ), ) .await?; info!("📝 Queued dispatcher note for mission {}", run_id); @@ -5000,11 +5043,23 @@ Now, help the user orchestrate their project! } else { mission.block("Stopped by user"); } + mission.clear_help_request_state(); storage.save_mission(&mission).await?; storage.remove_active(mission_id).await?; + let retired = tinytown::mission::retire_help_requests_for_mission( + town.channel(), + mission_id, + ) + .await?; storage - .log_event(mission_id, &format!("Mission stopped (force={})", force)) + .log_event( + mission_id, + &format!( + "Mission stopped (force={}) and retired {} stale help request(s)", + force, retired + ), + ) .await?; info!("âšī¸ Mission {} stopped", run_id); diff --git a/src/mission/dispatcher.rs b/src/mission/dispatcher.rs index 6796992..9a47c12 100644 --- a/src/mission/dispatcher.rs +++ b/src/mission/dispatcher.rs @@ -513,24 +513,11 @@ impl MissionDispatcher { } async fn resolve_agent_ref(&self, raw: &str) -> Result> { - if let Ok(id) = raw.parse::() { - return Ok(Some(id)); - } - - let raw_lower = raw.to_ascii_lowercase(); - let agents = self.channel.list_agents().await?; - let mut matches = agents.into_iter().filter(|agent| { - agent.name.eq_ignore_ascii_case(raw) - || agent.display_label().eq_ignore_ascii_case(raw) - || agent.id.short_id().eq_ignore_ascii_case(&raw_lower) - }); - - let first = matches.next().map(|agent| agent.id); - if matches.next().is_some() { - Ok(None) - } else { - Ok(first) - } + Ok(self + .channel + .resolve_agent_ref(raw) + .await? + .map(|agent| agent.id)) } async fn resolve_work_item_ref( diff --git a/src/mission/mod.rs b/src/mission/mod.rs index 838a90c..60b08b3 100644 --- a/src/mission/mod.rs +++ b/src/mission/mod.rs @@ -63,6 +63,11 @@ pub mod storage; pub mod types; pub mod watch; +use crate::agent::AgentId; +use crate::channel::Channel; +use crate::error::Result; +use crate::message::{Message, MessageType}; + // Re-export commonly used types pub use bootstrap::{build_mission_work_items, parse_issue_ref}; pub use compiler::{MissionManifest, ParsedIssue, WorkGraph, WorkGraphCompiler}; @@ -82,3 +87,25 @@ pub use watch::{ PrCheckResult, ReviewComment, ReviewState, WatchEngine, WatchEngineConfig, WatchEngineTickResult, WatchTickResult, }; + +/// Returns true when a conductor inbox message is a dispatcher help request for `mission_id`. +#[must_use] +pub fn is_help_request_message_for_mission(message: &Message, mission_id: MissionId) -> bool { + matches!( + &message.msg_type, + MessageType::Query { question } + if question.contains(&format!("[Mission Help Needed] Mission {}", mission_id)) + ) +} + +/// Remove stale dispatcher help-request prompts for a mission from the conductor mailbox. +pub async fn retire_help_requests_for_mission( + channel: &Channel, + mission_id: MissionId, +) -> Result { + channel + .remove_inbox_messages_matching(AgentId::supervisor(), |message| { + is_help_request_message_for_mission(message, mission_id) + }) + .await +} diff --git a/src/mission/types.rs b/src/mission/types.rs index d089af3..71b78c9 100644 --- a/src/mission/types.rs +++ b/src/mission/types.rs @@ -465,6 +465,14 @@ impl MissionRun { self.updated_at = now; } + /// Clear any remembered dispatcher help-request state after operator action. + pub fn clear_help_request_state(&mut self) { + self.dispatcher_last_help_request_at = None; + self.dispatcher_last_help_request_reason = None; + self.dispatcher_help_request_attempts = 0; + self.updated_at = Utc::now(); + } + /// Transition to completed state. pub fn complete(&mut self) { self.state = MissionState::Completed; diff --git a/src/town.rs b/src/town.rs index dcecb53..5172f03 100644 --- a/src/town.rs +++ b/src/town.rs @@ -553,18 +553,17 @@ impl Town { /// Get a handle to an existing agent. pub async fn agent(&self, name: &str) -> Result { - // Look up agent in Redis (persisted across process restarts) - if let Some(agent) = self.channel.get_agent_by_name(name).await? { + let normalized = name.trim().to_lowercase(); + if normalized == "supervisor" || normalized == "conductor" { return Ok(AgentHandle { - id: agent.id, + id: AgentId::supervisor(), channel: self.channel.clone(), }); } - let normalized = name.trim().to_lowercase(); - if normalized == "supervisor" || normalized == "conductor" { + if let Some(agent) = self.channel.resolve_agent_ref(name).await? { return Ok(AgentHandle { - id: AgentId::supervisor(), + id: agent.id, channel: self.channel.clone(), }); } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 930cc69..2365d1f 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -154,6 +154,25 @@ async fn test_supervisor_aliases_resolve_without_spawned_agent() Ok(()) } +#[tokio::test] +async fn test_agent_lookup_accepts_short_id_nickname_and_display_label() +-> Result<(), Box> { + let town = create_test_town("agent-ref-resolution-test").await?; + let handle = town.spawn_agent("worker141", "claude").await?; + + let mut agent = town.channel().get_agent_state(handle.id()).await?.unwrap(); + agent.nickname = Some("Piper".into()); + agent.role_id = Some("worker".into()); + agent.state = AgentState::Stopped; + town.channel().set_agent_state(&agent).await?; + + assert_eq!(town.agent("Piper").await?.id(), handle.id()); + assert_eq!(town.agent(&handle.id().short_id()).await?.id(), handle.id()); + assert_eq!(town.agent("Piper [worker]").await?.id(), handle.id()); + + Ok(()) +} + /// Test that reserved supervisor/conductor mailbox names cannot be spawned as agents. #[tokio::test] async fn test_reserved_supervisor_names_cannot_be_spawned() -> Result<(), Box> @@ -4179,6 +4198,67 @@ async fn test_mission_dispatcher_help_request_backoff() -> Result<(), Box Result<(), Box> { + let town = create_test_town("mission-help-retire").await?; + let mission_a = tinytown::mission::MissionId::new(); + let mission_b = tinytown::mission::MissionId::new(); + + let message_a = Message::new( + AgentId::supervisor(), + AgentId::supervisor(), + MessageType::Query { + question: format!( + "[Mission Help Needed] Mission {}\n\nMission A blocked", + mission_a + ), + }, + ); + let message_b = Message::new( + AgentId::supervisor(), + AgentId::supervisor(), + MessageType::Query { + question: format!( + "[Mission Help Needed] Mission {}\n\nMission B blocked", + mission_b + ), + }, + ); + let other = Message::new( + AgentId::supervisor(), + AgentId::supervisor(), + MessageType::Query { + question: "ordinary operator question".into(), + }, + ); + + town.channel().send(&message_a).await?; + town.channel().send(&message_b).await?; + town.channel().send(&other).await?; + + let removed = + tinytown::mission::retire_help_requests_for_mission(town.channel(), mission_a).await?; + assert_eq!(removed, 1); + + let inbox = town.channel().peek_inbox(AgentId::supervisor(), 10).await?; + assert_eq!(inbox.len(), 2); + assert!(!inbox.iter().any(|message| { + tinytown::mission::is_help_request_message_for_mission(message, mission_a) + })); + assert!(inbox.iter().any(|message| { + tinytown::mission::is_help_request_message_for_mission(message, mission_b) + })); + assert!(inbox.iter().any(|message| { + matches!( + &message.msg_type, + MessageType::Query { question } if question == "ordinary operator question" + ) + })); + + Ok(()) +} + /// Test that the dispatcher consumes conductor notes from the control channel. #[tokio::test] async fn test_mission_dispatcher_processes_conductor_note() -> Result<(), Box>