diff --git a/crates/cli/src/alignment/mod.rs b/crates/cli/src/alignment/mod.rs index 48e5f2f1..11284ee6 100644 --- a/crates/cli/src/alignment/mod.rs +++ b/crates/cli/src/alignment/mod.rs @@ -110,6 +110,7 @@ pub(crate) struct SessionAlignmentState { aliases: HashMap, completed_aliases: HashMap, pending_subagents: HashMap, + hermes_task_sessions: HashMap>, } impl SessionAlignmentState { @@ -117,6 +118,7 @@ impl SessionAlignmentState { self.aliases.clear(); self.completed_aliases.clear(); self.pending_subagents.clear(); + self.hermes_task_sessions.clear(); } pub(crate) fn alias_for_session(&self, session_id: &str) -> Option { @@ -154,6 +156,8 @@ impl SessionAlignmentState { } pub(crate) fn route_event(&mut self, event: NormalizedEvent) -> NormalizedEvent { + self.record_hermes_task_session(&event); + let event = self.route_hermes_task_event(event); let (event, finished_alias) = route_event_through_alias(event, &self.aliases); let session_id = event.session_id().to_string(); if let Some(child_session_id) = finished_alias.as_ref() { @@ -228,6 +232,8 @@ impl SessionAlignmentState { self.pending_subagents.retain(|child_session_id, pending| { child_session_id != session_id && pending.parent_session_id() != session_id }); + self.hermes_task_sessions.remove(session_id); + prune_hermes_task_sessions(&mut self.hermes_task_sessions, session_id); } pub(crate) fn clear_for_ended_subagent(&mut self, parent_session_id: &str, subagent_id: &str) { @@ -241,9 +247,80 @@ impl SessionAlignmentState { && !(pending.parent_session_id() == parent_session_id && pending.event.session_id == subagent_id) }); + self.hermes_task_sessions + .retain(|session_id, _| session_id != subagent_id); + prune_hermes_task_sessions(&mut self.hermes_task_sessions, subagent_id); + } + + fn record_hermes_task_session(&mut self, event: &NormalizedEvent) { + if normalized_event_agent_kind(event) != AgentKind::Hermes { + return; + } + let Some(task_id) = hermes_task_id(event) else { + return; + }; + let session_id = event.session_id(); + if session_id == task_id { + return; + } + self.hermes_task_sessions + .entry(session_id.to_string()) + .or_default() + .insert(task_id, session_id.to_string()); + } + + fn route_hermes_task_event(&self, event: NormalizedEvent) -> NormalizedEvent { + let should_route = matches!( + event, + NormalizedEvent::ToolStarted(_) | NormalizedEvent::ToolEnded(_) + ) && normalized_event_agent_kind(&event) == AgentKind::Hermes; + if !should_route { + return event; + } + + let task_id = hermes_task_id(&event).unwrap_or_else(|| event.session_id().to_string()); + let session_scope = hermes_task_session_scope(&event); + let Some(session_id) = self.hermes_session_for_task(&task_id, session_scope.as_deref()) + else { + return event; + }; + route_hermes_task_session_event(event, task_id, session_id) + } + + fn hermes_session_for_task( + &self, + task_id: &str, + session_scope: Option<&str>, + ) -> Option { + if let Some(session_scope) = session_scope { + return self + .hermes_task_sessions + .get(session_scope) + .and_then(|tasks| tasks.get(task_id)) + .cloned(); + } + + let mut matches = self + .hermes_task_sessions + .values() + .filter_map(|tasks| tasks.get(task_id).cloned()); + let session_id = matches.next()?; + matches.next().is_none().then_some(session_id) } } +fn prune_hermes_task_sessions( + hermes_task_sessions: &mut HashMap>, + session_id: &str, +) { + hermes_task_sessions.values_mut().for_each(|tasks| { + tasks.retain(|task_id, mapped_session_id| { + task_id != session_id && mapped_session_id != session_id + }); + }); + hermes_task_sessions.retain(|_, tasks| !tasks.is_empty()); +} + // Resolves the session id for a gateway request in precedence order: // explicit NeMo Relay header, agent-native headers, agent-specific body fallbacks, then the // generic OpenAI-compatible `session_id` body field. Keeping the provider fallbacks behind one @@ -607,6 +684,142 @@ fn route_tool_event(event: &mut ToolEvent, alias: &SessionAlias, metadata: Value event.metadata = merge_metadata(event.metadata.clone(), metadata); } +fn route_hermes_task_session_event( + event: NormalizedEvent, + task_id: String, + session_id: String, +) -> NormalizedEvent { + let metadata = json!({ + "session_correlation_status": "task_session_alias", + "session_correlation_source": "hermes_task_id", + "hermes_task_id": task_id, + "hermes_session_id": session_id, + }); + match event { + NormalizedEvent::ToolStarted(mut event) => { + event.session_id = session_id; + event.metadata = merge_metadata(event.metadata, metadata); + NormalizedEvent::ToolStarted(event) + } + NormalizedEvent::ToolEnded(mut event) => { + event.session_id = session_id; + event.metadata = merge_metadata(event.metadata, metadata); + NormalizedEvent::ToolEnded(event) + } + event => event, + } +} + +fn normalized_event_agent_kind(event: &NormalizedEvent) -> AgentKind { + match event { + NormalizedEvent::AgentStarted(event) + | NormalizedEvent::AgentEnded(event) + | NormalizedEvent::TurnEnded(event) + | NormalizedEvent::PromptSubmitted(event) + | NormalizedEvent::Compaction(event) + | NormalizedEvent::Notification(event) + | NormalizedEvent::HookMark(event) => event.agent_kind, + NormalizedEvent::SubagentStarted(event) | NormalizedEvent::SubagentEnded(event) => { + event.agent_kind + } + NormalizedEvent::LlmHint(event) => event.agent_kind, + NormalizedEvent::LlmStarted(event) | NormalizedEvent::LlmEnded(event) => event.agent_kind, + NormalizedEvent::ToolStarted(event) | NormalizedEvent::ToolEnded(event) => event.agent_kind, + } +} + +fn hermes_task_id(event: &NormalizedEvent) -> Option { + match event { + NormalizedEvent::AgentStarted(event) + | NormalizedEvent::AgentEnded(event) + | NormalizedEvent::TurnEnded(event) + | NormalizedEvent::PromptSubmitted(event) + | NormalizedEvent::Compaction(event) + | NormalizedEvent::Notification(event) + | NormalizedEvent::HookMark(event) => { + task_id_from_payload_and_metadata(&event.payload, &event.metadata) + } + NormalizedEvent::SubagentStarted(event) | NormalizedEvent::SubagentEnded(event) => { + task_id_from_payload_and_metadata(&event.payload, &event.metadata) + } + NormalizedEvent::LlmHint(event) => { + task_id_from_payload_and_metadata(&event.payload, &event.metadata) + } + NormalizedEvent::LlmStarted(event) | NormalizedEvent::LlmEnded(event) => { + task_id_from_llm_event(event) + } + NormalizedEvent::ToolStarted(event) | NormalizedEvent::ToolEnded(event) => { + task_id_from_payload_and_metadata(&event.payload, &event.metadata) + } + } +} + +fn hermes_task_session_scope(event: &NormalizedEvent) -> Option { + match event { + NormalizedEvent::AgentStarted(event) + | NormalizedEvent::AgentEnded(event) + | NormalizedEvent::TurnEnded(event) + | NormalizedEvent::PromptSubmitted(event) + | NormalizedEvent::Compaction(event) + | NormalizedEvent::Notification(event) + | NormalizedEvent::HookMark(event) => { + session_scope_from_payload_and_metadata(&event.payload, &event.metadata) + } + NormalizedEvent::SubagentStarted(event) | NormalizedEvent::SubagentEnded(event) => { + session_scope_from_payload_and_metadata(&event.payload, &event.metadata) + } + NormalizedEvent::LlmHint(event) => { + session_scope_from_payload_and_metadata(&event.payload, &event.metadata) + } + NormalizedEvent::LlmStarted(event) | NormalizedEvent::LlmEnded(event) => { + session_scope_from_llm_event(event) + } + NormalizedEvent::ToolStarted(event) | NormalizedEvent::ToolEnded(event) => { + session_scope_from_payload_and_metadata(&event.payload, &event.metadata) + } + } +} + +fn task_id_from_llm_event(event: &LlmEvent) -> Option { + task_id_from_payload_and_metadata(&event.request, &event.metadata) + .or_else(|| task_id_from_payload_and_metadata(&event.response, &event.metadata)) +} + +fn task_id_from_payload_and_metadata(payload: &Value, metadata: &Value) -> Option { + json_string_at(payload, TASK_ID_PATHS).or_else(|| json_string_at(metadata, TASK_ID_PATHS)) +} + +fn session_scope_from_llm_event(event: &LlmEvent) -> Option { + session_scope_from_payload_and_metadata(&event.request, &event.metadata) + .or_else(|| session_scope_from_payload_and_metadata(&event.response, &event.metadata)) +} + +fn session_scope_from_payload_and_metadata(payload: &Value, metadata: &Value) -> Option { + json_string_at(payload, HERMES_SESSION_SCOPE_PATHS) + .or_else(|| json_string_at(metadata, HERMES_SESSION_SCOPE_PATHS)) +} + +const TASK_ID_PATHS: &[&[&str]] = &[ + &["task_id"], + &["taskId"], + &["extra", "task_id"], + &["extra", "taskId"], +]; + +const HERMES_SESSION_SCOPE_PATHS: &[&[&str]] = &[ + &["session_id"], + &["sessionId"], + &["session", "id"], + &["conversation_id"], + &["conversationId"], + &["parent_session_id"], + &["parentSessionId"], + &["extra", "session_id"], + &["extra", "sessionId"], + &["extra", "parent_session_id"], + &["extra", "parentSessionId"], +]; + fn request_user_task_text(payload: &Value) -> Option { payload .get("messages") diff --git a/crates/cli/src/launcher.rs b/crates/cli/src/launcher.rs index b9b3048c..acf5d268 100644 --- a/crates/cli/src/launcher.rs +++ b/crates/cli/src/launcher.rs @@ -19,7 +19,9 @@ use crate::config::{ ServerArgs, any_config_file_exists, resolve_run_config, }; use crate::error::CliError; -use crate::installer::{generated_hooks, hook_forward_command, merge_hooks, read_json_file}; +use crate::installer::{ + generated_hooks, hook_forward_command, merge_hermes_config, merge_hooks, read_json_file, +}; use crate::server; /// Runs a child coding-agent command behind an ephemeral local gateway. @@ -152,7 +154,9 @@ async fn execute_live_run( ) -> Result { let running_server = RunningGateway::start(listener, gateway_config); if let Err(error) = wait_for_health(gateway_url).await { + let restore = prepared.restore(); let _ = running_server.stop().await; + restore?; return Err(error); } let status = prepared.spawn_and_wait().await; @@ -240,6 +244,7 @@ struct PreparedRun { env: Vec<(String, String)>, temp_dirs: Vec, cursor_restore: Option, + hermes_restore: Option, notes: Vec, } @@ -249,6 +254,12 @@ struct CursorRestore { had_original: bool, } +struct HermesRestore { + path: PathBuf, + backup_path: Option, + had_original: bool, +} + struct RunningGateway { shutdown_tx: oneshot::Sender<()>, task: JoinHandle>, @@ -291,6 +302,7 @@ impl PreparedRun { env: vec![("NEMO_RELAY_GATEWAY_URL".into(), gateway_url.into())], temp_dirs: Vec::new(), cursor_restore: None, + hermes_restore: None, notes: Vec::new(), }; if let Some(path) = path_with_transparent_hook_dir() { @@ -314,7 +326,13 @@ impl PreparedRun { } } } - CodingAgent::Hermes => run.prepare_hermes(resolved.agents.hermes.hooks_path.as_deref()), + CodingAgent::Hermes => { + if dry_run { + run.prepare_hermes_dry(resolved.agents.hermes.hooks_path.as_deref())?; + } else { + run.prepare_hermes(resolved.agents.hermes.hooks_path.as_deref())?; + } + } } Ok(run) } @@ -447,18 +465,36 @@ impl PreparedRun { Ok(()) } - // Surfaces where hermes' shell hooks live so users know what `nemo-relay config hermes` wrote. - // Hermes reads hooks from .hermes/config.yaml on its own; this launcher only exports the live - // gateway URL via NEMO_RELAY_GATEWAY_URL so installed hooks reach the ephemeral gateway. - fn prepare_hermes(&mut self, hooks_path: Option<&std::path::Path>) { - let note = match hooks_path { - Some(path) => format!( - "Hermes hooks at {} — re-run `nemo-relay config hermes` to refresh.", - path.display() - ), - None => "Hermes hooks not yet installed — run `nemo-relay config hermes` once so hermes traces under this gateway.".into(), - }; - self.notes.push(note); + // Hermes discovers hooks from `.hermes/config.yaml` instead of command-line flags. For + // transparent runs, temporarily merge gateway hook-forward entries into the configured Hermes + // hook file, then restore it after the child exits. + fn prepare_hermes(&mut self, hooks_path: Option<&std::path::Path>) -> Result<(), CliError> { + let path = hermes_hooks_path(hooks_path)?; + let (had_original, backup_path) = backup_existing_hermes_hooks(&path)?; + write_merged_hermes_hooks(&path)?; + self.env.push(("HERMES_ACCEPT_HOOKS".into(), "1".into())); + self.notes.push(format!( + "temporarily merged NeMo Relay hooks into {}", + path.display() + )); + self.hermes_restore = Some(HermesRestore { + path, + backup_path, + had_original, + }); + Ok(()) + } + + // Records the Hermes hook file that would be patched during a real run without touching the + // filesystem, preserving dry-run as an inspection-only operation. + fn prepare_hermes_dry(&mut self, hooks_path: Option<&std::path::Path>) -> Result<(), CliError> { + let path = hermes_hooks_path(hooks_path)?; + self.env.push(("HERMES_ACCEPT_HOOKS".into(), "1".into())); + self.notes.push(format!( + "would temporarily merge NeMo Relay hooks into {}", + path.display() + )); + Ok(()) } // Spawns the prepared child process with injected environment and waits for its exit status. @@ -473,34 +509,28 @@ impl PreparedRun { child.wait().await.map_err(CliError::from) } - // Removes temporary directories and restores Cursor hook files after the child exits. Restore + // Removes temporary directories and restores patched hook files after the child exits. Restore // errors are surfaced after the child status is collected so cleanup problems are not hidden. fn restore(&self) -> Result<(), CliError> { for dir in &self.temp_dirs { let _ = std::fs::remove_dir_all(dir); } - let Some(cursor) = &self.cursor_restore else { - return Ok(()); - }; - match (&cursor.backup_path, cursor.had_original) { - (Some(backup), true) => { - std::fs::copy(backup, &cursor.path).map_err(|error| { - CliError::Launch(format!( - "failed to restore Cursor hooks from {}: {error}", - backup.display() - )) - })?; - let _ = std::fs::remove_file(backup); - } - (_, false) if cursor.path.exists() => { - std::fs::remove_file(&cursor.path).map_err(|error| { - CliError::Launch(format!( - "failed to remove temporary Cursor hooks {}: {error}", - cursor.path.display() - )) - })?; - } - _ => {} + + if let Some(cursor) = &self.cursor_restore { + restore_hook_file( + &cursor.path, + cursor.backup_path.as_deref(), + cursor.had_original, + "Cursor", + )?; + } + if let Some(hermes) = &self.hermes_restore { + restore_hook_file( + &hermes.path, + hermes.backup_path.as_deref(), + hermes.had_original, + "Hermes", + )?; } Ok(()) } @@ -815,6 +845,17 @@ fn backup_existing_cursor_hooks(path: &Path) -> Result<(bool, Option), Ok((true, Some(backup))) } +// Backs up an existing Hermes hook config before run-mode patching. +fn backup_existing_hermes_hooks(path: &Path) -> Result<(bool, Option), CliError> { + let had_original = path.exists(); + if !had_original { + return Ok((false, None)); + } + let backup = path.with_extension(format!("yaml.nemo-relay-run.bak.{}", timestamp()?)); + std::fs::copy(path, &backup)?; + Ok((true, Some(backup))) +} + // Creates the Cursor hooks parent directory when needed, merges generated gateway hooks with any // existing hook file, and writes the patched JSON used for this transparent run. fn write_merged_cursor_hooks(path: &Path) -> Result<(), CliError> { @@ -837,6 +878,74 @@ fn write_merged_cursor_hooks(path: &Path) -> Result<(), CliError> { Ok(()) } +// Creates the Hermes config parent directory when needed, merges generated gateway hooks with any +// existing YAML config, and writes the patched YAML used for this transparent run. +fn write_merged_hermes_hooks(path: &Path) -> Result<(), CliError> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + let existing = match std::fs::read_to_string(path) { + Ok(raw) => raw, + Err(error) if error.kind() == std::io::ErrorKind::NotFound => String::new(), + Err(error) => return Err(CliError::Io(error)), + }; + let contents = merge_hermes_config( + &existing, + generated_hooks( + CodingAgent::Hermes, + &hook_forward_command(&transparent_hook_executable(), CodingAgent::Hermes), + ), + )?; + std::fs::write(path, contents)?; + Ok(()) +} + +// Chooses the Hermes hook file that transparent run should patch. If setup recorded a specific +// path, reuse it; otherwise fall back to the Hermes home config file that Hermes itself reads. +fn hermes_hooks_path(configured: Option<&Path>) -> Result { + if let Some(path) = configured { + return Ok(path.to_path_buf()); + } + if let Some(home) = std::env::var_os("HERMES_HOME").filter(|value| !value.is_empty()) { + return Ok(PathBuf::from(home).join("config.yaml")); + } + let home = std::env::var_os("HOME") + .or_else(|| std::env::var_os("USERPROFILE")) + .ok_or_else(|| { + CliError::Launch("could not resolve home directory for Hermes hooks".into()) + })?; + Ok(PathBuf::from(home).join(".hermes").join("config.yaml")) +} + +fn restore_hook_file( + path: &Path, + backup_path: Option<&Path>, + had_original: bool, + label: &str, +) -> Result<(), CliError> { + match (backup_path, had_original) { + (Some(backup), true) => { + std::fs::copy(backup, path).map_err(|error| { + CliError::Launch(format!( + "failed to restore {label} hooks from {}: {error}", + backup.display() + )) + })?; + let _ = std::fs::remove_file(backup); + } + (_, false) if path.exists() => { + std::fs::remove_file(path).map_err(|error| { + CliError::Launch(format!( + "failed to remove temporary {label} hooks {}: {error}", + path.display() + )) + })?; + } + _ => {} + } + Ok(()) +} + // Converts JSON hook groups into inline TOML arrays for Codex `--config` flags. The function // preserves matchers when present and assumes generated hook groups contain one command hook. fn hook_groups_toml(value: &Value) -> String { diff --git a/crates/cli/src/session.rs b/crates/cli/src/session.rs index e61cbc37..dc5ed526 100644 --- a/crates/cli/src/session.rs +++ b/crates/cli/src/session.rs @@ -113,7 +113,7 @@ struct Session { // duplicate end hook does not reopen or mark an already-closed worker. completed_subagents: HashSet, llms: HashMap, - tools: HashMap, + tools: HashMap, pending_llm_hints: Vec, pending_tool_hints: Vec, // Maps stable user-task text from confidently owned LLM requests to the subagent that owns @@ -126,6 +126,22 @@ struct Session { config: SessionConfig, } +#[derive(Debug, Clone)] +struct ActiveTool { + handle: ToolHandle, + name: String, + arguments: Value, + owner_subagent_id: Option, +} + +impl std::ops::Deref for ActiveTool { + type Target = ToolHandle; + + fn deref(&self) -> &Self::Target { + &self.handle + } +} + #[derive(Debug, Clone)] struct PendingLlmHint { hint: LlmHintEvent, @@ -1178,7 +1194,11 @@ impl Session { // Ends all active tool calls with a synthetic close result before ending their containing scopes. // Draining first avoids holding mutable map state while the runtime emits lifecycle events. fn close_active_tools(&mut self, reason: &str) -> Result<(), CliError> { - let active_tools: Vec<_> = self.tools.drain().map(|(_, handle)| handle).collect(); + let active_tools: Vec<_> = self + .tools + .drain() + .map(|(_, active)| active.handle) + .collect(); for handle in active_tools { tool_call_end( ToolCallEndParams::builder() @@ -1486,6 +1506,9 @@ impl Session { } else { event.arguments }; + let active_tool_arguments = arguments.clone(); + let active_tool_name = event.tool_name.clone(); + let active_tool_owner_subagent_id = owner.subagent_id.clone(); tool_conditional_execution(event.tool_name.as_str(), &arguments)?; let metadata = tool_correlation_metadata( event.metadata, @@ -1504,7 +1527,15 @@ impl Session { .tool_call_id(event.tool_call_id.clone()) .build(), )?; - self.tools.insert(event.tool_call_id, handle); + self.tools.insert( + event.tool_call_id, + ActiveTool { + handle, + name: active_tool_name, + arguments: active_tool_arguments, + owner_subagent_id: active_tool_owner_subagent_id, + }, + ); Ok(()) } @@ -1517,7 +1548,7 @@ impl Session { .subagent_id .clone() .filter(|subagent_id| self.subagents.contains_key(subagent_id)); - let handle = match self.tools.remove(&event.tool_call_id) { + let handle = match self.remove_tool_handle_for_event(&event) { Some(handle) => handle, None => { let owner = self.resolve_tool_owner(&event); @@ -1567,6 +1598,52 @@ impl Session { Ok(()) } + // Hermes pre/post tool hooks can disagree on call IDs: pre hooks may omit the provider id + // while post hooks carry the final chat-completions tool id. When the ID misses but exactly + // one active tool owned by the same subagent/root scope has the same name and arguments, close + // that start instead of synthesizing a second zero-duration span. + fn remove_tool_handle_for_event(&mut self, event: &ToolEvent) -> Option { + if let Some(active) = self.tools.remove(&event.tool_call_id) { + return Some(active.handle); + } + let owner_subagent_id = self.tool_event_owner_subagent_id(event); + let key = self.matching_active_tool_key(event, owner_subagent_id.as_deref())?; + self.tools.remove(&key).map(|active| active.handle) + } + + fn matching_active_tool_key( + &self, + event: &ToolEvent, + owner_subagent_id: Option<&str>, + ) -> Option { + if event.arguments.is_null() { + return None; + } + let matches = self + .tools + .iter() + .filter_map(|(key, active)| { + (owner_subagent_id + .is_none_or(|owner| active.owner_subagent_id.as_deref() == Some(owner)) + && active.name == event.tool_name + && active.arguments == event.arguments) + .then_some(key.clone()) + }) + .collect::>(); + (matches.len() == 1).then(|| matches[0].clone()) + } + + fn tool_event_owner_subagent_id(&self, event: &ToolEvent) -> Option { + if let Some(subagent_id) = &event.subagent_id + && self.subagents.contains_key(subagent_id) + { + return Some(subagent_id.clone()); + } + self.matching_tool_hint_index(event) + .and_then(|index| self.pending_tool_hints[index].hint.subagent_id.clone()) + .filter(|subagent_id| self.subagents.contains_key(subagent_id)) + } + // Emits a mark event after ensuring the turn scope exists. Generic and unknown hooks use this // path so unsupported agent events remain visible without changing scope structure. fn mark(&mut self, name: &str, event_payload: SessionEvent) -> Result<(), CliError> { diff --git a/crates/cli/tests/coverage/adapters_tests.rs b/crates/cli/tests/coverage/adapters_tests.rs index e8bfb598..aa5208d7 100644 --- a/crates/cli/tests/coverage/adapters_tests.rs +++ b/crates/cli/tests/coverage/adapters_tests.rs @@ -315,6 +315,7 @@ fn drops_uncorrelatable_hermes_pre_tool_call() { json!({ "hook_event_name": "pre_tool_call", "task_id": "task-1", + "tool_call_id": "toolcall-1", "tool_name": "terminal", "tool_input": { "command": "pwd" } }), diff --git a/crates/cli/tests/coverage/alignment_tests.rs b/crates/cli/tests/coverage/alignment_tests.rs index 8806e0dd..763f683e 100644 --- a/crates/cli/tests/coverage/alignment_tests.rs +++ b/crates/cli/tests/coverage/alignment_tests.rs @@ -76,6 +76,41 @@ fn tool_event(session_id: &str, event_name: &str) -> ToolEvent { } } +fn hermes_llm_event(session_id: &str, task_id: &str) -> NormalizedEvent { + NormalizedEvent::LlmStarted(LlmEvent { + session_id: session_id.into(), + agent_kind: AgentKind::Hermes, + event_name: "pre_api_request".into(), + api_call_id: format!("{session_id}:{task_id}:1"), + provider: "custom".into(), + model_name: Some("qwen".into()), + request: json!({ "extra": { "task_id": task_id } }), + response: Value::Null, + metadata: json!({ "event_metadata": "pre_api_request" }), + }) +} + +fn hermes_tool_event(task_id: &str, session_scope: Option<&str>) -> NormalizedEvent { + let mut payload = json!({ "extra": { "task_id": task_id } }); + if let Some(session_scope) = session_scope { + payload["extra"]["parent_session_id"] = json!(session_scope); + } + + NormalizedEvent::ToolStarted(ToolEvent { + session_id: task_id.into(), + agent_kind: AgentKind::Hermes, + event_name: "pre_tool_call".into(), + tool_call_id: format!("{task_id}:tool-1"), + tool_name: "read_file".into(), + subagent_id: None, + arguments: json!({ "path": "README.md" }), + result: Value::Null, + status: None, + payload, + metadata: json!({ "event_metadata": "pre_tool_call" }), + }) +} + fn aliases() -> HashMap { HashMap::from([( "child".into(), @@ -87,6 +122,45 @@ fn aliases() -> HashMap { )]) } +#[test] +fn hermes_task_session_routing_is_scoped_by_parent_session() { + let mut state = SessionAlignmentState::default(); + + state.route_event(hermes_llm_event("hermes-a", "task-1")); + state.route_event(hermes_llm_event("hermes-b", "task-1")); + + let routed_a = state.route_event(hermes_tool_event("task-1", Some("hermes-a"))); + let NormalizedEvent::ToolStarted(routed_a) = routed_a else { + panic!("expected routed Hermes tool event"); + }; + assert_eq!(routed_a.session_id, "hermes-a"); + assert_eq!(routed_a.metadata["hermes_task_id"], json!("task-1")); + assert_eq!(routed_a.metadata["hermes_session_id"], json!("hermes-a")); + + let routed_b = state.route_event(hermes_tool_event("task-1", Some("hermes-b"))); + let NormalizedEvent::ToolStarted(routed_b) = routed_b else { + panic!("expected routed Hermes tool event"); + }; + assert_eq!(routed_b.session_id, "hermes-b"); + assert_eq!(routed_b.metadata["hermes_task_id"], json!("task-1")); + assert_eq!(routed_b.metadata["hermes_session_id"], json!("hermes-b")); +} + +#[test] +fn hermes_task_session_routing_leaves_ambiguous_unscoped_task_event_unchanged() { + let mut state = SessionAlignmentState::default(); + + state.route_event(hermes_llm_event("hermes-a", "task-1")); + state.route_event(hermes_llm_event("hermes-b", "task-1")); + + let routed = state.route_event(hermes_tool_event("task-1", None)); + let NormalizedEvent::ToolStarted(routed) = routed else { + panic!("expected Hermes tool event"); + }; + assert_eq!(routed.session_id, "task-1"); + assert!(routed.metadata.get("hermes_session_id").is_none()); +} + #[test] fn gateway_session_id_uses_explicit_claude_then_codex_fallbacks() { let mut headers = HeaderMap::new(); diff --git a/crates/cli/tests/coverage/launcher_tests.rs b/crates/cli/tests/coverage/launcher_tests.rs index c079e227..c7fc25f9 100644 --- a/crates/cli/tests/coverage/launcher_tests.rs +++ b/crates/cli/tests/coverage/launcher_tests.rs @@ -366,9 +366,20 @@ fn cursor_patching_can_be_disabled() { #[test] fn prepares_hermes_hook_environment() { + let _guard = current_dir_lock().lock().unwrap(); + let temp = tempfile::tempdir().unwrap(); + let previous = std::env::current_dir().unwrap(); + std::env::set_current_dir(temp.path()).unwrap(); + let hooks_path = temp.path().join("hermes-home/config.yaml"); let resolved = ResolvedConfig { gateway: GatewayConfig::default(), - agents: AgentConfigs::default(), + agents: AgentConfigs { + hermes: AgentCommandConfig { + command: None, + hooks_path: Some(hooks_path.clone()), + }, + ..AgentConfigs::default() + }, }; let prepared = PreparedRun::new( CodingAgent::Hermes, @@ -385,12 +396,64 @@ fn prepares_hermes_hook_environment() { "http://127.0.0.1:1234".into() ))); assert!( - !prepared + prepared .env - .iter() - .any(|(name, _)| name == "HERMES_ACCEPT_HOOKS") + .contains(&("HERMES_ACCEPT_HOOKS".into(), "1".into())) + ); + assert_eq!( + prepared + .hermes_restore + .as_ref() + .map(|restore| &restore.path), + Some(&hooks_path) + ); + let hooks = std::fs::read_to_string(&hooks_path).unwrap(); + assert!(hooks.contains("hook-forward hermes")); + assert!(prepared.notes[0].contains("temporarily merged")); + + prepared.restore().unwrap(); + assert!(!hooks_path.exists()); + std::env::set_current_dir(previous).unwrap(); +} + +#[test] +fn hermes_patch_restore_restores_original_file() { + let _guard = current_dir_lock().lock().unwrap(); + let temp = tempfile::tempdir().unwrap(); + let previous = std::env::current_dir().unwrap(); + std::env::set_current_dir(temp.path()).unwrap(); + let hooks_path = temp.path().join("hermes-home/config.yaml"); + std::fs::create_dir_all(hooks_path.parent().unwrap()).unwrap(); + let original = "hooks:\n PreToolUse: []\n"; + std::fs::write(&hooks_path, original).unwrap(); + let resolved = ResolvedConfig { + gateway: GatewayConfig::default(), + agents: AgentConfigs { + hermes: AgentCommandConfig { + command: None, + hooks_path: Some(hooks_path.clone()), + }, + ..AgentConfigs::default() + }, + }; + + let prepared = PreparedRun::new( + CodingAgent::Hermes, + vec!["hermes".into(), "chat".into()], + "http://s", + &resolved, + false, + ) + .unwrap(); + + assert!( + std::fs::read_to_string(&hooks_path) + .unwrap() + .contains("hook-forward hermes") ); - assert!(prepared.notes[0].contains("nemo-relay config hermes")); + prepared.restore().unwrap(); + assert_eq!(std::fs::read_to_string(&hooks_path).unwrap(), original); + std::env::set_current_dir(previous).unwrap(); } #[test] @@ -548,6 +611,7 @@ fn cursor_restore_reports_failed_backup_restore() { backup_path: Some(temp.path().join("missing-backup.json")), had_original: true, }), + hermes_restore: None, notes: vec![], }; @@ -570,6 +634,7 @@ fn cursor_restore_reports_failed_temporary_hook_removal() { backup_path: None, had_original: false, }), + hermes_restore: None, notes: vec![], }; @@ -589,6 +654,7 @@ fn cursor_restore_noops_when_original_was_declared_without_backup() { backup_path: None, had_original: true, }), + hermes_restore: None, notes: vec![], }; @@ -706,6 +772,52 @@ async fn wait_for_health_reports_unready_gateway() { assert!(error.contains("gateway did not become ready")); } +#[tokio::test] +async fn execute_live_run_restores_hermes_hooks_when_health_check_fails() { + let temp = tempfile::tempdir().unwrap(); + let hooks_path = temp.path().join("hermes-home/config.yaml"); + std::fs::create_dir_all(hooks_path.parent().unwrap()).unwrap(); + let original = "hooks:\n PreToolUse: []\n"; + std::fs::write(&hooks_path, original).unwrap(); + let resolved = ResolvedConfig { + gateway: GatewayConfig::default(), + agents: AgentConfigs { + hermes: AgentCommandConfig { + command: None, + hooks_path: Some(hooks_path.clone()), + }, + ..AgentConfigs::default() + }, + }; + let prepared = PreparedRun::new( + CodingAgent::Hermes, + vec!["hermes".into(), "chat".into()], + "http://127.0.0.1:1234", + &resolved, + false, + ) + .unwrap(); + assert!( + std::fs::read_to_string(&hooks_path) + .unwrap() + .contains("hook-forward hermes") + ); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let error = execute_live_run( + listener, + GatewayConfig::default(), + "http://127.0.0.1:1", + prepared, + ) + .await + .unwrap_err() + .to_string(); + + assert!(error.contains("gateway did not become ready")); + assert_eq!(std::fs::read_to_string(&hooks_path).unwrap(), original); +} + #[cfg(unix)] fn make_executable(path: &Path) { use std::os::unix::fs::PermissionsExt; diff --git a/crates/cli/tests/coverage/session_tests.rs b/crates/cli/tests/coverage/session_tests.rs index 6a1f9be7..c848dd96 100644 --- a/crates/cli/tests/coverage/session_tests.rs +++ b/crates/cli/tests/coverage/session_tests.rs @@ -1582,6 +1582,102 @@ async fn hermes_turn_end_snapshots_atif_without_boundary_system_step() { ); } +#[tokio::test] +async fn hermes_task_id_tool_hooks_reuse_api_session() { + let config = session_test_config(); + let manager = SessionManager::new(config); + let headers = HeaderMap::new(); + + for payload in [ + json!({ + "hook_event_name": "on_session_start", + "session_id": "hermes-main" + }), + json!({ + "hook_event_name": "pre_api_request", + "session_id": "hermes-main", + "extra": { + "task_id": "task-1", + "api_call_count": 1, + "provider": "custom", + "model": "qwen", + "request": { + "body": { + "model": "qwen", + "messages": [ + { "role": "user", "content": "read file" } + ] + } + } + } + }), + ] { + let outcome = crate::adapters::hermes::adapt(payload, &headers); + manager + .apply_events(&headers, outcome.events) + .await + .unwrap(); + } + + let pre_tool = crate::adapters::hermes::adapt( + json!({ + "hook_event_name": "pre_tool_call", + "session_id": "hermes-main", + "tool_name": "read_file", + "tool_input": { "path": "README.md" }, + "extra": { + "task_id": "task-1", + "tool_call_id": "tool-1" + } + }), + &headers, + ); + manager + .apply_events(&headers, pre_tool.events) + .await + .unwrap(); + + { + let sessions = manager.inner.lock().await; + assert!(sessions.contains_key("hermes-main")); + assert!( + !sessions.contains_key("task-1"), + "Hermes tool hooks keyed by task_id should not create a duplicate session" + ); + let session = sessions.get("hermes-main").unwrap(); + assert!( + !session.tools.is_empty(), + "pre_tool_call should open an active tool before post_tool_call runs" + ); + } + + let post_tool = crate::adapters::hermes::adapt( + json!({ + "hook_event_name": "post_tool_call", + "session_id": "hermes-main", + "tool_name": "read_file", + "tool_input": { "path": "README.md" }, + "tool_response": { "content": "hello" }, + "extra": { + "task_id": "task-1", + "tool_call_id": "provider-tool-1" + } + }), + &headers, + ); + manager + .apply_events(&headers, post_tool.events) + .await + .unwrap(); + + let sessions = manager.inner.lock().await; + let session = sessions.get("hermes-main").unwrap(); + assert!( + session.tools.is_empty(), + "post_tool_call should close the matching pre_tool_call even when call IDs differ" + ); +} + #[tokio::test] async fn hermes_orphan_subagent_stop_exports_readable_mark_with_lineage() { let _guard = OBSERVABILITY_PLUGIN_TEST_LOCK.lock().await; @@ -3020,6 +3116,139 @@ async fn claude_agent_tool_async_launch_keeps_subagent_open_for_later_hooks() { ); } +#[tokio::test] +async fn active_tool_name_args_fallback_requires_matching_subagent_owner() { + let manager = SessionManager::new(session_test_config()); + let session_id = "tool-owner-fallback"; + let same_args = json!({ "file_path": "README.md" }); + + manager + .apply_events( + &HeaderMap::new(), + vec![ + NormalizedEvent::AgentStarted(session_event(session_id, "SessionStart")), + NormalizedEvent::SubagentStarted(SubagentEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "SubagentStart".into(), + subagent_id: "worker-1".into(), + payload: json!({}), + metadata: json!({}), + }), + NormalizedEvent::SubagentStarted(SubagentEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "SubagentStart".into(), + subagent_id: "worker-2".into(), + payload: json!({}), + metadata: json!({}), + }), + NormalizedEvent::ToolStarted(ToolEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "PreToolUse".into(), + tool_call_id: "worker-1-pre".into(), + tool_name: "Read".into(), + subagent_id: Some("worker-1".into()), + arguments: same_args.clone(), + result: Value::Null, + status: None, + payload: json!({}), + metadata: json!({}), + }), + NormalizedEvent::ToolStarted(ToolEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "PreToolUse".into(), + tool_call_id: "worker-2-pre".into(), + tool_name: "Read".into(), + subagent_id: Some("worker-2".into()), + arguments: same_args.clone(), + result: Value::Null, + status: None, + payload: json!({}), + metadata: json!({}), + }), + NormalizedEvent::ToolEnded(ToolEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "PostToolUse".into(), + tool_call_id: "provider-worker-1".into(), + tool_name: "Read".into(), + subagent_id: Some("worker-1".into()), + arguments: same_args, + result: json!({ "ok": true }), + status: Some("success".into()), + payload: json!({}), + metadata: json!({}), + }), + ], + ) + .await + .unwrap(); + + let sessions = manager.inner.lock().await; + let tools = &sessions.get(session_id).unwrap().tools; + assert!(!tools.contains_key("worker-1-pre")); + assert!(tools.contains_key("worker-2-pre")); + assert!(!tools.contains_key("provider-worker-1")); +} + +#[tokio::test] +async fn active_tool_name_args_fallback_uses_unique_global_match_without_owner() { + let manager = SessionManager::new(session_test_config()); + let session_id = "tool-global-fallback"; + let same_args = json!({ "file_path": "README.md" }); + + manager + .apply_events( + &HeaderMap::new(), + vec![ + NormalizedEvent::AgentStarted(session_event(session_id, "SessionStart")), + NormalizedEvent::SubagentStarted(SubagentEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "SubagentStart".into(), + subagent_id: "worker-1".into(), + payload: json!({}), + metadata: json!({}), + }), + NormalizedEvent::ToolStarted(ToolEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "PreToolUse".into(), + tool_call_id: "worker-1-pre".into(), + tool_name: "Read".into(), + subagent_id: Some("worker-1".into()), + arguments: same_args.clone(), + result: Value::Null, + status: None, + payload: json!({}), + metadata: json!({}), + }), + NormalizedEvent::ToolEnded(ToolEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "PostToolUse".into(), + tool_call_id: "provider-worker-1".into(), + tool_name: "Read".into(), + subagent_id: None, + arguments: same_args, + result: json!({ "ok": true }), + status: Some("success".into()), + payload: json!({}), + metadata: json!({}), + }), + ], + ) + .await + .unwrap(); + + let sessions = manager.inner.lock().await; + let tools = &sessions.get(session_id).unwrap().tools; + assert!(tools.is_empty()); +} + #[tokio::test] async fn agent_end_closes_active_tools_and_duplicate_starts_are_ignored() { let manager = SessionManager::new(session_test_config()); diff --git a/crates/core/src/observability/atif.rs b/crates/core/src/observability/atif.rs index a09af56b..27615f4c 100644 --- a/crates/core/src/observability/atif.rs +++ b/crates/core/src/observability/atif.rs @@ -1335,7 +1335,10 @@ fn llm_request_signature(input: &Json) -> String { } fn llm_response_signature(output: &Json) -> String { - json_to_string(&extract_llm_response_message(output)) + json_to_string(&serde_json::json!({ + "message": extract_llm_response_message(output), + "tool_calls": extract_tool_calls(output), + })) } fn llm_request_correlation_keys(start: &Event, end: &Event) -> HashSet { diff --git a/crates/core/tests/unit/atif_tests.rs b/crates/core/tests/unit/atif_tests.rs index 0dffda5d..090eae10 100644 --- a/crates/core/tests/unit/atif_tests.rs +++ b/crates/core/tests/unit/atif_tests.rs @@ -2061,6 +2061,95 @@ fn test_exporter_dedupes_overlapping_hook_and_gateway_llm_spans() { assert_eq!(metrics.extra.as_ref().unwrap()["total_tokens"], json!(10)); } +#[test] +fn test_exporter_keeps_tool_only_llm_spans_with_different_tool_calls() { + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let base = base_timestamp(); + let parent_uuid = Uuid::now_v7(); + let first_uuid = Uuid::now_v7(); + let second_uuid = Uuid::now_v7(); + let request = json!({ + "messages": [{"role": "user", "content": "Use tools"}], + "model": "test-model" + }); + + let mut first_start = event_builder(first_uuid, EventType::Start) + .name("anthropic.messages") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .input(request.clone()) + .build(); + let mut first_end = event_builder(first_uuid, EventType::End) + .name("anthropic.messages") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .output(json!({ + "type": "message", + "content": [{ + "type": "tool_use", + "id": "toolu-read", + "name": "read_file", + "input": { "path": "README.md" } + }] + })) + .build(); + let mut second_start = event_builder(second_uuid, EventType::Start) + .name("anthropic.messages") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .input(request) + .build(); + let mut second_end = event_builder(second_uuid, EventType::End) + .name("anthropic.messages") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .output(json!({ + "type": "message", + "content": [{ + "type": "tool_use", + "id": "toolu-search", + "name": "web_search", + "input": { "query": "release notes" } + }] + })) + .build(); + + for (idx, event) in [ + &mut first_start, + &mut second_start, + &mut first_end, + &mut second_end, + ] + .into_iter() + .enumerate() + { + set_event_timestamp(event, base + chrono::Duration::milliseconds(idx as i64)); + } + + { + let mut state = exporter.state.lock().unwrap(); + state + .events + .extend([first_start, second_start, first_end, second_end]); + } + + let trajectory = exporter.export().unwrap(); + let tool_call_ids = trajectory + .steps + .iter() + .filter_map(|step| step.tool_calls.as_deref()) + .flat_map(|calls| calls.iter().map(|call| call.tool_call_id.as_str())) + .collect::>(); + + assert!(tool_call_ids.contains("toolu-read")); + assert!(tool_call_ids.contains("toolu-search")); + assert_eq!(tool_call_ids.len(), 2); +} + #[test] fn test_exporter_prefers_gateway_span_over_non_exact_hook_summary() { let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); diff --git a/python/tests/test_scope_local.py b/python/tests/test_scope_local.py index 7fd26a28..7e661b44 100644 --- a/python/tests/test_scope_local.py +++ b/python/tests/test_scope_local.py @@ -70,6 +70,7 @@ def my_tool(args): scope_local.register_tool_sanitize_request(handle, "sl_sanitizer", 1, sanitizer) scope_local.register_subscriber(handle, "sl_sanitizer_sub", lambda e: events.append(e)) result = await tools.execute("sanitized_tool", {"input": "data"}, my_tool) + subscribers.flush() # Sanitize guardrails are observability-only: they do NOT modify args # flowing through the execution pipeline.