Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 59 additions & 5 deletions src/app/mcp/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1493,7 +1493,12 @@ pub fn mission_reject_tool(state: Arc<McpState>) -> Tool {
};
let storage = mission_storage(&state);
match storage.get_mission(mission_id).await {
Ok(Some(_)) => {}
Ok(Some(mut mission)) => {
mission.clear_help_request_state();
if let Err(e) = storage.save_mission(&mission).await {
return Ok(error_response(e.to_string()));
}
}
Ok(None) => {
return Ok(error_response(format!(
"Mission {} not found",
Expand Down Expand Up @@ -1551,7 +1556,7 @@ pub fn mission_pause_tool(state: Arc<McpState>) -> Tool {
Err(msg) => return Ok(error_response(msg)),
};
let storage = mission_storage(&state);
let Some(mission) = (match storage.get_mission(mission_id).await {
let Some(mut mission) = (match storage.get_mission(mission_id).await {
Ok(mission) => mission,
Err(e) => return Ok(error_response(e.to_string())),
}) else {
Expand All @@ -1578,9 +1583,21 @@ pub fn mission_pause_tool(state: Arc<McpState>) -> Tool {
"mcp",
"pause requested via mission.pause",
);
mission.clear_help_request_state();
if let Err(e) = storage.save_mission(&mission).await {
return Ok(error_response(e.to_string()));
}
if let Err(e) = storage.save_control_message(&note).await {
return Ok(error_response(e.to_string()));
}
if let Err(e) = crate::mission::retire_help_requests_for_mission(
state.town.channel(),
mission_id,
)
.await
{
return Ok(error_response(e.to_string()));
}
let dispatcher = crate::mission::MissionDispatcher::new(
storage.clone(),
state.town.channel().clone(),
Expand Down Expand Up @@ -1614,7 +1631,7 @@ pub fn mission_resume_tool(state: Arc<McpState>) -> Tool {
Err(msg) => return Ok(error_response(msg)),
};
let storage = mission_storage(&state);
let Some(mission) = (match storage.get_mission(mission_id).await {
let Some(mut mission) = (match storage.get_mission(mission_id).await {
Ok(mission) => mission,
Err(e) => return Ok(error_response(e.to_string())),
}) else {
Expand Down Expand Up @@ -1654,9 +1671,21 @@ pub fn mission_resume_tool(state: Arc<McpState>) -> Tool {
"mcp",
"resume requested via mission.resume",
);
mission.clear_help_request_state();
if let Err(e) = storage.save_mission(&mission).await {
return Ok(error_response(e.to_string()));
}
if let Err(e) = storage.save_control_message(&note).await {
return Ok(error_response(e.to_string()));
}
if let Err(e) = crate::mission::retire_help_requests_for_mission(
state.town.channel(),
mission_id,
)
.await
{
return Ok(error_response(e.to_string()));
}
let dispatcher = crate::mission::MissionDispatcher::new(
storage.clone(),
state.town.channel().clone(),
Expand Down Expand Up @@ -1798,10 +1827,22 @@ pub fn mission_note_tool(state: Arc<McpState>) -> Tool {
if let Err(e) = storage.save_control_message(&note).await {
return Ok(error_response(e.to_string()));
}
let retired = match crate::mission::retire_help_requests_for_mission(
state.town.channel(),
mission_id,
)
.await
{
Ok(retired) => retired,
Err(e) => return Ok(error_response(e.to_string())),
};
if let Err(e) = storage
.log_event(
mission_id,
&format!("Operator note queued via MCP: {}", input.message),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MCP note handler omits clear_help_request_state unlike other paths

Medium Severity

The MCP mission.note handler discards the loaded mission with Ok(Some(_)) => {} and never calls clear_help_request_state(), while every other operator acknowledgment path (CLI note, MCP pause/resume/stop, API resume/stop, and MCP reject) does call it and saves the mission. This means operator notes sent via MCP won't reset the dispatcher's remembered help-request state, so the dispatcher may re-send stale help requests to the conductor inbox even though the operator already responded.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 10ae7f5. Configure here.

&format!(
"Operator note queued via MCP: {} (retired {} stale help request(s))",
input.message, retired
),
)
.await
{
Expand Down Expand Up @@ -1954,17 +1995,30 @@ pub fn mission_stop_tool(state: Arc<McpState>) -> Tool {
} else {
mission.block("Stopped by user");
}
mission.clear_help_request_state();

if let Err(e) = storage.save_mission(&mission).await {
return Ok(error_response(e.to_string()));
}
if let Err(e) = storage.remove_active(mission_id).await {
return Ok(error_response(e.to_string()));
}
let retired = match crate::mission::retire_help_requests_for_mission(
state.town.channel(),
mission_id,
)
.await
{
Ok(retired) => retired,
Err(e) => return Ok(error_response(e.to_string())),
};
if let Err(e) = storage
.log_event(
mission_id,
&format!("Mission stopped via MCP (force={})", input.force),
&format!(
"Mission stopped via MCP (force={}) and retired {} stale help request(s)",
input.force, retired
),
)
.await
{
Expand Down
23 changes: 21 additions & 2 deletions src/app/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,7 @@ async fn stop_mission(
} else {
mission.block("Stopped by user");
}
mission.clear_help_request_state();

storage
.save_mission(&mission)
Expand All @@ -1190,10 +1191,16 @@ async fn stop_mission(
.remove_active(id)
.await
.map_err(|e| ProblemDetails::internal_error(&e.to_string()))?;
let retired = crate::mission::retire_help_requests_for_mission(state.town.channel(), id)
.await
.map_err(|e| ProblemDetails::internal_error(&e.to_string()))?;
let _ = storage
.log_event(
id,
&format!("Mission stopped via API (force={})", req.force),
&format!(
"Mission stopped via API (force={}) and retired {} stale help request(s)",
req.force, retired
),
)
.await;

Expand Down Expand Up @@ -1226,6 +1233,7 @@ async fn resume_mission(
}

mission.start();
mission.clear_help_request_state();
storage
.save_mission(&mission)
.await
Expand All @@ -1234,7 +1242,18 @@ async fn resume_mission(
.add_active(id)
.await
.map_err(|e| ProblemDetails::internal_error(&e.to_string()))?;
let _ = storage.log_event(id, "Mission resumed via API").await;
let retired = crate::mission::retire_help_requests_for_mission(state.town.channel(), id)
.await
.map_err(|e| ProblemDetails::internal_error(&e.to_string()))?;
let _ = storage
.log_event(
id,
&format!(
"Mission resumed via API (retired {} stale help request(s))",
retired
),
)
.await;

Ok(Json(serde_json::json!({
"id": id.to_string(),
Expand Down
80 changes: 80 additions & 0 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,49 @@ impl Channel {
Ok(agents.into_iter().find(|a| a.name == name))
}

/// Resolve a user-supplied agent reference.
///
/// Accepts a full UUID, short UUID, canonical name, nickname/display name,
/// or full display label. Returns `Ok(None)` when there is no match.
pub async fn resolve_agent_ref(&self, raw: &str) -> Result<Option<crate::agent::Agent>> {
let raw = raw.trim();
if raw.is_empty() {
return Ok(None);
}

if let Ok(agent_id) = raw.parse::<AgentId>() {
return self.get_agent_state(agent_id).await;
}

let raw_lower = raw.to_ascii_lowercase();
let agents = self.list_agents().await?;
let matches: Vec<_> = agents
.into_iter()
.filter(|agent| {
agent.name.eq_ignore_ascii_case(raw)
|| agent.display_name().eq_ignore_ascii_case(raw)
|| agent.display_label().eq_ignore_ascii_case(raw)
|| agent.id.short_id().eq_ignore_ascii_case(&raw_lower)
})
.collect();

match matches.as_slice() {
[] => Ok(None),
[agent] => Ok(Some(agent.clone())),
_ => {
let labels = matches
.iter()
.map(|agent| format!("{} ({})", agent.display_label(), agent.id.short_id()))
.collect::<Vec<_>>()
.join(", ");
Err(crate::Error::Config(format!(
"Agent reference '{}' is ambiguous: {}",
raw, labels
)))
}
}
}

/// Delete an agent from Redis.
pub async fn delete_agent(&self, agent_id: AgentId) -> Result<()> {
let mut conn = self.conn.clone();
Expand Down Expand Up @@ -948,6 +991,43 @@ impl Channel {
Ok(messages)
}

/// Remove inbox messages matching a predicate while preserving order.
pub async fn remove_inbox_messages_matching<F>(
&self,
agent_id: AgentId,
mut predicate: F,
) -> Result<usize>
where
F: FnMut(&Message) -> bool,
{
let mut conn = self.conn.clone();
let inbox_key = self.inbox_key(agent_id);
let items: Vec<String> = conn.lrange(&inbox_key, 0, -1).await?;
if items.is_empty() {
return Ok(0);
}

let mut kept = Vec::with_capacity(items.len());
let mut removed = 0usize;
for item in items {
match serde_json::from_str::<Message>(&item) {
Ok(message) if predicate(&message) => removed += 1,
_ => kept.push(item),
}
}

if removed == 0 {
return Ok(0);
}

let _: () = conn.del(&inbox_key).await?;
if !kept.is_empty() {
let _: () = conn.rpush(&inbox_key, kept).await?;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-atomic inbox read-delete-write loses concurrent messages

Medium Severity

remove_inbox_messages_matching reads the full inbox via lrange, then deletes the key and re-pushes kept messages. This is not atomic—any message pushed to the inbox between the lrange and the del will be silently lost. Since the dispatcher can send help requests to the conductor inbox concurrently with an operator action that triggers this cleanup, the race window is real even if small.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 10ae7f5. Configure here.


Ok(removed)
}

/// Move a message to another agent's inbox.
pub async fn move_message_to_inbox(&self, message: &Message, to_agent: AgentId) -> Result<()> {
// Create a new message with updated recipient
Expand Down
Loading
Loading