From 30e7f10a480f9e04e8cb3ac383f1bc4376b28612 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Tue, 2 Jun 2026 12:21:02 -0700 Subject: [PATCH 1/9] Add NAT telemetry support for Hermes Relay runs Temporarily patch and restore Hermes hook configuration during transparent Relay runs so Hermes lifecycle events reach the local gateway without requiring a permanent user config change. Correlate Hermes task and tool hook events back to the owning session, including fallback matching when provider tool IDs differ between pre and post hooks. Add a Python NatTelemetryExporter that writes canonical ATOF JSONL for NAT ingestion, with README and tests. Signed-off-by: Yuchen Zhang --- crates/cli/src/alignment/mod.rs | 127 ++++++++++++++ crates/cli/src/launcher.rs | 181 ++++++++++++++++---- crates/cli/src/session.rs | 63 ++++++- crates/cli/tests/coverage/launcher_tests.rs | 76 +++++++- crates/cli/tests/coverage/session_tests.rs | 71 ++++++++ python/nemo_relay/README.md | 21 +++ python/nemo_relay/__init__.py | 7 +- python/nemo_relay/nat_exporter.py | 79 +++++++++ python/tests/test_nat_exporter.py | 41 +++++ 9 files changed, 619 insertions(+), 47 deletions(-) create mode 100644 python/nemo_relay/nat_exporter.py create mode 100644 python/tests/test_nat_exporter.py diff --git a/crates/cli/src/alignment/mod.rs b/crates/cli/src/alignment/mod.rs index 48e5f2f1..611af429 100644 --- a/crates/cli/src/alignment/mod.rs +++ b/crates/cli/src/alignment/mod.rs @@ -110,6 +110,7 @@ pub(crate) struct SessionAlignmentState { aliases: HashMap, completed_aliases: HashMap, pending_subagents: HashMap, + hermes_task_sessions: HashMap, } impl SessionAlignmentState { @@ -117,6 +118,7 @@ impl SessionAlignmentState { self.aliases.clear(); self.completed_aliases.clear(); self.pending_subagents.clear(); + self.hermes_task_sessions.clear(); } pub(crate) fn alias_for_session(&self, session_id: &str) -> Option { @@ -154,6 +156,8 @@ impl SessionAlignmentState { } pub(crate) fn route_event(&mut self, event: NormalizedEvent) -> NormalizedEvent { + self.record_hermes_task_session(&event); + let event = self.route_hermes_task_event(event); let (event, finished_alias) = route_event_through_alias(event, &self.aliases); let session_id = event.session_id().to_string(); if let Some(child_session_id) = finished_alias.as_ref() { @@ -228,6 +232,10 @@ impl SessionAlignmentState { self.pending_subagents.retain(|child_session_id, pending| { child_session_id != session_id && pending.parent_session_id() != session_id }); + self.hermes_task_sessions + .retain(|task_id, mapped_session_id| { + task_id != session_id && mapped_session_id != session_id + }); } pub(crate) fn clear_for_ended_subagent(&mut self, parent_session_id: &str, subagent_id: &str) { @@ -241,6 +249,39 @@ impl SessionAlignmentState { && !(pending.parent_session_id() == parent_session_id && pending.event.session_id == subagent_id) }); + self.hermes_task_sessions + .retain(|_, mapped_session_id| mapped_session_id != subagent_id); + } + + fn record_hermes_task_session(&mut self, event: &NormalizedEvent) { + if normalized_event_agent_kind(event) != AgentKind::Hermes { + return; + } + let Some(task_id) = hermes_task_id(event) else { + return; + }; + let session_id = event.session_id(); + if session_id == task_id { + return; + } + self.hermes_task_sessions + .insert(task_id, session_id.to_string()); + } + + fn route_hermes_task_event(&self, event: NormalizedEvent) -> NormalizedEvent { + let should_route = matches!( + event, + NormalizedEvent::ToolStarted(_) | NormalizedEvent::ToolEnded(_) + ) && normalized_event_agent_kind(&event) == AgentKind::Hermes; + if !should_route { + return event; + } + + let task_id = event.session_id().to_string(); + let Some(session_id) = self.hermes_task_sessions.get(&task_id).cloned() else { + return event; + }; + route_hermes_task_session_event(event, task_id, session_id) } } @@ -607,6 +648,92 @@ fn route_tool_event(event: &mut ToolEvent, alias: &SessionAlias, metadata: Value event.metadata = merge_metadata(event.metadata.clone(), metadata); } +fn route_hermes_task_session_event( + event: NormalizedEvent, + task_id: String, + session_id: String, +) -> NormalizedEvent { + let metadata = json!({ + "session_correlation_status": "task_session_alias", + "session_correlation_source": "hermes_task_id", + "hermes_task_id": task_id, + "hermes_session_id": session_id, + }); + match event { + NormalizedEvent::ToolStarted(mut event) => { + event.session_id = session_id; + event.metadata = merge_metadata(event.metadata, metadata); + NormalizedEvent::ToolStarted(event) + } + NormalizedEvent::ToolEnded(mut event) => { + event.session_id = session_id; + event.metadata = merge_metadata(event.metadata, metadata); + NormalizedEvent::ToolEnded(event) + } + event => event, + } +} + +fn normalized_event_agent_kind(event: &NormalizedEvent) -> AgentKind { + match event { + NormalizedEvent::AgentStarted(event) + | NormalizedEvent::AgentEnded(event) + | NormalizedEvent::TurnEnded(event) + | NormalizedEvent::PromptSubmitted(event) + | NormalizedEvent::Compaction(event) + | NormalizedEvent::Notification(event) + | NormalizedEvent::HookMark(event) => event.agent_kind, + NormalizedEvent::SubagentStarted(event) | NormalizedEvent::SubagentEnded(event) => { + event.agent_kind + } + NormalizedEvent::LlmHint(event) => event.agent_kind, + NormalizedEvent::LlmStarted(event) | NormalizedEvent::LlmEnded(event) => event.agent_kind, + NormalizedEvent::ToolStarted(event) | NormalizedEvent::ToolEnded(event) => event.agent_kind, + } +} + +fn hermes_task_id(event: &NormalizedEvent) -> Option { + match event { + NormalizedEvent::AgentStarted(event) + | NormalizedEvent::AgentEnded(event) + | NormalizedEvent::TurnEnded(event) + | NormalizedEvent::PromptSubmitted(event) + | NormalizedEvent::Compaction(event) + | NormalizedEvent::Notification(event) + | NormalizedEvent::HookMark(event) => { + task_id_from_payload_and_metadata(&event.payload, &event.metadata) + } + NormalizedEvent::SubagentStarted(event) | NormalizedEvent::SubagentEnded(event) => { + task_id_from_payload_and_metadata(&event.payload, &event.metadata) + } + NormalizedEvent::LlmHint(event) => { + task_id_from_payload_and_metadata(&event.payload, &event.metadata) + } + NormalizedEvent::LlmStarted(event) | NormalizedEvent::LlmEnded(event) => { + task_id_from_llm_event(event) + } + NormalizedEvent::ToolStarted(event) | NormalizedEvent::ToolEnded(event) => { + task_id_from_payload_and_metadata(&event.payload, &event.metadata) + } + } +} + +fn task_id_from_llm_event(event: &LlmEvent) -> Option { + task_id_from_payload_and_metadata(&event.request, &event.metadata) + .or_else(|| task_id_from_payload_and_metadata(&event.response, &event.metadata)) +} + +fn task_id_from_payload_and_metadata(payload: &Value, metadata: &Value) -> Option { + json_string_at(payload, TASK_ID_PATHS).or_else(|| json_string_at(metadata, TASK_ID_PATHS)) +} + +const TASK_ID_PATHS: &[&[&str]] = &[ + &["task_id"], + &["taskId"], + &["extra", "task_id"], + &["extra", "taskId"], +]; + fn request_user_task_text(payload: &Value) -> Option { payload .get("messages") diff --git a/crates/cli/src/launcher.rs b/crates/cli/src/launcher.rs index b9b3048c..9d535e0c 100644 --- a/crates/cli/src/launcher.rs +++ b/crates/cli/src/launcher.rs @@ -19,7 +19,9 @@ use crate::config::{ ServerArgs, any_config_file_exists, resolve_run_config, }; use crate::error::CliError; -use crate::installer::{generated_hooks, hook_forward_command, merge_hooks, read_json_file}; +use crate::installer::{ + generated_hooks, hook_forward_command, merge_hermes_config, merge_hooks, read_json_file, +}; use crate::server; /// Runs a child coding-agent command behind an ephemeral local gateway. @@ -240,6 +242,7 @@ struct PreparedRun { env: Vec<(String, String)>, temp_dirs: Vec, cursor_restore: Option, + hermes_restore: Option, notes: Vec, } @@ -249,6 +252,12 @@ struct CursorRestore { had_original: bool, } +struct HermesRestore { + path: PathBuf, + backup_path: Option, + had_original: bool, +} + struct RunningGateway { shutdown_tx: oneshot::Sender<()>, task: JoinHandle>, @@ -291,6 +300,7 @@ impl PreparedRun { env: vec![("NEMO_RELAY_GATEWAY_URL".into(), gateway_url.into())], temp_dirs: Vec::new(), cursor_restore: None, + hermes_restore: None, notes: Vec::new(), }; if let Some(path) = path_with_transparent_hook_dir() { @@ -314,7 +324,13 @@ impl PreparedRun { } } } - CodingAgent::Hermes => run.prepare_hermes(resolved.agents.hermes.hooks_path.as_deref()), + CodingAgent::Hermes => { + if dry_run { + run.prepare_hermes_dry(resolved.agents.hermes.hooks_path.as_deref())?; + } else { + run.prepare_hermes(resolved.agents.hermes.hooks_path.as_deref())?; + } + } } Ok(run) } @@ -447,18 +463,36 @@ impl PreparedRun { Ok(()) } - // Surfaces where hermes' shell hooks live so users know what `nemo-relay config hermes` wrote. - // Hermes reads hooks from .hermes/config.yaml on its own; this launcher only exports the live - // gateway URL via NEMO_RELAY_GATEWAY_URL so installed hooks reach the ephemeral gateway. - fn prepare_hermes(&mut self, hooks_path: Option<&std::path::Path>) { - let note = match hooks_path { - Some(path) => format!( - "Hermes hooks at {} — re-run `nemo-relay config hermes` to refresh.", - path.display() - ), - None => "Hermes hooks not yet installed — run `nemo-relay config hermes` once so hermes traces under this gateway.".into(), - }; - self.notes.push(note); + // Hermes discovers hooks from `.hermes/config.yaml` instead of command-line flags. For + // transparent runs, temporarily merge gateway hook-forward entries into the configured Hermes + // hook file, then restore it after the child exits. + fn prepare_hermes(&mut self, hooks_path: Option<&std::path::Path>) -> Result<(), CliError> { + let path = hermes_hooks_path(hooks_path)?; + let (had_original, backup_path) = backup_existing_hermes_hooks(&path)?; + write_merged_hermes_hooks(&path)?; + self.env.push(("HERMES_ACCEPT_HOOKS".into(), "1".into())); + self.notes.push(format!( + "temporarily merged NeMo Relay hooks into {}", + path.display() + )); + self.hermes_restore = Some(HermesRestore { + path, + backup_path, + had_original, + }); + Ok(()) + } + + // Records the Hermes hook file that would be patched during a real run without touching the + // filesystem, preserving dry-run as an inspection-only operation. + fn prepare_hermes_dry(&mut self, hooks_path: Option<&std::path::Path>) -> Result<(), CliError> { + let path = hermes_hooks_path(hooks_path)?; + self.env.push(("HERMES_ACCEPT_HOOKS".into(), "1".into())); + self.notes.push(format!( + "would temporarily merge NeMo Relay hooks into {}", + path.display() + )); + Ok(()) } // Spawns the prepared child process with injected environment and waits for its exit status. @@ -473,34 +507,28 @@ impl PreparedRun { child.wait().await.map_err(CliError::from) } - // Removes temporary directories and restores Cursor hook files after the child exits. Restore + // Removes temporary directories and restores patched hook files after the child exits. Restore // errors are surfaced after the child status is collected so cleanup problems are not hidden. fn restore(&self) -> Result<(), CliError> { for dir in &self.temp_dirs { let _ = std::fs::remove_dir_all(dir); } - let Some(cursor) = &self.cursor_restore else { - return Ok(()); - }; - match (&cursor.backup_path, cursor.had_original) { - (Some(backup), true) => { - std::fs::copy(backup, &cursor.path).map_err(|error| { - CliError::Launch(format!( - "failed to restore Cursor hooks from {}: {error}", - backup.display() - )) - })?; - let _ = std::fs::remove_file(backup); - } - (_, false) if cursor.path.exists() => { - std::fs::remove_file(&cursor.path).map_err(|error| { - CliError::Launch(format!( - "failed to remove temporary Cursor hooks {}: {error}", - cursor.path.display() - )) - })?; - } - _ => {} + + if let Some(cursor) = &self.cursor_restore { + restore_hook_file( + &cursor.path, + cursor.backup_path.as_deref(), + cursor.had_original, + "Cursor", + )?; + } + if let Some(hermes) = &self.hermes_restore { + restore_hook_file( + &hermes.path, + hermes.backup_path.as_deref(), + hermes.had_original, + "Hermes", + )?; } Ok(()) } @@ -815,6 +843,17 @@ fn backup_existing_cursor_hooks(path: &Path) -> Result<(bool, Option), Ok((true, Some(backup))) } +// Backs up an existing Hermes hook config before run-mode patching. +fn backup_existing_hermes_hooks(path: &Path) -> Result<(bool, Option), CliError> { + let had_original = path.exists(); + if !had_original { + return Ok((false, None)); + } + let backup = path.with_extension(format!("yaml.nemo-relay-run.bak.{}", timestamp()?)); + std::fs::copy(path, &backup)?; + Ok((true, Some(backup))) +} + // Creates the Cursor hooks parent directory when needed, merges generated gateway hooks with any // existing hook file, and writes the patched JSON used for this transparent run. fn write_merged_cursor_hooks(path: &Path) -> Result<(), CliError> { @@ -837,6 +876,74 @@ fn write_merged_cursor_hooks(path: &Path) -> Result<(), CliError> { Ok(()) } +// Creates the Hermes config parent directory when needed, merges generated gateway hooks with any +// existing YAML config, and writes the patched YAML used for this transparent run. +fn write_merged_hermes_hooks(path: &Path) -> Result<(), CliError> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + let existing = match std::fs::read_to_string(path) { + Ok(raw) => raw, + Err(error) if error.kind() == std::io::ErrorKind::NotFound => String::new(), + Err(error) => return Err(CliError::Io(error)), + }; + let contents = merge_hermes_config( + &existing, + generated_hooks( + CodingAgent::Hermes, + &hook_forward_command(&transparent_hook_executable(), CodingAgent::Hermes), + ), + )?; + std::fs::write(path, contents)?; + Ok(()) +} + +// Chooses the Hermes hook file that transparent run should patch. If setup recorded a specific +// path, reuse it; otherwise fall back to the Hermes home config file that Hermes itself reads. +fn hermes_hooks_path(configured: Option<&Path>) -> Result { + if let Some(path) = configured { + return Ok(path.to_path_buf()); + } + if let Some(home) = std::env::var_os("HERMES_HOME").filter(|value| !value.is_empty()) { + return Ok(PathBuf::from(home).join("config.yaml")); + } + let home = std::env::var_os("HOME") + .or_else(|| std::env::var_os("USERPROFILE")) + .ok_or_else(|| { + CliError::Launch("could not resolve home directory for Hermes hooks".into()) + })?; + Ok(PathBuf::from(home).join(".hermes").join("config.yaml")) +} + +fn restore_hook_file( + path: &Path, + backup_path: Option<&Path>, + had_original: bool, + label: &str, +) -> Result<(), CliError> { + match (backup_path, had_original) { + (Some(backup), true) => { + std::fs::copy(backup, path).map_err(|error| { + CliError::Launch(format!( + "failed to restore {label} hooks from {}: {error}", + backup.display() + )) + })?; + let _ = std::fs::remove_file(backup); + } + (_, false) if path.exists() => { + std::fs::remove_file(path).map_err(|error| { + CliError::Launch(format!( + "failed to remove temporary {label} hooks {}: {error}", + path.display() + )) + })?; + } + _ => {} + } + Ok(()) +} + // Converts JSON hook groups into inline TOML arrays for Codex `--config` flags. The function // preserves matchers when present and assumes generated hook groups contain one command hook. fn hook_groups_toml(value: &Value) -> String { diff --git a/crates/cli/src/session.rs b/crates/cli/src/session.rs index e61cbc37..4abda4cc 100644 --- a/crates/cli/src/session.rs +++ b/crates/cli/src/session.rs @@ -113,7 +113,7 @@ struct Session { // duplicate end hook does not reopen or mark an already-closed worker. completed_subagents: HashSet, llms: HashMap, - tools: HashMap, + tools: HashMap, pending_llm_hints: Vec, pending_tool_hints: Vec, // Maps stable user-task text from confidently owned LLM requests to the subagent that owns @@ -126,6 +126,21 @@ struct Session { config: SessionConfig, } +#[derive(Debug, Clone)] +struct ActiveTool { + handle: ToolHandle, + name: String, + arguments: Value, +} + +impl std::ops::Deref for ActiveTool { + type Target = ToolHandle; + + fn deref(&self) -> &Self::Target { + &self.handle + } +} + #[derive(Debug, Clone)] struct PendingLlmHint { hint: LlmHintEvent, @@ -1178,7 +1193,11 @@ impl Session { // Ends all active tool calls with a synthetic close result before ending their containing scopes. // Draining first avoids holding mutable map state while the runtime emits lifecycle events. fn close_active_tools(&mut self, reason: &str) -> Result<(), CliError> { - let active_tools: Vec<_> = self.tools.drain().map(|(_, handle)| handle).collect(); + let active_tools: Vec<_> = self + .tools + .drain() + .map(|(_, active)| active.handle) + .collect(); for handle in active_tools { tool_call_end( ToolCallEndParams::builder() @@ -1486,6 +1505,8 @@ impl Session { } else { event.arguments }; + let active_tool_arguments = arguments.clone(); + let active_tool_name = event.tool_name.clone(); tool_conditional_execution(event.tool_name.as_str(), &arguments)?; let metadata = tool_correlation_metadata( event.metadata, @@ -1504,7 +1525,14 @@ impl Session { .tool_call_id(event.tool_call_id.clone()) .build(), )?; - self.tools.insert(event.tool_call_id, handle); + self.tools.insert( + event.tool_call_id, + ActiveTool { + handle, + name: active_tool_name, + arguments: active_tool_arguments, + }, + ); Ok(()) } @@ -1517,7 +1545,7 @@ impl Session { .subagent_id .clone() .filter(|subagent_id| self.subagents.contains_key(subagent_id)); - let handle = match self.tools.remove(&event.tool_call_id) { + let handle = match self.remove_tool_handle_for_event(&event) { Some(handle) => handle, None => { let owner = self.resolve_tool_owner(&event); @@ -1567,6 +1595,33 @@ impl Session { Ok(()) } + // Hermes pre/post tool hooks can disagree on call IDs: pre hooks may omit the provider id + // while post hooks carry the final chat-completions tool id. When the ID misses but exactly + // one active tool has the same name and arguments, close that start instead of synthesizing a + // second zero-duration span. + fn remove_tool_handle_for_event(&mut self, event: &ToolEvent) -> Option { + if let Some(active) = self.tools.remove(&event.tool_call_id) { + return Some(active.handle); + } + let key = self.matching_active_tool_key(event)?; + self.tools.remove(&key).map(|active| active.handle) + } + + fn matching_active_tool_key(&self, event: &ToolEvent) -> Option { + if event.arguments.is_null() { + return None; + } + let matches = self + .tools + .iter() + .filter_map(|(key, active)| { + (active.name == event.tool_name && active.arguments == event.arguments) + .then_some(key.clone()) + }) + .collect::>(); + (matches.len() == 1).then(|| matches[0].clone()) + } + // Emits a mark event after ensuring the turn scope exists. Generic and unknown hooks use this // path so unsupported agent events remain visible without changing scope structure. fn mark(&mut self, name: &str, event_payload: SessionEvent) -> Result<(), CliError> { diff --git a/crates/cli/tests/coverage/launcher_tests.rs b/crates/cli/tests/coverage/launcher_tests.rs index c079e227..0d5d69a9 100644 --- a/crates/cli/tests/coverage/launcher_tests.rs +++ b/crates/cli/tests/coverage/launcher_tests.rs @@ -366,9 +366,20 @@ fn cursor_patching_can_be_disabled() { #[test] fn prepares_hermes_hook_environment() { + let _guard = current_dir_lock().lock().unwrap(); + let temp = tempfile::tempdir().unwrap(); + let previous = std::env::current_dir().unwrap(); + std::env::set_current_dir(temp.path()).unwrap(); + let hooks_path = temp.path().join("hermes-home/config.yaml"); let resolved = ResolvedConfig { gateway: GatewayConfig::default(), - agents: AgentConfigs::default(), + agents: AgentConfigs { + hermes: AgentCommandConfig { + command: None, + hooks_path: Some(hooks_path.clone()), + }, + ..AgentConfigs::default() + }, }; let prepared = PreparedRun::new( CodingAgent::Hermes, @@ -385,12 +396,64 @@ fn prepares_hermes_hook_environment() { "http://127.0.0.1:1234".into() ))); assert!( - !prepared + prepared .env - .iter() - .any(|(name, _)| name == "HERMES_ACCEPT_HOOKS") + .contains(&("HERMES_ACCEPT_HOOKS".into(), "1".into())) ); - assert!(prepared.notes[0].contains("nemo-relay config hermes")); + assert_eq!( + prepared + .hermes_restore + .as_ref() + .map(|restore| &restore.path), + Some(&hooks_path) + ); + let hooks = std::fs::read_to_string(&hooks_path).unwrap(); + assert!(hooks.contains("hook-forward hermes")); + assert!(prepared.notes[0].contains("temporarily merged")); + + prepared.restore().unwrap(); + assert!(!hooks_path.exists()); + std::env::set_current_dir(previous).unwrap(); +} + +#[test] +fn hermes_patch_restore_restores_original_file() { + let _guard = current_dir_lock().lock().unwrap(); + let temp = tempfile::tempdir().unwrap(); + let previous = std::env::current_dir().unwrap(); + std::env::set_current_dir(temp.path()).unwrap(); + let hooks_path = temp.path().join("hermes-home/config.yaml"); + std::fs::create_dir_all(hooks_path.parent().unwrap()).unwrap(); + let original = "hooks:\n PreToolUse: []\n"; + std::fs::write(&hooks_path, original).unwrap(); + let resolved = ResolvedConfig { + gateway: GatewayConfig::default(), + agents: AgentConfigs { + hermes: AgentCommandConfig { + command: None, + hooks_path: Some(hooks_path.clone()), + }, + ..AgentConfigs::default() + }, + }; + + let prepared = PreparedRun::new( + CodingAgent::Hermes, + vec!["hermes".into(), "chat".into()], + "http://s", + &resolved, + false, + ) + .unwrap(); + + assert!( + std::fs::read_to_string(&hooks_path) + .unwrap() + .contains("hook-forward hermes") + ); + prepared.restore().unwrap(); + assert_eq!(std::fs::read_to_string(&hooks_path).unwrap(), original); + std::env::set_current_dir(previous).unwrap(); } #[test] @@ -548,6 +611,7 @@ fn cursor_restore_reports_failed_backup_restore() { backup_path: Some(temp.path().join("missing-backup.json")), had_original: true, }), + hermes_restore: None, notes: vec![], }; @@ -570,6 +634,7 @@ fn cursor_restore_reports_failed_temporary_hook_removal() { backup_path: None, had_original: false, }), + hermes_restore: None, notes: vec![], }; @@ -589,6 +654,7 @@ fn cursor_restore_noops_when_original_was_declared_without_backup() { backup_path: None, had_original: true, }), + hermes_restore: None, notes: vec![], }; diff --git a/crates/cli/tests/coverage/session_tests.rs b/crates/cli/tests/coverage/session_tests.rs index 6a1f9be7..40dd2157 100644 --- a/crates/cli/tests/coverage/session_tests.rs +++ b/crates/cli/tests/coverage/session_tests.rs @@ -1582,6 +1582,77 @@ async fn hermes_turn_end_snapshots_atif_without_boundary_system_step() { ); } +#[tokio::test] +async fn hermes_task_id_tool_hooks_reuse_api_session() { + let config = session_test_config(); + let manager = SessionManager::new(config); + let headers = HeaderMap::new(); + + for payload in [ + json!({ + "hook_event_name": "on_session_start", + "session_id": "hermes-main" + }), + json!({ + "hook_event_name": "pre_api_request", + "session_id": "hermes-main", + "extra": { + "task_id": "task-1", + "api_call_count": 1, + "provider": "custom", + "model": "qwen", + "request": { + "body": { + "model": "qwen", + "messages": [ + { "role": "user", "content": "read file" } + ] + } + } + } + }), + json!({ + "hook_event_name": "pre_tool_call", + "session_id": "", + "tool_name": "read_file", + "tool_input": { "path": "README.md" }, + "extra": { + "task_id": "task-1", + "tool_call_id": "tool-1" + } + }), + json!({ + "hook_event_name": "post_tool_call", + "session_id": "", + "tool_name": "read_file", + "tool_input": { "path": "README.md" }, + "tool_response": { "content": "hello" }, + "extra": { + "task_id": "task-1", + "tool_call_id": "provider-tool-1" + } + }), + ] { + let outcome = crate::adapters::hermes::adapt(payload, &headers); + manager + .apply_events(&headers, outcome.events) + .await + .unwrap(); + } + + let sessions = manager.inner.lock().await; + assert!(sessions.contains_key("hermes-main")); + assert!( + !sessions.contains_key("task-1"), + "Hermes tool hooks keyed by task_id should not create a duplicate session" + ); + let session = sessions.get("hermes-main").unwrap(); + assert!( + session.tools.is_empty(), + "post_tool_call should close the matching pre_tool_call even when call IDs differ" + ); +} + #[tokio::test] async fn hermes_orphan_subagent_stop_exports_readable_mark_with_lineage() { let _guard = OBSERVABILITY_PLUGIN_TEST_LOCK.lock().await; diff --git a/python/nemo_relay/README.md b/python/nemo_relay/README.md index 6f81c80c..aaf95ac1 100644 --- a/python/nemo_relay/README.md +++ b/python/nemo_relay/README.md @@ -185,9 +185,30 @@ The public package modules are: - `nemo_relay.plugin` - `nemo_relay.adaptive` - `nemo_relay.observability` +- `nemo_relay.nat_exporter` - `nemo_relay.typed` - `nemo_relay.codecs` +### NeMo Agent Toolkit Export + +Use `NatTelemetryExporter` when a Python host process needs to hand Relay +events to NVIDIA NeMo Agent Toolkit. It registers as a normal Relay subscriber +and writes canonical ATOF JSONL that NeMo Agent Toolkit can replay into its +telemetry stream. + +```python +import nemo_relay + +exporter = nemo_relay.NatTelemetryExporter("relay-events.jsonl") +exporter.register("nat") +try: + with nemo_relay.scope.scope("agent", nemo_relay.ScopeType.Agent): + nemo_relay.scope.event("ready") +finally: + exporter.deregister("nat") + exporter.shutdown() +``` + ### Integrations - `nemo_relay.integrations.langchain` diff --git a/python/nemo_relay/__init__.py b/python/nemo_relay/__init__.py index 3e27a98b..013cef22 100644 --- a/python/nemo_relay/__init__.py +++ b/python/nemo_relay/__init__.py @@ -24,7 +24,8 @@ - native runtime types such as ``ScopeHandle``, ``ToolHandle``, ``LLMHandle``, ``LLMRequest``, ``ScopeType``, and the lifecycle event classes - observability helpers such as ``AtifExporter``, ``AtofExporter``, - ``OpenTelemetrySubscriber``, and ``OpenInferenceSubscriber`` + ``OpenTelemetrySubscriber``, ``OpenInferenceSubscriber``, and + ``NatTelemetryExporter`` - JSON and callback type aliases used by middleware, typed wrappers, and plugin-facing configuration helpers @@ -188,6 +189,7 @@ async def main(): guardrails, intercepts, llm, + nat_exporter, observability, plugin, scope, @@ -196,6 +198,7 @@ async def main(): tools, typed, ) +from nemo_relay.nat_exporter import NatTelemetryExporter # noqa: E402 _scope_stack_var: contextvars.ContextVar[ScopeStack] = contextvars.ContextVar("scope_stack") @@ -425,6 +428,7 @@ def worker() -> None: "llm", "guardrails", "intercepts", + "nat_exporter", "subscribers", "scope_local", "codecs", @@ -457,6 +461,7 @@ def worker() -> None: "AtofExporterMode", "AtofExporterConfig", "AtofExporter", + "NatTelemetryExporter", "OpenInferenceConfig", "OpenInferenceSubscriber", "OpenTelemetryConfig", diff --git a/python/nemo_relay/nat_exporter.py b/python/nemo_relay/nat_exporter.py new file mode 100644 index 00000000..0a184f49 --- /dev/null +++ b/python/nemo_relay/nat_exporter.py @@ -0,0 +1,79 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""NAT-oriented NeMo Relay event exporter. + +The exporter writes canonical ATOF event JSONL that NVIDIA NeMo Agent Toolkit +can replay into its intermediate-step telemetry stream. +""" + +from __future__ import annotations + +import json +import threading +from pathlib import Path +from typing import Literal + +from nemo_relay import subscribers + + +class NatTelemetryExporter: + """Write Relay events as JSONL for NeMo Agent Toolkit ingestion.""" + + def __init__(self, path: str | Path, *, mode: Literal["append", "overwrite"] = "overwrite") -> None: + self._path = Path(path) + self._lock = threading.Lock() + self._closed = False + self._path.parent.mkdir(parents=True, exist_ok=True) + if mode == "overwrite": + self._path.write_text("") + elif mode == "append": + self._path.touch() + else: + raise ValueError("mode must be 'append' or 'overwrite'") + + @property + def path(self) -> str: + """Return the JSONL output path.""" + return str(self._path) + + def subscriber(self, event) -> None: + """Subscriber callback that writes one event per JSONL line.""" + with self._lock: + if self._closed: + return + with self._path.open("a") as output: + output.write(_event_to_json(event)) + output.write("\n") + + def register(self, name: str) -> None: + """Register this exporter as a global Relay subscriber.""" + subscribers.register(name, self.subscriber) + + def deregister(self, name: str) -> bool: + """Deregister a previously registered exporter subscriber.""" + return subscribers.deregister(name) + + def force_flush(self) -> None: + """Wait for queued Relay subscriber callbacks to finish.""" + subscribers.flush() + + def shutdown(self) -> None: + """Flush queued callbacks and reject future writes.""" + self.force_flush() + with self._lock: + self._closed = True + + def __repr__(self) -> str: + return f"" + + +def _event_to_json(event) -> str: + if hasattr(event, "to_json"): + return event.to_json() + if hasattr(event, "to_dict"): + return json.dumps(event.to_dict(), separators=(",", ":")) + return json.dumps(event, separators=(",", ":")) + + +__all__ = ["NatTelemetryExporter"] diff --git a/python/tests/test_nat_exporter.py b/python/tests/test_nat_exporter.py new file mode 100644 index 00000000..2f547b75 --- /dev/null +++ b/python/tests/test_nat_exporter.py @@ -0,0 +1,41 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for the NAT telemetry exporter shim.""" + +import json +from uuid import uuid4 + +import nemo_relay +from nemo_relay import ScopeType, scope, subscribers +from nemo_relay.nat_exporter import NatTelemetryExporter + + +def test_nat_telemetry_exporter_writes_relay_event_jsonl(tmp_path): + exporter = NatTelemetryExporter(tmp_path / "relay-events.jsonl") + subscriber_name = f"nat_exporter_{uuid4().hex}" + + exporter.register(subscriber_name) + try: + handle = scope.push("nat_exporter_root", ScopeType.Agent, data={"input": True}) + try: + scope.event("nat_exporter_mark", handle=handle, data={"step": 1}) + finally: + scope.pop(handle, output={"done": True}) + exporter.force_flush() + finally: + exporter.deregister(subscriber_name) + exporter.shutdown() + subscribers.deregister(subscriber_name) + + lines = [json.loads(line) for line in (tmp_path / "relay-events.jsonl").read_text().splitlines()] + + assert [line["kind"] for line in lines] == ["scope", "mark", "scope"] + assert lines[0]["name"] == "nat_exporter_root" + assert lines[1]["data"] == {"step": 1} + assert lines[2]["scope_category"] == "end" + assert " Date: Tue, 2 Jun 2026 20:37:11 -0700 Subject: [PATCH 2/9] Remove redundant NAT telemetry exporter Signed-off-by: Yuchen Zhang --- python/nemo_relay/README.md | 21 -------- python/nemo_relay/__init__.py | 7 +-- python/nemo_relay/nat_exporter.py | 79 ------------------------------- python/tests/test_nat_exporter.py | 41 ---------------- 4 files changed, 1 insertion(+), 147 deletions(-) delete mode 100644 python/nemo_relay/nat_exporter.py delete mode 100644 python/tests/test_nat_exporter.py diff --git a/python/nemo_relay/README.md b/python/nemo_relay/README.md index aaf95ac1..6f81c80c 100644 --- a/python/nemo_relay/README.md +++ b/python/nemo_relay/README.md @@ -185,30 +185,9 @@ The public package modules are: - `nemo_relay.plugin` - `nemo_relay.adaptive` - `nemo_relay.observability` -- `nemo_relay.nat_exporter` - `nemo_relay.typed` - `nemo_relay.codecs` -### NeMo Agent Toolkit Export - -Use `NatTelemetryExporter` when a Python host process needs to hand Relay -events to NVIDIA NeMo Agent Toolkit. It registers as a normal Relay subscriber -and writes canonical ATOF JSONL that NeMo Agent Toolkit can replay into its -telemetry stream. - -```python -import nemo_relay - -exporter = nemo_relay.NatTelemetryExporter("relay-events.jsonl") -exporter.register("nat") -try: - with nemo_relay.scope.scope("agent", nemo_relay.ScopeType.Agent): - nemo_relay.scope.event("ready") -finally: - exporter.deregister("nat") - exporter.shutdown() -``` - ### Integrations - `nemo_relay.integrations.langchain` diff --git a/python/nemo_relay/__init__.py b/python/nemo_relay/__init__.py index 013cef22..3e27a98b 100644 --- a/python/nemo_relay/__init__.py +++ b/python/nemo_relay/__init__.py @@ -24,8 +24,7 @@ - native runtime types such as ``ScopeHandle``, ``ToolHandle``, ``LLMHandle``, ``LLMRequest``, ``ScopeType``, and the lifecycle event classes - observability helpers such as ``AtifExporter``, ``AtofExporter``, - ``OpenTelemetrySubscriber``, ``OpenInferenceSubscriber``, and - ``NatTelemetryExporter`` + ``OpenTelemetrySubscriber``, and ``OpenInferenceSubscriber`` - JSON and callback type aliases used by middleware, typed wrappers, and plugin-facing configuration helpers @@ -189,7 +188,6 @@ async def main(): guardrails, intercepts, llm, - nat_exporter, observability, plugin, scope, @@ -198,7 +196,6 @@ async def main(): tools, typed, ) -from nemo_relay.nat_exporter import NatTelemetryExporter # noqa: E402 _scope_stack_var: contextvars.ContextVar[ScopeStack] = contextvars.ContextVar("scope_stack") @@ -428,7 +425,6 @@ def worker() -> None: "llm", "guardrails", "intercepts", - "nat_exporter", "subscribers", "scope_local", "codecs", @@ -461,7 +457,6 @@ def worker() -> None: "AtofExporterMode", "AtofExporterConfig", "AtofExporter", - "NatTelemetryExporter", "OpenInferenceConfig", "OpenInferenceSubscriber", "OpenTelemetryConfig", diff --git a/python/nemo_relay/nat_exporter.py b/python/nemo_relay/nat_exporter.py deleted file mode 100644 index 0a184f49..00000000 --- a/python/nemo_relay/nat_exporter.py +++ /dev/null @@ -1,79 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -"""NAT-oriented NeMo Relay event exporter. - -The exporter writes canonical ATOF event JSONL that NVIDIA NeMo Agent Toolkit -can replay into its intermediate-step telemetry stream. -""" - -from __future__ import annotations - -import json -import threading -from pathlib import Path -from typing import Literal - -from nemo_relay import subscribers - - -class NatTelemetryExporter: - """Write Relay events as JSONL for NeMo Agent Toolkit ingestion.""" - - def __init__(self, path: str | Path, *, mode: Literal["append", "overwrite"] = "overwrite") -> None: - self._path = Path(path) - self._lock = threading.Lock() - self._closed = False - self._path.parent.mkdir(parents=True, exist_ok=True) - if mode == "overwrite": - self._path.write_text("") - elif mode == "append": - self._path.touch() - else: - raise ValueError("mode must be 'append' or 'overwrite'") - - @property - def path(self) -> str: - """Return the JSONL output path.""" - return str(self._path) - - def subscriber(self, event) -> None: - """Subscriber callback that writes one event per JSONL line.""" - with self._lock: - if self._closed: - return - with self._path.open("a") as output: - output.write(_event_to_json(event)) - output.write("\n") - - def register(self, name: str) -> None: - """Register this exporter as a global Relay subscriber.""" - subscribers.register(name, self.subscriber) - - def deregister(self, name: str) -> bool: - """Deregister a previously registered exporter subscriber.""" - return subscribers.deregister(name) - - def force_flush(self) -> None: - """Wait for queued Relay subscriber callbacks to finish.""" - subscribers.flush() - - def shutdown(self) -> None: - """Flush queued callbacks and reject future writes.""" - self.force_flush() - with self._lock: - self._closed = True - - def __repr__(self) -> str: - return f"" - - -def _event_to_json(event) -> str: - if hasattr(event, "to_json"): - return event.to_json() - if hasattr(event, "to_dict"): - return json.dumps(event.to_dict(), separators=(",", ":")) - return json.dumps(event, separators=(",", ":")) - - -__all__ = ["NatTelemetryExporter"] diff --git a/python/tests/test_nat_exporter.py b/python/tests/test_nat_exporter.py deleted file mode 100644 index 2f547b75..00000000 --- a/python/tests/test_nat_exporter.py +++ /dev/null @@ -1,41 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -"""Tests for the NAT telemetry exporter shim.""" - -import json -from uuid import uuid4 - -import nemo_relay -from nemo_relay import ScopeType, scope, subscribers -from nemo_relay.nat_exporter import NatTelemetryExporter - - -def test_nat_telemetry_exporter_writes_relay_event_jsonl(tmp_path): - exporter = NatTelemetryExporter(tmp_path / "relay-events.jsonl") - subscriber_name = f"nat_exporter_{uuid4().hex}" - - exporter.register(subscriber_name) - try: - handle = scope.push("nat_exporter_root", ScopeType.Agent, data={"input": True}) - try: - scope.event("nat_exporter_mark", handle=handle, data={"step": 1}) - finally: - scope.pop(handle, output={"done": True}) - exporter.force_flush() - finally: - exporter.deregister(subscriber_name) - exporter.shutdown() - subscribers.deregister(subscriber_name) - - lines = [json.loads(line) for line in (tmp_path / "relay-events.jsonl").read_text().splitlines()] - - assert [line["kind"] for line in lines] == ["scope", "mark", "scope"] - assert lines[0]["name"] == "nat_exporter_root" - assert lines[1]["data"] == {"step": 1} - assert lines[2]["scope_category"] == "end" - assert " Date: Tue, 2 Jun 2026 20:57:21 -0700 Subject: [PATCH 3/9] Scope Hermes task session routing Signed-off-by: Yuchen Zhang --- crates/cli/src/alignment/mod.rs | 102 +++++++++++++++++-- crates/cli/tests/coverage/alignment_tests.rs | 74 ++++++++++++++ 2 files changed, 168 insertions(+), 8 deletions(-) diff --git a/crates/cli/src/alignment/mod.rs b/crates/cli/src/alignment/mod.rs index 611af429..11284ee6 100644 --- a/crates/cli/src/alignment/mod.rs +++ b/crates/cli/src/alignment/mod.rs @@ -110,7 +110,7 @@ pub(crate) struct SessionAlignmentState { aliases: HashMap, completed_aliases: HashMap, pending_subagents: HashMap, - hermes_task_sessions: HashMap, + hermes_task_sessions: HashMap>, } impl SessionAlignmentState { @@ -232,10 +232,8 @@ impl SessionAlignmentState { self.pending_subagents.retain(|child_session_id, pending| { child_session_id != session_id && pending.parent_session_id() != session_id }); - self.hermes_task_sessions - .retain(|task_id, mapped_session_id| { - task_id != session_id && mapped_session_id != session_id - }); + self.hermes_task_sessions.remove(session_id); + prune_hermes_task_sessions(&mut self.hermes_task_sessions, session_id); } pub(crate) fn clear_for_ended_subagent(&mut self, parent_session_id: &str, subagent_id: &str) { @@ -250,7 +248,8 @@ impl SessionAlignmentState { && pending.event.session_id == subagent_id) }); self.hermes_task_sessions - .retain(|_, mapped_session_id| mapped_session_id != subagent_id); + .retain(|session_id, _| session_id != subagent_id); + prune_hermes_task_sessions(&mut self.hermes_task_sessions, subagent_id); } fn record_hermes_task_session(&mut self, event: &NormalizedEvent) { @@ -265,6 +264,8 @@ impl SessionAlignmentState { return; } self.hermes_task_sessions + .entry(session_id.to_string()) + .or_default() .insert(task_id, session_id.to_string()); } @@ -277,12 +278,47 @@ impl SessionAlignmentState { return event; } - let task_id = event.session_id().to_string(); - let Some(session_id) = self.hermes_task_sessions.get(&task_id).cloned() else { + let task_id = hermes_task_id(&event).unwrap_or_else(|| event.session_id().to_string()); + let session_scope = hermes_task_session_scope(&event); + let Some(session_id) = self.hermes_session_for_task(&task_id, session_scope.as_deref()) + else { return event; }; route_hermes_task_session_event(event, task_id, session_id) } + + fn hermes_session_for_task( + &self, + task_id: &str, + session_scope: Option<&str>, + ) -> Option { + if let Some(session_scope) = session_scope { + return self + .hermes_task_sessions + .get(session_scope) + .and_then(|tasks| tasks.get(task_id)) + .cloned(); + } + + let mut matches = self + .hermes_task_sessions + .values() + .filter_map(|tasks| tasks.get(task_id).cloned()); + let session_id = matches.next()?; + matches.next().is_none().then_some(session_id) + } +} + +fn prune_hermes_task_sessions( + hermes_task_sessions: &mut HashMap>, + session_id: &str, +) { + hermes_task_sessions.values_mut().for_each(|tasks| { + tasks.retain(|task_id, mapped_session_id| { + task_id != session_id && mapped_session_id != session_id + }); + }); + hermes_task_sessions.retain(|_, tasks| !tasks.is_empty()); } // Resolves the session id for a gateway request in precedence order: @@ -718,6 +754,32 @@ fn hermes_task_id(event: &NormalizedEvent) -> Option { } } +fn hermes_task_session_scope(event: &NormalizedEvent) -> Option { + match event { + NormalizedEvent::AgentStarted(event) + | NormalizedEvent::AgentEnded(event) + | NormalizedEvent::TurnEnded(event) + | NormalizedEvent::PromptSubmitted(event) + | NormalizedEvent::Compaction(event) + | NormalizedEvent::Notification(event) + | NormalizedEvent::HookMark(event) => { + session_scope_from_payload_and_metadata(&event.payload, &event.metadata) + } + NormalizedEvent::SubagentStarted(event) | NormalizedEvent::SubagentEnded(event) => { + session_scope_from_payload_and_metadata(&event.payload, &event.metadata) + } + NormalizedEvent::LlmHint(event) => { + session_scope_from_payload_and_metadata(&event.payload, &event.metadata) + } + NormalizedEvent::LlmStarted(event) | NormalizedEvent::LlmEnded(event) => { + session_scope_from_llm_event(event) + } + NormalizedEvent::ToolStarted(event) | NormalizedEvent::ToolEnded(event) => { + session_scope_from_payload_and_metadata(&event.payload, &event.metadata) + } + } +} + fn task_id_from_llm_event(event: &LlmEvent) -> Option { task_id_from_payload_and_metadata(&event.request, &event.metadata) .or_else(|| task_id_from_payload_and_metadata(&event.response, &event.metadata)) @@ -727,6 +789,16 @@ fn task_id_from_payload_and_metadata(payload: &Value, metadata: &Value) -> Optio json_string_at(payload, TASK_ID_PATHS).or_else(|| json_string_at(metadata, TASK_ID_PATHS)) } +fn session_scope_from_llm_event(event: &LlmEvent) -> Option { + session_scope_from_payload_and_metadata(&event.request, &event.metadata) + .or_else(|| session_scope_from_payload_and_metadata(&event.response, &event.metadata)) +} + +fn session_scope_from_payload_and_metadata(payload: &Value, metadata: &Value) -> Option { + json_string_at(payload, HERMES_SESSION_SCOPE_PATHS) + .or_else(|| json_string_at(metadata, HERMES_SESSION_SCOPE_PATHS)) +} + const TASK_ID_PATHS: &[&[&str]] = &[ &["task_id"], &["taskId"], @@ -734,6 +806,20 @@ const TASK_ID_PATHS: &[&[&str]] = &[ &["extra", "taskId"], ]; +const HERMES_SESSION_SCOPE_PATHS: &[&[&str]] = &[ + &["session_id"], + &["sessionId"], + &["session", "id"], + &["conversation_id"], + &["conversationId"], + &["parent_session_id"], + &["parentSessionId"], + &["extra", "session_id"], + &["extra", "sessionId"], + &["extra", "parent_session_id"], + &["extra", "parentSessionId"], +]; + fn request_user_task_text(payload: &Value) -> Option { payload .get("messages") diff --git a/crates/cli/tests/coverage/alignment_tests.rs b/crates/cli/tests/coverage/alignment_tests.rs index 8806e0dd..763f683e 100644 --- a/crates/cli/tests/coverage/alignment_tests.rs +++ b/crates/cli/tests/coverage/alignment_tests.rs @@ -76,6 +76,41 @@ fn tool_event(session_id: &str, event_name: &str) -> ToolEvent { } } +fn hermes_llm_event(session_id: &str, task_id: &str) -> NormalizedEvent { + NormalizedEvent::LlmStarted(LlmEvent { + session_id: session_id.into(), + agent_kind: AgentKind::Hermes, + event_name: "pre_api_request".into(), + api_call_id: format!("{session_id}:{task_id}:1"), + provider: "custom".into(), + model_name: Some("qwen".into()), + request: json!({ "extra": { "task_id": task_id } }), + response: Value::Null, + metadata: json!({ "event_metadata": "pre_api_request" }), + }) +} + +fn hermes_tool_event(task_id: &str, session_scope: Option<&str>) -> NormalizedEvent { + let mut payload = json!({ "extra": { "task_id": task_id } }); + if let Some(session_scope) = session_scope { + payload["extra"]["parent_session_id"] = json!(session_scope); + } + + NormalizedEvent::ToolStarted(ToolEvent { + session_id: task_id.into(), + agent_kind: AgentKind::Hermes, + event_name: "pre_tool_call".into(), + tool_call_id: format!("{task_id}:tool-1"), + tool_name: "read_file".into(), + subagent_id: None, + arguments: json!({ "path": "README.md" }), + result: Value::Null, + status: None, + payload, + metadata: json!({ "event_metadata": "pre_tool_call" }), + }) +} + fn aliases() -> HashMap { HashMap::from([( "child".into(), @@ -87,6 +122,45 @@ fn aliases() -> HashMap { )]) } +#[test] +fn hermes_task_session_routing_is_scoped_by_parent_session() { + let mut state = SessionAlignmentState::default(); + + state.route_event(hermes_llm_event("hermes-a", "task-1")); + state.route_event(hermes_llm_event("hermes-b", "task-1")); + + let routed_a = state.route_event(hermes_tool_event("task-1", Some("hermes-a"))); + let NormalizedEvent::ToolStarted(routed_a) = routed_a else { + panic!("expected routed Hermes tool event"); + }; + assert_eq!(routed_a.session_id, "hermes-a"); + assert_eq!(routed_a.metadata["hermes_task_id"], json!("task-1")); + assert_eq!(routed_a.metadata["hermes_session_id"], json!("hermes-a")); + + let routed_b = state.route_event(hermes_tool_event("task-1", Some("hermes-b"))); + let NormalizedEvent::ToolStarted(routed_b) = routed_b else { + panic!("expected routed Hermes tool event"); + }; + assert_eq!(routed_b.session_id, "hermes-b"); + assert_eq!(routed_b.metadata["hermes_task_id"], json!("task-1")); + assert_eq!(routed_b.metadata["hermes_session_id"], json!("hermes-b")); +} + +#[test] +fn hermes_task_session_routing_leaves_ambiguous_unscoped_task_event_unchanged() { + let mut state = SessionAlignmentState::default(); + + state.route_event(hermes_llm_event("hermes-a", "task-1")); + state.route_event(hermes_llm_event("hermes-b", "task-1")); + + let routed = state.route_event(hermes_tool_event("task-1", None)); + let NormalizedEvent::ToolStarted(routed) = routed else { + panic!("expected Hermes tool event"); + }; + assert_eq!(routed.session_id, "task-1"); + assert!(routed.metadata.get("hermes_session_id").is_none()); +} + #[test] fn gateway_session_id_uses_explicit_claude_then_codex_fallbacks() { let mut headers = HeaderMap::new(); From 57e484f18aa68ef49a7c1663bdfe61737dac52b9 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Tue, 2 Jun 2026 21:01:54 -0700 Subject: [PATCH 4/9] Restore Hermes hooks after health failure Signed-off-by: Yuchen Zhang --- crates/cli/src/launcher.rs | 2 + crates/cli/tests/coverage/launcher_tests.rs | 46 +++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/crates/cli/src/launcher.rs b/crates/cli/src/launcher.rs index 9d535e0c..acf5d268 100644 --- a/crates/cli/src/launcher.rs +++ b/crates/cli/src/launcher.rs @@ -154,7 +154,9 @@ async fn execute_live_run( ) -> Result { let running_server = RunningGateway::start(listener, gateway_config); if let Err(error) = wait_for_health(gateway_url).await { + let restore = prepared.restore(); let _ = running_server.stop().await; + restore?; return Err(error); } let status = prepared.spawn_and_wait().await; diff --git a/crates/cli/tests/coverage/launcher_tests.rs b/crates/cli/tests/coverage/launcher_tests.rs index 0d5d69a9..c7fc25f9 100644 --- a/crates/cli/tests/coverage/launcher_tests.rs +++ b/crates/cli/tests/coverage/launcher_tests.rs @@ -772,6 +772,52 @@ async fn wait_for_health_reports_unready_gateway() { assert!(error.contains("gateway did not become ready")); } +#[tokio::test] +async fn execute_live_run_restores_hermes_hooks_when_health_check_fails() { + let temp = tempfile::tempdir().unwrap(); + let hooks_path = temp.path().join("hermes-home/config.yaml"); + std::fs::create_dir_all(hooks_path.parent().unwrap()).unwrap(); + let original = "hooks:\n PreToolUse: []\n"; + std::fs::write(&hooks_path, original).unwrap(); + let resolved = ResolvedConfig { + gateway: GatewayConfig::default(), + agents: AgentConfigs { + hermes: AgentCommandConfig { + command: None, + hooks_path: Some(hooks_path.clone()), + }, + ..AgentConfigs::default() + }, + }; + let prepared = PreparedRun::new( + CodingAgent::Hermes, + vec!["hermes".into(), "chat".into()], + "http://127.0.0.1:1234", + &resolved, + false, + ) + .unwrap(); + assert!( + std::fs::read_to_string(&hooks_path) + .unwrap() + .contains("hook-forward hermes") + ); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let error = execute_live_run( + listener, + GatewayConfig::default(), + "http://127.0.0.1:1", + prepared, + ) + .await + .unwrap_err() + .to_string(); + + assert!(error.contains("gateway did not become ready")); + assert_eq!(std::fs::read_to_string(&hooks_path).unwrap(), original); +} + #[cfg(unix)] fn make_executable(path: &Path) { use std::os::unix::fs::PermissionsExt; From ad9e65a837d76f9b43ee4427367115d085c5e701 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Tue, 2 Jun 2026 21:05:24 -0700 Subject: [PATCH 5/9] Scope active tool fallback by owner Signed-off-by: Yuchen Zhang --- crates/cli/src/session.rs | 31 +++++++-- crates/cli/tests/coverage/session_tests.rs | 78 ++++++++++++++++++++++ 2 files changed, 104 insertions(+), 5 deletions(-) diff --git a/crates/cli/src/session.rs b/crates/cli/src/session.rs index 4abda4cc..1b3cbb2d 100644 --- a/crates/cli/src/session.rs +++ b/crates/cli/src/session.rs @@ -131,6 +131,7 @@ struct ActiveTool { handle: ToolHandle, name: String, arguments: Value, + owner_subagent_id: Option, } impl std::ops::Deref for ActiveTool { @@ -1507,6 +1508,7 @@ impl Session { }; let active_tool_arguments = arguments.clone(); let active_tool_name = event.tool_name.clone(); + let active_tool_owner_subagent_id = owner.subagent_id.clone(); tool_conditional_execution(event.tool_name.as_str(), &arguments)?; let metadata = tool_correlation_metadata( event.metadata, @@ -1531,6 +1533,7 @@ impl Session { handle, name: active_tool_name, arguments: active_tool_arguments, + owner_subagent_id: active_tool_owner_subagent_id, }, ); Ok(()) @@ -1597,17 +1600,22 @@ impl Session { // Hermes pre/post tool hooks can disagree on call IDs: pre hooks may omit the provider id // while post hooks carry the final chat-completions tool id. When the ID misses but exactly - // one active tool has the same name and arguments, close that start instead of synthesizing a - // second zero-duration span. + // one active tool owned by the same subagent/root scope has the same name and arguments, close + // that start instead of synthesizing a second zero-duration span. fn remove_tool_handle_for_event(&mut self, event: &ToolEvent) -> Option { if let Some(active) = self.tools.remove(&event.tool_call_id) { return Some(active.handle); } - let key = self.matching_active_tool_key(event)?; + let owner_subagent_id = self.tool_event_owner_subagent_id(event); + let key = self.matching_active_tool_key(event, owner_subagent_id.as_deref())?; self.tools.remove(&key).map(|active| active.handle) } - fn matching_active_tool_key(&self, event: &ToolEvent) -> Option { + fn matching_active_tool_key( + &self, + event: &ToolEvent, + owner_subagent_id: Option<&str>, + ) -> Option { if event.arguments.is_null() { return None; } @@ -1615,13 +1623,26 @@ impl Session { .tools .iter() .filter_map(|(key, active)| { - (active.name == event.tool_name && active.arguments == event.arguments) + (active.owner_subagent_id.as_deref() == owner_subagent_id + && active.name == event.tool_name + && active.arguments == event.arguments) .then_some(key.clone()) }) .collect::>(); (matches.len() == 1).then(|| matches[0].clone()) } + fn tool_event_owner_subagent_id(&self, event: &ToolEvent) -> Option { + if let Some(subagent_id) = &event.subagent_id + && self.subagents.contains_key(subagent_id) + { + return Some(subagent_id.clone()); + } + self.matching_tool_hint_index(event) + .and_then(|index| self.pending_tool_hints[index].hint.subagent_id.clone()) + .filter(|subagent_id| self.subagents.contains_key(subagent_id)) + } + // Emits a mark event after ensuring the turn scope exists. Generic and unknown hooks use this // path so unsupported agent events remain visible without changing scope structure. fn mark(&mut self, name: &str, event_payload: SessionEvent) -> Result<(), CliError> { diff --git a/crates/cli/tests/coverage/session_tests.rs b/crates/cli/tests/coverage/session_tests.rs index 40dd2157..483be3d9 100644 --- a/crates/cli/tests/coverage/session_tests.rs +++ b/crates/cli/tests/coverage/session_tests.rs @@ -3091,6 +3091,84 @@ async fn claude_agent_tool_async_launch_keeps_subagent_open_for_later_hooks() { ); } +#[tokio::test] +async fn active_tool_name_args_fallback_requires_matching_subagent_owner() { + let manager = SessionManager::new(session_test_config()); + let session_id = "tool-owner-fallback"; + let same_args = json!({ "file_path": "README.md" }); + + manager + .apply_events( + &HeaderMap::new(), + vec![ + NormalizedEvent::AgentStarted(session_event(session_id, "SessionStart")), + NormalizedEvent::SubagentStarted(SubagentEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "SubagentStart".into(), + subagent_id: "worker-1".into(), + payload: json!({}), + metadata: json!({}), + }), + NormalizedEvent::SubagentStarted(SubagentEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "SubagentStart".into(), + subagent_id: "worker-2".into(), + payload: json!({}), + metadata: json!({}), + }), + NormalizedEvent::ToolStarted(ToolEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "PreToolUse".into(), + tool_call_id: "worker-1-pre".into(), + tool_name: "Read".into(), + subagent_id: Some("worker-1".into()), + arguments: same_args.clone(), + result: Value::Null, + status: None, + payload: json!({}), + metadata: json!({}), + }), + NormalizedEvent::ToolStarted(ToolEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "PreToolUse".into(), + tool_call_id: "worker-2-pre".into(), + tool_name: "Read".into(), + subagent_id: Some("worker-2".into()), + arguments: same_args.clone(), + result: Value::Null, + status: None, + payload: json!({}), + metadata: json!({}), + }), + NormalizedEvent::ToolEnded(ToolEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "PostToolUse".into(), + tool_call_id: "provider-worker-1".into(), + tool_name: "Read".into(), + subagent_id: Some("worker-1".into()), + arguments: same_args, + result: json!({ "ok": true }), + status: Some("success".into()), + payload: json!({}), + metadata: json!({}), + }), + ], + ) + .await + .unwrap(); + + let sessions = manager.inner.lock().await; + let tools = &sessions.get(session_id).unwrap().tools; + assert!(!tools.contains_key("worker-1-pre")); + assert!(tools.contains_key("worker-2-pre")); + assert!(!tools.contains_key("provider-worker-1")); +} + #[tokio::test] async fn agent_end_closes_active_tools_and_duplicate_starts_are_ignored() { let manager = SessionManager::new(session_test_config()); From c6c0a244a7ff78b9914423f01b7cfc1f433082d4 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Tue, 2 Jun 2026 21:08:43 -0700 Subject: [PATCH 6/9] Make Hermes mismatched tool-id test non-vacuous Signed-off-by: Yuchen Zhang --- crates/cli/tests/coverage/session_tests.rs | 53 ++++++++++++++++------ 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/crates/cli/tests/coverage/session_tests.rs b/crates/cli/tests/coverage/session_tests.rs index 483be3d9..5c3c6fda 100644 --- a/crates/cli/tests/coverage/session_tests.rs +++ b/crates/cli/tests/coverage/session_tests.rs @@ -1611,9 +1611,18 @@ async fn hermes_task_id_tool_hooks_reuse_api_session() { } } }), + ] { + let outcome = crate::adapters::hermes::adapt(payload, &headers); + manager + .apply_events(&headers, outcome.events) + .await + .unwrap(); + } + + let pre_tool = crate::adapters::hermes::adapt( json!({ "hook_event_name": "pre_tool_call", - "session_id": "", + "session_id": "hermes-main", "tool_name": "read_file", "tool_input": { "path": "README.md" }, "extra": { @@ -1621,9 +1630,31 @@ async fn hermes_task_id_tool_hooks_reuse_api_session() { "tool_call_id": "tool-1" } }), + &headers, + ); + manager + .apply_events(&headers, pre_tool.events) + .await + .unwrap(); + + { + let sessions = manager.inner.lock().await; + assert!(sessions.contains_key("hermes-main")); + assert!( + !sessions.contains_key("task-1"), + "Hermes tool hooks keyed by task_id should not create a duplicate session" + ); + let session = sessions.get("hermes-main").unwrap(); + assert!( + !session.tools.is_empty(), + "pre_tool_call should open an active tool before post_tool_call runs" + ); + } + + let post_tool = crate::adapters::hermes::adapt( json!({ "hook_event_name": "post_tool_call", - "session_id": "", + "session_id": "hermes-main", "tool_name": "read_file", "tool_input": { "path": "README.md" }, "tool_response": { "content": "hello" }, @@ -1632,20 +1663,14 @@ async fn hermes_task_id_tool_hooks_reuse_api_session() { "tool_call_id": "provider-tool-1" } }), - ] { - let outcome = crate::adapters::hermes::adapt(payload, &headers); - manager - .apply_events(&headers, outcome.events) - .await - .unwrap(); - } + &headers, + ); + manager + .apply_events(&headers, post_tool.events) + .await + .unwrap(); let sessions = manager.inner.lock().await; - assert!(sessions.contains_key("hermes-main")); - assert!( - !sessions.contains_key("task-1"), - "Hermes tool hooks keyed by task_id should not create a duplicate session" - ); let session = sessions.get("hermes-main").unwrap(); assert!( session.tools.is_empty(), From ea872220fe751f30c6ac21f507fc0a75003a7b43 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Wed, 3 Jun 2026 09:18:27 -0700 Subject: [PATCH 7/9] Address Relay telemetry review follow-ups Signed-off-by: Yuchen Zhang --- crates/cli/src/session.rs | 3 +- crates/cli/tests/coverage/adapters_tests.rs | 1 + crates/cli/tests/coverage/session_tests.rs | 55 ++++++++++++ crates/core/src/observability/atif.rs | 5 +- crates/core/tests/unit/atif_tests.rs | 89 +++++++++++++++++++ integrations/openclaw/src/hook-replay/llm.ts | 2 + integrations/openclaw/test/llm-replay.test.ts | 32 +++++++ 7 files changed, 185 insertions(+), 2 deletions(-) diff --git a/crates/cli/src/session.rs b/crates/cli/src/session.rs index 1b3cbb2d..dc5ed526 100644 --- a/crates/cli/src/session.rs +++ b/crates/cli/src/session.rs @@ -1623,7 +1623,8 @@ impl Session { .tools .iter() .filter_map(|(key, active)| { - (active.owner_subagent_id.as_deref() == owner_subagent_id + (owner_subagent_id + .is_none_or(|owner| active.owner_subagent_id.as_deref() == Some(owner)) && active.name == event.tool_name && active.arguments == event.arguments) .then_some(key.clone()) diff --git a/crates/cli/tests/coverage/adapters_tests.rs b/crates/cli/tests/coverage/adapters_tests.rs index e8bfb598..aa5208d7 100644 --- a/crates/cli/tests/coverage/adapters_tests.rs +++ b/crates/cli/tests/coverage/adapters_tests.rs @@ -315,6 +315,7 @@ fn drops_uncorrelatable_hermes_pre_tool_call() { json!({ "hook_event_name": "pre_tool_call", "task_id": "task-1", + "tool_call_id": "toolcall-1", "tool_name": "terminal", "tool_input": { "command": "pwd" } }), diff --git a/crates/cli/tests/coverage/session_tests.rs b/crates/cli/tests/coverage/session_tests.rs index 5c3c6fda..c848dd96 100644 --- a/crates/cli/tests/coverage/session_tests.rs +++ b/crates/cli/tests/coverage/session_tests.rs @@ -3194,6 +3194,61 @@ async fn active_tool_name_args_fallback_requires_matching_subagent_owner() { assert!(!tools.contains_key("provider-worker-1")); } +#[tokio::test] +async fn active_tool_name_args_fallback_uses_unique_global_match_without_owner() { + let manager = SessionManager::new(session_test_config()); + let session_id = "tool-global-fallback"; + let same_args = json!({ "file_path": "README.md" }); + + manager + .apply_events( + &HeaderMap::new(), + vec![ + NormalizedEvent::AgentStarted(session_event(session_id, "SessionStart")), + NormalizedEvent::SubagentStarted(SubagentEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "SubagentStart".into(), + subagent_id: "worker-1".into(), + payload: json!({}), + metadata: json!({}), + }), + NormalizedEvent::ToolStarted(ToolEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "PreToolUse".into(), + tool_call_id: "worker-1-pre".into(), + tool_name: "Read".into(), + subagent_id: Some("worker-1".into()), + arguments: same_args.clone(), + result: Value::Null, + status: None, + payload: json!({}), + metadata: json!({}), + }), + NormalizedEvent::ToolEnded(ToolEvent { + session_id: session_id.into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "PostToolUse".into(), + tool_call_id: "provider-worker-1".into(), + tool_name: "Read".into(), + subagent_id: None, + arguments: same_args, + result: json!({ "ok": true }), + status: Some("success".into()), + payload: json!({}), + metadata: json!({}), + }), + ], + ) + .await + .unwrap(); + + let sessions = manager.inner.lock().await; + let tools = &sessions.get(session_id).unwrap().tools; + assert!(tools.is_empty()); +} + #[tokio::test] async fn agent_end_closes_active_tools_and_duplicate_starts_are_ignored() { let manager = SessionManager::new(session_test_config()); diff --git a/crates/core/src/observability/atif.rs b/crates/core/src/observability/atif.rs index a09af56b..27615f4c 100644 --- a/crates/core/src/observability/atif.rs +++ b/crates/core/src/observability/atif.rs @@ -1335,7 +1335,10 @@ fn llm_request_signature(input: &Json) -> String { } fn llm_response_signature(output: &Json) -> String { - json_to_string(&extract_llm_response_message(output)) + json_to_string(&serde_json::json!({ + "message": extract_llm_response_message(output), + "tool_calls": extract_tool_calls(output), + })) } fn llm_request_correlation_keys(start: &Event, end: &Event) -> HashSet { diff --git a/crates/core/tests/unit/atif_tests.rs b/crates/core/tests/unit/atif_tests.rs index 0dffda5d..090eae10 100644 --- a/crates/core/tests/unit/atif_tests.rs +++ b/crates/core/tests/unit/atif_tests.rs @@ -2061,6 +2061,95 @@ fn test_exporter_dedupes_overlapping_hook_and_gateway_llm_spans() { assert_eq!(metrics.extra.as_ref().unwrap()["total_tokens"], json!(10)); } +#[test] +fn test_exporter_keeps_tool_only_llm_spans_with_different_tool_calls() { + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let base = base_timestamp(); + let parent_uuid = Uuid::now_v7(); + let first_uuid = Uuid::now_v7(); + let second_uuid = Uuid::now_v7(); + let request = json!({ + "messages": [{"role": "user", "content": "Use tools"}], + "model": "test-model" + }); + + let mut first_start = event_builder(first_uuid, EventType::Start) + .name("anthropic.messages") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .input(request.clone()) + .build(); + let mut first_end = event_builder(first_uuid, EventType::End) + .name("anthropic.messages") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .output(json!({ + "type": "message", + "content": [{ + "type": "tool_use", + "id": "toolu-read", + "name": "read_file", + "input": { "path": "README.md" } + }] + })) + .build(); + let mut second_start = event_builder(second_uuid, EventType::Start) + .name("anthropic.messages") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .input(request) + .build(); + let mut second_end = event_builder(second_uuid, EventType::End) + .name("anthropic.messages") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .output(json!({ + "type": "message", + "content": [{ + "type": "tool_use", + "id": "toolu-search", + "name": "web_search", + "input": { "query": "release notes" } + }] + })) + .build(); + + for (idx, event) in [ + &mut first_start, + &mut second_start, + &mut first_end, + &mut second_end, + ] + .into_iter() + .enumerate() + { + set_event_timestamp(event, base + chrono::Duration::milliseconds(idx as i64)); + } + + { + let mut state = exporter.state.lock().unwrap(); + state + .events + .extend([first_start, second_start, first_end, second_end]); + } + + let trajectory = exporter.export().unwrap(); + let tool_call_ids = trajectory + .steps + .iter() + .filter_map(|step| step.tool_calls.as_deref()) + .flat_map(|calls| calls.iter().map(|call| call.tool_call_id.as_str())) + .collect::>(); + + assert!(tool_call_ids.contains("toolu-read")); + assert!(tool_call_ids.contains("toolu-search")); + assert_eq!(tool_call_ids.len(), 2); +} + #[test] fn test_exporter_prefers_gateway_span_over_non_exact_hook_summary() { let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); diff --git a/integrations/openclaw/src/hook-replay/llm.ts b/integrations/openclaw/src/hook-replay/llm.ts index 63eccef6..d5df5c10 100644 --- a/integrations/openclaw/src/hook-replay/llm.ts +++ b/integrations/openclaw/src/hook-replay/llm.ts @@ -1196,6 +1196,8 @@ function mapUsage(usage: unknown): Record | undefined { numberField(usage, 'outputTokens'); const cacheRead = numberField(usage, 'cacheRead') ?? + numberField(usage, 'cached_tokens') ?? + numberField(usage, 'cachedTokens') ?? numberField(usage, 'cache_read_tokens') ?? numberField(usage, 'cache_read_input_tokens') ?? nestedNumberField(usage, 'input_tokens_details', 'cached_tokens') ?? diff --git a/integrations/openclaw/test/llm-replay.test.ts b/integrations/openclaw/test/llm-replay.test.ts index a68a67e8..df331871 100644 --- a/integrations/openclaw/test/llm-replay.test.ts +++ b/integrations/openclaw/test/llm-replay.test.ts @@ -283,6 +283,38 @@ describe('LLM replay', () => { }); }); + it('preserves canonical cached token usage fields', () => { + for (const [field, value] of [ + ['cached_tokens', 7], + ['cachedTokens', 11], + ] as const) { + const nf = createNemoRelayRuntime(); + const backend = createBackend(nf); + + backend.onLlmInput(llmInput(), { runId: `run-${field}`, sessionId: `session-${field}` }); + backend.onLlmOutput( + { + ...llmOutput(), + usage: { + input: 2, + output: 3, + [field]: value, + }, + }, + { runId: `run-${field}`, sessionId: `session-${field}` }, + ); + + const response = nf.calls.llmCallEnd[0]?.response as ReplayResponse; + assert.deepEqual(response.usage, { + prompt_tokens: 2, + completion_tokens: 3, + cached_tokens: value, + cache_read_tokens: value, + total_tokens: 5, + }); + } + }); + it('does not derive impossible prompt tokens from inconsistent usage totals', () => { const nf = createNemoRelayRuntime(); const backend = createBackend(nf); From 42c1ef97676deb33d32ad6406565fe12e5a592e6 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Wed, 3 Jun 2026 09:36:11 -0700 Subject: [PATCH 8/9] Remove OpenClaw cache metric follow-up Signed-off-by: Yuchen Zhang --- integrations/openclaw/src/hook-replay/llm.ts | 2 -- integrations/openclaw/test/llm-replay.test.ts | 32 ------------------- 2 files changed, 34 deletions(-) diff --git a/integrations/openclaw/src/hook-replay/llm.ts b/integrations/openclaw/src/hook-replay/llm.ts index d5df5c10..63eccef6 100644 --- a/integrations/openclaw/src/hook-replay/llm.ts +++ b/integrations/openclaw/src/hook-replay/llm.ts @@ -1196,8 +1196,6 @@ function mapUsage(usage: unknown): Record | undefined { numberField(usage, 'outputTokens'); const cacheRead = numberField(usage, 'cacheRead') ?? - numberField(usage, 'cached_tokens') ?? - numberField(usage, 'cachedTokens') ?? numberField(usage, 'cache_read_tokens') ?? numberField(usage, 'cache_read_input_tokens') ?? nestedNumberField(usage, 'input_tokens_details', 'cached_tokens') ?? diff --git a/integrations/openclaw/test/llm-replay.test.ts b/integrations/openclaw/test/llm-replay.test.ts index df331871..a68a67e8 100644 --- a/integrations/openclaw/test/llm-replay.test.ts +++ b/integrations/openclaw/test/llm-replay.test.ts @@ -283,38 +283,6 @@ describe('LLM replay', () => { }); }); - it('preserves canonical cached token usage fields', () => { - for (const [field, value] of [ - ['cached_tokens', 7], - ['cachedTokens', 11], - ] as const) { - const nf = createNemoRelayRuntime(); - const backend = createBackend(nf); - - backend.onLlmInput(llmInput(), { runId: `run-${field}`, sessionId: `session-${field}` }); - backend.onLlmOutput( - { - ...llmOutput(), - usage: { - input: 2, - output: 3, - [field]: value, - }, - }, - { runId: `run-${field}`, sessionId: `session-${field}` }, - ); - - const response = nf.calls.llmCallEnd[0]?.response as ReplayResponse; - assert.deepEqual(response.usage, { - prompt_tokens: 2, - completion_tokens: 3, - cached_tokens: value, - cache_read_tokens: value, - total_tokens: 5, - }); - } - }); - it('does not derive impossible prompt tokens from inconsistent usage totals', () => { const nf = createNemoRelayRuntime(); const backend = createBackend(nf); From f2c1000c952fe51ca6349d82f6c04942222597d9 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Wed, 3 Jun 2026 11:36:12 -0700 Subject: [PATCH 9/9] Flush scope-local subscriber events in test Signed-off-by: Yuchen Zhang --- python/tests/test_scope_local.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/tests/test_scope_local.py b/python/tests/test_scope_local.py index 7fd26a28..7e661b44 100644 --- a/python/tests/test_scope_local.py +++ b/python/tests/test_scope_local.py @@ -70,6 +70,7 @@ def my_tool(args): scope_local.register_tool_sanitize_request(handle, "sl_sanitizer", 1, sanitizer) scope_local.register_subscriber(handle, "sl_sanitizer_sub", lambda e: events.append(e)) result = await tools.execute("sanitized_tool", {"input": "data"}, my_tool) + subscribers.flush() # Sanitize guardrails are observability-only: they do NOT modify args # flowing through the execution pipeline.