diff --git a/index.ts b/index.ts index 8bcd8e34..e69357e6 100644 --- a/index.ts +++ b/index.ts @@ -13,6 +13,7 @@ import { createChatMessageHandler, createChatMessageTransformHandler, createCommandExecuteHandler, + createEventHandler, createSystemPromptHandler, createTextCompleteHandler, } from "./lib/hooks" @@ -57,7 +58,6 @@ const plugin: Plugin = (async (ctx) => { config, prompts, ), - "experimental.chat.messages.transform": createChatMessageTransformHandler( ctx.client, state, @@ -76,6 +76,7 @@ const plugin: Plugin = (async (ctx) => { ctx.directory, hostPermissions, ), + event: createEventHandler(state, logger), tool: { ...(config.compress.permission !== "deny" && { compress: diff --git a/lib/commands/compression-targets.ts b/lib/commands/compression-targets.ts index 887ad53e..59bc2555 100644 --- a/lib/commands/compression-targets.ts +++ b/lib/commands/compression-targets.ts @@ -5,6 +5,7 @@ export interface CompressionTarget { runId: number topic: string compressedTokens: number + durationMs: number grouped: boolean blocks: CompressionBlock[] } @@ -26,6 +27,7 @@ function buildTarget(blocks: CompressionBlock[]): CompressionTarget { runId: first.runId, topic: grouped ? first.batchTopic || first.topic : first.topic, compressedTokens: ordered.reduce((total, block) => total + block.compressedTokens, 0), + durationMs: ordered.reduce((total, block) => Math.max(total, block.durationMs), 0), grouped, blocks: ordered, } diff --git a/lib/commands/stats.ts b/lib/commands/stats.ts index 559c3859..bea2a6dc 100644 --- a/lib/commands/stats.ts +++ b/lib/commands/stats.ts @@ -9,6 +9,7 @@ import { sendIgnoredMessage } from "../ui/notification" import { formatTokenCount } from "../ui/utils" import { loadAllSessionStats, type AggregatedStats } from "../state/persistence" import { getCurrentParams } from "../token-utils" +import { getActiveCompressionTargets } from "./compression-targets" export interface StatsCommandContext { client: any @@ -20,8 +21,10 @@ export interface StatsCommandContext { function formatStatsMessage( sessionTokens: number, + sessionSummaryTokens: number, sessionTools: number, sessionMessages: number, + sessionDurationMs: number, allTime: AggregatedStats, ): string { const lines: string[] = [] @@ -30,11 +33,15 @@ function formatStatsMessage( lines.push("│ DCP Statistics │") lines.push("╰───────────────────────────────────────────────────────────╯") lines.push("") - lines.push("Session:") + lines.push("Compression:") lines.push("─".repeat(60)) - lines.push(` Tokens pruned: ~${formatTokenCount(sessionTokens)}`) - lines.push(` Tools pruned: ${sessionTools}`) - lines.push(` Messages pruned: ${sessionMessages}`) + lines.push( + ` Tokens in|out: ~${formatTokenCount(sessionTokens)} | ~${formatTokenCount(sessionSummaryTokens)}`, + ) + lines.push(` Ratio: ${formatCompressionRatio(sessionTokens, sessionSummaryTokens)}`) + lines.push(` Time: ${formatCompressionTime(sessionDurationMs)}`) + lines.push(` Messages: ${sessionMessages}`) + lines.push(` Tools: ${sessionTools}`) lines.push("") lines.push("All-time:") lines.push("─".repeat(60)) @@ -46,11 +53,55 @@ function formatStatsMessage( return lines.join("\n") } +function formatCompressionRatio(inputTokens: number, outputTokens: number): string { + if (inputTokens <= 0) { + return "0:1" + } + + if (outputTokens <= 0) { + return "∞:1" + } + + const ratio = Math.max(1, Math.round(inputTokens / outputTokens)) + return `${ratio}:1` +} + +function formatCompressionTime(ms: number): string { + const safeMs = Math.max(0, Math.round(ms)) + if (safeMs < 1000) { + return `${safeMs} ms` + } + + const totalSeconds = safeMs / 1000 + if (totalSeconds < 60) { + return `${totalSeconds.toFixed(1)} s` + } + + const wholeSeconds = Math.floor(totalSeconds) + const hours = Math.floor(wholeSeconds / 3600) + const minutes = Math.floor((wholeSeconds % 3600) / 60) + const seconds = wholeSeconds % 60 + + if (hours > 0) { + return `${hours}h ${minutes}m ${seconds}s` + } + + return `${minutes}m ${seconds}s` +} + export async function handleStatsCommand(ctx: StatsCommandContext): Promise { const { client, state, logger, sessionId, messages } = ctx // Session stats from in-memory state const sessionTokens = state.stats.totalPruneTokens + const sessionSummaryTokens = Array.from(state.prune.messages.blocksById.values()).reduce( + (total, block) => (block.active ? total + block.summaryTokens : total), + 0, + ) + const sessionDurationMs = getActiveCompressionTargets(state.prune.messages).reduce( + (total, target) => total + target.durationMs, + 0, + ) const prunedToolIds = new Set(state.prune.tools.keys()) for (const block of state.prune.messages.blocksById.values()) { @@ -72,15 +123,24 @@ export async function handleStatsCommand(ctx: StatsCommandContext): Promise { ctx.state.manualMode = ctx.state.manualMode ? "active" : false + applyPendingCompressionDurations(ctx.state) await saveSessionState(ctx.state, ctx.logger) const params = getCurrentParams(ctx.state, rawMessages, ctx.logger) diff --git a/lib/compress/range.ts b/lib/compress/range.ts index c2c587b6..cc2ceaa9 100644 --- a/lib/compress/range.ts +++ b/lib/compress/range.ts @@ -59,6 +59,10 @@ export function createCompressRangeTool(ctx: ToolContext): ReturnType + pendingByCallId: Map +} + +export function consumeCompressionStart(state: SessionState, callId: string): number | undefined { + const start = state.compressionTiming.startsByCallId.get(callId) + state.compressionTiming.startsByCallId.delete(callId) + return start +} + +export function resolveCompressionDuration( + startedAt: number | undefined, + eventTime: number | undefined, + partTime: { start?: unknown; end?: unknown } | undefined, +): number | undefined { + const runningAt = + typeof partTime?.start === "number" && Number.isFinite(partTime.start) + ? partTime.start + : eventTime + const pendingToRunningMs = + typeof startedAt === "number" && typeof runningAt === "number" + ? Math.max(0, runningAt - startedAt) + : undefined + + const toolStart = partTime?.start + const toolEnd = partTime?.end + const runtimeMs = + typeof toolStart === "number" && + Number.isFinite(toolStart) && + typeof toolEnd === "number" && + Number.isFinite(toolEnd) + ? Math.max(0, toolEnd - toolStart) + : undefined + + return typeof pendingToRunningMs === "number" ? pendingToRunningMs : runtimeMs +} + +export function applyPendingCompressionDurations(state: SessionState): number { + if (state.compressionTiming.pendingByCallId.size === 0) { + return 0 + } + + let updates = 0 + for (const [callId, entry] of state.compressionTiming.pendingByCallId) { + const applied = attachCompressionDuration( + state.prune.messages, + entry.callId, + entry.durationMs, + ) + if (applied > 0) { + updates += applied + state.compressionTiming.pendingByCallId.delete(callId) + } + } + + return updates +} diff --git a/lib/compress/types.ts b/lib/compress/types.ts index 883493dd..f0eb5d0c 100644 --- a/lib/compress/types.ts +++ b/lib/compress/types.ts @@ -103,5 +103,6 @@ export interface CompressionStateInput { mode: CompressionMode runId: number compressMessageId: string + compressCallId?: string summaryTokens: number } diff --git a/lib/hooks.ts b/lib/hooks.ts index 9474c91e..7f71265a 100644 --- a/lib/hooks.ts +++ b/lib/hooks.ts @@ -16,6 +16,11 @@ import { } from "./messages" import { renderSystemPrompt, type PromptStore } from "./prompts" import { buildProtectedToolsExtension } from "./prompts/extensions/system" +import { + applyPendingCompressionDurations, + consumeCompressionStart, + resolveCompressionDuration, +} from "./compress/timing" import { applyPendingManualTrigger, handleContextCommand, @@ -29,7 +34,7 @@ import { } from "./commands" import { type HostPermissionSnapshot } from "./host-permissions" import { compressPermission, syncCompressPermissionState } from "./compress-permission" -import { checkSession, ensureSessionInitialized, syncToolCache } from "./state" +import { checkSession, ensureSessionInitialized, saveSessionState, syncToolCache } from "./state" import { cacheSystemPromptTokens } from "./ui/utils" const INTERNAL_AGENT_SIGNATURES = [ @@ -266,6 +271,83 @@ export function createTextCompleteHandler() { } } +export function createEventHandler(state: SessionState, logger: Logger) { + return async (input: { event: any }) => { + const eventTime = + typeof input.event?.time === "number" && Number.isFinite(input.event.time) + ? input.event.time + : typeof input.event?.properties?.time === "number" && + Number.isFinite(input.event.properties.time) + ? input.event.properties.time + : undefined + + if (input.event.type !== "message.part.updated") { + return + } + + const part = input.event.properties?.part + if (part?.type !== "tool" || part.tool !== "compress") { + return + } + + if (part.state.status === "pending") { + if (typeof part.callID !== "string") { + return + } + + const startedAt = eventTime ?? Date.now() + if (state.compressionTiming.startsByCallId.has(part.callID)) { + return + } + state.compressionTiming.startsByCallId.set(part.callID, startedAt) + logger.debug("Recorded compression start", { + callID: part.callID, + startedAt, + }) + return + } + + if (part.state.status === "completed") { + if (typeof part.callID !== "string") { + return + } + + const start = consumeCompressionStart(state, part.callID) + const durationMs = resolveCompressionDuration(start, eventTime, part.state.time) + if (typeof durationMs !== "number") { + return + } + + state.compressionTiming.pendingByCallId.set(part.callID, { + callId: part.callID, + durationMs, + }) + + const updates = applyPendingCompressionDurations(state) + if (updates === 0) { + return + } + + await saveSessionState(state, logger) + + logger.info("Attached compression time to blocks", { + callID: part.callID, + blocks: updates, + durationMs, + }) + return + } + + if (part.state.status === "running") { + return + } + + if (typeof part.callID === "string") { + state.compressionTiming.startsByCallId.delete(part.callID) + } + } +} + export function createChatMessageHandler( state: SessionState, logger: Logger, diff --git a/lib/state/persistence.ts b/lib/state/persistence.ts index 2431eee3..87b774f9 100644 --- a/lib/state/persistence.ts +++ b/lib/state/persistence.ts @@ -10,6 +10,7 @@ import { homedir } from "os" import { join } from "path" import type { CompressionBlock, PrunedMessageEntry, SessionState, SessionStats } from "./types" import type { Logger } from "../logger" +import { serializePruneMessagesState } from "./utils" /** Prune state as stored on disk */ export interface PersistedPruneMessagesState { @@ -58,6 +59,23 @@ function getSessionFilePath(sessionId: string): string { return join(STORAGE_DIR, `${sessionId}.json`) } +async function writePersistedSessionState( + sessionId: string, + state: PersistedSessionState, + logger: Logger, +): Promise { + await ensureStorageDir() + + const filePath = getSessionFilePath(sessionId) + const content = JSON.stringify(state, null, 2) + await fs.writeFile(filePath, content, "utf-8") + + logger.info("Saved session state to disk", { + sessionId, + totalTokensSaved: state.stats.totalPruneTokens, + }) +} + export async function saveSessionState( sessionState: SessionState, logger: Logger, @@ -68,26 +86,11 @@ export async function saveSessionState( return } - await ensureStorageDir() - const state: PersistedSessionState = { sessionName: sessionName, prune: { tools: Object.fromEntries(sessionState.prune.tools), - messages: { - byMessageId: Object.fromEntries(sessionState.prune.messages.byMessageId), - blocksById: Object.fromEntries( - Array.from(sessionState.prune.messages.blocksById.entries()).map( - ([blockId, block]) => [String(blockId), block], - ), - ), - activeBlockIds: Array.from(sessionState.prune.messages.activeBlockIds), - activeByAnchorMessageId: Object.fromEntries( - sessionState.prune.messages.activeByAnchorMessageId, - ), - nextBlockId: sessionState.prune.messages.nextBlockId, - nextRunId: sessionState.prune.messages.nextRunId, - }, + messages: serializePruneMessagesState(sessionState.prune.messages), }, nudges: { contextLimitAnchors: Array.from(sessionState.nudges.contextLimitAnchors), @@ -98,14 +101,7 @@ export async function saveSessionState( lastUpdated: new Date().toISOString(), } - const filePath = getSessionFilePath(sessionState.sessionId) - const content = JSON.stringify(state, null, 2) - await fs.writeFile(filePath, content, "utf-8") - - logger.info("Saved session state to disk", { - sessionId: sessionState.sessionId, - totalTokensSaved: state.stats.totalPruneTokens, - }) + await writePersistedSessionState(sessionState.sessionId, state, logger) } catch (error: any) { logger.error("Failed to save session state", { sessionId: sessionState.sessionId, diff --git a/lib/state/state.ts b/lib/state/state.ts index c8a00ba1..71cb3aac 100644 --- a/lib/state/state.ts +++ b/lib/state/state.ts @@ -1,5 +1,6 @@ import type { SessionState, ToolParameterEntry, WithParts } from "./types" import type { Logger } from "../logger" +import { applyPendingCompressionDurations } from "../compress/timing" import { loadSessionState, saveSessionState } from "./persistence" import { isSubAgentSession, @@ -81,6 +82,10 @@ export function createSessionState(): SessionState { pruneTokenCounter: 0, totalPruneTokens: 0, }, + compressionTiming: { + startsByCallId: new Map(), + pendingByCallId: new Map(), + }, toolParameters: new Map(), subAgentResultCache: new Map(), toolIdList: [], @@ -177,4 +182,9 @@ export async function ensureSessionInitialized( pruneTokenCounter: persisted.stats?.pruneTokenCounter || 0, totalPruneTokens: persisted.stats?.totalPruneTokens || 0, } + + const applied = applyPendingCompressionDurations(state) + if (applied > 0) { + await saveSessionState(state, logger) + } } diff --git a/lib/state/types.ts b/lib/state/types.ts index 67f1e9e5..7b0b04da 100644 --- a/lib/state/types.ts +++ b/lib/state/types.ts @@ -1,3 +1,4 @@ +import type { CompressionTimingState } from "../compress/timing" import { Message, Part } from "@opencode-ai/sdk/v2" export interface WithParts { @@ -36,6 +37,7 @@ export interface CompressionBlock { deactivatedByUser: boolean compressedTokens: number summaryTokens: number + durationMs: number mode?: CompressionMode topic: string batchTopic?: string @@ -43,6 +45,7 @@ export interface CompressionBlock { endId: string anchorMessageId: string compressMessageId: string + compressCallId?: string includedBlockIds: number[] consumedBlockIds: number[] parentBlockIds: number[] @@ -96,6 +99,7 @@ export interface SessionState { prune: Prune nudges: Nudges stats: SessionStats + compressionTiming: CompressionTimingState toolParameters: Map subAgentResultCache: Map toolIdList: string[] diff --git a/lib/state/utils.ts b/lib/state/utils.ts index fcaaa290..a18552a2 100644 --- a/lib/state/utils.ts +++ b/lib/state/utils.ts @@ -20,12 +20,30 @@ export const isMessageCompacted = (state: SessionState, msg: WithParts): boolean } interface PersistedPruneMessagesState { - byMessageId?: Record - blocksById?: Record - activeBlockIds?: number[] - activeByAnchorMessageId?: Record - nextBlockId?: number - nextRunId?: number + byMessageId: Record + blocksById: Record + activeBlockIds: number[] + activeByAnchorMessageId: Record + nextBlockId: number + nextRunId: number +} + +export function serializePruneMessagesState( + messagesState: PruneMessagesState, +): PersistedPruneMessagesState { + return { + byMessageId: Object.fromEntries(messagesState.byMessageId), + blocksById: Object.fromEntries( + Array.from(messagesState.blocksById.entries()).map(([blockId, block]) => [ + String(blockId), + block, + ]), + ), + activeBlockIds: Array.from(messagesState.activeBlockIds), + activeByAnchorMessageId: Object.fromEntries(messagesState.activeByAnchorMessageId), + nextBlockId: messagesState.nextBlockId, + nextRunId: messagesState.nextRunId, + } } export async function isSubAgentSession(client: any, sessionID: string): Promise { @@ -178,6 +196,10 @@ export function loadPruneMessagesState( : typeof block.summary === "string" ? countTokens(block.summary) : 0, + durationMs: + typeof block.durationMs === "number" && Number.isFinite(block.durationMs) + ? Math.max(0, block.durationMs) + : 0, mode: block.mode === "range" || block.mode === "message" ? block.mode : undefined, topic: typeof block.topic === "string" ? block.topic : "", batchTopic: @@ -192,6 +214,8 @@ export function loadPruneMessagesState( typeof block.anchorMessageId === "string" ? block.anchorMessageId : "", compressMessageId: typeof block.compressMessageId === "string" ? block.compressMessageId : "", + compressCallId: + typeof block.compressCallId === "string" ? block.compressCallId : undefined, includedBlockIds: toNumberArray(block.includedBlockIds), consumedBlockIds: toNumberArray(block.consumedBlockIds), parentBlockIds: toNumberArray(block.parentBlockIds), diff --git a/tests/compress-message.test.ts b/tests/compress-message.test.ts index 18c6e410..ad0ab394 100644 --- a/tests/compress-message.test.ts +++ b/tests/compress-message.test.ts @@ -226,6 +226,55 @@ test("compress message mode batches individual message summaries", async () => { assert.match(blocks[1]?.summary || "", /task output body/) }) +test("compress message mode stores call id for later duration attachment", async () => { + const sessionID = `ses_message_compress_duration_${Date.now()}` + const rawMessages = buildMessages(sessionID) + const state = createSessionState() + const logger = new Logger(false) + + const tool = createCompressMessageTool({ + client: { + session: { + messages: async () => ({ data: rawMessages }), + get: async () => ({ data: { parentID: null } }), + }, + }, + state, + logger, + config: buildConfig(), + prompts: { + reload() {}, + getRuntimePrompts() { + return { compressMessage: "", compressRange: "" } + }, + }, + } as any) + + await tool.execute( + { + topic: "Batch stale notes", + content: [ + { + messageId: "m0002", + topic: "Code path note", + summary: "Captured the assistant's code-path findings.", + }, + ], + }, + { + ask: async () => {}, + metadata: () => {}, + sessionID, + messageID: "msg-compress-message", + callID: "call-1", + }, + ) + + const block = Array.from(state.prune.messages.blocksById.values())[0] + assert.equal(block?.compressCallId, "call-1") + assert.equal(block?.durationMs, 0) +}) + test("compress message mode does not partially apply when preparation fails", async () => { const sessionID = `ses_message_compress_prepare_fail_${Date.now()}` const rawMessages = buildMessages(sessionID) diff --git a/tests/compression-targets.test.ts b/tests/compression-targets.test.ts new file mode 100644 index 00000000..77027af1 --- /dev/null +++ b/tests/compression-targets.test.ts @@ -0,0 +1,78 @@ +import assert from "node:assert/strict" +import test from "node:test" +import { getActiveCompressionTargets } from "../lib/commands/compression-targets" +import { createSessionState, type CompressionBlock } from "../lib/state" + +function buildBlock( + blockId: number, + runId: number, + mode: "range" | "message", + durationMs: number, +): CompressionBlock { + return { + blockId, + runId, + active: true, + deactivatedByUser: false, + compressedTokens: 10, + summaryTokens: 5, + durationMs, + mode, + topic: `topic-${blockId}`, + batchTopic: mode === "message" ? `batch-${runId}` : `topic-${blockId}`, + startId: `m${blockId}`, + endId: `m${blockId}`, + anchorMessageId: `msg-${blockId}`, + compressMessageId: `origin-${runId}`, + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [`msg-${blockId}`], + directToolIds: [], + effectiveMessageIds: [`msg-${blockId}`], + effectiveToolIds: [], + createdAt: blockId, + summary: `summary-${blockId}`, + } +} + +test("active compression targets count a grouped message run once", () => { + const state = createSessionState() + const first = buildBlock(1, 10, "message", 225) + const second = buildBlock(2, 10, "message", 225) + const third = buildBlock(3, 11, "range", 80) + + state.prune.messages.blocksById.set(1, first) + state.prune.messages.blocksById.set(2, second) + state.prune.messages.blocksById.set(3, third) + state.prune.messages.activeBlockIds.add(1) + state.prune.messages.activeBlockIds.add(2) + state.prune.messages.activeBlockIds.add(3) + + const targets = getActiveCompressionTargets(state.prune.messages) + const totalDurationMs = targets.reduce((total, target) => total + target.durationMs, 0) + + assert.equal(targets.length, 2) + assert.equal(totalDurationMs, 305) +}) + +test("inactive grouped message runs no longer contribute compression time", () => { + const state = createSessionState() + const first = buildBlock(1, 10, "message", 225) + const second = buildBlock(2, 10, "message", 225) + const third = buildBlock(3, 11, "range", 80) + + first.active = false + second.active = false + + state.prune.messages.blocksById.set(1, first) + state.prune.messages.blocksById.set(2, second) + state.prune.messages.blocksById.set(3, third) + state.prune.messages.activeBlockIds.add(3) + + const targets = getActiveCompressionTargets(state.prune.messages) + const totalDurationMs = targets.reduce((total, target) => total + target.durationMs, 0) + + assert.equal(targets.length, 1) + assert.equal(totalDurationMs, 80) +}) diff --git a/tests/hooks-permission.test.ts b/tests/hooks-permission.test.ts index 9a13b7ba..238b0412 100644 --- a/tests/hooks-permission.test.ts +++ b/tests/hooks-permission.test.ts @@ -5,10 +5,16 @@ import { createChatMessageHandler, createChatMessageTransformHandler, createCommandExecuteHandler, + createEventHandler, createTextCompleteHandler, } from "../lib/hooks" import { Logger } from "../lib/logger" -import { createSessionState, type WithParts } from "../lib/state" +import { + createSessionState, + ensureSessionInitialized, + saveSessionState, + type WithParts, +} from "../lib/state" function buildConfig(permission: "allow" | "ask" | "deny" = "allow"): PluginConfig { return { @@ -152,3 +158,367 @@ test("text complete strips hallucinated metadata tags", async () => { assert.equal(output.text, "alpha omega") }) + +test("event hook attaches durations to matching blocks by call id", async () => { + const state = createSessionState() + state.sessionId = "session-1" + const handler = createEventHandler(state, new Logger(false)) + const originalNow = Date.now + Date.now = () => 100 + + try { + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-1", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "pending", + input: {}, + raw: "", + }, + }, + }, + }, + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-2", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "pending", + input: {}, + raw: "", + }, + }, + }, + }, + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-1", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "running", + input: {}, + time: { start: 325 }, + }, + }, + }, + }, + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-2", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "running", + input: {}, + time: { start: 410 }, + }, + }, + }, + }, + }) + state.prune.messages.blocksById.set(1, { + blockId: 1, + runId: 1, + active: true, + deactivatedByUser: false, + compressedTokens: 0, + summaryTokens: 0, + durationMs: 0, + mode: "message", + topic: "one", + batchTopic: "one", + startId: "m0001", + endId: "m0001", + anchorMessageId: "msg-a", + compressMessageId: "message-1", + compressCallId: "call-1", + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [], + directToolIds: [], + effectiveMessageIds: ["msg-a"], + effectiveToolIds: [], + createdAt: 1, + summary: "a", + }) + state.prune.messages.blocksById.set(2, { + blockId: 2, + runId: 2, + active: true, + deactivatedByUser: false, + compressedTokens: 0, + summaryTokens: 0, + durationMs: 0, + mode: "message", + topic: "two", + batchTopic: "two", + startId: "m0002", + endId: "m0002", + anchorMessageId: "msg-b", + compressMessageId: "message-1", + compressCallId: "call-2", + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [], + directToolIds: [], + effectiveMessageIds: ["msg-b"], + effectiveToolIds: [], + createdAt: 2, + summary: "b", + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-2", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "completed", + input: {}, + output: "done", + title: "", + metadata: {}, + time: { start: 410, end: 500 }, + }, + }, + }, + }, + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-1", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "completed", + input: {}, + output: "done", + title: "", + metadata: {}, + time: { start: 325, end: 500 }, + }, + }, + }, + }, + }) + } finally { + Date.now = originalNow + } + + assert.equal(state.prune.messages.blocksById.get(1)?.durationMs, 225) + assert.equal(state.prune.messages.blocksById.get(2)?.durationMs, 310) +}) + +test("event hook falls back to completed runtime when running duration missing", async () => { + const state = createSessionState() + state.sessionId = "session-1" + const handler = createEventHandler(state, new Logger(false)) + + state.prune.messages.blocksById.set(1, { + blockId: 1, + runId: 1, + active: true, + deactivatedByUser: false, + compressedTokens: 0, + summaryTokens: 0, + durationMs: 0, + mode: "message", + topic: "one", + batchTopic: "one", + startId: "m0001", + endId: "m0001", + anchorMessageId: "msg-a", + compressMessageId: "message-1", + compressCallId: "call-3", + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [], + directToolIds: [], + effectiveMessageIds: ["msg-a"], + effectiveToolIds: [], + createdAt: 1, + summary: "a", + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-3", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "completed", + input: {}, + output: "done", + title: "", + metadata: {}, + time: { start: 500, end: 940 }, + }, + }, + }, + }, + }) + + assert.equal(state.prune.messages.blocksById.get(1)?.durationMs, 440) +}) + +test("event hook queues duration updates until the matching session is loaded", async () => { + const logger = new Logger(false) + const targetSessionId = `session-target-${process.pid}-${Date.now()}` + const otherSessionId = `session-other-${process.pid}-${Date.now()}` + const persistedState = createSessionState() + persistedState.sessionId = targetSessionId + persistedState.prune.messages.blocksById.set(1, { + blockId: 1, + runId: 1, + active: true, + deactivatedByUser: false, + compressedTokens: 0, + summaryTokens: 0, + durationMs: 0, + mode: "message", + topic: "one", + batchTopic: "one", + startId: "m0001", + endId: "m0001", + anchorMessageId: "msg-a", + compressMessageId: "message-1", + compressCallId: "call-remote", + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [], + directToolIds: [], + effectiveMessageIds: ["msg-a"], + effectiveToolIds: [], + createdAt: 1, + summary: "a", + }) + await saveSessionState(persistedState, logger) + + const liveState = createSessionState() + liveState.sessionId = otherSessionId + const handler = createEventHandler(liveState, logger) + + await handler({ + event: { + type: "message.part.updated", + properties: { + sessionID: targetSessionId, + part: { + type: "tool", + tool: "compress", + callID: "call-remote", + messageID: "message-1", + state: { + status: "pending", + input: {}, + raw: "", + }, + }, + }, + time: 100, + }, + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + sessionID: targetSessionId, + part: { + type: "tool", + tool: "compress", + callID: "call-remote", + messageID: "message-1", + state: { + status: "completed", + input: {}, + output: "done", + title: "", + metadata: {}, + time: { start: 350, end: 500 }, + }, + }, + }, + }, + }) + + assert.equal(liveState.compressionTiming.pendingByCallId.has("call-remote"), true) + assert.equal(liveState.compressionTiming.startsByCallId.has("call-remote"), false) + + await ensureSessionInitialized( + { + session: { + get: async () => ({ data: { parentID: null } }), + }, + } as any, + liveState, + targetSessionId, + logger, + [ + { + info: { + id: "msg-user-1", + role: "user", + sessionID: targetSessionId, + agent: "assistant", + time: { created: 1 }, + } as WithParts["info"], + parts: [], + }, + ], + false, + ) + + assert.equal(liveState.prune.messages.blocksById.get(1)?.durationMs, 250) + assert.equal(liveState.compressionTiming.pendingByCallId.has("call-remote"), false) +})