Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 213 additions & 0 deletions crates/cli/src/alignment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,15 @@ pub(crate) struct SessionAlignmentState {
aliases: HashMap<String, SessionAlias>,
completed_aliases: HashMap<String, SessionAlias>,
pending_subagents: HashMap<String, PendingSubagentStart>,
hermes_task_sessions: HashMap<String, HashMap<String, String>>,
}

impl SessionAlignmentState {
pub(crate) fn clear(&mut self) {
self.aliases.clear();
self.completed_aliases.clear();
self.pending_subagents.clear();
self.hermes_task_sessions.clear();
}

pub(crate) fn alias_for_session(&self, session_id: &str) -> Option<SessionAlias> {
Expand Down Expand Up @@ -154,6 +156,8 @@ impl SessionAlignmentState {
}

pub(crate) fn route_event(&mut self, event: NormalizedEvent) -> NormalizedEvent {
self.record_hermes_task_session(&event);
let event = self.route_hermes_task_event(event);
let (event, finished_alias) = route_event_through_alias(event, &self.aliases);
let session_id = event.session_id().to_string();
if let Some(child_session_id) = finished_alias.as_ref() {
Expand Down Expand Up @@ -228,6 +232,8 @@ impl SessionAlignmentState {
self.pending_subagents.retain(|child_session_id, pending| {
child_session_id != session_id && pending.parent_session_id() != session_id
});
self.hermes_task_sessions.remove(session_id);
prune_hermes_task_sessions(&mut self.hermes_task_sessions, session_id);
}

pub(crate) fn clear_for_ended_subagent(&mut self, parent_session_id: &str, subagent_id: &str) {
Expand All @@ -241,9 +247,80 @@ impl SessionAlignmentState {
&& !(pending.parent_session_id() == parent_session_id
&& pending.event.session_id == subagent_id)
});
self.hermes_task_sessions
.retain(|session_id, _| session_id != subagent_id);
prune_hermes_task_sessions(&mut self.hermes_task_sessions, subagent_id);
}

fn record_hermes_task_session(&mut self, event: &NormalizedEvent) {
if normalized_event_agent_kind(event) != AgentKind::Hermes {
return;
}
let Some(task_id) = hermes_task_id(event) else {
return;
};
let session_id = event.session_id();
if session_id == task_id {
return;
}
self.hermes_task_sessions
.entry(session_id.to_string())
.or_default()
.insert(task_id, session_id.to_string());
}

fn route_hermes_task_event(&self, event: NormalizedEvent) -> NormalizedEvent {
let should_route = matches!(
event,
NormalizedEvent::ToolStarted(_) | NormalizedEvent::ToolEnded(_)
) && normalized_event_agent_kind(&event) == AgentKind::Hermes;
if !should_route {
return event;
}

let task_id = hermes_task_id(&event).unwrap_or_else(|| event.session_id().to_string());
let session_scope = hermes_task_session_scope(&event);
let Some(session_id) = self.hermes_session_for_task(&task_id, session_scope.as_deref())
else {
return event;
};
route_hermes_task_session_event(event, task_id, session_id)
}

fn hermes_session_for_task(
&self,
task_id: &str,
session_scope: Option<&str>,
) -> Option<String> {
if let Some(session_scope) = session_scope {
return self
.hermes_task_sessions
.get(session_scope)
.and_then(|tasks| tasks.get(task_id))
.cloned();
}

let mut matches = self
.hermes_task_sessions
.values()
.filter_map(|tasks| tasks.get(task_id).cloned());
let session_id = matches.next()?;
matches.next().is_none().then_some(session_id)
}
}

fn prune_hermes_task_sessions(
hermes_task_sessions: &mut HashMap<String, HashMap<String, String>>,
session_id: &str,
) {
hermes_task_sessions.values_mut().for_each(|tasks| {
tasks.retain(|task_id, mapped_session_id| {
task_id != session_id && mapped_session_id != session_id
});
});
hermes_task_sessions.retain(|_, tasks| !tasks.is_empty());
}

// Resolves the session id for a gateway request in precedence order:
// explicit NeMo Relay header, agent-native headers, agent-specific body fallbacks, then the
// generic OpenAI-compatible `session_id` body field. Keeping the provider fallbacks behind one
Expand Down Expand Up @@ -607,6 +684,142 @@ fn route_tool_event(event: &mut ToolEvent, alias: &SessionAlias, metadata: Value
event.metadata = merge_metadata(event.metadata.clone(), metadata);
}

fn route_hermes_task_session_event(
event: NormalizedEvent,
task_id: String,
session_id: String,
) -> NormalizedEvent {
let metadata = json!({
"session_correlation_status": "task_session_alias",
"session_correlation_source": "hermes_task_id",
"hermes_task_id": task_id,
"hermes_session_id": session_id,
});
match event {
NormalizedEvent::ToolStarted(mut event) => {
event.session_id = session_id;
event.metadata = merge_metadata(event.metadata, metadata);
NormalizedEvent::ToolStarted(event)
}
NormalizedEvent::ToolEnded(mut event) => {
event.session_id = session_id;
event.metadata = merge_metadata(event.metadata, metadata);
NormalizedEvent::ToolEnded(event)
}
event => event,
}
}

fn normalized_event_agent_kind(event: &NormalizedEvent) -> AgentKind {
match event {
NormalizedEvent::AgentStarted(event)
| NormalizedEvent::AgentEnded(event)
| NormalizedEvent::TurnEnded(event)
| NormalizedEvent::PromptSubmitted(event)
| NormalizedEvent::Compaction(event)
| NormalizedEvent::Notification(event)
| NormalizedEvent::HookMark(event) => event.agent_kind,
NormalizedEvent::SubagentStarted(event) | NormalizedEvent::SubagentEnded(event) => {
event.agent_kind
}
NormalizedEvent::LlmHint(event) => event.agent_kind,
NormalizedEvent::LlmStarted(event) | NormalizedEvent::LlmEnded(event) => event.agent_kind,
NormalizedEvent::ToolStarted(event) | NormalizedEvent::ToolEnded(event) => event.agent_kind,
}
}

fn hermes_task_id(event: &NormalizedEvent) -> Option<String> {
match event {
NormalizedEvent::AgentStarted(event)
| NormalizedEvent::AgentEnded(event)
| NormalizedEvent::TurnEnded(event)
| NormalizedEvent::PromptSubmitted(event)
| NormalizedEvent::Compaction(event)
| NormalizedEvent::Notification(event)
| NormalizedEvent::HookMark(event) => {
task_id_from_payload_and_metadata(&event.payload, &event.metadata)
}
NormalizedEvent::SubagentStarted(event) | NormalizedEvent::SubagentEnded(event) => {
task_id_from_payload_and_metadata(&event.payload, &event.metadata)
}
NormalizedEvent::LlmHint(event) => {
task_id_from_payload_and_metadata(&event.payload, &event.metadata)
}
NormalizedEvent::LlmStarted(event) | NormalizedEvent::LlmEnded(event) => {
task_id_from_llm_event(event)
}
NormalizedEvent::ToolStarted(event) | NormalizedEvent::ToolEnded(event) => {
task_id_from_payload_and_metadata(&event.payload, &event.metadata)
}
}
}

fn hermes_task_session_scope(event: &NormalizedEvent) -> Option<String> {
match event {
NormalizedEvent::AgentStarted(event)
| NormalizedEvent::AgentEnded(event)
| NormalizedEvent::TurnEnded(event)
| NormalizedEvent::PromptSubmitted(event)
| NormalizedEvent::Compaction(event)
| NormalizedEvent::Notification(event)
| NormalizedEvent::HookMark(event) => {
session_scope_from_payload_and_metadata(&event.payload, &event.metadata)
}
NormalizedEvent::SubagentStarted(event) | NormalizedEvent::SubagentEnded(event) => {
session_scope_from_payload_and_metadata(&event.payload, &event.metadata)
}
NormalizedEvent::LlmHint(event) => {
session_scope_from_payload_and_metadata(&event.payload, &event.metadata)
}
NormalizedEvent::LlmStarted(event) | NormalizedEvent::LlmEnded(event) => {
session_scope_from_llm_event(event)
}
NormalizedEvent::ToolStarted(event) | NormalizedEvent::ToolEnded(event) => {
session_scope_from_payload_and_metadata(&event.payload, &event.metadata)
}
}
}

fn task_id_from_llm_event(event: &LlmEvent) -> Option<String> {
task_id_from_payload_and_metadata(&event.request, &event.metadata)
.or_else(|| task_id_from_payload_and_metadata(&event.response, &event.metadata))
}

fn task_id_from_payload_and_metadata(payload: &Value, metadata: &Value) -> Option<String> {
json_string_at(payload, TASK_ID_PATHS).or_else(|| json_string_at(metadata, TASK_ID_PATHS))
}

fn session_scope_from_llm_event(event: &LlmEvent) -> Option<String> {
session_scope_from_payload_and_metadata(&event.request, &event.metadata)
.or_else(|| session_scope_from_payload_and_metadata(&event.response, &event.metadata))
}

fn session_scope_from_payload_and_metadata(payload: &Value, metadata: &Value) -> Option<String> {
json_string_at(payload, HERMES_SESSION_SCOPE_PATHS)
.or_else(|| json_string_at(metadata, HERMES_SESSION_SCOPE_PATHS))
}

const TASK_ID_PATHS: &[&[&str]] = &[
&["task_id"],
&["taskId"],
&["extra", "task_id"],
&["extra", "taskId"],
];

const HERMES_SESSION_SCOPE_PATHS: &[&[&str]] = &[
&["session_id"],
&["sessionId"],
&["session", "id"],
&["conversation_id"],
&["conversationId"],
&["parent_session_id"],
&["parentSessionId"],
&["extra", "session_id"],
&["extra", "sessionId"],
&["extra", "parent_session_id"],
&["extra", "parentSessionId"],
];

fn request_user_task_text(payload: &Value) -> Option<String> {
payload
.get("messages")
Expand Down
Loading
Loading