Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/supported-integrations/openclaw-plugin.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ features such as adaptive hints require a managed execution path.
| `after_tool_call` | Replays successful tool calls as tool spans; blocked tools emit marks. |
| `agent_end` | Emits an agent lifecycle mark, flushes recorded assistant-turn LLM spans, and preserves the final assistant answer as the session output. |
| `before_agent_finalize` | Preserves the last assistant message as fallback session output and emits a lifecycle mark without mutating the finalization payload. |
| `subagent_spawned` / `subagent_ended` | Emits subagent lifecycle marks under the best available parent or child session. |
| `subagent_spawned` / `subagent_ended` | Emits subagent lifecycle marks and nests child subagent session scopes under the requester session when stable lineage is available. |

## LLM Replay Fidelity

Expand Down
50 changes: 27 additions & 23 deletions integrations/openclaw/src/hook-replay/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
evictExpiredCorrelationRecords,
ensureSession,
insertBoundedRecord,
resolveSessionKey,
resolveSessionOwnerKey,
type LlmInputRecord,
type ModelCallRecord,
type PendingLlmOutputRecord,
Expand All @@ -47,12 +47,14 @@ export function recordLlmInput(
ctx: PluginHookAgentContext,
): void {
evictExpiredReplayRecords(manager);
const observedAtMicros = nowMicros();
const session = ensureSession(manager, {
sessionId: event.sessionId,
sessionKey: ctx.sessionKey,
runId: event.runId,
agentId: ctx.agentId,
source: 'lazy_session',
timestamp: observedAtMicros,
});
if (!session) {
return;
Expand All @@ -76,7 +78,7 @@ export function recordLlmInput(
const pending = shiftOldest(
manager.state.llmOutputsPendingInput,
key,
(record) => record.sessionKey === session.sessionId,
(record) => record.sessionOwnerKey === session.ownerKey,
);
if (!pending) {
return;
Expand Down Expand Up @@ -105,24 +107,26 @@ export function recordLlmOutput(
ctx: PluginHookAgentContext,
): void {
evictExpiredReplayRecords(manager);
const observedAtMicros = nowMicros();
const session = ensureSession(manager, {
sessionId: event.sessionId,
sessionKey: ctx.sessionKey,
runId: event.runId,
agentId: ctx.agentId,
source: 'lazy_session',
timestamp: observedAtMicros,
});
if (!session) {
return;
}

const key = llmKey(event);
if (hasTrajectoryReplay(session, event.runId)) {
shiftOldest(manager.state.llmInputs, key, (record) => record.sessionKey === session.sessionId);
shiftOldest(manager.state.llmInputs, key, (record) => record.sessionOwnerKey === session.ownerKey);
return;
}

const input = shiftOldest(manager.state.llmInputs, key, (record) => record.sessionKey === session.sessionId);
const input = shiftOldest(manager.state.llmInputs, key, (record) => record.sessionOwnerKey === session.ownerKey);
if (input) {
replayLlmOutput({
manager,
Expand All @@ -135,7 +139,7 @@ export function recordLlmOutput(
}

const pending: PendingLlmOutputRecord = {
sessionKey: session.sessionId,
sessionOwnerKey: session.ownerKey,
sessionId: event.sessionId,
runId: event.runId,
provider: event.provider,
Expand Down Expand Up @@ -191,7 +195,7 @@ export function recordBeforeMessageWrite(
if (provider && model && (assistantTexts.length > 0 || assistantToolCalls.length > 0 || usage !== undefined)) {
session.assistantMessageWrites ??= [];
session.assistantMessageWrites.push({
sessionKey: session.sessionId,
sessionOwnerKey: session.ownerKey,
provider,
model,
assistantTexts,
Expand Down Expand Up @@ -239,7 +243,7 @@ export function recordModelCallStarted(
manager.state.modelCallsByCallId,
modelTimingKey(event),
{
sessionKey: session.sessionId,
sessionOwnerKey: session.ownerKey,
sessionId: session.sessionId,
runId: event.runId,
callId: event.callId,
Expand Down Expand Up @@ -281,7 +285,7 @@ export function recordModelCallEnded(
const record =
existing ??
({
sessionKey: session.sessionId,
sessionOwnerKey: session.ownerKey,
sessionId: session.sessionId,
runId: event.runId,
callId: event.callId,
Expand All @@ -303,7 +307,7 @@ export function recordModelCallEnded(
insertBoundedRecord(
manager.state.modelTimingsByLlmKey,
modelTimingLlmKey({
sessionId: session.sessionId,
sessionId: session.ownerKey,
runId: event.runId,
provider: event.provider,
model: event.model,
Expand All @@ -325,7 +329,7 @@ export function replayPendingLlmOutputsForSession(
for (const [key, records] of [...manager.state.llmOutputsPendingInput]) {
const remaining: PendingLlmOutputRecord[] = [];
for (const record of records) {
if (record.sessionKey !== session.sessionId) {
if (record.sessionOwnerKey !== session.ownerKey) {
remaining.push(record);
continue;
}
Expand Down Expand Up @@ -389,7 +393,7 @@ export function replayAgentEndMessages(
export function emitUnpairedModelCallTimingMarks(manager: SessionManager, session: SessionState): void {
for (const records of manager.state.modelCallsByCallId.values()) {
for (const record of records) {
if (record.sessionKey !== session.sessionId || record.consumed || record.endedAtMs !== undefined) {
if (record.sessionOwnerKey !== session.ownerKey || record.consumed || record.endedAtMs !== undefined) {
continue;
}
emitModelTimingMark(manager, session, 'openclaw.model_call_timing_unpaired', record);
Expand All @@ -400,7 +404,7 @@ export function emitUnpairedModelCallTimingMarks(manager: SessionManager, sessio
const unpairedEnded: ModelCallRecord[] = [];
for (const records of manager.state.modelTimingsByLlmKey.values()) {
for (const record of records) {
if (record.sessionKey !== session.sessionId || record.consumed) {
if (record.sessionOwnerKey !== session.ownerKey || record.consumed) {
continue;
}
unpairedEnded.push(record);
Expand Down Expand Up @@ -494,7 +498,7 @@ function replayExpiredPendingOutput(manager: SessionManager, key: string, record
if (!removeRecord(manager.state.llmOutputsPendingInput, key, record)) {
return;
}
const session = manager.state.sessions.get(record.sessionKey);
const session = manager.state.sessions.get(record.sessionOwnerKey);
if (!session) {
manager.state.counters.skippedEvents += 1;
return;
Expand Down Expand Up @@ -616,7 +620,7 @@ function replayAssistantMessageWrites(
},
ctx,
input: {
sessionKey: session.sessionId,
sessionOwnerKey: session.ownerKey,
sessionId: session.sessionId,
runId,
provider: record.provider,
Expand Down Expand Up @@ -645,13 +649,13 @@ function consumeNextTimingCandidate(
input: { runId?: string | undefined; provider: string; model: string },
): ModelCallRecord | undefined {
const key = modelTimingLlmKey({
sessionId: session.sessionId,
sessionId: session.ownerKey,
runId: input.runId,
provider: input.provider,
model: input.model,
});
const records = manager.state.modelTimingsByLlmKey.get(key) ?? [];
const candidate = records.find((record) => record.sessionKey === session.sessionId && !record.consumed);
const candidate = records.find((record) => record.sessionOwnerKey === session.ownerKey && !record.consumed);
if (!candidate) {
return undefined;
}
Expand All @@ -666,13 +670,13 @@ function consumeTimingCandidate(
event: PluginHookLlmOutputEvent,
): ModelCallRecord | undefined {
const key = modelTimingLlmKey({
sessionId: session.sessionId,
sessionId: session.ownerKey,
runId: event.runId,
provider: event.provider,
model: event.model,
});
const candidates = (manager.state.modelTimingsByLlmKey.get(key) ?? []).filter(
(record) => record.sessionKey === session.sessionId && !record.consumed,
(record) => record.sessionOwnerKey === session.ownerKey && !record.consumed,
);
if (candidates.length === 1) {
const candidate = candidates[0];
Expand Down Expand Up @@ -771,7 +775,7 @@ function emitModelTimingSummaryMark(manager: SessionManager, session: SessionSta
/** Convert an OpenClaw llm_input event into the buffered request record. */
function createInputRecord(session: SessionState, event: PluginHookLlmInputEvent): LlmInputRecord {
return {
sessionKey: session.sessionId,
sessionOwnerKey: session.ownerKey,
sessionId: event.sessionId,
runId: event.runId,
provider: event.provider,
Expand All @@ -790,7 +794,7 @@ function existingSessionForMessageWrite(
event: PluginHookBeforeMessageWriteEvent,
ctx: PluginHookBeforeMessageWriteContext,
): SessionState | undefined {
const key = resolveSessionKey(manager.state, {
const key = resolveSessionOwnerKey(manager.state, {
sessionKey: event.sessionKey ?? ctx.sessionKey,
});
if (key === undefined) {
Expand All @@ -807,7 +811,7 @@ function existingSessionForMessageWrite(
/** Build a minimal request placeholder when only an llm_output hook is available. */
function placeholderInputRecord(record: PendingLlmOutputRecord): LlmInputRecord {
return {
sessionKey: record.sessionKey,
sessionOwnerKey: record.sessionOwnerKey,
sessionId: record.sessionId,
runId: record.runId,
provider: record.provider,
Expand Down Expand Up @@ -1185,7 +1189,7 @@ function incrementHookLlmOutputReplayCount(session: SessionState, runId: string

/** Build the per-session run key used for trajectory de-duplication. */
function trajectoryRunKey(session: SessionState, runId?: string): string {
return runId ?? session.sessionId;
return runId ?? session.ownerKey;
}

/** Normalize provider usage into NeMo Relay token and cost fields. */
Expand Down Expand Up @@ -1291,7 +1295,7 @@ function latestUnendedRecord(
}
for (let index = records.length - 1; index >= 0; index -= 1) {
const record = records[index];
if (record?.sessionKey === session.sessionId && record.endedAtMs === undefined) {
if (record?.sessionOwnerKey === session.ownerKey && record.endedAtMs === undefined) {
return record;
}
}
Expand Down
Loading