Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions src/mission/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,16 +219,16 @@ impl WorkGraphCompiler {

/// Infer owner role from issue labels or content.
#[must_use]
pub fn infer_owner_role(&self, title: &str, body: &str) -> Option<String> {
pub fn infer_owner_role(&self, title: &str, body: &str, kind: WorkKind) -> Option<String> {
let text = format!("{} {}", title, body).to_lowercase();

if text.contains("backend") || text.contains("api") || text.contains("server") {
Some("backend".to_string())
} else if text.contains("frontend") || text.contains("ui") || text.contains("web") {
Some("frontend".to_string())
} else if text.contains("test") || text.contains("qa") {
} else if matches!(kind, WorkKind::Test) || text.contains("qa owner") {
Some("tester".to_string())
} else if text.contains("review") {
} else if matches!(kind, WorkKind::Review) {
Some("reviewer".to_string())
} else if text.contains("devops")
|| text.contains("infrastructure")
Expand All @@ -252,7 +252,7 @@ impl WorkGraphCompiler {
) -> ParsedIssue {
let depends_on = self.parse_dependencies(&body);
let kind = self.infer_work_kind(&title, &body);
let owner_role = self.infer_owner_role(&title, &body);
let owner_role = self.infer_owner_role(&title, &body, kind);

ParsedIssue {
number,
Expand Down Expand Up @@ -516,6 +516,36 @@ mod tests {
);
}

#[test]
fn test_infer_owner_role_is_kind_aware_for_implement_work() {
let compiler = WorkGraphCompiler::new();

assert_eq!(
compiler.infer_owner_role(
"Implement auth flow",
"Also add integration tests for login and signup",
WorkKind::Implement
),
None
);
assert_eq!(
compiler
.infer_owner_role(
"Implement auth API",
"Add integration tests for login and signup",
WorkKind::Implement
)
.as_deref(),
Some("backend")
);
assert_eq!(
compiler
.infer_owner_role("Add integration tests", "", WorkKind::Test)
.as_deref(),
Some("tester")
);
}

#[test]
fn test_compile_simple() {
let compiler = WorkGraphCompiler::new();
Expand Down
171 changes: 170 additions & 1 deletion src/mission/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::error::Result;
use crate::message::{Message, MessageType};
use crate::mission::scheduler::{MissionScheduler, SchedulerTickResult};
use crate::mission::storage::MissionStorage;
use crate::mission::types::{MissionId, MissionState, WatchStatus};
use crate::mission::types::{MissionId, MissionState, WatchStatus, WorkItemId};
use crate::mission::watch::{GitHubClient, WatchEngine, WatchEngineTickResult};

/// Dispatcher configuration.
Expand Down Expand Up @@ -154,6 +154,7 @@ impl<G: GitHubClient> MissionDispatcher<G> {
.find(|result| result.mission_id == *mission_id)
.is_some_and(|result| {
result.state_changed
|| !result.reconciled.is_empty()
|| !result.promoted.is_empty()
|| !result.assigned.is_empty()
|| !result.completed.is_empty()
Expand Down Expand Up @@ -278,6 +279,80 @@ impl<G: GitHubClient> MissionDispatcher<G> {
)
.await?;
}
} else if let Some(directive) = parse_redirect_directive(body) {
let work_item_id = match self
.resolve_work_item_ref(mission_id, directive.work_item_ref.as_deref())
.await?
{
Some(id) => id,
None => {
self.storage
.log_event(
mission_id,
&format!(
"Dispatcher ignored redirect directive from {}: could not resolve work item in '{}'",
message.sender, body
),
)
.await?;
message.mark_processed();
self.storage.save_control_message(&message).await?;
continue;
}
};
let agent_id = match self.resolve_agent_ref(&directive.agent_ref).await? {
Some(id) => id,
None => {
self.storage
.log_event(
mission_id,
&format!(
"Dispatcher ignored redirect directive from {}: unknown agent '{}' in '{}'",
message.sender, directive.agent_ref, body
),
)
.await?;
message.mark_processed();
self.storage.save_control_message(&message).await?;
continue;
}
};

if self
.scheduler
.redirect_work_item(
mission_id,
work_item_id,
agent_id,
directive.reason.as_deref(),
)
.await?
{
if mission.state.can_resume() {
mission.start();
mission.set_next_wake_at(None);
}
progressed = true;
self.storage
.log_event(
mission_id,
&format!(
"Dispatcher redirected work item {} to {} from {}",
work_item_id, directive.agent_ref, message.sender
),
)
.await?;
} else {
self.storage
.log_event(
mission_id,
&format!(
"Dispatcher ignored redirect directive from {}: failed to redirect '{}'",
message.sender, body
),
)
.await?;
}
} else if lower.starts_with("pause") || lower.starts_with("hold") {
if mission.state.can_pause() {
mission.block(format!("Paused by {}: {}", message.sender, body));
Expand Down Expand Up @@ -436,4 +511,98 @@ impl<G: GitHubClient> MissionDispatcher<G> {
.await?;
Ok(())
}

async fn resolve_agent_ref(&self, raw: &str) -> Result<Option<AgentId>> {
if let Ok(id) = raw.parse::<AgentId>() {
return Ok(Some(id));
}

let raw_lower = raw.to_ascii_lowercase();
let agents = self.channel.list_agents().await?;
let mut matches = agents.into_iter().filter(|agent| {
agent.name.eq_ignore_ascii_case(raw)
|| agent.display_label().eq_ignore_ascii_case(raw)
|| agent.id.short_id().eq_ignore_ascii_case(&raw_lower)
});

let first = matches.next().map(|agent| agent.id);
if matches.next().is_some() {
Ok(None)
} else {
Ok(first)
}
}

async fn resolve_work_item_ref(
&self,
mission_id: MissionId,
raw: Option<&str>,
) -> Result<Option<WorkItemId>> {
let work_items = self.storage.list_work_items(mission_id).await?;
let active_items: Vec<_> = work_items
.iter()
.filter(|item| item.status != crate::mission::WorkStatus::Done)
.collect();

let Some(raw) = raw else {
return Ok((active_items.len() == 1).then(|| active_items[0].id));
};

if let Ok(id) = raw.parse::<WorkItemId>() {
return Ok(Some(id));
}

let matches: Vec<_> = active_items
.iter()
.filter(|item| item.id.short_id().eq_ignore_ascii_case(raw))
.map(|item| item.id)
.collect();

if matches.len() == 1 {
Ok(Some(matches[0]))
} else {
Ok(None)
}
}
}

#[derive(Debug, Clone)]
struct RedirectDirective {
work_item_ref: Option<String>,
agent_ref: String,
reason: Option<String>,
}

fn parse_redirect_directive(body: &str) -> Option<RedirectDirective> {
let parts: Vec<_> = body.split_whitespace().collect();
let verb = parts.first()?.to_ascii_lowercase();
if !matches!(
verb.as_str(),
"redirect" | "reassign" | "reroute" | "rebind"
) {
return None;
}

let (work_item_ref, to_idx) = if parts.get(1)?.eq_ignore_ascii_case("to") {
(None, 1usize)
} else {
(Some(parts.get(1)?.to_string()), 2usize)
};

if !parts.get(to_idx)?.eq_ignore_ascii_case("to") {
return None;
}

let agent_ref = parts.get(to_idx + 1)?.to_string();
let reason = if parts.len() > to_idx + 2 {
Some(parts[to_idx + 2..].join(" "))
} else {
None
};

Some(RedirectDirective {
work_item_ref,
agent_ref,
reason,
})
}
Loading
Loading