diff --git a/docs/supported-integrations/openclaw-plugin.mdx b/docs/supported-integrations/openclaw-plugin.mdx index c828886f..32f1006c 100644 --- a/docs/supported-integrations/openclaw-plugin.mdx +++ b/docs/supported-integrations/openclaw-plugin.mdx @@ -268,7 +268,7 @@ features such as adaptive hints require a managed execution path. | `after_tool_call` | Replays successful tool calls as tool spans; blocked tools emit marks. | | `agent_end` | Emits an agent lifecycle mark, flushes recorded assistant-turn LLM spans, and preserves the final assistant answer as the session output. | | `before_agent_finalize` | Preserves the last assistant message as fallback session output and emits a lifecycle mark without mutating the finalization payload. | -| `subagent_spawned` / `subagent_ended` | Emits subagent lifecycle marks under the best available parent or child session. | +| `subagent_spawned` / `subagent_ended` | Emits subagent lifecycle marks and nests child subagent session scopes under the requester session when stable lineage is available. | ## LLM Replay Fidelity diff --git a/integrations/openclaw/src/hook-replay/llm.ts b/integrations/openclaw/src/hook-replay/llm.ts index c74729e0..f8b33a0a 100644 --- a/integrations/openclaw/src/hook-replay/llm.ts +++ b/integrations/openclaw/src/hook-replay/llm.ts @@ -28,7 +28,7 @@ import { evictExpiredCorrelationRecords, ensureSession, insertBoundedRecord, - resolveSessionKey, + resolveSessionOwnerKey, type LlmInputRecord, type ModelCallRecord, type PendingLlmOutputRecord, @@ -47,12 +47,14 @@ export function recordLlmInput( ctx: PluginHookAgentContext, ): void { evictExpiredReplayRecords(manager); + const observedAtMicros = nowMicros(); const session = ensureSession(manager, { sessionId: event.sessionId, sessionKey: ctx.sessionKey, runId: event.runId, agentId: ctx.agentId, source: 'lazy_session', + timestamp: observedAtMicros, }); if (!session) { return; @@ -76,7 +78,7 @@ export function recordLlmInput( const pending = shiftOldest( manager.state.llmOutputsPendingInput, key, - (record) => record.sessionKey === session.sessionId, + (record) => record.sessionOwnerKey === session.ownerKey, ); if (!pending) { return; @@ -105,12 +107,14 @@ export function recordLlmOutput( ctx: PluginHookAgentContext, ): void { evictExpiredReplayRecords(manager); + const observedAtMicros = nowMicros(); const session = ensureSession(manager, { sessionId: event.sessionId, sessionKey: ctx.sessionKey, runId: event.runId, agentId: ctx.agentId, source: 'lazy_session', + timestamp: observedAtMicros, }); if (!session) { return; @@ -118,11 +122,11 @@ export function recordLlmOutput( const key = llmKey(event); if (hasTrajectoryReplay(session, event.runId)) { - shiftOldest(manager.state.llmInputs, key, (record) => record.sessionKey === session.sessionId); + shiftOldest(manager.state.llmInputs, key, (record) => record.sessionOwnerKey === session.ownerKey); return; } - const input = shiftOldest(manager.state.llmInputs, key, (record) => record.sessionKey === session.sessionId); + const input = shiftOldest(manager.state.llmInputs, key, (record) => record.sessionOwnerKey === session.ownerKey); if (input) { replayLlmOutput({ manager, @@ -135,7 +139,7 @@ export function recordLlmOutput( } const pending: PendingLlmOutputRecord = { - sessionKey: session.sessionId, + sessionOwnerKey: session.ownerKey, sessionId: event.sessionId, runId: event.runId, provider: event.provider, @@ -191,7 +195,7 @@ export function recordBeforeMessageWrite( if (provider && model && (assistantTexts.length > 0 || assistantToolCalls.length > 0 || usage !== undefined)) { session.assistantMessageWrites ??= []; session.assistantMessageWrites.push({ - sessionKey: session.sessionId, + sessionOwnerKey: session.ownerKey, provider, model, assistantTexts, @@ -239,7 +243,7 @@ export function recordModelCallStarted( manager.state.modelCallsByCallId, modelTimingKey(event), { - sessionKey: session.sessionId, + sessionOwnerKey: session.ownerKey, sessionId: session.sessionId, runId: event.runId, callId: event.callId, @@ -281,7 +285,7 @@ export function recordModelCallEnded( const record = existing ?? ({ - sessionKey: session.sessionId, + sessionOwnerKey: session.ownerKey, sessionId: session.sessionId, runId: event.runId, callId: event.callId, @@ -303,7 +307,7 @@ export function recordModelCallEnded( insertBoundedRecord( manager.state.modelTimingsByLlmKey, modelTimingLlmKey({ - sessionId: session.sessionId, + sessionId: session.ownerKey, runId: event.runId, provider: event.provider, model: event.model, @@ -325,7 +329,7 @@ export function replayPendingLlmOutputsForSession( for (const [key, records] of [...manager.state.llmOutputsPendingInput]) { const remaining: PendingLlmOutputRecord[] = []; for (const record of records) { - if (record.sessionKey !== session.sessionId) { + if (record.sessionOwnerKey !== session.ownerKey) { remaining.push(record); continue; } @@ -389,7 +393,7 @@ export function replayAgentEndMessages( export function emitUnpairedModelCallTimingMarks(manager: SessionManager, session: SessionState): void { for (const records of manager.state.modelCallsByCallId.values()) { for (const record of records) { - if (record.sessionKey !== session.sessionId || record.consumed || record.endedAtMs !== undefined) { + if (record.sessionOwnerKey !== session.ownerKey || record.consumed || record.endedAtMs !== undefined) { continue; } emitModelTimingMark(manager, session, 'openclaw.model_call_timing_unpaired', record); @@ -400,7 +404,7 @@ export function emitUnpairedModelCallTimingMarks(manager: SessionManager, sessio const unpairedEnded: ModelCallRecord[] = []; for (const records of manager.state.modelTimingsByLlmKey.values()) { for (const record of records) { - if (record.sessionKey !== session.sessionId || record.consumed) { + if (record.sessionOwnerKey !== session.ownerKey || record.consumed) { continue; } unpairedEnded.push(record); @@ -494,7 +498,7 @@ function replayExpiredPendingOutput(manager: SessionManager, key: string, record if (!removeRecord(manager.state.llmOutputsPendingInput, key, record)) { return; } - const session = manager.state.sessions.get(record.sessionKey); + const session = manager.state.sessions.get(record.sessionOwnerKey); if (!session) { manager.state.counters.skippedEvents += 1; return; @@ -616,7 +620,7 @@ function replayAssistantMessageWrites( }, ctx, input: { - sessionKey: session.sessionId, + sessionOwnerKey: session.ownerKey, sessionId: session.sessionId, runId, provider: record.provider, @@ -645,13 +649,13 @@ function consumeNextTimingCandidate( input: { runId?: string | undefined; provider: string; model: string }, ): ModelCallRecord | undefined { const key = modelTimingLlmKey({ - sessionId: session.sessionId, + sessionId: session.ownerKey, runId: input.runId, provider: input.provider, model: input.model, }); const records = manager.state.modelTimingsByLlmKey.get(key) ?? []; - const candidate = records.find((record) => record.sessionKey === session.sessionId && !record.consumed); + const candidate = records.find((record) => record.sessionOwnerKey === session.ownerKey && !record.consumed); if (!candidate) { return undefined; } @@ -666,13 +670,13 @@ function consumeTimingCandidate( event: PluginHookLlmOutputEvent, ): ModelCallRecord | undefined { const key = modelTimingLlmKey({ - sessionId: session.sessionId, + sessionId: session.ownerKey, runId: event.runId, provider: event.provider, model: event.model, }); const candidates = (manager.state.modelTimingsByLlmKey.get(key) ?? []).filter( - (record) => record.sessionKey === session.sessionId && !record.consumed, + (record) => record.sessionOwnerKey === session.ownerKey && !record.consumed, ); if (candidates.length === 1) { const candidate = candidates[0]; @@ -771,7 +775,7 @@ function emitModelTimingSummaryMark(manager: SessionManager, session: SessionSta /** Convert an OpenClaw llm_input event into the buffered request record. */ function createInputRecord(session: SessionState, event: PluginHookLlmInputEvent): LlmInputRecord { return { - sessionKey: session.sessionId, + sessionOwnerKey: session.ownerKey, sessionId: event.sessionId, runId: event.runId, provider: event.provider, @@ -790,7 +794,7 @@ function existingSessionForMessageWrite( event: PluginHookBeforeMessageWriteEvent, ctx: PluginHookBeforeMessageWriteContext, ): SessionState | undefined { - const key = resolveSessionKey(manager.state, { + const key = resolveSessionOwnerKey(manager.state, { sessionKey: event.sessionKey ?? ctx.sessionKey, }); if (key === undefined) { @@ -807,7 +811,7 @@ function existingSessionForMessageWrite( /** Build a minimal request placeholder when only an llm_output hook is available. */ function placeholderInputRecord(record: PendingLlmOutputRecord): LlmInputRecord { return { - sessionKey: record.sessionKey, + sessionOwnerKey: record.sessionOwnerKey, sessionId: record.sessionId, runId: record.runId, provider: record.provider, @@ -1185,7 +1189,7 @@ function incrementHookLlmOutputReplayCount(session: SessionState, runId: string /** Build the per-session run key used for trajectory de-duplication. */ function trajectoryRunKey(session: SessionState, runId?: string): string { - return runId ?? session.sessionId; + return runId ?? session.ownerKey; } /** Normalize provider usage into NeMo Relay token and cost fields. */ @@ -1291,7 +1295,7 @@ function latestUnendedRecord( } for (let index = records.length - 1; index >= 0; index -= 1) { const record = records[index]; - if (record?.sessionKey === session.sessionId && record.endedAtMs === undefined) { + if (record?.sessionOwnerKey === session.ownerKey && record.endedAtMs === undefined) { return record; } } diff --git a/integrations/openclaw/src/hook-replay/session.ts b/integrations/openclaw/src/hook-replay/session.ts index 8b761d44..215a4012 100644 --- a/integrations/openclaw/src/hook-replay/session.ts +++ b/integrations/openclaw/src/hook-replay/session.ts @@ -9,7 +9,10 @@ * those identifiers and owns the root `openclaw.session` scope lifecycle. */ import type { NemoRelayHookBackendConfig } from '../config.js'; -import { evictExpiredRecords, tupleKey as tupleKeyFromCorrelation } from './correlation.js'; +import { + evictExpiredRecords, + tupleKey as tupleKeyFromCorrelation, +} from './correlation.js'; import type { PluginHookAgentContext, PluginHookLlmOutputEvent, @@ -30,12 +33,17 @@ export type SessionLookupInput = { export type EnsureSessionInput = SessionLookupInput & { agentId?: string | undefined; + parentHandle?: ReturnType | undefined; + scopeRole?: 'subagent' | undefined; source: 'session_start' | 'lazy_session'; resumedFrom?: string | undefined; timestamp?: number | undefined; + deferRootOpen?: boolean | undefined; }; export type SessionState = { + /** Immutable internal owner key used for replay buffers and alias lookups. */ + ownerKey: string; sessionId: string; sessionKey?: string; agentId?: string; @@ -52,10 +60,15 @@ export type SessionState = { assistantMessageWrites?: AssistantMessageRecord[]; stack: ReturnType; rootHandle?: ReturnType; + scopeRole?: 'subagent'; + pendingRootOpen?: boolean; + pendingRootTimestampMicros?: number; + pendingRootRunId?: string; + pendingCapturedEmits?: Array<{ label: string; emit: () => void }>; }; export type PendingLlmOutputRecord = { - sessionKey: string; + sessionOwnerKey: string; sessionId: string; runId: string; provider: string; @@ -67,7 +80,7 @@ export type PendingLlmOutputRecord = { }; export type LlmInputRecord = { - sessionKey: string; + sessionOwnerKey: string; sessionId: string; runId: string; provider: string; @@ -81,7 +94,7 @@ export type LlmInputRecord = { }; export type AssistantMessageRecord = { - sessionKey: string; + sessionOwnerKey: string; provider: string; model: string; assistantTexts: string[]; @@ -94,7 +107,7 @@ export type AssistantMessageRecord = { }; export type ModelCallRecord = { - sessionKey: string; + sessionOwnerKey: string; runId: string; callId: string; provider: string; @@ -145,8 +158,32 @@ export type SessionManager = { replayPendingLlmOutputsForSession: (session: SessionState, options: { allowPlaceholderRequest: boolean }) => void; emitUnpairedModelCallTimingMarks: (session: SessionState) => void; logBoundedWarn: (key: string, message: string) => void; + resolveSessionRootContext?: (input: EnsureSessionInput) => Partial | undefined; }; +/** Merge lineage/root-context defaults without letting undefined hook fields erase them. */ +function mergeEnsureSessionInput( + input: EnsureSessionInput, + context: Partial | undefined, +): EnsureSessionInput { + if (!context) { + return input; + } + + const merged: Record = { ...context, ...input }; + for (const [key, value] of Object.entries(context)) { + if (merged[key] === undefined && value !== undefined) { + merged[key] = value; + } + } + return merged as EnsureSessionInput; +} + +/** Return the session key that belongs to the session itself, not a requester alias. */ +function ownSessionKey(input: SessionLookupInput): string | undefined { + return input.sessionKey ?? input.childSessionKey; +} + /** Return all keys that may identify an existing OpenClaw session. */ export function lookupSessionKeys(input: SessionLookupInput): string[] { return [input.sessionId, input.sessionKey, input.requesterSessionKey, input.childSessionKey, input.runId].filter( @@ -154,15 +191,15 @@ export function lookupSessionKeys(input: SessionLookupInput): string[] { ); } -/** Return keys that should alias to a canonical session once it is known. */ +/** Return keys that should alias to one stable internal session owner. */ export function aliasSessionKeys(input: SessionLookupInput): string[] { - return [input.sessionId, input.sessionKey, input.requesterSessionKey, input.runId].filter( + return [input.sessionId, input.sessionKey, input.childSessionKey, input.requesterSessionKey, input.runId].filter( (value): value is string => typeof value === 'string' && value.length > 0, ); } -/** Resolve a hook's session identity to the canonical session id used in replay state. */ -export function resolveSessionKey(state: HookReplayBackendState, input: SessionLookupInput): string | undefined { +/** Resolve a hook's session identity to the stable owner key used in replay state. */ +export function resolveSessionOwnerKey(state: HookReplayBackendState, input: SessionLookupInput): string | undefined { for (const key of lookupSessionKeys(input)) { const canonical = state.sessionAliases.get(key); if (canonical) { @@ -173,14 +210,14 @@ export function resolveSessionKey(state: HookReplayBackendState, input: SessionL return input.sessionId ?? input.sessionKey ?? input.childSessionKey ?? input.runId; } -/** Remember equivalent hook identifiers so later events attach to the same root span. */ +/** Remember equivalent hook identifiers so later events attach to the same owner and root span. */ export function rememberSessionAliases( state: HookReplayBackendState, session: SessionState, input: SessionLookupInput, ): void { for (const alias of aliasSessionKeys(input)) { - state.sessionAliases.set(alias, session.sessionId); + state.sessionAliases.set(alias, session.ownerKey); } } @@ -205,55 +242,68 @@ export function createHookReplayState(): HookReplayBackendState { /** Return an existing session or lazily create a root session scope for replay. */ export function ensureSession(manager: SessionManager, input: EnsureSessionInput): SessionState | undefined { - const key = resolveSessionKey(manager.state, input); - if (!key) { + const resolvedInput = mergeEnsureSessionInput(input, manager.resolveSessionRootContext?.(input)); + const ownerKey = resolveSessionOwnerKey(manager.state, resolvedInput); + if (!ownerKey) { manager.state.counters.skippedEvents += 1; manager.logBoundedWarn('missing-session-key', 'nemo-relay skipped replay because no session/run key was available'); return undefined; } - const existing = manager.state.sessions.get(key); + const existing = manager.state.sessions.get(ownerKey); if (existing) { - enrichSession(existing, input); - rememberSessionAliases(manager.state, existing, input); + enrichSession(existing, resolvedInput); + rememberSessionAliases(manager.state, existing, resolvedInput); + if (!existing.rootHandle && resolvedInput.deferRootOpen !== true) { + materializeSessionRoot(manager, existing, resolvedInput); + } return existing; } - const canonicalSessionId = input.sessionId ?? key; - const aliased = manager.state.sessions.get(canonicalSessionId); - if (aliased) { - enrichSession(aliased, input); - rememberSessionAliases(manager.state, aliased, input); - return aliased; - } - const stack = manager.nf.createScopeStack(); const session: SessionState = { - sessionId: canonicalSessionId, - source: input.source, + ownerKey, + sessionId: resolvedInput.sessionId ?? ownerKey, + source: resolvedInput.source, stack, }; - if (input.sessionKey !== undefined) { - session.sessionKey = input.sessionKey; + const sessionKey = ownSessionKey(resolvedInput); + if (sessionKey !== undefined) { + session.sessionKey = sessionKey; } - if (input.agentId !== undefined) { - session.agentId = input.agentId; + if (resolvedInput.agentId !== undefined) { + session.agentId = resolvedInput.agentId; } - if (input.resumedFrom !== undefined) { - session.resumedFrom = input.resumedFrom; + if (resolvedInput.resumedFrom !== undefined) { + session.resumedFrom = resolvedInput.resumedFrom; + } + if (resolvedInput.scopeRole !== undefined) { + session.scopeRole = resolvedInput.scopeRole; } - openSessionRoot(manager, session, input); - manager.state.sessions.set(session.sessionId, session); - rememberSessionAliases(manager.state, session, input); + if (resolvedInput.deferRootOpen === true) { + session.pendingRootOpen = true; + if (resolvedInput.timestamp !== undefined) { + session.pendingRootTimestampMicros = resolvedInput.timestamp; + } + if (resolvedInput.runId !== undefined) { + session.pendingRootRunId = resolvedInput.runId; + } + session.pendingCapturedEmits = []; + } else { + materializeSessionRoot(manager, session, resolvedInput); + } + manager.state.sessions.set(session.ownerKey, session); + rememberSessionAliases(manager.state, session, resolvedInput); return session; } /** Fill stable session identifiers from later hooks without clobbering established values. */ function enrichSession(session: SessionState, input: EnsureSessionInput): void { - if (session.sessionKey === undefined && input.sessionKey !== undefined) { - session.sessionKey = input.sessionKey; + const sessionKey = ownSessionKey(input); + if (session.sessionKey === undefined && sessionKey !== undefined) { + session.sessionKey = sessionKey; } if (session.agentId === undefined && input.agentId !== undefined) { session.agentId = input.agentId; @@ -261,6 +311,33 @@ function enrichSession(session: SessionState, input: EnsureSessionInput): void { if (session.resumedFrom === undefined && input.resumedFrom !== undefined) { session.resumedFrom = input.resumedFrom; } + if (session.scopeRole === undefined && input.scopeRole !== undefined) { + session.scopeRole = input.scopeRole; + } + if (!session.rootHandle && input.sessionId !== undefined) { + session.sessionId = input.sessionId; + } + if (!session.rootHandle && session.source === 'lazy_session' && input.source === 'session_start') { + session.source = 'session_start'; + } + if (!session.rootHandle && input.timestamp !== undefined) { + if (input.source === 'session_start' || session.pendingRootTimestampMicros === undefined) { + session.pendingRootTimestampMicros = input.timestamp; + } + } + if (!session.rootHandle && session.pendingRootRunId === undefined && input.runId !== undefined) { + session.pendingRootRunId = input.runId; + } +} + +/** Queue an emit callback until a deferred session root can open with honest lineage. */ +export function queueCapturedEmit(session: SessionState, label: string, emit: () => void): boolean { + if (session.rootHandle || session.pendingRootOpen !== true) { + return false; + } + + (session.pendingCapturedEmits ??= []).push({ label, emit }); + return true; } /** Flush pending LLM output/timing state before the root session closes. */ @@ -294,7 +371,7 @@ export function closeSessionRoot( /** Remove a closed session from active replay state. */ export function deleteSession(state: HookReplayBackendState, session: SessionState): void { - state.sessions.delete(session.sessionId); + state.sessions.delete(session.ownerKey); } /** Insert a correlation record while bounding retained entries per key. */ @@ -320,14 +397,23 @@ export function evictExpiredCorrelationRecords(state: HookReplayBackendState, no evictExpiredRecords(state.modelTimingsByLlmKey, nowMs, ttlMs); } -/** Open the root NeMo Relay scope for one OpenClaw session and emit session_start. */ -function openSessionRoot(manager: SessionManager, session: SessionState, input: EnsureSessionInput): void { +/** Open a deferred or new root session scope and flush queued child emissions. */ +export function materializeSessionRoot(manager: SessionManager, session: SessionState, input: EnsureSessionInput): void { + if (session.rootHandle) { + return; + } + + enrichSession(session, input); + + const timestampMicros = input.timestamp ?? session.pendingRootTimestampMicros ?? null; + const rootRunId = input.runId ?? session.pendingRootRunId; + const data: JsonRecord = { sessionId: session.sessionId, source: session.source, ...(session.sessionKey === undefined ? {} : { sessionKey: session.sessionKey }), ...(session.agentId === undefined ? {} : { agentId: session.agentId }), - ...(input.runId === undefined ? {} : { runId: input.runId }), + ...(rootRunId === undefined ? {} : { runId: rootRunId }), ...(session.resumedFrom === undefined ? {} : { resumedFrom: session.resumedFrom }), }; const metadata = toJsonRecord({ @@ -336,30 +422,44 @@ function openSessionRoot(manager: SessionManager, session: SessionState, input: sessionId: session.sessionId, sessionKey: session.sessionKey, agentId: session.agentId, - runId: input.runId, + runId: rootRunId, + nemo_relay_scope_role: session.scopeRole, }); - manager.emitCapturedUnderSession('session_start', session, () => { + const previousStack = manager.nf.currentScopeStack(); + try { + manager.nf.setThreadScopeStack(session.stack); session.rootHandle = manager.nf.pushScope( 'openclaw.session', agentScopeType(manager.nf), - null, + input.parentHandle ?? null, null, data, metadata, data, - input.timestamp ?? null, + timestampMicros, ); - manager.nf.event('openclaw.session_start', session.rootHandle, data, metadata, input.timestamp ?? null); + manager.nf.event('openclaw.session_start', session.rootHandle, data, metadata, timestampMicros); manager.state.counters.marksEmitted += 1; - }); + } finally { + manager.nf.setThreadScopeStack(previousStack); + } + + delete session.pendingRootOpen; + delete session.pendingRootTimestampMicros; + delete session.pendingRootRunId; + const pendingEmits = session.pendingCapturedEmits ?? []; + delete session.pendingCapturedEmits; + for (const pending of pendingEmits) { + manager.emitCapturedUnderSession(pending.label, session, pending.emit); + } } /** Cancel timers that would otherwise replay late LLM outputs after session close. */ function cancelPendingLlmOutputTimers(state: HookReplayBackendState, session: SessionState): void { for (const records of state.llmOutputsPendingInput.values()) { for (const record of records) { - if (record.sessionKey === session.sessionId && record.timer) { + if (record.sessionOwnerKey === session.ownerKey && record.timer) { clearTimeout(record.timer); record.timer = undefined; } @@ -369,22 +469,22 @@ function cancelPendingLlmOutputTimers(state: HookReplayBackendState, session: Se /** Remove all correlation records and aliases owned by a closed session. */ function evictSessionCorrelationRecords(state: HookReplayBackendState, session: SessionState): void { - evictFromRecordMap(state.llmInputs, session.sessionId); - evictFromRecordMap(state.llmOutputsPendingInput, session.sessionId); - evictFromRecordMap(state.modelCallsByCallId, session.sessionId); - evictFromRecordMap(state.modelTimingsByLlmKey, session.sessionId); + evictFromRecordMap(state.llmInputs, session.ownerKey); + evictFromRecordMap(state.llmOutputsPendingInput, session.ownerKey); + evictFromRecordMap(state.modelCallsByCallId, session.ownerKey); + evictFromRecordMap(state.modelTimingsByLlmKey, session.ownerKey); - for (const [alias, canonical] of state.sessionAliases) { - if (canonical === session.sessionId || alias === session.sessionId) { + for (const [alias, ownerKey] of state.sessionAliases) { + if (ownerKey === session.ownerKey) { state.sessionAliases.delete(alias); } } } /** Drop records for one session from a single keyed correlation map. */ -function evictFromRecordMap(map: Map, sessionKey: string): void { +function evictFromRecordMap(map: Map, ownerKey: string): void { for (const [key, records] of map) { - const retained = records.filter((record) => record.sessionKey !== sessionKey); + const retained = records.filter((record) => record.sessionOwnerKey !== ownerKey); if (retained.length === 0) { map.delete(key); } else { diff --git a/integrations/openclaw/src/hook-replay/tool.ts b/integrations/openclaw/src/hook-replay/tool.ts index 1e1a21c9..acae9391 100644 --- a/integrations/openclaw/src/hook-replay/tool.ts +++ b/integrations/openclaw/src/hook-replay/tool.ts @@ -22,12 +22,14 @@ export async function guardBeforeToolCall( event: PluginHookBeforeToolCallEvent, ctx: PluginHookToolContext, ): Promise { + const observedAtMicros = nowMicros(); const session = ensureSession(manager, { sessionId: ctx.sessionId, sessionKey: ctx.sessionKey, runId: event.runId ?? ctx.runId, agentId: ctx.agentId, source: 'lazy_session', + timestamp: observedAtMicros, }); const args = toJsonValue(event.params ?? {}); @@ -50,12 +52,15 @@ export function replayAfterToolCall( event: PluginHookAfterToolCallEvent, ctx: PluginHookToolContext, ): void { + const endMicros = nowMicros(); + const sessionTimestamp = startMicrosFromDuration(endMicros, event.durationMs) ?? endMicros; const session = ensureSession(manager, { sessionId: ctx.sessionId, sessionKey: ctx.sessionKey, runId: event.runId ?? ctx.runId, agentId: ctx.agentId, source: 'lazy_session', + timestamp: sessionTimestamp, }); const blockedDetails = blockedToolDetails(event, { runId: event.runId ?? ctx.runId }); @@ -76,6 +81,7 @@ export function replayAfterToolCall( runId: event.runId ?? ctx.runId, toolCallId: event.toolCallId ?? ctx.toolCallId, }), + timestamp: endMicros, }); }); return; @@ -85,7 +91,6 @@ export function replayAfterToolCall( return; } - const endMicros = nowMicros(); const metadata = toJsonRecord({ source: 'openclaw.after_tool_call', runId: event.runId ?? ctx.runId, diff --git a/integrations/openclaw/src/hooks-backend.ts b/integrations/openclaw/src/hooks-backend.ts index 09cca479..e72f5c6c 100644 --- a/integrations/openclaw/src/hooks-backend.ts +++ b/integrations/openclaw/src/hooks-backend.ts @@ -10,7 +10,7 @@ */ import type { NemoRelayHookBackendConfig } from './config.js'; import { emitMark, toJsonRecord } from './hook-replay/marks.js'; -import { llmKey } from './hook-replay/correlation.js'; +import { llmKey, nowMicros } from './hook-replay/correlation.js'; import { emitUnpairedModelCallTimingMarks, recordBeforeMessageWrite, @@ -28,7 +28,9 @@ import { closeSessionRoot, deleteSession, ensureSession, - resolveSessionKey, + materializeSessionRoot, + queueCapturedEmit, + resolveSessionOwnerKey, type HookReplayBackendState, type SessionLookupInput, type SessionState, @@ -66,6 +68,14 @@ export type HookReplayBackendOptions = { agentVersion: string; }; +type PendingSubagentLineage = { + childSessionKey: string; + requesterSessionKey: string; + runId?: string; + agentId?: string; + observedAtMs: number; +}; + /** Replays OpenClaw public hook events into NeMo Relay scopes, spans, and marks. */ export class HookReplayBackend { private readonly nf: NemoRelayRuntimeModule; @@ -74,6 +84,8 @@ export class HookReplayBackend { private readonly agentVersion: string; private readonly stateValue = createHookReplayState(); private readonly warningCounts = new Map(); + private readonly pendingSubagentLineageByChildSessionKey = new Map(); + private readonly pendingSubagentChildKeyByRunId = new Map(); constructor(options: HookReplayBackendOptions) { this.nf = options.nf; @@ -95,14 +107,18 @@ export class HookReplayBackend { /** Open or alias an explicit OpenClaw session root. */ onSessionStart(event: PluginHookSessionStartEvent, ctx: PluginHookSessionContext): void { - this.ensureSession({ + const observedAtMicros = nowMicros(); + const session = this.ensureSession({ sessionId: event.sessionId, sessionKey: event.sessionKey ?? ctx.sessionKey, agentId: ctx.agentId, source: 'session_start', resumedFrom: event.resumedFrom, + timestamp: observedAtMicros, }); + this.promoteDeferredSubagentSessionsForRequester(session?.sessionKey ?? event.sessionKey ?? ctx.sessionKey); + // ensureSession opens the root scope and emits openclaw.session_start for both explicit and lazy sessions. } @@ -169,12 +185,14 @@ export class HookReplayBackend { /** Finalize one agent run, replaying message-write trajectory when needed. */ onAgentEnd(event: PluginHookAgentEndEvent, ctx: PluginHookAgentContext): void { + const observedAtMicros = nowMicros(); const session = this.ensureSession({ sessionId: ctx.sessionId, sessionKey: ctx.sessionKey, runId: event.runId ?? ctx.runId, agentId: ctx.agentId, source: 'lazy_session', + timestamp: observedAtMicros, }); if (!session) { @@ -196,17 +214,20 @@ export class HookReplayBackend { durationMs: event.durationMs, messageCount: event.messages.length, }), + observedAtMicros, ); } /** Remember the last assistant text before OpenClaw finalizes the response. */ onBeforeAgentFinalize(event: PluginHookBeforeAgentFinalizeEvent, ctx: PluginHookAgentContext): void { + const observedAtMicros = nowMicros(); const session = this.ensureSession({ sessionId: event.sessionId, sessionKey: event.sessionKey ?? ctx.sessionKey, runId: event.runId ?? ctx.runId, agentId: ctx.agentId, source: 'lazy_session', + timestamp: observedAtMicros, }); if (!session) { @@ -234,21 +255,24 @@ export class HookReplayBackend { stopHookActive: event.stopHookActive, messageCount: event.messages?.length, }), + observedAtMicros, ); } /** Attach subagent spawn metadata to the requester session when possible. */ onSubagentSpawned(event: PluginHookSubagentSpawnedEvent, ctx: PluginHookSubagentContext): void { + const observedAtMicros = nowMicros(); + this.trackPendingSubagentLineage(event, ctx, Math.trunc(observedAtMicros / 1000)); + const requesterSession = this.ensureRequesterSessionAnchor(ctx.requesterSessionKey, observedAtMicros); + const childSessionKey = ctx.childSessionKey ?? event.childSessionKey; const session = + requesterSession ?? this.ensureSession({ - requesterSessionKey: ctx.requesterSessionKey, - source: 'lazy_session', - }) ?? - this.ensureSession({ - childSessionKey: ctx.childSessionKey ?? event.childSessionKey, + childSessionKey, runId: ctx.runId ?? event.runId, agentId: event.agentId, source: 'lazy_session', + timestamp: observedAtMicros, }); if (!session) { @@ -266,26 +290,36 @@ export class HookReplayBackend { mode: event.mode, threadRequested: event.threadRequested, }), + observedAtMicros, ); + + if (!ctx.requesterSessionKey || requesterSession?.rootHandle) { + this.promoteDeferredSubagentSession(event.childSessionKey); + } } /** Attach subagent completion metadata to the requester or child session. */ onSubagentEnded(event: PluginHookSubagentEndedEvent, ctx: PluginHookSubagentContext): void { + const observedAtMicros = nowMicros(); const session = this.ensureSession({ requesterSessionKey: ctx.requesterSessionKey, source: 'lazy_session', + timestamp: observedAtMicros, }) ?? this.ensureSession({ childSessionKey: ctx.childSessionKey ?? event.targetSessionKey, runId: ctx.runId ?? event.runId, source: 'lazy_session', + timestamp: observedAtMicros, }); if (!session) { return; } + this.materializeDeferredSessionRoot(session); + this.emitSessionMark( 'openclaw.subagent_ended', session, @@ -300,6 +334,7 @@ export class HookReplayBackend { sendFarewell: event.sendFarewell, accountId: event.accountId, }), + observedAtMicros, ); } @@ -310,12 +345,12 @@ export class HookReplayBackend { /** Close one session selected by a runtime lifecycle cleanup hook. */ async cleanupSession(input: SessionLookupInput & { reason: string }): Promise { - const key = resolveSessionKey(this.stateValue, input); - if (!key) { + const ownerKey = resolveSessionOwnerKey(this.stateValue, input); + if (!ownerKey) { return; } - const session = this.stateValue.sessions.get(key); + const session = this.stateValue.sessions.get(ownerKey); if (!session) { return; } @@ -356,6 +391,10 @@ export class HookReplayBackend { /** Emit spans/marks under the stored session scope stack and ATIF capture window. */ emitCapturedUnderSession(label: string, session: SessionState, emit: () => void): void { + if (queueCapturedEmit(session, label, emit)) { + return; + } + this.safeReplay(label, session, () => { const previousStack = this.nf.currentScopeStack(); try { @@ -384,16 +423,18 @@ export class HookReplayBackend { /** Drain, close, export, and delete one session. */ private async closeSession(session: SessionState, summary: JsonRecord, metadata?: JsonRecord): Promise { + this.materializeDeferredSessionRoot(session); drainSession(this.sessionManager(), session); closeSessionRoot(this.sessionManager(), session, summary, session.finalOutput ?? summary, metadata); this.flushSubscriberDelivery('session_close'); + this.forgetPendingSubagentLineage(session); deleteSession(this.stateValue, session); } /** Emit a session-level OpenClaw lifecycle mark. */ - private emitSessionMark(name: string, session: SessionState, data: JsonRecord): void { + private emitSessionMark(name: string, session: SessionState, data: JsonRecord, timestampMicros?: number): void { this.emitCapturedUnderSession(name, session, () => { - emitMark({ + const params: Parameters[0] = { nf: this.nf, state: this.stateValue, session, @@ -407,7 +448,13 @@ export class HookReplayBackend { agentId: session.agentId, runId: typeof data.runId === 'string' ? data.runId : undefined, }), - }); + }; + + if (timestampMicros !== undefined) { + params.timestamp = timestampMicros; + } + + emitMark(params); }); } @@ -444,9 +491,202 @@ export class HookReplayBackend { this.replayPendingLlmOutputsForSession(session, options), emitUnpairedModelCallTimingMarks: (session: SessionState) => this.emitUnpairedModelCallTimingMarks(session), logBoundedWarn: (key: string, message: string) => this.logBoundedWarn(key, message), + resolveSessionRootContext: (input: Parameters[1]) => this.resolveSessionRootContext(input), }; } + /** Prefer nested child scopes only when the hook surface provides real subagent lineage. */ + private resolveSessionRootContext(input: Parameters[1]): Partial[1]> | undefined { + this.pruneExpiredPendingSubagentLineage(microsToMs(input.timestamp) ?? Date.now()); + const lineage = this.resolvePendingSubagentLineage(input); + if (lineage) { + const parentSession = this.resolveTrackedSession({ requesterSessionKey: lineage.requesterSessionKey }); + return { + childSessionKey: input.childSessionKey ?? lineage.childSessionKey, + runId: input.runId ?? lineage.runId, + agentId: input.agentId ?? lineage.agentId, + scopeRole: 'subagent', + parentHandle: parentSession?.rootHandle, + deferRootOpen: input.deferRootOpen ?? (parentSession?.rootHandle ? false : true), + }; + } + + if (this.isDocumentedSubagentSessionKey(input.sessionKey ?? input.childSessionKey)) { + return { + scopeRole: 'subagent', + deferRootOpen: input.deferRootOpen ?? true, + }; + } + + return undefined; + } + + /** Track stable parent/child lineage from subagent hooks until child session hooks can use it. */ + private trackPendingSubagentLineage( + event: PluginHookSubagentSpawnedEvent, + ctx: PluginHookSubagentContext, + observedAtMs: number, + ): void { + this.pruneExpiredPendingSubagentLineage(observedAtMs); + const requesterSessionKey = ctx.requesterSessionKey?.trim(); + const childSessionKey = (ctx.childSessionKey ?? event.childSessionKey)?.trim(); + if (!requesterSessionKey || !childSessionKey) { + return; + } + + this.pendingSubagentLineageByChildSessionKey.set(childSessionKey, { + childSessionKey, + requesterSessionKey, + runId: ctx.runId ?? event.runId, + agentId: event.agentId, + observedAtMs, + }); + if (ctx.runId ?? event.runId) { + this.pendingSubagentChildKeyByRunId.set(ctx.runId ?? event.runId, childSessionKey); + } + } + + /** Resolve the requester session if it exists, or seed a deferred lazy root placeholder for later promotion. */ + private ensureRequesterSessionAnchor(requesterSessionKey: string | undefined, timestampMicros?: number): SessionState | undefined { + const trimmedRequesterSessionKey = requesterSessionKey?.trim(); + if (!trimmedRequesterSessionKey) { + return undefined; + } + + return ( + this.resolveTrackedSession({ requesterSessionKey: trimmedRequesterSessionKey }) ?? + this.ensureSession({ + sessionKey: trimmedRequesterSessionKey, + requesterSessionKey: trimmedRequesterSessionKey, + source: 'lazy_session', + timestamp: timestampMicros, + deferRootOpen: true, + }) + ); + } + + /** Open a deferred child session root once the requester scope is known. */ + private promoteDeferredSubagentSession(childSessionKey: string): void { + const session = this.resolveTrackedSession({ sessionKey: childSessionKey, childSessionKey }); + if (!session) { + return; + } + + this.materializeDeferredSessionRoot(session); + } + + /** Promote any deferred child sessions waiting on a requester root once it exists. */ + private promoteDeferredSubagentSessionsForRequester(requesterSessionKey: string | undefined): void { + const trimmedRequesterSessionKey = requesterSessionKey?.trim(); + if (!trimmedRequesterSessionKey) { + return; + } + + const requesterSession = this.resolveTrackedSession({ requesterSessionKey: trimmedRequesterSessionKey }); + if (!requesterSession?.rootHandle) { + return; + } + + for (const lineage of this.pendingSubagentLineageByChildSessionKey.values()) { + if (lineage.requesterSessionKey === trimmedRequesterSessionKey) { + this.promoteDeferredSubagentSession(lineage.childSessionKey); + } + } + } + + /** Materialize one deferred session root with nested lineage when available. */ + private materializeDeferredSessionRoot(session: SessionState): void { + if (session.rootHandle) { + return; + } + + this.pruneExpiredPendingSubagentLineage(Date.now()); + const lineage = this.resolvePendingSubagentLineage({ + sessionId: session.sessionId, + sessionKey: session.sessionKey, + }); + const parentSession = + lineage === undefined + ? undefined + : this.ensureRequesterSessionAnchor(lineage.requesterSessionKey, session.pendingRootTimestampMicros); + if (parentSession && !parentSession.rootHandle) { + materializeSessionRoot(this.sessionManager(), parentSession, { + sessionId: parentSession.sessionId, + sessionKey: parentSession.sessionKey ?? lineage?.requesterSessionKey, + source: parentSession.source, + resumedFrom: parentSession.resumedFrom, + timestamp: parentSession.pendingRootTimestampMicros, + }); + } + const parentHandle = parentSession?.rootHandle; + + materializeSessionRoot(this.sessionManager(), session, { + sessionId: session.sessionId, + sessionKey: session.sessionKey, + runId: lineage?.runId, + agentId: session.agentId ?? lineage?.agentId, + source: session.source, + resumedFrom: session.resumedFrom, + scopeRole: session.scopeRole, + parentHandle, + }); + } + + /** Resolve stable subagent lineage from child session key first, then run id as a fallback. */ + private resolvePendingSubagentLineage(input: SessionLookupInput): PendingSubagentLineage | undefined { + const childSessionKey = [input.sessionKey, input.childSessionKey] + .find((value): value is string => this.isDocumentedSubagentSessionKey(value)) + ?.trim(); + if (childSessionKey) { + return this.pendingSubagentLineageByChildSessionKey.get(childSessionKey); + } + + const runChildSessionKey = + typeof input.runId === 'string' && input.runId.length > 0 ? this.pendingSubagentChildKeyByRunId.get(input.runId) : undefined; + return runChildSessionKey === undefined ? undefined : this.pendingSubagentLineageByChildSessionKey.get(runChildSessionKey); + } + + /** Drop stale pending lineage entries so abandoned subagent spawns do not accumulate indefinitely. */ + private pruneExpiredPendingSubagentLineage(nowMs: number): void { + const ttlMs = this.config.correlation.recordTtlMs; + for (const [childSessionKey, lineage] of this.pendingSubagentLineageByChildSessionKey) { + if (nowMs - lineage.observedAtMs > ttlMs) { + this.pendingSubagentLineageByChildSessionKey.delete(childSessionKey); + } + } + + for (const [runId, childSessionKey] of this.pendingSubagentChildKeyByRunId) { + if (!this.pendingSubagentLineageByChildSessionKey.has(childSessionKey)) { + this.pendingSubagentChildKeyByRunId.delete(runId); + } + } + } + + /** Resolve one session through the same alias map used by the replay state. */ + private resolveTrackedSession(input: SessionLookupInput): SessionState | undefined { + const ownerKey = resolveSessionOwnerKey(this.stateValue, input); + return ownerKey === undefined ? undefined : this.stateValue.sessions.get(ownerKey); + } + + /** Free lineage bookkeeping once the child session is closed. */ + private forgetPendingSubagentLineage(session: SessionState): void { + const childSessionKey = session.sessionKey; + if (childSessionKey) { + this.pendingSubagentLineageByChildSessionKey.delete(childSessionKey); + } + + for (const [runId, trackedChildSessionKey] of this.pendingSubagentChildKeyByRunId) { + if (trackedChildSessionKey === childSessionKey) { + this.pendingSubagentChildKeyByRunId.delete(runId); + } + } + } + + /** Match the documented native subagent session key shape without depending on private OpenClaw internals. */ + private isDocumentedSubagentSessionKey(value?: string): value is string { + return typeof value === 'string' && /^agent:[^:]+:subagent:/.test(value); + } + /** Log one warning per key to avoid noisy repeated hook failures. */ private logBoundedWarn(key: string, message: string): void { const count = this.warningCounts.get(key) ?? 0; @@ -459,12 +699,16 @@ export class HookReplayBackend { export { llmKey }; -/** Expose session-key resolution for tests without exporting the full session module. */ -export function resolveBackendSessionKey( +/** Expose owner-key resolution for tests without exporting the full session module. */ +export function resolveBackendSessionOwnerKey( state: HookReplayBackendState, - input: Parameters[1], + input: Parameters[1], ): string | undefined { - return resolveSessionKey(state, input); + return resolveSessionOwnerKey(state, input); +} + +function microsToMs(timestampMicros: number | undefined): number | undefined { + return timestampMicros === undefined ? undefined : Math.trunc(timestampMicros / 1000); } /** Build the lifecycle summary stored as the session_end mark payload. */ diff --git a/integrations/openclaw/test/hooks-backend.test.ts b/integrations/openclaw/test/hooks-backend.test.ts index 06f3bfd3..2537ff47 100644 --- a/integrations/openclaw/test/hooks-backend.test.ts +++ b/integrations/openclaw/test/hooks-backend.test.ts @@ -221,6 +221,38 @@ describe('HookReplayBackend', () => { assertNoOverclaimedHookMetadata(nf.calls.event[1]?.metadata); }); + it('backdates a lazy session root to tool start when after_tool_call is the first hook', () => { + const nf = createNemoRelayRuntime(); + const backend = createBackend(nf); + + withMockedNow([1000], () => { + backend.onAfterToolCall( + { + toolName: 'read_file', + params: { path: '/workspace/file.txt' }, + toolCallId: 'tool-call-1', + runId: 'run-1', + result: { text: 'ok' }, + durationMs: 7, + }, + { + runId: 'run-1', + sessionId: 'session-1', + sessionKey: 'session-key-1', + toolCallId: 'tool-call-1', + }, + ); + }); + + assert.equal(nf.calls.pushScope.length, 1); + assert.equal(nf.calls.pushScope[0]?.timestamp, 993_000); + assert.deepEqual( + nf.calls.event.map((event) => event.name), + ['openclaw.session_start'], + ); + assert.equal(nf.calls.event[0]?.timestamp, 993_000); + }); + it('safe replay restores the previous scope stack and fails open', () => { const nf = createNemoRelayRuntime(); const backend = createBackend(nf); @@ -388,24 +420,527 @@ describe('HookReplayBackend', () => { assert.equal(nf.calls.event[1]?.handle, nf.calls.event[0]?.handle); }); - it('uses child session key as a lazy-session fallback without aliasing it away', () => { + it('nests child session activity under the requester when subagent_spawned arrives first', () => { const nf = createNemoRelayRuntime(); const backend = createBackend(nf); - backend.onSubagentSpawned( + withMockedNow([1000, 2000, 3000, 4000], () => { + backend.onSubagentSpawned( + { + childSessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + mode: 'run', + threadRequested: false, + runId: 'child-run', + }, + { + requesterSessionKey: 'parent-key', + childSessionKey: 'agent:child-agent:subagent:child-key', + runId: 'child-run', + }, + ); + backend.onSessionStart( + { sessionId: 'parent-session', sessionKey: 'parent-key' }, + { sessionId: 'parent-session', sessionKey: 'parent-key' }, + ); + backend.onSessionStart( + { sessionId: 'child-session', sessionKey: 'agent:child-agent:subagent:child-key' }, + { + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + }, + ); + backend.onAgentEnd( + { + runId: 'child-run', + messages: [{ role: 'assistant', provider: 'openai', model: 'gpt', content: 'Child answer.' }], + success: true, + }, + { + runId: 'child-run', + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + }, + ); + }); + + assert.equal(nf.calls.pushScope.length, 2); + assert.equal(nf.calls.pushScope[1]?.parentHandle, nf.calls.pushScope[0]?.handle); + assert.equal(nf.calls.pushScope[1]?.timestamp, 3000 * 1000); + assert.deepEqual(nf.calls.pushScope[1]?.metadata, { + source: 'openclaw.session_start', + hook_event_name: 'session_start', + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + runId: 'child-run', + nemo_relay_scope_role: 'subagent', + }); + + const spawnMark = nf.calls.event.find((event) => event.name === 'openclaw.subagent_spawned'); + const childAgentEnd = nf.calls.event.find((event) => event.name === 'openclaw.agent_end'); + assert.equal(spawnMark?.handle, nf.calls.pushScope[0]?.handle); + assert.equal(childAgentEnd?.handle, nf.calls.pushScope[1]?.handle); + assert.equal(spawnMark?.timestamp, 1000 * 1000); + assert.equal(childAgentEnd?.timestamp, 4000 * 1000); + }); + + it('keeps spawn-first child activity deferred until the requester root exists', () => { + const nf = createNemoRelayRuntime(); + const backend = createBackend(nf); + + withMockedNow([1000, 2000, 3000, 4000], () => { + backend.onSubagentSpawned( + { + childSessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + mode: 'run', + threadRequested: false, + runId: 'child-run', + }, + { + requesterSessionKey: 'parent-key', + childSessionKey: 'agent:child-agent:subagent:child-key', + runId: 'child-run', + }, + ); + backend.onSessionStart( + { sessionId: 'child-session', sessionKey: 'agent:child-agent:subagent:child-key' }, + { + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + }, + ); + backend.onAgentEnd( + { + runId: 'child-run', + messages: [{ role: 'assistant', provider: 'openai', model: 'gpt', content: 'Child answer.' }], + success: true, + }, + { + runId: 'child-run', + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + }, + ); + backend.onSessionStart( + { sessionId: 'parent-session', sessionKey: 'parent-key' }, + { sessionId: 'parent-session', sessionKey: 'parent-key' }, + ); + }); + + assert.equal(nf.calls.pushScope.length, 2); + assert.equal(nf.calls.pushScope[1]?.parentHandle, nf.calls.pushScope[0]?.handle); + assert.equal(nf.calls.pushScope[0]?.timestamp, 4000 * 1000); + assert.equal(nf.calls.pushScope[1]?.timestamp, 2000 * 1000); + assert.deepEqual(nf.calls.pushScope[1]?.metadata, { + source: 'openclaw.session_start', + hook_event_name: 'session_start', + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + runId: 'child-run', + nemo_relay_scope_role: 'subagent', + }); + + const spawnMark = nf.calls.event.find((event) => event.name === 'openclaw.subagent_spawned'); + const childAgentEnd = nf.calls.event.find((event) => event.name === 'openclaw.agent_end'); + assert.equal(spawnMark?.handle, nf.calls.pushScope[0]?.handle); + assert.equal(spawnMark?.timestamp, 1000 * 1000); + assert.equal(childAgentEnd?.handle, nf.calls.pushScope[1]?.handle); + assert.equal(childAgentEnd?.timestamp, 3000 * 1000); + }); + + it('keeps a spawn-first child nested when the child closes before requester session_start', async () => { + const nf = createNemoRelayRuntime(); + const backend = createBackend(nf); + + withMockedNow([1000, 2000, 3000, 4000], () => { + backend.onSubagentSpawned( + { + childSessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + mode: 'run', + threadRequested: false, + runId: 'child-run', + }, + { + requesterSessionKey: 'parent-key', + childSessionKey: 'agent:child-agent:subagent:child-key', + runId: 'child-run', + }, + ); + backend.onSessionStart( + { sessionId: 'child-session', sessionKey: 'agent:child-agent:subagent:child-key' }, + { + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + }, + ); + }); + + await withMockedNow([3000], () => + backend.onSessionEnd( + { sessionId: 'child-session', sessionKey: 'agent:child-agent:subagent:child-key', messageCount: 1, reason: 'idle' }, + { sessionId: 'child-session', sessionKey: 'agent:child-agent:subagent:child-key' }, + ), + ); + + withMockedNow([4000], () => { + backend.onSessionStart( + { sessionId: 'parent-session', sessionKey: 'parent-key' }, + { sessionId: 'parent-session', sessionKey: 'parent-key' }, + ); + }); + + assert.equal(nf.calls.pushScope.length, 2); + assert.equal(nf.calls.pushScope[1]?.parentHandle, nf.calls.pushScope[0]?.handle); + assert.equal(nf.calls.pushScope[0]?.timestamp, 1000 * 1000); + assert.equal(nf.calls.pushScope[1]?.timestamp, 2000 * 1000); + assert.equal(nf.calls.popScope.length, 1); + + const sessionStartEvents = nf.calls.event.filter((event) => event.name === 'openclaw.session_start'); + const spawnMark = nf.calls.event.find((event) => event.name === 'openclaw.subagent_spawned'); + const childSessionEnd = nf.calls.event.find( + (event) => event.name === 'openclaw.session_end' && event.handle === nf.calls.pushScope[1]?.handle, + ); + assert.equal(sessionStartEvents.length, 2); + assert.equal(spawnMark?.handle, nf.calls.pushScope[0]?.handle); + assert.ok(childSessionEnd); + }); + + it('keeps session_start-first child activity on the existing child scope after subagent_spawned', () => { + const nf = createNemoRelayRuntime(); + const backend = createBackend(nf); + + withMockedNow([1000, 2000, 3000, 4000], () => { + backend.onSessionStart( + { sessionId: 'parent-session', sessionKey: 'parent-key' }, + { sessionId: 'parent-session', sessionKey: 'parent-key' }, + ); + backend.onSessionStart( + { sessionId: 'child-session', sessionKey: 'agent:child-agent:subagent:child-key' }, + { + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + }, + ); + backend.onSubagentSpawned( + { + childSessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + mode: 'run', + threadRequested: false, + runId: 'child-run', + }, + { + requesterSessionKey: 'parent-key', + childSessionKey: 'agent:child-agent:subagent:child-key', + runId: 'child-run', + }, + ); + backend.onAgentEnd( + { + runId: 'child-run', + messages: [{ role: 'assistant', provider: 'openai', model: 'gpt', content: 'Child answer.' }], + success: true, + }, + { + runId: 'child-run', + }, + ); + }); + + assert.equal(nf.calls.pushScope.length, 2); + assert.equal(nf.calls.pushScope[1]?.parentHandle, nf.calls.pushScope[0]?.handle); + assert.equal(nf.calls.pushScope[1]?.timestamp, 2000 * 1000); + assert.deepEqual(nf.calls.pushScope[1]?.metadata, { + source: 'openclaw.session_start', + hook_event_name: 'session_start', + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + runId: 'child-run', + nemo_relay_scope_role: 'subagent', + }); + + const childAgentEndEvents = nf.calls.event.filter( + (event) => event.name === 'openclaw.agent_end' && event.handle === nf.calls.pushScope[1]?.handle, + ); + assert.equal(childAgentEndEvents.length, 1); + assert.equal(childAgentEndEvents[0]?.timestamp, 4000 * 1000); + }); + + it('expires stale pending subagent lineage before a late child session_start can reuse it', () => { + const nf = createNemoRelayRuntime(); + const backend = createBackend(nf, createLogger(), { + config: parseConfig({ correlation: { recordTtlMs: 1 } }), + }); + + withMockedNow([1000, 1000, 1002], () => { + backend.onSessionStart( + { sessionId: 'parent-session', sessionKey: 'parent-key' }, + { sessionId: 'parent-session', sessionKey: 'parent-key' }, + ); + backend.onSubagentSpawned( + { + childSessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + mode: 'run', + threadRequested: false, + runId: 'child-run', + }, + { + requesterSessionKey: 'parent-key', + childSessionKey: 'agent:child-agent:subagent:child-key', + runId: 'child-run', + }, + ); + backend.onSessionStart( + { sessionId: 'child-session', sessionKey: 'agent:child-agent:subagent:child-key' }, + { + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + }, + ); + }); + + assert.equal(nf.calls.pushScope.length, 1); + assert.deepEqual( + nf.calls.event.map((event) => event.name), + ['openclaw.session_start', 'openclaw.subagent_spawned'], + ); + }); + + it('reconciles a deferred child session with later session_start before lineage promotion', () => { + const nf = createNemoRelayRuntime(); + const backend = createBackend(nf); + + withMockedNow([1000, 2000, 3000, 4000], () => { + backend.onSessionStart( + { sessionId: 'parent-session', sessionKey: 'parent-key' }, + { sessionId: 'parent-session', sessionKey: 'parent-key' }, + ); + backend.onAgentEnd( + { + runId: 'child-run', + messages: [{ role: 'assistant', provider: 'openai', model: 'gpt', content: 'Child answer.' }], + success: true, + }, + { + runId: 'child-run', + sessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + }, + ); + backend.onSessionStart( + { sessionId: 'child-session', sessionKey: 'agent:child-agent:subagent:child-key' }, + { + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + }, + ); + backend.onSubagentSpawned( + { + childSessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + mode: 'run', + threadRequested: false, + runId: 'child-run', + }, + { + requesterSessionKey: 'parent-key', + childSessionKey: 'agent:child-agent:subagent:child-key', + runId: 'child-run', + }, + ); + backend.onAgentEnd( + { + runId: 'child-run', + messages: [{ role: 'assistant', provider: 'openai', model: 'gpt', content: 'Follow-up answer.' }], + success: true, + }, + { + runId: 'child-run', + agentId: 'child-agent', + }, + ); + }); + + assert.equal(nf.calls.pushScope.length, 2); + assert.equal(nf.calls.pushScope[1]?.parentHandle, nf.calls.pushScope[0]?.handle); + assert.equal(nf.calls.pushScope[1]?.timestamp, 3000 * 1000); + + const childSessionStartIndex = nf.calls.event.findIndex((event) => event.name === 'openclaw.session_start' && event.handle === nf.calls.pushScope[1]?.handle); + const childAgentEndIndex = nf.calls.event.findIndex((event) => event.name === 'openclaw.agent_end' && event.handle === nf.calls.pushScope[1]?.handle); + const spawnMarkIndex = nf.calls.event.findIndex((event) => event.name === 'openclaw.subagent_spawned'); + const childAgentEndEvents = nf.calls.event.filter( + (event) => event.name === 'openclaw.agent_end' && event.handle === nf.calls.pushScope[1]?.handle, + ); + assert.ok(childSessionStartIndex >= 0); + assert.ok(childAgentEndIndex > childSessionStartIndex); + assert.ok(spawnMarkIndex >= 0); + assert.equal(childAgentEndEvents.length, 2); + assert.equal(nf.calls.event[spawnMarkIndex]?.handle, nf.calls.pushScope[0]?.handle); + assert.equal(nf.calls.event[childSessionStartIndex]?.timestamp, 3000 * 1000); + assert.equal(nf.calls.event[childAgentEndIndex]?.timestamp, 2000 * 1000); + assert.equal(nf.calls.event[spawnMarkIndex]?.timestamp, 4000 * 1000); + }); + + it('materializes a child root with the child session key when only child lineage is available', () => { + const nf = createNemoRelayRuntime(); + const backend = createBackend(nf); + + withMockedNow([1000], () => { + backend.onSubagentEnded( + { + targetSessionKey: 'agent:child-agent:subagent:child-key', + targetKind: 'subagent', + reason: 'completed', + runId: 'child-run', + }, + { + childSessionKey: 'agent:child-agent:subagent:child-key', + runId: 'child-run', + }, + ); + }); + + assert.equal(nf.calls.pushScope.length, 1); + assert.equal(nf.calls.pushScope[0]?.parentHandle, null); + assert.equal(nf.calls.pushScope[0]?.timestamp, 1000 * 1000); + assert.deepEqual(nf.calls.pushScope[0]?.metadata, { + source: 'openclaw.lazy_session', + sessionId: 'agent:child-agent:subagent:child-key', + sessionKey: 'agent:child-agent:subagent:child-key', + runId: 'child-run', + nemo_relay_scope_role: 'subagent', + }); + assert.deepEqual( + nf.calls.event.map((event) => event.name), + ['openclaw.session_start', 'openclaw.subagent_ended'], + ); + assert.equal(nf.calls.event[0]?.timestamp, 1000 * 1000); + assert.equal(nf.calls.event[1]?.timestamp, 1000 * 1000); + assert.equal(nf.calls.event[1]?.handle, nf.calls.pushScope[0]?.handle); + }); + + it('keeps deferred child model timing correlated when explicit session_start arrives later', async () => { + const nf = createNemoRelayRuntime(); + const backend = createBackend(nf); + + withMockedNow([1000, 2000, 3000, 4000, 5000, 6000], () => { + backend.onSessionStart( + { sessionId: 'parent-session', sessionKey: 'parent-key' }, + { sessionId: 'parent-session', sessionKey: 'parent-key' }, + ); + backend.onModelCallEnded( + { + runId: 'child-run', + callId: 'call-1', + sessionKey: 'agent:child-agent:subagent:child-key', + provider: 'openai', + model: 'gpt', + durationMs: 12, + outcome: 'completed', + }, + { + runId: 'child-run', + sessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + }, + ); + backend.onSessionStart( + { sessionId: 'child-session', sessionKey: 'agent:child-agent:subagent:child-key' }, + { + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + }, + ); + backend.onSubagentSpawned( + { + childSessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + mode: 'run', + threadRequested: false, + runId: 'child-run', + }, + { + requesterSessionKey: 'parent-key', + childSessionKey: 'agent:child-agent:subagent:child-key', + runId: 'child-run', + }, + ); + backend.onLlmInput( + { + runId: 'child-run', + sessionId: 'child-session', + provider: 'openai', + model: 'gpt', + prompt: 'hello', + historyMessages: [], + imagesCount: 0, + }, + { + runId: 'child-run', + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + }, + ); + backend.onLlmOutput( + { + runId: 'child-run', + sessionId: 'child-session', + provider: 'openai', + model: 'gpt', + assistantTexts: ['child answer'], + }, + { + runId: 'child-run', + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + }, + ); + }); + + assert.equal(nf.calls.llmCall.length, 1); + assert.deepEqual(nf.calls.llmCall[0]?.metadata, { + source: 'openclaw.llm_output', + runId: 'child-run', + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + agentId: 'child-agent', + provider: 'openai', + model: 'gpt', + callId: 'call-1', + }); + + await backend.onSessionEnd( { - childSessionKey: 'child-key', + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', + messageCount: 1, + reason: 'idle', + }, + { + sessionId: 'child-session', + sessionKey: 'agent:child-agent:subagent:child-key', agentId: 'child-agent', - mode: 'run', - threadRequested: false, - runId: 'child-run', }, - { childSessionKey: 'child-key', runId: 'child-run' }, ); - assert.ok(backend.state().sessions.get('child-key')); - assert.equal(backend.state().sessionAliases.get('child-run'), 'child-key'); - assert.equal(backend.state().sessionAliases.get('child-key'), undefined); + assert.equal(backend.state().modelCallsByCallId.size, 0); + assert.equal(backend.state().modelTimingsByLlmKey.size, 0); + assert.equal( + nf.calls.event.some((event) => event.name === 'openclaw.model_call_timing_unpaired'), + false, + ); }); it('normalizes circular replay payloads before NAPI boundaries', () => { @@ -451,9 +986,29 @@ describe('HookReplayBackend', () => { type TestNemoRelayRuntime = NemoRelayRuntimeModule & { previousStack: { id: 'previous' }; calls: { - pushScope: Array<{ name: string; scopeType: number; data: unknown; metadata: unknown; input: unknown }>; - popScope: Array<{ handle: unknown; output: unknown }>; - event: Array<{ name: string; handle: unknown; data: unknown; metadata: unknown }>; + pushScope: Array<{ + name: string; + scopeType: number; + handle: unknown; + parentHandle: unknown; + data: unknown; + metadata: unknown; + input: unknown; + timestamp: unknown; + }>; + popScope: Array<{ handle: unknown; output: unknown; timestamp: unknown }>; + event: Array<{ name: string; handle: unknown; data: unknown; metadata: unknown; timestamp: unknown }>; + llmCall: Array<{ + name: string; + handle: unknown; + request: unknown; + parentHandle: unknown; + data: unknown; + metadata: unknown; + modelName: unknown; + timestamp: unknown; + }>; + llmCallEnd: Array<{ handle: unknown; response: unknown; data: unknown; metadata: unknown; timestamp: unknown }>; setThreadScopeStack: unknown[]; toolConditionalExecution: Array<{ name: string; args: unknown }>; }; @@ -497,6 +1052,8 @@ function createNemoRelayRuntime(): TestNemoRelayRuntime { pushScope: [], popScope: [], event: [], + llmCall: [], + llmCallEnd: [], setThreadScopeStack: [], toolConditionalExecution: [], }; @@ -510,18 +1067,29 @@ function createNemoRelayRuntime(): TestNemoRelayRuntime { currentScopeStack: () => previousStack as unknown as ReturnType, setThreadScopeStack: (stack) => calls.setThreadScopeStack.push(stack), pushScope: (...args: Parameters) => { - const [name, scopeType, , , data, metadata, input] = args; + const [name, scopeType, parentHandle, , data, metadata, input, timestamp] = args; const handle = { id: `scope-${nextScopeId++}` }; - calls.pushScope.push({ name, scopeType, data, metadata, input }); + calls.pushScope.push({ name, scopeType, handle, parentHandle, data, metadata, input, timestamp }); return handle as unknown as ReturnType; }, - popScope: (handle, output) => calls.popScope.push({ handle, output }), + popScope: (...args: Parameters) => { + const [handle, output, timestamp] = args; + calls.popScope.push({ handle, output, timestamp }); + }, event: (...args: Parameters) => { - const [name, handle, data, metadata] = args; - calls.event.push({ name, handle, data, metadata }); + const [name, handle, data, metadata, timestamp] = args; + calls.event.push({ name, handle, data, metadata, timestamp }); + }, + llmCall: (...args: Parameters) => { + const [name, request, parentHandle, , data, metadata, modelName, timestamp] = args; + const handle = { id: `llm-${nextScopeId++}` }; + calls.llmCall.push({ name, handle, request, parentHandle, data, metadata, modelName, timestamp }); + return handle as unknown as ReturnType; + }, + llmCallEnd: (...args: Parameters) => { + const [handle, response, data, metadata, timestamp] = args; + calls.llmCallEnd.push({ handle, response, data, metadata, timestamp }); }, - llmCall: () => ({}) as unknown as ReturnType, - llmCallEnd: () => {}, toolCall: () => ({}) as unknown as ReturnType, toolCallEnd: () => {}, toolConditionalExecution: async (name, args) => { @@ -540,3 +1108,14 @@ function assertNoOverclaimedHookMetadata(metadata: unknown): void { assert.equal('gateway_route' in record, false); assert.equal('correlation' in record, false); } + +function withMockedNow(values: number[], fn: () => T): T { + const originalNow = Date.now; + let index = 0; + Date.now = () => values[Math.min(index++, values.length - 1)] ?? originalNow(); + try { + return fn(); + } finally { + Date.now = originalNow; + } +} diff --git a/integrations/openclaw/test/llm-replay.test.ts b/integrations/openclaw/test/llm-replay.test.ts index dbc1bd2e..7bf7b4cd 100644 --- a/integrations/openclaw/test/llm-replay.test.ts +++ b/integrations/openclaw/test/llm-replay.test.ts @@ -943,7 +943,7 @@ describe('LLM replay', () => { const nf = createNemoRelayRuntime(); const backend = createBackend(nf, { recordTtlMs: 1 }); const stalePendingOutput = { - sessionKey: 'session-1', + sessionOwnerKey: 'session-1', sessionId: 'session-1', runId: 'old-run', provider: 'openai', @@ -956,7 +956,7 @@ describe('LLM replay', () => { backend.state().llmInputs.set('stale-input', [ { - sessionKey: 'session-1', + sessionOwnerKey: 'session-1', sessionId: 'session-1', runId: 'old-run', provider: 'openai', @@ -970,7 +970,7 @@ describe('LLM replay', () => { backend.state().llmOutputsPendingInput.set('stale-output', [stalePendingOutput]); backend.state().modelTimingsByLlmKey.set('stale-timing', [ { - sessionKey: 'session-1', + sessionOwnerKey: 'session-1', sessionId: 'session-1', runId: 'old-run', callId: 'old-call',