From 9c6b979d62f14846e4ce8a9e6de8d6010e972efa Mon Sep 17 00:00:00 2001 From: Jeremy Plichta Date: Wed, 22 Apr 2026 11:38:17 -0600 Subject: [PATCH 1/5] fix(mission): reconcile redirected work ownership --- src/mission/dispatcher.rs | 171 ++++++++++++++++++- src/mission/scheduler.rs | 325 ++++++++++++++++++++++++++++++++++++- src/mission/types.rs | 6 + tests/integration_tests.rs | 177 +++++++++++++++++++- 4 files changed, 676 insertions(+), 3 deletions(-) diff --git a/src/mission/dispatcher.rs b/src/mission/dispatcher.rs index f419880..bd56c3e 100644 --- a/src/mission/dispatcher.rs +++ b/src/mission/dispatcher.rs @@ -20,7 +20,7 @@ use crate::error::Result; use crate::message::{Message, MessageType}; use crate::mission::scheduler::{MissionScheduler, SchedulerTickResult}; use crate::mission::storage::MissionStorage; -use crate::mission::types::{MissionId, MissionState, WatchStatus}; +use crate::mission::types::{MissionId, MissionState, WatchStatus, WorkItemId}; use crate::mission::watch::{GitHubClient, WatchEngine, WatchEngineTickResult}; /// Dispatcher configuration. @@ -154,6 +154,7 @@ impl MissionDispatcher { .find(|result| result.mission_id == *mission_id) .is_some_and(|result| { result.state_changed + || !result.reconciled.is_empty() || !result.promoted.is_empty() || !result.assigned.is_empty() || !result.completed.is_empty() @@ -278,6 +279,80 @@ impl MissionDispatcher { ) .await?; } + } else if let Some(directive) = parse_redirect_directive(body) { + let work_item_id = match self + .resolve_work_item_ref(mission_id, directive.work_item_ref.as_deref()) + .await? + { + Some(id) => id, + None => { + self.storage + .log_event( + mission_id, + &format!( + "Dispatcher ignored redirect directive from {}: could not resolve work item in '{}'", + message.sender, body + ), + ) + .await?; + message.mark_processed(); + self.storage.save_control_message(&message).await?; + continue; + } + }; + let agent_id = match self.resolve_agent_ref(&directive.agent_ref).await? { + Some(id) => id, + None => { + self.storage + .log_event( + mission_id, + &format!( + "Dispatcher ignored redirect directive from {}: unknown agent '{}' in '{}'", + message.sender, directive.agent_ref, body + ), + ) + .await?; + message.mark_processed(); + self.storage.save_control_message(&message).await?; + continue; + } + }; + + if self + .scheduler + .redirect_work_item( + mission_id, + work_item_id, + agent_id, + directive.reason.as_deref(), + ) + .await? + { + if mission.state.can_resume() { + mission.start(); + mission.set_next_wake_at(None); + } + progressed = true; + self.storage + .log_event( + mission_id, + &format!( + "Dispatcher redirected work item {} to {} from {}", + work_item_id, directive.agent_ref, message.sender + ), + ) + .await?; + } else { + self.storage + .log_event( + mission_id, + &format!( + "Dispatcher ignored redirect directive from {}: failed to redirect '{}'", + message.sender, body + ), + ) + .await?; + } } else if lower.starts_with("pause") || lower.starts_with("hold") { if mission.state.can_pause() { mission.block(format!("Paused by {}: {}", message.sender, body)); @@ -436,4 +511,98 @@ impl MissionDispatcher { .await?; Ok(()) } + + async fn resolve_agent_ref(&self, raw: &str) -> Result> { + if let Ok(id) = raw.parse::() { + return Ok(Some(id)); + } + + let raw_lower = raw.to_ascii_lowercase(); + let agents = self.channel.list_agents().await?; + let mut matches = agents.into_iter().filter(|agent| { + agent.name.eq_ignore_ascii_case(raw) + || agent.display_label().eq_ignore_ascii_case(raw) + || agent.id.short_id().eq_ignore_ascii_case(&raw_lower) + }); + + let first = matches.next().map(|agent| agent.id); + if matches.next().is_some() { + Ok(None) + } else { + Ok(first) + } + } + + async fn resolve_work_item_ref( + &self, + mission_id: MissionId, + raw: Option<&str>, + ) -> Result> { + let work_items = self.storage.list_work_items(mission_id).await?; + let active_items: Vec<_> = work_items + .iter() + .filter(|item| item.status != crate::mission::WorkStatus::Done) + .collect(); + + let Some(raw) = raw else { + return Ok((active_items.len() == 1).then_some(active_items[0].id)); + }; + + if let Ok(id) = raw.parse::() { + return Ok(Some(id)); + } + + let matches: Vec<_> = active_items + .iter() + .filter(|item| item.id.short_id().eq_ignore_ascii_case(raw)) + .map(|item| item.id) + .collect(); + + if matches.len() == 1 { + Ok(Some(matches[0])) + } else { + Ok(None) + } + } +} + +#[derive(Debug, Clone)] +struct RedirectDirective { + work_item_ref: Option, + agent_ref: String, + reason: Option, +} + +fn parse_redirect_directive(body: &str) -> Option { + let parts: Vec<_> = body.split_whitespace().collect(); + let verb = parts.first()?.to_ascii_lowercase(); + if !matches!( + verb.as_str(), + "redirect" | "reassign" | "reroute" | "rebind" + ) { + return None; + } + + let (work_item_ref, to_idx) = if parts.get(1)?.eq_ignore_ascii_case("to") { + (None, 1usize) + } else { + (Some(parts.get(1)?.to_string()), 2usize) + }; + + if !parts.get(to_idx)?.eq_ignore_ascii_case("to") { + return None; + } + + let agent_ref = parts.get(to_idx + 1)?.to_string(); + let reason = if parts.len() > to_idx + 2 { + Some(parts[to_idx + 2..].join(" ")) + } else { + None + }; + + Some(RedirectDirective { + work_item_ref, + agent_ref, + reason, + }) } diff --git a/src/mission/scheduler.rs b/src/mission/scheduler.rs index f5cd052..be3bbdc 100644 --- a/src/mission/scheduler.rs +++ b/src/mission/scheduler.rs @@ -30,7 +30,7 @@ use crate::mission::types::{ MissionId, MissionRun, MissionState, TriggerAction, WatchItem, WatchKind, WatchStatus, WorkItem, WorkItemId, WorkKind, WorkStatus, }; -use crate::task::Task; +use crate::task::{Task, TaskState}; // ==================== Configuration ==================== @@ -62,6 +62,8 @@ impl Default for SchedulerConfig { pub struct MissionTickResult { /// Mission ID pub mission_id: MissionId, + /// Work items whose ownership/status was reconciled against live mission tasks. + pub reconciled: Vec, /// Work items promoted to ready pub promoted: Vec, /// Work items assigned to agents @@ -238,6 +240,12 @@ impl MissionScheduler { let mut work_items = self.storage.list_work_items(mission_id).await?; let watches = self.storage.list_watch_items(mission_id).await?; + // Step -1: reconcile stale assignees against the current mission-task set. + let reconciled = self + .reconcile_work_item_assignments(&mut work_items, &mission, agents) + .await?; + result.reconciled = reconciled; + // Step 0: finalize previously submitted work once review/watch gates are clear. let completed = self.complete_waiting_items(&mut work_items, &mission, &watches); for id in &completed { @@ -334,6 +342,119 @@ impl MissionScheduler { Ok(result) } + async fn reconcile_work_item_assignments( + &self, + work_items: &mut [WorkItem], + mission: &MissionRun, + agents: &[Agent], + ) -> Result> { + let tasks = self.channel.list_tasks().await?; + let agents_by_id: HashMap = + agents.iter().map(|agent| (agent.id, agent)).collect(); + let mut tasks_by_work_item: HashMap> = HashMap::new(); + + for task in tasks.iter().filter(|task| !task.state.is_terminal()) { + let Some((task_mission_id, work_item_id, _)) = mission_task_binding(&task.tags) else { + continue; + }; + if task_mission_id != mission.id { + continue; + } + tasks_by_work_item + .entry(work_item_id) + .or_default() + .push(task); + } + + let mut reconciled = Vec::new(); + + for item in work_items.iter_mut() { + if item.status == WorkStatus::Done { + continue; + } + + let mut changed = false; + if let Some(tasks) = tasks_by_work_item.get(&item.id) + && let Some(task) = select_authoritative_mission_task(tasks) + { + if let Some(agent_id) = task.task.assigned_to + && item.assigned_to != Some(agent_id) + { + item.assigned_to = Some(agent_id); + item.updated_at = Utc::now(); + self.storage + .log_event( + mission.id, + &format!( + "Rebound work item '{}' to live mission task owner {}", + item.title, agent_id + ), + ) + .await?; + changed = true; + } + + if matches!(task.kind, MissionTaskKind::Work | MissionTaskKind::Fix) { + let desired = task_work_status(task.task.state); + if item.status != desired && task_can_drive_execution_state(task.task.state) { + item.status = desired; + item.updated_at = Utc::now(); + self.storage + .log_event( + mission.id, + &format!( + "Updated work item '{}' to {:?} from live mission task", + item.title, desired + ), + ) + .await?; + changed = true; + } + } + } else if let Some(agent_id) = item.assigned_to { + let stale_assignee = agents_by_id + .get(&agent_id) + .is_none_or(|agent| agent.state.is_terminal()); + if stale_assignee { + if matches!(item.status, WorkStatus::Assigned | WorkStatus::Running) { + item.assigned_to = None; + item.mark_ready(); + self.storage + .log_event( + mission.id, + &format!( + "Released stale assignee {} from work item '{}'", + agent_id, item.title + ), + ) + .await?; + changed = true; + } else if item.status == WorkStatus::Blocked { + item.assigned_to = None; + item.updated_at = Utc::now(); + self.storage + .log_event( + mission.id, + &format!( + "Cleared stale blocked assignee {} from work item '{}'", + agent_id, item.title + ), + ) + .await?; + changed = true; + } + } + } + + if changed { + self.storage.save_work_item(item).await?; + reconciled.push(item.id); + } + } + + Ok(reconciled) + } + // ==================== Ready Queue Promotion ==================== /// Promote work items from Pending to Ready when dependencies are satisfied. @@ -861,6 +982,98 @@ impl MissionScheduler { Ok(WorkItemCompletion::Completed) } + /// Redirect a work item to a specific agent after an operator correction. + pub async fn redirect_work_item( + &self, + mission_id: MissionId, + work_item_id: WorkItemId, + agent_id: AgentId, + reason: Option<&str>, + ) -> Result { + let Some(mission) = self.storage.get_mission(mission_id).await? else { + return Ok(false); + }; + let Some(mut item) = self.storage.get_work_item(mission_id, work_item_id).await? else { + return Ok(false); + }; + + let agents = self.channel.list_agents().await?; + let Some(agent) = agents.iter().find(|candidate| candidate.id == agent_id) else { + return Ok(false); + }; + + let tasks = self.channel.list_tasks().await?; + let mut cancelled = 0usize; + let mut target_task_exists = false; + for mut task in tasks { + let Some((task_mission_id, task_work_item_id, _)) = mission_task_binding(&task.tags) + else { + continue; + }; + if task_mission_id != mission_id + || task_work_item_id != work_item_id + || task.state.is_terminal() + { + continue; + } + + if task.assigned_to == Some(agent_id) { + target_task_exists = true; + continue; + } + + task.cancel(format!( + "Superseded by dispatcher redirect to {}{}", + agent.name, + reason + .filter(|value| !value.trim().is_empty()) + .map(|value| format!(" ({})", value.trim())) + .unwrap_or_default() + )); + self.channel.set_task(&task).await?; + cancelled += 1; + } + + if !target_task_exists { + let mut task = redirected_task(&mission, &item, agent_id, reason); + let task_id = task.id; + task.assign(agent_id); + self.channel.set_task(&task).await?; + self.channel + .send(&Message::new( + AgentId::supervisor(), + agent_id, + MessageType::TaskAssign { + task_id: task_id.to_string(), + }, + )) + .await?; + } + + item.assign(agent_id); + self.storage.save_work_item(&item).await?; + self.storage + .log_event( + mission_id, + &format!( + "Redirected work item '{}' to '{}'{}{}", + item.title, + agent.name, + if cancelled > 0 { + format!("; cancelled {} superseded mission task(s)", cancelled) + } else { + String::new() + }, + reason + .filter(|value| !value.trim().is_empty()) + .map(|value| format!(" ({})", value.trim())) + .unwrap_or_default() + ), + ) + .await?; + Ok(true) + } + /// Mark a work item as blocked. #[instrument(skip(self))] pub async fn block_work_item( @@ -1105,6 +1318,116 @@ impl MissionScheduler { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +enum MissionTaskKind { + Review, + Fix, + Work, +} + +fn mission_task_binding(tags: &[String]) -> Option<(MissionId, WorkItemId, MissionTaskKind)> { + let mission_id = tags + .iter() + .find_map(|tag| tag.strip_prefix("mission:")) + .and_then(|value| value.parse().ok())?; + let work_item_id = tags + .iter() + .find_map(|tag| tag.strip_prefix("work-item:")) + .and_then(|value| value.parse().ok())?; + let kind = if tags.iter().any(|tag| tag == "mission-review-task") { + MissionTaskKind::Review + } else if tags.iter().any(|tag| tag == "mission-fix-task") { + MissionTaskKind::Fix + } else if tags.iter().any(|tag| tag == "mission-work-item") { + MissionTaskKind::Work + } else { + return None; + }; + Some((mission_id, work_item_id, kind)) +} + +fn select_authoritative_mission_task<'a>(tasks: &[&'a Task]) -> Option> { + tasks + .iter() + .filter_map(|task| { + let (_, _, kind) = mission_task_binding(&task.tags)?; + Some(MissionTrackedTask { task, kind }) + }) + .max_by_key(|tracked| { + ( + match tracked.kind { + MissionTaskKind::Work => 3u8, + MissionTaskKind::Fix => 2u8, + MissionTaskKind::Review => 1u8, + }, + match tracked.task.state { + TaskState::Running => 2u8, + TaskState::Assigned => 1u8, + _ => 0u8, + }, + tracked.task.updated_at, + ) + }) +} + +#[derive(Debug, Clone, Copy)] +struct MissionTrackedTask<'a> { + task: &'a Task, + kind: MissionTaskKind, +} + +fn task_can_drive_execution_state(state: TaskState) -> bool { + matches!(state, TaskState::Assigned | TaskState::Running) +} + +fn task_work_status(state: TaskState) -> WorkStatus { + match state { + TaskState::Running => WorkStatus::Running, + _ => WorkStatus::Assigned, + } +} + +fn redirected_task( + mission: &MissionRun, + item: &WorkItem, + agent_id: AgentId, + reason: Option<&str>, +) -> Task { + let intro = if item.artifact_refs.is_empty() { + "[Mission Redirect]" + } else { + "[Mission Fix Redirect]" + }; + let artifact_hint = if item.artifact_refs.is_empty() { + "No artifact refs recorded yet.".to_string() + } else { + format!( + "Latest artifacts/refs:\n- {}", + item.artifact_refs.join("\n- ") + ) + }; + let reason_hint = reason + .filter(|value| !value.trim().is_empty()) + .map(|value| format!("Operator correction: {}\n\n", value.trim())) + .unwrap_or_default(); + let task_kind = if item.artifact_refs.is_empty() { + ("mission-work-item", "mission-task:work") + } else { + ("mission-fix-task", "mission-task:fix") + }; + + Task::new(format!( + "{} {}\n\nMission: {}\nWork item: {}\nAssigned by dispatcher redirect to {}\n\n{}{}", + intro, item.title, mission.id, item.id, agent_id, reason_hint, artifact_hint + )) + .with_tags([ + task_kind.0.to_string(), + task_kind.1.to_string(), + format!("mission:{}", mission.id), + format!("work-item:{}", item.id), + ]) +} + fn extract_pr_refs(artifacts: &[String]) -> Vec { let mut refs = Vec::new(); let pr_url = diff --git a/src/mission/types.rs b/src/mission/types.rs index f99d6a9..d089af3 100644 --- a/src/mission/types.rs +++ b/src/mission/types.rs @@ -78,6 +78,12 @@ impl WorkItemId { pub fn from_uuid(uuid: Uuid) -> Self { Self(uuid) } + + /// Return the first 4 hex characters of the UUID for compact display. + #[must_use] + pub fn short_id(&self) -> String { + self.0.to_string().chars().take(4).collect() + } } impl Default for WorkItemId { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 719c1f3..6d806ca 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -3264,7 +3264,8 @@ async fn test_mission_dispatcher_creates_fix_task_from_failing_watch() assert_ne!(updated_watch.status, tinytown::mission::WatchStatus::Active); let updated_item = storage.get_work_item(mission.id, item.id).await?.unwrap(); - assert_eq!(updated_item.status, WorkStatus::Blocked); + assert_eq!(updated_item.status, WorkStatus::Assigned); + assert_eq!(updated_item.assigned_to, Some(worker.id())); drop(town); cleanup_redis(&temp_dir); @@ -3785,6 +3786,71 @@ async fn test_mission_dispatcher_mergeability_respects_reviewer_gate() Ok(()) } +/// Test that the scheduler follows a live redirected mission task instead of a stale assignee. +#[tokio::test] +async fn test_mission_scheduler_rebinds_blocked_item_to_live_task() +-> Result<(), Box> { + use tinytown::mission::{ + MissionRun, MissionScheduler, MissionStorage, ObjectiveRef, WorkItem, WorkKind, WorkStatus, + }; + + let temp_dir = TempDir::new()?; + let town_name = unique_town_name("mission-scheduler-rebind-live-task"); + let town = Town::init(temp_dir.path(), &town_name).await?; + let tester = town.spawn_agent("tester", "claude").await?; + let worker = town.spawn_agent("backend-worker", "claude").await?; + + let mut tester_state = Agent::new("tester", "claude", AgentType::Worker); + tester_state.id = tester.id(); + tester_state.state = AgentState::Idle; + town.channel().set_agent_state(&tester_state).await?; + + let mut worker_state = Agent::new("backend-worker", "claude", AgentType::Worker); + worker_state.id = worker.id(); + worker_state.state = AgentState::Working; + town.channel().set_agent_state(&worker_state).await?; + + let storage = MissionStorage::new(town.channel().conn().clone(), &town_name); + let mut mission = MissionRun::new(vec![ObjectiveRef::Doc { + path: "test.md".into(), + }]); + mission.start(); + storage.save_mission(&mission).await?; + storage.add_active(mission.id).await?; + + let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement); + item.assign(tester.id()); + item.block(); + item.record_artifacts(["task:stale-review"]); + storage.save_work_item(&item).await?; + + let mut redirected_task = Task::new("[Mission Fix Redirect] Implement auth").with_tags([ + "mission-fix-task".to_string(), + "mission-task:fix".to_string(), + format!("mission:{}", mission.id), + format!("work-item:{}", item.id), + ]); + redirected_task.assign(worker.id()); + redirected_task.start(); + town.channel().set_task(&redirected_task).await?; + + let scheduler = MissionScheduler::with_defaults(storage.clone(), town.channel().clone()); + let tick = scheduler.tick().await?; + assert!( + tick.missions + .iter() + .any(|mission_tick| mission_tick.reconciled.contains(&item.id)) + ); + + let updated = storage.get_work_item(mission.id, item.id).await?.unwrap(); + assert_eq!(updated.assigned_to, Some(worker.id())); + assert_eq!(updated.status, WorkStatus::Running); + + drop(town); + cleanup_redis(&temp_dir); + Ok(()) +} + /// Test that scheduler completion only applies to blocked waiting items. #[tokio::test] async fn test_mission_scheduler_does_not_finalize_running_items_with_artifacts() @@ -4005,6 +4071,115 @@ async fn test_mission_dispatcher_processes_conductor_note() -> Result<(), Box Result<(), Box> { + use tinytown::mission::{ + DispatcherConfig, MissionControlMessage, MissionDispatcher, MissionRun, MissionState, + MissionStorage, MockGitHubClient, ObjectiveRef, WorkItem, WorkKind, WorkStatus, + }; + + let temp_dir = TempDir::new()?; + let town_name = unique_town_name("mission-dispatch-redirect"); + let town = Town::init(temp_dir.path(), &town_name).await?; + let tester = town.spawn_agent("tester", "claude").await?; + let worker = town.spawn_agent("backend-worker", "claude").await?; + + let mut tester_state = Agent::new("tester", "claude", AgentType::Worker); + tester_state.id = tester.id(); + tester_state.state = AgentState::Idle; + town.channel().set_agent_state(&tester_state).await?; + + let mut worker_state = Agent::new("backend-worker", "claude", AgentType::Worker); + worker_state.id = worker.id(); + worker_state.state = AgentState::Idle; + town.channel().set_agent_state(&worker_state).await?; + + let storage = MissionStorage::new(town.channel().conn().clone(), &town_name); + let mut mission = MissionRun::new(vec![ObjectiveRef::Doc { + path: "test.md".into(), + }]); + mission.start(); + mission.blocked_reason = Some("Waiting on stale reviewer assignment".into()); + storage.save_mission(&mission).await?; + storage.add_active(mission.id).await?; + + let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement); + item.assign(tester.id()); + item.block(); + item.record_artifacts(["task:stale-review"]); + storage.save_work_item(&item).await?; + + let mut stale_task = Task::new("[Mission Work Item] stale owner").with_tags([ + "mission-work-item".to_string(), + "mission-task:work".to_string(), + format!("mission:{}", mission.id), + format!("work-item:{}", item.id), + ]); + stale_task.assign(tester.id()); + let stale_task_id = stale_task.id; + town.channel().set_task(&stale_task).await?; + + let note = MissionControlMessage::new( + mission.id, + "conductor", + format!( + "redirect {} to backend-worker tester was the wrong lane", + item.id + ), + ); + storage.save_control_message(¬e).await?; + + let dispatcher = MissionDispatcher::new( + storage.clone(), + town.channel().clone(), + MockGitHubClient::new(), + DispatcherConfig { + tick_interval_secs: 1, + lock_ttl_secs: 30, + ..DispatcherConfig::default() + }, + ); + dispatcher.tick(Some(mission.id)).await?; + + let updated = storage.get_work_item(mission.id, item.id).await?.unwrap(); + assert_eq!(updated.assigned_to, Some(worker.id())); + assert_eq!(updated.status, WorkStatus::Assigned); + + let updated_mission = storage.get_mission(mission.id).await?.unwrap(); + assert_eq!(updated_mission.state, MissionState::Running); + assert!(updated_mission.blocked_reason.is_none()); + + let stale_task = town.channel().get_task(stale_task_id).await?.unwrap(); + assert_eq!(stale_task.state, TaskState::Cancelled); + + let replacement_tasks: Vec<_> = town + .channel() + .list_tasks() + .await? + .into_iter() + .filter(|task| { + task.id != stale_task_id + && !task.state.is_terminal() + && task.assigned_to == Some(worker.id()) + && task + .tags + .iter() + .any(|tag| tag == &format!("mission:{}", mission.id)) + && task + .tags + .iter() + .any(|tag| tag == &format!("work-item:{}", item.id)) + }) + .collect(); + assert_eq!(replacement_tasks.len(), 1); + + drop(town); + cleanup_redis(&temp_dir); + Ok(()) +} + /// Test that invalid resume notes cannot revive failed missions. #[tokio::test] async fn test_mission_dispatcher_ignores_resume_note_for_failed_mission() From 8d2d675bb9dc0debe01b884dbfd0f638cc6532aa Mon Sep 17 00:00:00 2001 From: Jeremy Plichta Date: Wed, 22 Apr 2026 12:25:30 -0600 Subject: [PATCH 2/5] fix(mission): harden work-item routing --- src/mission/compiler.rs | 38 ++++++++- src/mission/scheduler.rs | 167 +++++++++++++++++++++++++++++++++---- tests/integration_tests.rs | 52 ++++++++++++ 3 files changed, 238 insertions(+), 19 deletions(-) diff --git a/src/mission/compiler.rs b/src/mission/compiler.rs index 83fdd49..231cc28 100644 --- a/src/mission/compiler.rs +++ b/src/mission/compiler.rs @@ -219,16 +219,16 @@ impl WorkGraphCompiler { /// Infer owner role from issue labels or content. #[must_use] - pub fn infer_owner_role(&self, title: &str, body: &str) -> Option { + pub fn infer_owner_role(&self, title: &str, body: &str, kind: WorkKind) -> Option { let text = format!("{} {}", title, body).to_lowercase(); if text.contains("backend") || text.contains("api") || text.contains("server") { Some("backend".to_string()) } else if text.contains("frontend") || text.contains("ui") || text.contains("web") { Some("frontend".to_string()) - } else if text.contains("test") || text.contains("qa") { + } else if matches!(kind, WorkKind::Test) || text.contains("qa owner") { Some("tester".to_string()) - } else if text.contains("review") { + } else if matches!(kind, WorkKind::Review) { Some("reviewer".to_string()) } else if text.contains("devops") || text.contains("infrastructure") @@ -252,7 +252,7 @@ impl WorkGraphCompiler { ) -> ParsedIssue { let depends_on = self.parse_dependencies(&body); let kind = self.infer_work_kind(&title, &body); - let owner_role = self.infer_owner_role(&title, &body); + let owner_role = self.infer_owner_role(&title, &body, kind); ParsedIssue { number, @@ -516,6 +516,36 @@ mod tests { ); } + #[test] + fn test_infer_owner_role_is_kind_aware_for_implement_work() { + let compiler = WorkGraphCompiler::new(); + + assert_eq!( + compiler.infer_owner_role( + "Implement auth flow", + "Also add integration tests for login and signup", + WorkKind::Implement + ), + None + ); + assert_eq!( + compiler + .infer_owner_role( + "Implement auth API", + "Add integration tests for login and signup", + WorkKind::Implement + ) + .as_deref(), + Some("backend") + ); + assert_eq!( + compiler + .infer_owner_role("Add integration tests", "", WorkKind::Test) + .as_deref(), + Some("tester") + ); + } + #[test] fn test_compile_simple() { let compiler = WorkGraphCompiler::new(); diff --git a/src/mission/scheduler.rs b/src/mission/scheduler.rs index be3bbdc..66adf11 100644 --- a/src/mission/scheduler.rs +++ b/src/mission/scheduler.rs @@ -121,6 +121,18 @@ pub struct AgentMatchScore { pub load_penalty: u32, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum AgentLane { + Backend, + Frontend, + Tester, + Reviewer, + Devops, + Runner, + Researcher, + Generalist, +} + impl AgentMatchScore { /// Create a new match score. #[must_use] @@ -673,29 +685,17 @@ impl MissionScheduler { item: &WorkItem, current_assignments: &[(WorkItemId, AgentId)], ) -> AgentMatchScore { + let lane = self.agent_lane(agent); // Base score: role matching let base_score = if let Some(ref owner_role) = item.owner_role { let role_lower = owner_role.to_lowercase(); if self.agent_matches_role(agent, &role_lower) { 100 // Exact role match - } else if self.is_reviewer_agent(agent) { - // Reviewer can do review work at full score, other work at penalty - if item.kind == WorkKind::Review { - 100 - } else { - 25 - } } else { - 50 // Generic worker + self.mismatch_fallback_score(item, &role_lower, lane) } } else { - // No role specified - any worker is fine - if self.is_reviewer_agent(agent) { - // Prefer non-reviewers for unspecified work - 40 - } else { - 60 - } + self.kind_fallback_score(item, lane) }; // Load penalty: reduce score for agents already assigned this tick @@ -708,6 +708,96 @@ impl MissionScheduler { AgentMatchScore::new(base_score, load_penalty) } + fn kind_fallback_score(&self, item: &WorkItem, lane: AgentLane) -> u32 { + match item.kind { + WorkKind::Implement => match lane { + AgentLane::Backend + | AgentLane::Frontend + | AgentLane::Devops + | AgentLane::Generalist => 60, + AgentLane::Researcher | AgentLane::Runner => 35, + AgentLane::Tester => 5, + AgentLane::Reviewer => 0, + }, + WorkKind::Test => match lane { + AgentLane::Tester => 85, + AgentLane::Backend + | AgentLane::Frontend + | AgentLane::Devops + | AgentLane::Generalist => 45, + AgentLane::Runner | AgentLane::Researcher => 30, + AgentLane::Reviewer => 10, + }, + WorkKind::Review => match lane { + AgentLane::Reviewer => 100, + AgentLane::Researcher => 35, + AgentLane::Generalist => 20, + _ => 5, + }, + WorkKind::Design => match lane { + AgentLane::Researcher => 75, + AgentLane::Backend + | AgentLane::Frontend + | AgentLane::Devops + | AgentLane::Generalist => 55, + AgentLane::Reviewer => 30, + AgentLane::Runner => 20, + AgentLane::Tester => 10, + }, + WorkKind::MergeGate => match lane { + AgentLane::Runner => 80, + AgentLane::Reviewer => 50, + AgentLane::Generalist + | AgentLane::Backend + | AgentLane::Frontend + | AgentLane::Devops => 35, + AgentLane::Researcher => 20, + AgentLane::Tester => 10, + }, + WorkKind::Followup => match lane { + AgentLane::Generalist + | AgentLane::Backend + | AgentLane::Frontend + | AgentLane::Devops + | AgentLane::Researcher => 50, + AgentLane::Runner => 35, + AgentLane::Tester => 20, + AgentLane::Reviewer => 10, + }, + } + } + + fn mismatch_fallback_score(&self, item: &WorkItem, owner_role: &str, lane: AgentLane) -> u32 { + match owner_role { + "backend" | "frontend" | "devops" => match lane { + AgentLane::Generalist => 55, + AgentLane::Backend | AgentLane::Frontend | AgentLane::Devops => 35, + AgentLane::Researcher | AgentLane::Runner => 20, + AgentLane::Tester => { + if matches!(item.kind, WorkKind::Implement) { + 5 + } else { + 15 + } + } + AgentLane::Reviewer => 0, + }, + "tester" | "test" => match lane { + AgentLane::Generalist => 45, + AgentLane::Backend | AgentLane::Frontend | AgentLane::Devops => 30, + AgentLane::Runner | AgentLane::Researcher => 20, + AgentLane::Reviewer => 5, + AgentLane::Tester => 100, + }, + "reviewer" | "review" => match lane { + AgentLane::Researcher => 20, + AgentLane::Generalist => 10, + _ => 0, + }, + _ => self.kind_fallback_score(item, lane), + } + } + /// Check if an agent matches a role. /// /// Uses `agent.role_id` when available for an exact match. Falls back to @@ -758,6 +848,53 @@ impl MissionScheduler { agent_name.contains("review") || agent_name.contains("audit") } + fn agent_lane(&self, agent: &Agent) -> AgentLane { + if let Some(ref role_id) = agent.role_id { + match role_id.to_lowercase().as_str() { + "backend" => return AgentLane::Backend, + "frontend" => return AgentLane::Frontend, + "tester" | "test" => return AgentLane::Tester, + "reviewer" | "review" | "audit" => return AgentLane::Reviewer, + "devops" | "infra" => return AgentLane::Devops, + crate::agent::roles::RUNNER => return AgentLane::Runner, + crate::agent::roles::RESEARCHER => return AgentLane::Researcher, + crate::agent::roles::DEFAULT | crate::agent::roles::WORKER => { + return AgentLane::Generalist; + } + _ => {} + } + } + + let agent_name = agent.name.to_lowercase(); + if agent_name.contains("review") || agent_name.contains("audit") { + AgentLane::Reviewer + } else if agent_name.contains("test") || agent_name.contains("qa") { + AgentLane::Tester + } else if agent_name.contains("backend") + || agent_name.contains("api") + || agent_name.contains("server") + { + AgentLane::Backend + } else if agent_name.contains("frontend") + || agent_name.contains("ui") + || agent_name.contains("web") + || agent_name.contains("client") + { + AgentLane::Frontend + } else if agent_name.contains("devops") + || agent_name.contains("infra") + || agent_name.contains("deploy") + { + AgentLane::Devops + } else if agent_name.contains("runner") || agent_name.contains("ci") { + AgentLane::Runner + } else if agent_name.contains("research") || agent_name.contains("explore") { + AgentLane::Researcher + } else { + AgentLane::Generalist + } + } + // ==================== Reviewer Gate ==================== /// Check if a work item requires reviewer approval before completion. diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 6d806ca..cee45df 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -2872,6 +2872,58 @@ async fn test_mission_scheduler_assigns_persisted_tasks() -> Result<(), Box Result<(), Box> { + use tinytown::mission::{ + MissionRun, MissionScheduler, MissionStorage, ObjectiveRef, WorkItem, WorkKind, + }; + + let town = create_test_town("mission-scheduler-avoid-tester").await?; + let backend_handle = town.spawn_agent("backend-worker", "claude").await?; + let tester_handle = town.spawn_agent("tester", "claude").await?; + + let mut backend = Agent::new("backend-worker", "claude", AgentType::Worker); + backend.id = backend_handle.id(); + backend.state = AgentState::Idle; + town.channel().set_agent_state(&backend).await?; + + let mut tester = Agent::new("tester", "claude", AgentType::Worker); + tester.id = tester_handle.id(); + tester.state = AgentState::Idle; + town.channel().set_agent_state(&tester).await?; + + let storage = MissionStorage::new(town.channel().conn().clone(), town.channel().town_name()); + + let mut mission = MissionRun::new(vec![ObjectiveRef::Issue { + owner: "owner".into(), + repo: "repo".into(), + number: 42, + }]); + mission.policy.reviewer_required = false; + mission.start(); + storage.save_mission(&mission).await?; + storage.add_active(mission.id).await?; + + let mut work_item = WorkItem::new(mission.id, "Implement auth flow", WorkKind::Implement); + work_item.mark_ready(); + let work_item_id = work_item.id; + storage.save_work_item(&work_item).await?; + + let scheduler = MissionScheduler::with_defaults(storage.clone(), town.channel().clone()); + let result = scheduler.tick().await?; + assert_eq!(result.total_assigned, 1); + + let updated = storage + .get_work_item(mission.id, work_item_id) + .await? + .expect("work item should exist"); + assert_eq!(updated.assigned_to, Some(backend_handle.id())); + + Ok(()) +} + /// Test MissionStorage WatchItem operations. #[tokio::test] async fn test_mission_storage_watch_items() -> Result<(), Box> { From 3001489e068d65cc853bc5ea57932e4271d461b7 Mon Sep 17 00:00:00 2001 From: Jeremy Plichta Date: Wed, 22 Apr 2026 13:07:04 -0600 Subject: [PATCH 3/5] fix(mission): address bugbot regressions --- src/mission/dispatcher.rs | 2 +- src/mission/scheduler.rs | 25 +++++++- tests/integration_tests.rs | 119 +++++++++++++++++++++++++++++++++++++ 3 files changed, 142 insertions(+), 4 deletions(-) diff --git a/src/mission/dispatcher.rs b/src/mission/dispatcher.rs index bd56c3e..6796992 100644 --- a/src/mission/dispatcher.rs +++ b/src/mission/dispatcher.rs @@ -545,7 +545,7 @@ impl MissionDispatcher { .collect(); let Some(raw) = raw else { - return Ok((active_items.len() == 1).then_some(active_items[0].id)); + return Ok((active_items.len() == 1).then(|| active_items[0].id)); }; if let Ok(id) = raw.parse::() { diff --git a/src/mission/scheduler.rs b/src/mission/scheduler.rs index 66adf11..ab4dccb 100644 --- a/src/mission/scheduler.rs +++ b/src/mission/scheduler.rs @@ -133,6 +133,15 @@ enum AgentLane { Generalist, } +fn normalize_role_alias(role: &str) -> &str { + match role { + "test" => "tester", + "review" | "audit" => "reviewer", + "infra" => "devops", + other => other, + } +} + impl AgentMatchScore { /// Create a new match score. #[must_use] @@ -768,7 +777,7 @@ impl MissionScheduler { } fn mismatch_fallback_score(&self, item: &WorkItem, owner_role: &str, lane: AgentLane) -> u32 { - match owner_role { + match normalize_role_alias(owner_role) { "backend" | "frontend" | "devops" => match lane { AgentLane::Generalist => 55, AgentLane::Backend | AgentLane::Frontend | AgentLane::Devops => 35, @@ -790,6 +799,7 @@ impl MissionScheduler { AgentLane::Tester => 100, }, "reviewer" | "review" => match lane { + AgentLane::Reviewer => 100, AgentLane::Researcher => 20, AgentLane::Generalist => 10, _ => 0, @@ -805,12 +815,12 @@ impl MissionScheduler { fn agent_matches_role(&self, agent: &Agent, role: &str) -> bool { // Prefer explicit role_id when set if let Some(ref role_id) = agent.role_id { - return role_id.to_lowercase() == role; + return normalize_role_alias(&role_id.to_lowercase()) == normalize_role_alias(role); } // Fallback: name-based substring matching let agent_name = agent.name.to_lowercase(); - match role { + match normalize_role_alias(role) { "backend" => { agent_name.contains("backend") || agent_name.contains("api") @@ -1719,4 +1729,13 @@ mod tests { ); } } + + #[test] + fn test_normalize_role_alias() { + assert_eq!(normalize_role_alias("test"), "tester"); + assert_eq!(normalize_role_alias("review"), "reviewer"); + assert_eq!(normalize_role_alias("audit"), "reviewer"); + assert_eq!(normalize_role_alias("infra"), "devops"); + assert_eq!(normalize_role_alias("backend"), "backend"); + } } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index cee45df..449a9b0 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -2924,6 +2924,60 @@ async fn test_mission_scheduler_avoids_assigning_implement_work_to_tester() Ok(()) } +/// Test that review work still routes to reviewer-role aliases. +#[tokio::test] +async fn test_mission_scheduler_assigns_review_work_to_review_role_alias() +-> Result<(), Box> { + use tinytown::mission::{ + MissionRun, MissionScheduler, MissionStorage, ObjectiveRef, WorkItem, WorkKind, + }; + + let town = create_test_town("mission-scheduler-review-alias").await?; + let reviewer_handle = town.spawn_agent("susan", "claude").await?; + let generalist_handle = town.spawn_agent("worker", "claude").await?; + + let mut reviewer = Agent::new("susan", "claude", AgentType::Worker); + reviewer.id = reviewer_handle.id(); + reviewer.role_id = Some("review".into()); + reviewer.state = AgentState::Idle; + town.channel().set_agent_state(&reviewer).await?; + + let mut generalist = Agent::new("worker", "claude", AgentType::Worker); + generalist.id = generalist_handle.id(); + generalist.state = AgentState::Idle; + town.channel().set_agent_state(&generalist).await?; + + let storage = MissionStorage::new(town.channel().conn().clone(), town.channel().town_name()); + + let mut mission = MissionRun::new(vec![ObjectiveRef::Issue { + owner: "owner".into(), + repo: "repo".into(), + number: 43, + }]); + mission.policy.reviewer_required = false; + mission.start(); + storage.save_mission(&mission).await?; + storage.add_active(mission.id).await?; + + let mut work_item = + WorkItem::new(mission.id, "Review auth flow", WorkKind::Review).with_owner_role("reviewer"); + work_item.mark_ready(); + let work_item_id = work_item.id; + storage.save_work_item(&work_item).await?; + + let scheduler = MissionScheduler::with_defaults(storage.clone(), town.channel().clone()); + let result = scheduler.tick().await?; + assert_eq!(result.total_assigned, 1); + + let updated = storage + .get_work_item(mission.id, work_item_id) + .await? + .expect("work item should exist"); + assert_eq!(updated.assigned_to, Some(reviewer_handle.id())); + + Ok(()) +} + /// Test MissionStorage WatchItem operations. #[tokio::test] async fn test_mission_storage_watch_items() -> Result<(), Box> { @@ -4232,6 +4286,71 @@ async fn test_mission_dispatcher_redirect_note_rebinds_work_item() Ok(()) } +/// Test that redirect notes without a work item ref do not panic when all work is already done. +#[tokio::test] +async fn test_mission_dispatcher_redirect_note_without_active_items_does_not_panic() +-> Result<(), Box> { + use tinytown::mission::{ + DispatcherConfig, MissionControlMessage, MissionDispatcher, MissionRun, MissionStorage, + MockGitHubClient, ObjectiveRef, WorkItem, WorkKind, + }; + + let temp_dir = TempDir::new()?; + let town_name = unique_town_name("mission-dispatch-empty-redirect"); + let town = Town::init(temp_dir.path(), &town_name).await?; + let worker = town.spawn_agent("backend-worker", "claude").await?; + + let mut worker_state = Agent::new("backend-worker", "claude", AgentType::Worker); + worker_state.id = worker.id(); + worker_state.state = AgentState::Idle; + town.channel().set_agent_state(&worker_state).await?; + + let storage = MissionStorage::new(town.channel().conn().clone(), &town_name); + let mut mission = MissionRun::new(vec![ObjectiveRef::Doc { + path: "test.md".into(), + }]); + mission.start(); + storage.save_mission(&mission).await?; + storage.add_active(mission.id).await?; + + let mut item = WorkItem::new(mission.id, "Implement auth", WorkKind::Implement); + item.complete(vec![]); + storage.save_work_item(&item).await?; + + let note = MissionControlMessage::new(mission.id, "conductor", "redirect to backend-worker"); + let note_id = note.id.clone(); + storage.save_control_message(¬e).await?; + + let dispatcher = MissionDispatcher::new( + storage.clone(), + town.channel().clone(), + MockGitHubClient::new(), + DispatcherConfig { + tick_interval_secs: 1, + lock_ttl_secs: 30, + ..DispatcherConfig::default() + }, + ); + dispatcher.tick(Some(mission.id)).await?; + + let messages = storage.list_control_messages(mission.id).await?; + let processed = messages + .into_iter() + .find(|message| message.id == note_id) + .expect("control note should exist"); + assert!(processed.processed_at.is_some()); + + let events = storage.get_events(mission.id, 10).await?; + assert!(events.iter().any(|event| { + event.contains("ignored redirect directive") + && event.contains("could not resolve work item") + })); + + drop(town); + cleanup_redis(&temp_dir); + Ok(()) +} + /// Test that invalid resume notes cannot revive failed missions. #[tokio::test] async fn test_mission_dispatcher_ignores_resume_note_for_failed_mission() From 9632938ca6b040b3aaaa4605e6f92dc2f6619eb1 Mon Sep 17 00:00:00 2001 From: Jeremy Plichta Date: Wed, 22 Apr 2026 14:13:33 -0600 Subject: [PATCH 4/5] fix(mission): prefer reviewer lanes for review work --- src/mission/scheduler.rs | 8 ++++-- tests/integration_tests.rs | 55 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/src/mission/scheduler.rs b/src/mission/scheduler.rs index ab4dccb..f44a701 100644 --- a/src/mission/scheduler.rs +++ b/src/mission/scheduler.rs @@ -777,6 +777,10 @@ impl MissionScheduler { } fn mismatch_fallback_score(&self, item: &WorkItem, owner_role: &str, lane: AgentLane) -> u32 { + if matches!(item.kind, WorkKind::Review) && matches!(lane, AgentLane::Reviewer) { + return 100; + } + match normalize_role_alias(owner_role) { "backend" | "frontend" | "devops" => match lane { AgentLane::Generalist => 55, @@ -791,14 +795,14 @@ impl MissionScheduler { } AgentLane::Reviewer => 0, }, - "tester" | "test" => match lane { + "tester" => match lane { AgentLane::Generalist => 45, AgentLane::Backend | AgentLane::Frontend | AgentLane::Devops => 30, AgentLane::Runner | AgentLane::Researcher => 20, AgentLane::Reviewer => 5, AgentLane::Tester => 100, }, - "reviewer" | "review" => match lane { + "reviewer" => match lane { AgentLane::Reviewer => 100, AgentLane::Researcher => 20, AgentLane::Generalist => 10, diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 449a9b0..930cc69 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -2978,6 +2978,61 @@ async fn test_mission_scheduler_assigns_review_work_to_review_role_alias() Ok(()) } +/// Test that review work with backend-oriented wording still prefers a reviewer lane. +#[tokio::test] +async fn test_mission_scheduler_assigns_backend_review_work_to_reviewer() +-> Result<(), Box> { + use tinytown::mission::{ + MissionRun, MissionScheduler, MissionStorage, ObjectiveRef, WorkItem, WorkKind, + }; + + let town = create_test_town("mission-scheduler-backend-review").await?; + let reviewer_handle = town.spawn_agent("auditor", "claude").await?; + let backend_handle = town.spawn_agent("backend-worker", "claude").await?; + + let mut reviewer = Agent::new("auditor", "claude", AgentType::Worker); + reviewer.id = reviewer_handle.id(); + reviewer.role_id = Some("audit".into()); + reviewer.state = AgentState::Idle; + town.channel().set_agent_state(&reviewer).await?; + + let mut backend = Agent::new("backend-worker", "claude", AgentType::Worker); + backend.id = backend_handle.id(); + backend.role_id = Some("backend".into()); + backend.state = AgentState::Idle; + town.channel().set_agent_state(&backend).await?; + + let storage = MissionStorage::new(town.channel().conn().clone(), town.channel().town_name()); + + let mut mission = MissionRun::new(vec![ObjectiveRef::Issue { + owner: "owner".into(), + repo: "repo".into(), + number: 44, + }]); + mission.policy.reviewer_required = false; + mission.start(); + storage.save_mission(&mission).await?; + storage.add_active(mission.id).await?; + + let mut work_item = WorkItem::new(mission.id, "Review backend API", WorkKind::Review) + .with_owner_role("backend"); + work_item.mark_ready(); + let work_item_id = work_item.id; + storage.save_work_item(&work_item).await?; + + let scheduler = MissionScheduler::with_defaults(storage.clone(), town.channel().clone()); + let result = scheduler.tick().await?; + assert_eq!(result.total_assigned, 1); + + let updated = storage + .get_work_item(mission.id, work_item_id) + .await? + .expect("work item should exist"); + assert_eq!(updated.assigned_to, Some(reviewer_handle.id())); + + Ok(()) +} + /// Test MissionStorage WatchItem operations. #[tokio::test] async fn test_mission_storage_watch_items() -> Result<(), Box> { From b64a3d43130690826ded1ca9e2fef8dd8d39daff Mon Sep 17 00:00:00 2001 From: Jeremy Plichta Date: Wed, 22 Apr 2026 14:19:50 -0600 Subject: [PATCH 5/5] fix(mission): prefer reviewers for review work --- src/mission/scheduler.rs | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/mission/scheduler.rs b/src/mission/scheduler.rs index f44a701..2c1da33 100644 --- a/src/mission/scheduler.rs +++ b/src/mission/scheduler.rs @@ -696,16 +696,19 @@ impl MissionScheduler { ) -> AgentMatchScore { let lane = self.agent_lane(agent); // Base score: role matching - let base_score = if let Some(ref owner_role) = item.owner_role { - let role_lower = owner_role.to_lowercase(); - if self.agent_matches_role(agent, &role_lower) { - 100 // Exact role match + let base_score = + if matches!(item.kind, WorkKind::Review) && matches!(lane, AgentLane::Reviewer) { + 120 + } else if let Some(ref owner_role) = item.owner_role { + let role_lower = owner_role.to_lowercase(); + if self.agent_matches_role(agent, &role_lower) { + 100 // Exact role match + } else { + self.mismatch_fallback_score(item, &role_lower, lane) + } } else { - self.mismatch_fallback_score(item, &role_lower, lane) - } - } else { - self.kind_fallback_score(item, lane) - }; + self.kind_fallback_score(item, lane) + }; // Load penalty: reduce score for agents already assigned this tick let concurrent_count = current_assignments @@ -777,10 +780,6 @@ impl MissionScheduler { } fn mismatch_fallback_score(&self, item: &WorkItem, owner_role: &str, lane: AgentLane) -> u32 { - if matches!(item.kind, WorkKind::Review) && matches!(lane, AgentLane::Reviewer) { - return 100; - } - match normalize_role_alias(owner_role) { "backend" | "frontend" | "devops" => match lane { AgentLane::Generalist => 55,