From b5d608f96aed3c78a02bdbb0597a5f344145b85f Mon Sep 17 00:00:00 2001 From: Spoon <212802214+spoons-and-mirrors@users.noreply.github.com> Date: Mon, 30 Mar 2026 20:48:11 +0200 Subject: [PATCH 01/14] feat: add compression time to `/dcp stats` refactor refactor: natural time display fix: format check fix: stabilize compression timing correlation refactor: simplify compression timing fix: restore compression timing durability fix: attach compression timing to blocks fix: make compression timing robust --- index.ts | 2 + lib/commands/compression-targets.ts | 2 + lib/commands/stats.ts | 39 ++- lib/compress/message.ts | 6 + lib/compress/range.ts | 6 + lib/compress/state.ts | 37 +++ lib/compress/types.ts | 2 + lib/hooks.ts | 132 +++++++++- lib/state/state.ts | 4 +- lib/state/types.ts | 9 + lib/state/utils.ts | 6 + tests/compress-message.test.ts | 122 ++++++++++ tests/compression-targets.test.ts | 78 ++++++ tests/hooks-permission.test.ts | 357 ++++++++++++++++++++++++++++ 14 files changed, 799 insertions(+), 3 deletions(-) create mode 100644 tests/compression-targets.test.ts diff --git a/index.ts b/index.ts index 8bcd8e34..0e58d1db 100644 --- a/index.ts +++ b/index.ts @@ -13,6 +13,7 @@ import { createChatMessageHandler, createChatMessageTransformHandler, createCommandExecuteHandler, + createEventHandler, createSystemPromptHandler, createTextCompleteHandler, } from "./lib/hooks" @@ -68,6 +69,7 @@ const plugin: Plugin = (async (ctx) => { ) as any, "chat.message": createChatMessageHandler(state, logger, config, hostPermissions), "experimental.text.complete": createTextCompleteHandler(), + event: createEventHandler(state, logger), "command.execute.before": createCommandExecuteHandler( ctx.client, state, diff --git a/lib/commands/compression-targets.ts b/lib/commands/compression-targets.ts index 887ad53e..59bc2555 100644 --- a/lib/commands/compression-targets.ts +++ b/lib/commands/compression-targets.ts @@ -5,6 +5,7 @@ export interface CompressionTarget { runId: number topic: string compressedTokens: number + durationMs: number grouped: boolean blocks: CompressionBlock[] } @@ -26,6 +27,7 @@ function buildTarget(blocks: CompressionBlock[]): CompressionTarget { runId: first.runId, topic: grouped ? first.batchTopic || first.topic : first.topic, compressedTokens: ordered.reduce((total, block) => total + block.compressedTokens, 0), + durationMs: ordered.reduce((total, block) => Math.max(total, block.durationMs), 0), grouped, blocks: ordered, } diff --git a/lib/commands/stats.ts b/lib/commands/stats.ts index 559c3859..86d0ea50 100644 --- a/lib/commands/stats.ts +++ b/lib/commands/stats.ts @@ -9,6 +9,7 @@ import { sendIgnoredMessage } from "../ui/notification" import { formatTokenCount } from "../ui/utils" import { loadAllSessionStats, type AggregatedStats } from "../state/persistence" import { getCurrentParams } from "../token-utils" +import { getActiveCompressionTargets } from "./compression-targets" export interface StatsCommandContext { client: any @@ -22,6 +23,7 @@ function formatStatsMessage( sessionTokens: number, sessionTools: number, sessionMessages: number, + sessionDurationMs: number, allTime: AggregatedStats, ): string { const lines: string[] = [] @@ -35,6 +37,7 @@ function formatStatsMessage( lines.push(` Tokens pruned: ~${formatTokenCount(sessionTokens)}`) lines.push(` Tools pruned: ${sessionTools}`) lines.push(` Messages pruned: ${sessionMessages}`) + lines.push(` Compression time: ${formatCompressionTime(sessionDurationMs)}`) lines.push("") lines.push("All-time:") lines.push("─".repeat(60)) @@ -46,11 +49,38 @@ function formatStatsMessage( return lines.join("\n") } +function formatCompressionTime(ms: number): string { + const safeMs = Math.max(0, Math.round(ms)) + if (safeMs < 1000) { + return `${safeMs} ms` + } + + const totalSeconds = safeMs / 1000 + if (totalSeconds < 60) { + return `${totalSeconds.toFixed(1)} s` + } + + const wholeSeconds = Math.floor(totalSeconds) + const hours = Math.floor(wholeSeconds / 3600) + const minutes = Math.floor((wholeSeconds % 3600) / 60) + const seconds = wholeSeconds % 60 + + if (hours > 0) { + return `${hours}h ${minutes}m ${seconds}s` + } + + return `${minutes}m ${seconds}s` +} + export async function handleStatsCommand(ctx: StatsCommandContext): Promise { const { client, state, logger, sessionId, messages } = ctx // Session stats from in-memory state const sessionTokens = state.stats.totalPruneTokens + const sessionDurationMs = getActiveCompressionTargets(state.prune.messages).reduce( + (total, target) => total + target.durationMs, + 0, + ) const prunedToolIds = new Set(state.prune.tools.keys()) for (const block of state.prune.messages.blocksById.values()) { @@ -72,7 +102,13 @@ export async function handleStatsCommand(ctx: StatsCommandContext): Promise { + const eventTime = + typeof input.event?.time === "number" && Number.isFinite(input.event.time) + ? input.event.time + : typeof input.event?.properties?.time === "number" && + Number.isFinite(input.event.properties.time) + ? input.event.properties.time + : undefined + + if (input.event.type !== "message.part.updated") { + return + } + + const part = input.event.properties?.part + if (part?.type !== "tool" || part.tool !== "compress") { + return + } + + if (part.state.status === "pending") { + if (typeof part.callID !== "string" || typeof part.messageID !== "string") { + return + } + + if (state.compressionStarts.has(part.callID)) { + return + } + + const startedAt = eventTime ?? Date.now() + state.compressionStarts.set(part.callID, { + messageId: part.messageID, + startedAt, + }) + logger.debug("Recorded compression start", { + callID: part.callID, + messageID: part.messageID, + startedAt, + }) + return + } + + if (part.state.status === "running") { + if (typeof part.callID !== "string") { + return + } + + const start = state.compressionStarts.get(part.callID) + if (!start) { + return + } + + const runningAt = + typeof part.state.time?.start === "number" && Number.isFinite(part.state.time.start) + ? part.state.time.start + : eventTime + if (typeof runningAt !== "number") { + return + } + + state.compressionStarts.delete(part.callID) + const durationMs = Math.max(0, runningAt - start.startedAt) + recordCompressionDuration(state, part.callID, durationMs) + + logger.info("Recorded compression time", { + callID: part.callID, + messageID: start.messageId, + durationMs, + }) + return + } + + if (part.state.status === "completed") { + if (typeof part.callID !== "string" || typeof part.messageID !== "string") { + return + } + + if (!state.compressionDurations.has(part.callID)) { + const start = state.compressionStarts.get(part.callID) + const runningAt = + typeof part.state.time?.start === "number" && + Number.isFinite(part.state.time.start) + ? part.state.time.start + : eventTime + + if (start && typeof runningAt === "number") { + state.compressionStarts.delete(part.callID) + const durationMs = Math.max(0, runningAt - start.startedAt) + recordCompressionDuration(state, part.callID, durationMs) + } else { + const toolStart = part.state.time?.start + const toolEnd = part.state.time?.end + if ( + typeof toolStart === "number" && + Number.isFinite(toolStart) && + typeof toolEnd === "number" && + Number.isFinite(toolEnd) + ) { + const durationMs = Math.max(0, toolEnd - toolStart) + recordCompressionDuration(state, part.callID, durationMs) + } + } + } + + const updates = attachCompressionDuration(state, part.callID, part.messageID) + if (updates === 0) { + return + } + + logger.info("Attached compression time to blocks", { + callID: part.callID, + messageID: part.messageID, + blocks: updates, + }) + + saveSessionState(state, logger).catch((error) => { + logger.warn("Failed to persist compression time update", { + error: error instanceof Error ? error.message : String(error), + }) + }) + return + } + + if (typeof part.callID === "string") { + state.compressionStarts.delete(part.callID) + state.compressionDurations.delete(part.callID) + } + } +} + export function createChatMessageHandler( state: SessionState, logger: Logger, diff --git a/lib/state/state.ts b/lib/state/state.ts index c8a00ba1..18d0d4f1 100644 --- a/lib/state/state.ts +++ b/lib/state/state.ts @@ -1,4 +1,4 @@ -import type { SessionState, ToolParameterEntry, WithParts } from "./types" +import type { CompressionStart, SessionState, ToolParameterEntry, WithParts } from "./types" import type { Logger } from "../logger" import { loadSessionState, saveSessionState } from "./persistence" import { @@ -81,6 +81,8 @@ export function createSessionState(): SessionState { pruneTokenCounter: 0, totalPruneTokens: 0, }, + compressionStarts: new Map(), + compressionDurations: new Map(), toolParameters: new Map(), subAgentResultCache: new Map(), toolIdList: [], diff --git a/lib/state/types.ts b/lib/state/types.ts index 67f1e9e5..0580529a 100644 --- a/lib/state/types.ts +++ b/lib/state/types.ts @@ -21,6 +21,11 @@ export interface SessionStats { totalPruneTokens: number } +export interface CompressionStart { + messageId: string + startedAt: number +} + export interface PrunedMessageEntry { tokenCount: number allBlockIds: number[] @@ -36,6 +41,7 @@ export interface CompressionBlock { deactivatedByUser: boolean compressedTokens: number summaryTokens: number + durationMs: number mode?: CompressionMode topic: string batchTopic?: string @@ -43,6 +49,7 @@ export interface CompressionBlock { endId: string anchorMessageId: string compressMessageId: string + compressCallId?: string includedBlockIds: number[] consumedBlockIds: number[] parentBlockIds: number[] @@ -96,6 +103,8 @@ export interface SessionState { prune: Prune nudges: Nudges stats: SessionStats + compressionStarts: Map + compressionDurations: Map toolParameters: Map subAgentResultCache: Map toolIdList: string[] diff --git a/lib/state/utils.ts b/lib/state/utils.ts index fcaaa290..953e03fe 100644 --- a/lib/state/utils.ts +++ b/lib/state/utils.ts @@ -178,6 +178,10 @@ export function loadPruneMessagesState( : typeof block.summary === "string" ? countTokens(block.summary) : 0, + durationMs: + typeof block.durationMs === "number" && Number.isFinite(block.durationMs) + ? Math.max(0, block.durationMs) + : 0, mode: block.mode === "range" || block.mode === "message" ? block.mode : undefined, topic: typeof block.topic === "string" ? block.topic : "", batchTopic: @@ -192,6 +196,8 @@ export function loadPruneMessagesState( typeof block.anchorMessageId === "string" ? block.anchorMessageId : "", compressMessageId: typeof block.compressMessageId === "string" ? block.compressMessageId : "", + compressCallId: + typeof block.compressCallId === "string" ? block.compressCallId : undefined, includedBlockIds: toNumberArray(block.includedBlockIds), consumedBlockIds: toNumberArray(block.consumedBlockIds), parentBlockIds: toNumberArray(block.parentBlockIds), diff --git a/tests/compress-message.test.ts b/tests/compress-message.test.ts index 18c6e410..a590c3ec 100644 --- a/tests/compress-message.test.ts +++ b/tests/compress-message.test.ts @@ -4,6 +4,7 @@ import { join } from "node:path" import { tmpdir } from "node:os" import { mkdirSync } from "node:fs" import { createCompressMessageTool } from "../lib/compress/message" +import { createEventHandler } from "../lib/hooks" import { createSessionState, type WithParts } from "../lib/state" import type { PluginConfig } from "../lib/config" import { Logger } from "../lib/logger" @@ -226,6 +227,127 @@ test("compress message mode batches individual message summaries", async () => { assert.match(blocks[1]?.summary || "", /task output body/) }) +test("compress message mode stores call id for later duration attachment", async () => { + const sessionID = `ses_message_compress_duration_${Date.now()}` + const rawMessages = buildMessages(sessionID) + const state = createSessionState() + const logger = new Logger(false) + const handler = createEventHandler(state, logger) + const originalNow = Date.now + Date.now = () => 100 + + try { + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-1", + messageID: "msg-compress-message", + sessionID, + state: { + status: "pending", + input: {}, + raw: "", + }, + }, + }, + }, + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-1", + messageID: "msg-compress-message", + sessionID, + state: { + status: "running", + input: {}, + time: { start: 325 }, + }, + }, + }, + }, + }) + + const tool = createCompressMessageTool({ + client: { + session: { + messages: async () => ({ data: rawMessages }), + get: async () => ({ data: { parentID: null } }), + }, + }, + state, + logger, + config: buildConfig(), + prompts: { + reload() {}, + getRuntimePrompts() { + return { compressMessage: "", compressRange: "" } + }, + }, + } as any) + + await tool.execute( + { + topic: "Batch stale notes", + content: [ + { + messageId: "m0002", + topic: "Code path note", + summary: "Captured the assistant's code-path findings.", + }, + ], + }, + { + ask: async () => {}, + metadata: () => {}, + sessionID, + messageID: "msg-compress-message", + callID: "call-1", + }, + ) + + const block = Array.from(state.prune.messages.blocksById.values())[0] + assert.equal(block?.compressCallId, "call-1") + assert.equal(block?.durationMs, 0) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-1", + messageID: "msg-compress-message", + sessionID, + state: { + status: "completed", + input: {}, + output: "done", + title: "", + metadata: {}, + time: { start: 325, end: 400 }, + }, + }, + }, + }, + }) + + assert.equal(block?.durationMs, 225) + } finally { + Date.now = originalNow + } +}) + test("compress message mode does not partially apply when preparation fails", async () => { const sessionID = `ses_message_compress_prepare_fail_${Date.now()}` const rawMessages = buildMessages(sessionID) diff --git a/tests/compression-targets.test.ts b/tests/compression-targets.test.ts new file mode 100644 index 00000000..77027af1 --- /dev/null +++ b/tests/compression-targets.test.ts @@ -0,0 +1,78 @@ +import assert from "node:assert/strict" +import test from "node:test" +import { getActiveCompressionTargets } from "../lib/commands/compression-targets" +import { createSessionState, type CompressionBlock } from "../lib/state" + +function buildBlock( + blockId: number, + runId: number, + mode: "range" | "message", + durationMs: number, +): CompressionBlock { + return { + blockId, + runId, + active: true, + deactivatedByUser: false, + compressedTokens: 10, + summaryTokens: 5, + durationMs, + mode, + topic: `topic-${blockId}`, + batchTopic: mode === "message" ? `batch-${runId}` : `topic-${blockId}`, + startId: `m${blockId}`, + endId: `m${blockId}`, + anchorMessageId: `msg-${blockId}`, + compressMessageId: `origin-${runId}`, + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [`msg-${blockId}`], + directToolIds: [], + effectiveMessageIds: [`msg-${blockId}`], + effectiveToolIds: [], + createdAt: blockId, + summary: `summary-${blockId}`, + } +} + +test("active compression targets count a grouped message run once", () => { + const state = createSessionState() + const first = buildBlock(1, 10, "message", 225) + const second = buildBlock(2, 10, "message", 225) + const third = buildBlock(3, 11, "range", 80) + + state.prune.messages.blocksById.set(1, first) + state.prune.messages.blocksById.set(2, second) + state.prune.messages.blocksById.set(3, third) + state.prune.messages.activeBlockIds.add(1) + state.prune.messages.activeBlockIds.add(2) + state.prune.messages.activeBlockIds.add(3) + + const targets = getActiveCompressionTargets(state.prune.messages) + const totalDurationMs = targets.reduce((total, target) => total + target.durationMs, 0) + + assert.equal(targets.length, 2) + assert.equal(totalDurationMs, 305) +}) + +test("inactive grouped message runs no longer contribute compression time", () => { + const state = createSessionState() + const first = buildBlock(1, 10, "message", 225) + const second = buildBlock(2, 10, "message", 225) + const third = buildBlock(3, 11, "range", 80) + + first.active = false + second.active = false + + state.prune.messages.blocksById.set(1, first) + state.prune.messages.blocksById.set(2, second) + state.prune.messages.blocksById.set(3, third) + state.prune.messages.activeBlockIds.add(3) + + const targets = getActiveCompressionTargets(state.prune.messages) + const totalDurationMs = targets.reduce((total, target) => total + target.durationMs, 0) + + assert.equal(targets.length, 1) + assert.equal(totalDurationMs, 80) +}) diff --git a/tests/hooks-permission.test.ts b/tests/hooks-permission.test.ts index 9a13b7ba..e855ca84 100644 --- a/tests/hooks-permission.test.ts +++ b/tests/hooks-permission.test.ts @@ -5,6 +5,7 @@ import { createChatMessageHandler, createChatMessageTransformHandler, createCommandExecuteHandler, + createEventHandler, createTextCompleteHandler, } from "../lib/hooks" import { Logger } from "../lib/logger" @@ -152,3 +153,359 @@ test("text complete strips hallucinated metadata tags", async () => { assert.equal(output.text, "alpha omega") }) + +test("event hook records compress input generation duration", async () => { + const state = createSessionState() + state.sessionId = "session-1" + const handler = createEventHandler(state, new Logger(false)) + const originalNow = Date.now + Date.now = () => 100 + + try { + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-1", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "pending", + input: {}, + raw: "", + }, + }, + }, + }, + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-1", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "running", + input: { topic: "x" }, + time: { start: 325 }, + }, + }, + }, + }, + }) + } finally { + Date.now = originalNow + } + + assert.equal(state.compressionDurations.get("call-1"), 225) + assert.equal(state.compressionStarts.has("call-1"), false) +}) + +test("event hook attaches durations to matching blocks by call id", async () => { + const state = createSessionState() + state.sessionId = "session-1" + const handler = createEventHandler(state, new Logger(false)) + const originalNow = Date.now + Date.now = () => 100 + + try { + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-1", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "pending", + input: {}, + raw: "", + }, + }, + }, + }, + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-2", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "pending", + input: {}, + raw: "", + }, + }, + }, + }, + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-1", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "running", + input: {}, + time: { start: 325 }, + }, + }, + }, + }, + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-2", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "running", + input: {}, + time: { start: 410 }, + }, + }, + }, + }, + }) + state.prune.messages.blocksById.set(1, { + blockId: 1, + runId: 1, + active: true, + deactivatedByUser: false, + compressedTokens: 0, + summaryTokens: 0, + durationMs: 0, + mode: "message", + topic: "one", + batchTopic: "one", + startId: "m0001", + endId: "m0001", + anchorMessageId: "msg-a", + compressMessageId: "message-1", + compressCallId: "call-1", + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [], + directToolIds: [], + effectiveMessageIds: ["msg-a"], + effectiveToolIds: [], + createdAt: 1, + summary: "a", + }) + state.prune.messages.blocksById.set(2, { + blockId: 2, + runId: 2, + active: true, + deactivatedByUser: false, + compressedTokens: 0, + summaryTokens: 0, + durationMs: 0, + mode: "message", + topic: "two", + batchTopic: "two", + startId: "m0002", + endId: "m0002", + anchorMessageId: "msg-b", + compressMessageId: "message-1", + compressCallId: "call-2", + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [], + directToolIds: [], + effectiveMessageIds: ["msg-b"], + effectiveToolIds: [], + createdAt: 2, + summary: "b", + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-2", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "completed", + input: {}, + output: "done", + title: "", + metadata: {}, + time: { start: 410, end: 500 }, + }, + }, + }, + }, + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-1", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "completed", + input: {}, + output: "done", + title: "", + metadata: {}, + time: { start: 325, end: 500 }, + }, + }, + }, + }, + }) + } finally { + Date.now = originalNow + } + + assert.equal(state.prune.messages.blocksById.get(1)?.durationMs, 225) + assert.equal(state.prune.messages.blocksById.get(2)?.durationMs, 310) + assert.equal(state.compressionDurations.size, 0) +}) + +test("event hook falls back to completed runtime when running duration missing", async () => { + const state = createSessionState() + state.sessionId = "session-1" + const handler = createEventHandler(state, new Logger(false)) + + state.prune.messages.blocksById.set(1, { + blockId: 1, + runId: 1, + active: true, + deactivatedByUser: false, + compressedTokens: 0, + summaryTokens: 0, + durationMs: 0, + mode: "message", + topic: "one", + batchTopic: "one", + startId: "m0001", + endId: "m0001", + anchorMessageId: "msg-a", + compressMessageId: "message-1", + compressCallId: undefined, + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [], + directToolIds: [], + effectiveMessageIds: ["msg-a"], + effectiveToolIds: [], + createdAt: 1, + summary: "a", + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "compress", + callID: "call-3", + messageID: "message-1", + sessionID: "session-1", + state: { + status: "completed", + input: {}, + output: "done", + title: "", + metadata: {}, + time: { start: 500, end: 940 }, + }, + }, + }, + }, + }) + + assert.equal(state.prune.messages.blocksById.get(1)?.durationMs, 440) + assert.equal(state.compressionDurations.size, 0) +}) + +test("event hook ignores non-compress tool parts", async () => { + const state = createSessionState() + state.sessionId = "session-1" + const handler = createEventHandler(state, new Logger(false)) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "bash", + callID: "call-2", + messageID: "message-2", + sessionID: "session-1", + state: { + status: "pending", + input: {}, + raw: "", + }, + }, + }, + }, + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + part: { + type: "tool", + tool: "bash", + callID: "call-2", + messageID: "message-2", + sessionID: "session-1", + state: { + status: "running", + input: {}, + time: { start: 220 }, + }, + }, + }, + }, + }) + + assert.equal(state.compressionDurations.size, 0) +}) From b55b57c9e2a8e15e3b96f0c9707fe4d467adcf9f Mon Sep 17 00:00:00 2001 From: Spoon <212802214+spoons-and-mirrors@users.noreply.github.com> Date: Tue, 31 Mar 2026 06:12:09 +0200 Subject: [PATCH 02/14] trim --- lib/compress/message.ts | 1 - lib/compress/range.ts | 1 - lib/compress/state.ts | 14 +----- lib/compress/types.ts | 1 - lib/hooks.ts | 84 +++++++++++++--------------------- lib/state/state.ts | 1 - lib/state/types.ts | 1 - tests/hooks-permission.test.ts | 12 ++--- 8 files changed, 40 insertions(+), 75 deletions(-) diff --git a/lib/compress/message.ts b/lib/compress/message.ts index 905aa7e0..5d81161e 100644 --- a/lib/compress/message.ts +++ b/lib/compress/message.ts @@ -113,7 +113,6 @@ export function createCompressMessageTool(ctx: ToolContext): ReturnType { @@ -389,9 +366,12 @@ export function createEventHandler(state: SessionState, logger: Logger) { return } + if (part.state.status === "running") { + return + } + if (typeof part.callID === "string") { state.compressionStarts.delete(part.callID) - state.compressionDurations.delete(part.callID) } } } diff --git a/lib/state/state.ts b/lib/state/state.ts index 18d0d4f1..97681d6a 100644 --- a/lib/state/state.ts +++ b/lib/state/state.ts @@ -82,7 +82,6 @@ export function createSessionState(): SessionState { totalPruneTokens: 0, }, compressionStarts: new Map(), - compressionDurations: new Map(), toolParameters: new Map(), subAgentResultCache: new Map(), toolIdList: [], diff --git a/lib/state/types.ts b/lib/state/types.ts index 0580529a..5ec73f78 100644 --- a/lib/state/types.ts +++ b/lib/state/types.ts @@ -104,7 +104,6 @@ export interface SessionState { nudges: Nudges stats: SessionStats compressionStarts: Map - compressionDurations: Map toolParameters: Map subAgentResultCache: Map toolIdList: string[] diff --git a/tests/hooks-permission.test.ts b/tests/hooks-permission.test.ts index e855ca84..21fb94b4 100644 --- a/tests/hooks-permission.test.ts +++ b/tests/hooks-permission.test.ts @@ -154,7 +154,7 @@ test("text complete strips hallucinated metadata tags", async () => { assert.equal(output.text, "alpha omega") }) -test("event hook records compress input generation duration", async () => { +test("event hook records compression start timing", async () => { const state = createSessionState() state.sessionId = "session-1" const handler = createEventHandler(state, new Logger(false)) @@ -205,8 +205,10 @@ test("event hook records compress input generation duration", async () => { Date.now = originalNow } - assert.equal(state.compressionDurations.get("call-1"), 225) - assert.equal(state.compressionStarts.has("call-1"), false) + assert.deepEqual(state.compressionStarts.get("call-1"), { + messageId: "message-1", + startedAt: 100, + }) }) test("event hook attaches durations to matching blocks by call id", async () => { @@ -400,7 +402,6 @@ test("event hook attaches durations to matching blocks by call id", async () => assert.equal(state.prune.messages.blocksById.get(1)?.durationMs, 225) assert.equal(state.prune.messages.blocksById.get(2)?.durationMs, 310) - assert.equal(state.compressionDurations.size, 0) }) test("event hook falls back to completed runtime when running duration missing", async () => { @@ -459,7 +460,6 @@ test("event hook falls back to completed runtime when running duration missing", }) assert.equal(state.prune.messages.blocksById.get(1)?.durationMs, 440) - assert.equal(state.compressionDurations.size, 0) }) test("event hook ignores non-compress tool parts", async () => { @@ -507,5 +507,5 @@ test("event hook ignores non-compress tool parts", async () => { }, }) - assert.equal(state.compressionDurations.size, 0) + assert.equal(state.compressionStarts.size, 0) }) From 5182362c04740cb6d1f89d85f65c35c19ccaa5b3 Mon Sep 17 00:00:00 2001 From: Spoon <212802214+spoons-and-mirrors@users.noreply.github.com> Date: Tue, 31 Mar 2026 06:16:39 +0200 Subject: [PATCH 03/14] feat: add compression summary tokens amount to `dcp stats` --- lib/commands/stats.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/commands/stats.ts b/lib/commands/stats.ts index 86d0ea50..b934cda4 100644 --- a/lib/commands/stats.ts +++ b/lib/commands/stats.ts @@ -23,6 +23,7 @@ function formatStatsMessage( sessionTokens: number, sessionTools: number, sessionMessages: number, + sessionSummaryTokens: number, sessionDurationMs: number, allTime: AggregatedStats, ): string { @@ -37,6 +38,7 @@ function formatStatsMessage( lines.push(` Tokens pruned: ~${formatTokenCount(sessionTokens)}`) lines.push(` Tools pruned: ${sessionTools}`) lines.push(` Messages pruned: ${sessionMessages}`) + lines.push(` Summary tokens: ~${formatTokenCount(sessionSummaryTokens)}`) lines.push(` Compression time: ${formatCompressionTime(sessionDurationMs)}`) lines.push("") lines.push("All-time:") @@ -77,6 +79,10 @@ export async function handleStatsCommand(ctx: StatsCommandContext): Promise (block.active ? total + block.summaryTokens : total), + 0, + ) const sessionDurationMs = getActiveCompressionTargets(state.prune.messages).reduce( (total, target) => total + target.durationMs, 0, @@ -106,6 +112,7 @@ export async function handleStatsCommand(ctx: StatsCommandContext): Promise Date: Tue, 31 Mar 2026 06:22:20 +0200 Subject: [PATCH 04/14] rm test noise --- tests/compress-message.test.ts | 145 ++++++++------------------------- tests/hooks-permission.test.ts | 105 ------------------------ 2 files changed, 36 insertions(+), 214 deletions(-) diff --git a/tests/compress-message.test.ts b/tests/compress-message.test.ts index a590c3ec..ad0ab394 100644 --- a/tests/compress-message.test.ts +++ b/tests/compress-message.test.ts @@ -4,7 +4,6 @@ import { join } from "node:path" import { tmpdir } from "node:os" import { mkdirSync } from "node:fs" import { createCompressMessageTool } from "../lib/compress/message" -import { createEventHandler } from "../lib/hooks" import { createSessionState, type WithParts } from "../lib/state" import type { PluginConfig } from "../lib/config" import { Logger } from "../lib/logger" @@ -232,120 +231,48 @@ test("compress message mode stores call id for later duration attachment", async const rawMessages = buildMessages(sessionID) const state = createSessionState() const logger = new Logger(false) - const handler = createEventHandler(state, logger) - const originalNow = Date.now - Date.now = () => 100 - - try { - await handler({ - event: { - type: "message.part.updated", - properties: { - part: { - type: "tool", - tool: "compress", - callID: "call-1", - messageID: "msg-compress-message", - sessionID, - state: { - status: "pending", - input: {}, - raw: "", - }, - }, - }, - }, - }) - - await handler({ - event: { - type: "message.part.updated", - properties: { - part: { - type: "tool", - tool: "compress", - callID: "call-1", - messageID: "msg-compress-message", - sessionID, - state: { - status: "running", - input: {}, - time: { start: 325 }, - }, - }, - }, - }, - }) - const tool = createCompressMessageTool({ - client: { - session: { - messages: async () => ({ data: rawMessages }), - get: async () => ({ data: { parentID: null } }), - }, + const tool = createCompressMessageTool({ + client: { + session: { + messages: async () => ({ data: rawMessages }), + get: async () => ({ data: { parentID: null } }), }, - state, - logger, - config: buildConfig(), - prompts: { - reload() {}, - getRuntimePrompts() { - return { compressMessage: "", compressRange: "" } - }, + }, + state, + logger, + config: buildConfig(), + prompts: { + reload() {}, + getRuntimePrompts() { + return { compressMessage: "", compressRange: "" } }, - } as any) + }, + } as any) - await tool.execute( - { - topic: "Batch stale notes", - content: [ - { - messageId: "m0002", - topic: "Code path note", - summary: "Captured the assistant's code-path findings.", - }, - ], - }, - { - ask: async () => {}, - metadata: () => {}, - sessionID, - messageID: "msg-compress-message", - callID: "call-1", - }, - ) - - const block = Array.from(state.prune.messages.blocksById.values())[0] - assert.equal(block?.compressCallId, "call-1") - assert.equal(block?.durationMs, 0) - - await handler({ - event: { - type: "message.part.updated", - properties: { - part: { - type: "tool", - tool: "compress", - callID: "call-1", - messageID: "msg-compress-message", - sessionID, - state: { - status: "completed", - input: {}, - output: "done", - title: "", - metadata: {}, - time: { start: 325, end: 400 }, - }, - }, + await tool.execute( + { + topic: "Batch stale notes", + content: [ + { + messageId: "m0002", + topic: "Code path note", + summary: "Captured the assistant's code-path findings.", }, - }, - }) + ], + }, + { + ask: async () => {}, + metadata: () => {}, + sessionID, + messageID: "msg-compress-message", + callID: "call-1", + }, + ) - assert.equal(block?.durationMs, 225) - } finally { - Date.now = originalNow - } + const block = Array.from(state.prune.messages.blocksById.values())[0] + assert.equal(block?.compressCallId, "call-1") + assert.equal(block?.durationMs, 0) }) test("compress message mode does not partially apply when preparation fails", async () => { diff --git a/tests/hooks-permission.test.ts b/tests/hooks-permission.test.ts index 21fb94b4..8860511c 100644 --- a/tests/hooks-permission.test.ts +++ b/tests/hooks-permission.test.ts @@ -154,63 +154,6 @@ test("text complete strips hallucinated metadata tags", async () => { assert.equal(output.text, "alpha omega") }) -test("event hook records compression start timing", async () => { - const state = createSessionState() - state.sessionId = "session-1" - const handler = createEventHandler(state, new Logger(false)) - const originalNow = Date.now - Date.now = () => 100 - - try { - await handler({ - event: { - type: "message.part.updated", - properties: { - part: { - type: "tool", - tool: "compress", - callID: "call-1", - messageID: "message-1", - sessionID: "session-1", - state: { - status: "pending", - input: {}, - raw: "", - }, - }, - }, - }, - }) - - await handler({ - event: { - type: "message.part.updated", - properties: { - part: { - type: "tool", - tool: "compress", - callID: "call-1", - messageID: "message-1", - sessionID: "session-1", - state: { - status: "running", - input: { topic: "x" }, - time: { start: 325 }, - }, - }, - }, - }, - }) - } finally { - Date.now = originalNow - } - - assert.deepEqual(state.compressionStarts.get("call-1"), { - messageId: "message-1", - startedAt: 100, - }) -}) - test("event hook attaches durations to matching blocks by call id", async () => { const state = createSessionState() state.sessionId = "session-1" @@ -461,51 +404,3 @@ test("event hook falls back to completed runtime when running duration missing", assert.equal(state.prune.messages.blocksById.get(1)?.durationMs, 440) }) - -test("event hook ignores non-compress tool parts", async () => { - const state = createSessionState() - state.sessionId = "session-1" - const handler = createEventHandler(state, new Logger(false)) - - await handler({ - event: { - type: "message.part.updated", - properties: { - part: { - type: "tool", - tool: "bash", - callID: "call-2", - messageID: "message-2", - sessionID: "session-1", - state: { - status: "pending", - input: {}, - raw: "", - }, - }, - }, - }, - }) - - await handler({ - event: { - type: "message.part.updated", - properties: { - part: { - type: "tool", - tool: "bash", - callID: "call-2", - messageID: "message-2", - sessionID: "session-1", - state: { - status: "running", - input: {}, - time: { start: 220 }, - }, - }, - }, - }, - }) - - assert.equal(state.compressionStarts.size, 0) -}) From 37ed1dd09cb983d23caeaee126add3ebaa1aad70 Mon Sep 17 00:00:00 2001 From: Spoon <212802214+spoons-and-mirrors@users.noreply.github.com> Date: Tue, 31 Mar 2026 07:02:13 +0200 Subject: [PATCH 05/14] stats changes --- lib/commands/stats.ts | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/lib/commands/stats.ts b/lib/commands/stats.ts index b934cda4..bea2a6dc 100644 --- a/lib/commands/stats.ts +++ b/lib/commands/stats.ts @@ -21,9 +21,9 @@ export interface StatsCommandContext { function formatStatsMessage( sessionTokens: number, + sessionSummaryTokens: number, sessionTools: number, sessionMessages: number, - sessionSummaryTokens: number, sessionDurationMs: number, allTime: AggregatedStats, ): string { @@ -33,13 +33,15 @@ function formatStatsMessage( lines.push("│ DCP Statistics │") lines.push("╰───────────────────────────────────────────────────────────╯") lines.push("") - lines.push("Session:") + lines.push("Compression:") lines.push("─".repeat(60)) - lines.push(` Tokens pruned: ~${formatTokenCount(sessionTokens)}`) - lines.push(` Tools pruned: ${sessionTools}`) - lines.push(` Messages pruned: ${sessionMessages}`) - lines.push(` Summary tokens: ~${formatTokenCount(sessionSummaryTokens)}`) - lines.push(` Compression time: ${formatCompressionTime(sessionDurationMs)}`) + lines.push( + ` Tokens in|out: ~${formatTokenCount(sessionTokens)} | ~${formatTokenCount(sessionSummaryTokens)}`, + ) + lines.push(` Ratio: ${formatCompressionRatio(sessionTokens, sessionSummaryTokens)}`) + lines.push(` Time: ${formatCompressionTime(sessionDurationMs)}`) + lines.push(` Messages: ${sessionMessages}`) + lines.push(` Tools: ${sessionTools}`) lines.push("") lines.push("All-time:") lines.push("─".repeat(60)) @@ -51,6 +53,19 @@ function formatStatsMessage( return lines.join("\n") } +function formatCompressionRatio(inputTokens: number, outputTokens: number): string { + if (inputTokens <= 0) { + return "0:1" + } + + if (outputTokens <= 0) { + return "∞:1" + } + + const ratio = Math.max(1, Math.round(inputTokens / outputTokens)) + return `${ratio}:1` +} + function formatCompressionTime(ms: number): string { const safeMs = Math.max(0, Math.round(ms)) if (safeMs < 1000) { @@ -110,9 +125,9 @@ export async function handleStatsCommand(ctx: StatsCommandContext): Promise Date: Tue, 31 Mar 2026 16:07:46 -0400 Subject: [PATCH 06/14] fix: track compression durations across sessions --- lib/compress/state.ts | 25 ------- lib/hooks.ts | 61 +++++++++++------ lib/state/persistence.ts | 44 ++++++------ lib/state/state.ts | 72 +++++++++++++++++++- lib/state/types.ts | 14 +++- lib/state/utils.ts | 55 +++++++++++++-- tests/hooks-permission.test.ts | 120 ++++++++++++++++++++++++++++++++- 7 files changed, 311 insertions(+), 80 deletions(-) diff --git a/lib/compress/state.ts b/lib/compress/state.ts index e8386d9f..15b59e39 100644 --- a/lib/compress/state.ts +++ b/lib/compress/state.ts @@ -26,31 +26,6 @@ export function allocateRunId(state: SessionState): number { return next } -export function attachCompressionDuration( - state: SessionState, - callId: string, - messageId: string, - durationMs: number, -): number { - if (typeof durationMs !== "number" || !Number.isFinite(durationMs)) { - return 0 - } - - let updates = 0 - for (const block of state.prune.messages.blocksById.values()) { - const matchesCall = block.compressCallId === callId - const matchesMessage = !block.compressCallId && block.compressMessageId === messageId - if (!matchesCall && !matchesMessage) { - continue - } - - block.durationMs = durationMs - updates++ - } - - return updates -} - export function wrapCompressedSummary(blockId: number, summary: string): string { const header = COMPRESSED_BLOCK_HEADER const footer = formatMessageIdTag(formatBlockRef(blockId)) diff --git a/lib/hooks.ts b/lib/hooks.ts index 8c590e5b..a954cdb3 100644 --- a/lib/hooks.ts +++ b/lib/hooks.ts @@ -16,7 +16,6 @@ import { } from "./messages" import { renderSystemPrompt, type PromptStore } from "./prompts" import { buildProtectedToolsExtension } from "./prompts/extensions/system" -import { attachCompressionDuration } from "./compress/state" import { applyPendingManualTrigger, handleContextCommand, @@ -30,7 +29,14 @@ import { } from "./commands" import { type HostPermissionSnapshot } from "./host-permissions" import { compressPermission, syncCompressPermissionState } from "./compress-permission" -import { checkSession, ensureSessionInitialized, saveSessionState, syncToolCache } from "./state" +import { + checkSession, + ensureSessionInitialized, + applyPendingCompressionDurations, + queueCompressionDuration, + saveSessionState, + syncToolCache, +} from "./state" import { cacheSystemPromptTokens } from "./ui/utils" const INTERNAL_AGENT_SIGNATURES = [ @@ -269,6 +275,12 @@ export function createTextCompleteHandler() { export function createEventHandler(state: SessionState, logger: Logger) { return async (input: { event: any }) => { + const eventSessionId = + typeof input.event?.properties?.sessionID === "string" + ? input.event.properties.sessionID + : typeof input.event?.properties?.part?.sessionID === "string" + ? input.event.properties.part.sessionID + : undefined const eventTime = typeof input.event?.time === "number" && Number.isFinite(input.event.time) ? input.event.time @@ -287,20 +299,26 @@ export function createEventHandler(state: SessionState, logger: Logger) { } if (part.state.status === "pending") { - if (typeof part.callID !== "string" || typeof part.messageID !== "string") { + if ( + typeof part.callID !== "string" || + typeof part.messageID !== "string" || + typeof eventSessionId !== "string" + ) { return } - if (state.compressionStarts.has(part.callID)) { + if (state.compressionTiming.startsByCallId.has(part.callID)) { return } const startedAt = eventTime ?? Date.now() - state.compressionStarts.set(part.callID, { + state.compressionTiming.startsByCallId.set(part.callID, { + sessionId: eventSessionId, messageId: part.messageID, startedAt, }) logger.debug("Recorded compression start", { + sessionID: eventSessionId, callID: part.callID, messageID: part.messageID, startedAt, @@ -309,12 +327,16 @@ export function createEventHandler(state: SessionState, logger: Logger) { } if (part.state.status === "completed") { - if (typeof part.callID !== "string" || typeof part.messageID !== "string") { + if ( + typeof part.callID !== "string" || + typeof part.messageID !== "string" || + typeof eventSessionId !== "string" + ) { return } - const start = state.compressionStarts.get(part.callID) - state.compressionStarts.delete(part.callID) + const start = state.compressionTiming.startsByCallId.get(part.callID) + state.compressionTiming.startsByCallId.delete(part.callID) const runningAt = typeof part.state.time?.start === "number" && Number.isFinite(part.state.time.start) @@ -341,28 +363,25 @@ export function createEventHandler(state: SessionState, logger: Logger) { return } - const updates = attachCompressionDuration( - state, - part.callID, - part.messageID, - durationMs, - ) + queueCompressionDuration(state, eventSessionId, part.callID, part.messageID, durationMs) + + const updates = + state.sessionId === eventSessionId + ? applyPendingCompressionDurations(state, eventSessionId) + : 0 if (updates === 0) { return } + await saveSessionState(state, logger) + logger.info("Attached compression time to blocks", { + sessionID: eventSessionId, callID: part.callID, messageID: part.messageID, blocks: updates, durationMs, }) - - saveSessionState(state, logger).catch((error) => { - logger.warn("Failed to persist compression time update", { - error: error instanceof Error ? error.message : String(error), - }) - }) return } @@ -371,7 +390,7 @@ export function createEventHandler(state: SessionState, logger: Logger) { } if (typeof part.callID === "string") { - state.compressionStarts.delete(part.callID) + state.compressionTiming.startsByCallId.delete(part.callID) } } } diff --git a/lib/state/persistence.ts b/lib/state/persistence.ts index 2431eee3..87b774f9 100644 --- a/lib/state/persistence.ts +++ b/lib/state/persistence.ts @@ -10,6 +10,7 @@ import { homedir } from "os" import { join } from "path" import type { CompressionBlock, PrunedMessageEntry, SessionState, SessionStats } from "./types" import type { Logger } from "../logger" +import { serializePruneMessagesState } from "./utils" /** Prune state as stored on disk */ export interface PersistedPruneMessagesState { @@ -58,6 +59,23 @@ function getSessionFilePath(sessionId: string): string { return join(STORAGE_DIR, `${sessionId}.json`) } +async function writePersistedSessionState( + sessionId: string, + state: PersistedSessionState, + logger: Logger, +): Promise { + await ensureStorageDir() + + const filePath = getSessionFilePath(sessionId) + const content = JSON.stringify(state, null, 2) + await fs.writeFile(filePath, content, "utf-8") + + logger.info("Saved session state to disk", { + sessionId, + totalTokensSaved: state.stats.totalPruneTokens, + }) +} + export async function saveSessionState( sessionState: SessionState, logger: Logger, @@ -68,26 +86,11 @@ export async function saveSessionState( return } - await ensureStorageDir() - const state: PersistedSessionState = { sessionName: sessionName, prune: { tools: Object.fromEntries(sessionState.prune.tools), - messages: { - byMessageId: Object.fromEntries(sessionState.prune.messages.byMessageId), - blocksById: Object.fromEntries( - Array.from(sessionState.prune.messages.blocksById.entries()).map( - ([blockId, block]) => [String(blockId), block], - ), - ), - activeBlockIds: Array.from(sessionState.prune.messages.activeBlockIds), - activeByAnchorMessageId: Object.fromEntries( - sessionState.prune.messages.activeByAnchorMessageId, - ), - nextBlockId: sessionState.prune.messages.nextBlockId, - nextRunId: sessionState.prune.messages.nextRunId, - }, + messages: serializePruneMessagesState(sessionState.prune.messages), }, nudges: { contextLimitAnchors: Array.from(sessionState.nudges.contextLimitAnchors), @@ -98,14 +101,7 @@ export async function saveSessionState( lastUpdated: new Date().toISOString(), } - const filePath = getSessionFilePath(sessionState.sessionId) - const content = JSON.stringify(state, null, 2) - await fs.writeFile(filePath, content, "utf-8") - - logger.info("Saved session state to disk", { - sessionId: sessionState.sessionId, - totalTokensSaved: state.stats.totalPruneTokens, - }) + await writePersistedSessionState(sessionState.sessionId, state, logger) } catch (error: any) { logger.error("Failed to save session state", { sessionId: sessionState.sessionId, diff --git a/lib/state/state.ts b/lib/state/state.ts index 97681d6a..c543bdec 100644 --- a/lib/state/state.ts +++ b/lib/state/state.ts @@ -1,7 +1,8 @@ -import type { CompressionStart, SessionState, ToolParameterEntry, WithParts } from "./types" +import type { CompressionTimingState, SessionState, ToolParameterEntry, WithParts } from "./types" import type { Logger } from "../logger" import { loadSessionState, saveSessionState } from "./persistence" import { + attachCompressionDuration, isSubAgentSession, findLastCompactionTimestamp, countTurns, @@ -13,6 +14,13 @@ import { } from "./utils" import { getLastUserMessage } from "../messages/query" +function createCompressionTimingState(): CompressionTimingState { + return { + startsByCallId: new Map(), + pendingBySessionId: new Map(), + } +} + export const checkSession = async ( client: any, state: SessionState, @@ -43,6 +51,17 @@ export const checkSession = async ( } } + if (state.sessionId === lastSessionId) { + const applied = applyPendingCompressionDurations(state, lastSessionId) + if (applied > 0) { + saveSessionState(state, logger).catch((error) => { + logger.warn("Failed to persist queued compression time updates", { + error: error instanceof Error ? error.message : String(error), + }) + }) + } + } + const lastCompactionTimestamp = findLastCompactionTimestamp(messages) if (lastCompactionTimestamp > state.lastCompaction) { state.lastCompaction = lastCompactionTimestamp @@ -81,7 +100,7 @@ export function createSessionState(): SessionState { pruneTokenCounter: 0, totalPruneTokens: 0, }, - compressionStarts: new Map(), + compressionTiming: createCompressionTimingState(), toolParameters: new Map(), subAgentResultCache: new Map(), toolIdList: [], @@ -178,4 +197,53 @@ export async function ensureSessionInitialized( pruneTokenCounter: persisted.stats?.pruneTokenCounter || 0, totalPruneTokens: persisted.stats?.totalPruneTokens || 0, } + + const applied = applyPendingCompressionDurations(state, sessionId) + if (applied > 0) { + await saveSessionState(state, logger) + } +} + +export function queueCompressionDuration( + state: SessionState, + sessionId: string, + callId: string, + messageId: string, + durationMs: number, +): void { + const queued = state.compressionTiming.pendingBySessionId.get(sessionId) || [] + const filtered = queued.filter((entry) => entry.callId !== callId) + filtered.push({ callId, messageId, durationMs }) + state.compressionTiming.pendingBySessionId.set(sessionId, filtered) +} + +export function applyPendingCompressionDurations(state: SessionState, sessionId: string): number { + const queued = state.compressionTiming.pendingBySessionId.get(sessionId) + if (!queued || queued.length === 0) { + return 0 + } + + let updates = 0 + const remaining = [] + for (const entry of queued) { + const applied = attachCompressionDuration( + state.prune.messages, + entry.callId, + entry.messageId, + entry.durationMs, + ) + if (applied > 0) { + updates += applied + continue + } + remaining.push(entry) + } + + if (remaining.length > 0) { + state.compressionTiming.pendingBySessionId.set(sessionId, remaining) + } else { + state.compressionTiming.pendingBySessionId.delete(sessionId) + } + + return updates } diff --git a/lib/state/types.ts b/lib/state/types.ts index 5ec73f78..ac77bf5b 100644 --- a/lib/state/types.ts +++ b/lib/state/types.ts @@ -22,10 +22,22 @@ export interface SessionStats { } export interface CompressionStart { + sessionId: string messageId: string startedAt: number } +export interface PendingCompressionDuration { + callId: string + messageId: string + durationMs: number +} + +export interface CompressionTimingState { + startsByCallId: Map + pendingBySessionId: Map +} + export interface PrunedMessageEntry { tokenCount: number allBlockIds: number[] @@ -103,7 +115,7 @@ export interface SessionState { prune: Prune nudges: Nudges stats: SessionStats - compressionStarts: Map + compressionTiming: CompressionTimingState toolParameters: Map subAgentResultCache: Map toolIdList: string[] diff --git a/lib/state/utils.ts b/lib/state/utils.ts index 953e03fe..5a0ad546 100644 --- a/lib/state/utils.ts +++ b/lib/state/utils.ts @@ -8,6 +8,31 @@ import type { import { isIgnoredUserMessage, messageHasCompress } from "../messages/query" import { countTokens } from "../token-utils" +export function attachCompressionDuration( + messagesState: PruneMessagesState, + callId: string, + messageId: string, + durationMs: number, +): number { + if (typeof durationMs !== "number" || !Number.isFinite(durationMs)) { + return 0 + } + + let updates = 0 + for (const block of messagesState.blocksById.values()) { + const matchesCall = block.compressCallId === callId + const matchesMessage = !block.compressCallId && block.compressMessageId === messageId + if (!matchesCall && !matchesMessage) { + continue + } + + block.durationMs = durationMs + updates++ + } + + return updates +} + export const isMessageCompacted = (state: SessionState, msg: WithParts): boolean => { if (msg.info.time.created < state.lastCompaction) { return true @@ -20,12 +45,30 @@ export const isMessageCompacted = (state: SessionState, msg: WithParts): boolean } interface PersistedPruneMessagesState { - byMessageId?: Record - blocksById?: Record - activeBlockIds?: number[] - activeByAnchorMessageId?: Record - nextBlockId?: number - nextRunId?: number + byMessageId: Record + blocksById: Record + activeBlockIds: number[] + activeByAnchorMessageId: Record + nextBlockId: number + nextRunId: number +} + +export function serializePruneMessagesState( + messagesState: PruneMessagesState, +): PersistedPruneMessagesState { + return { + byMessageId: Object.fromEntries(messagesState.byMessageId), + blocksById: Object.fromEntries( + Array.from(messagesState.blocksById.entries()).map(([blockId, block]) => [ + String(blockId), + block, + ]), + ), + activeBlockIds: Array.from(messagesState.activeBlockIds), + activeByAnchorMessageId: Object.fromEntries(messagesState.activeByAnchorMessageId), + nextBlockId: messagesState.nextBlockId, + nextRunId: messagesState.nextRunId, + } } export async function isSubAgentSession(client: any, sessionID: string): Promise { diff --git a/tests/hooks-permission.test.ts b/tests/hooks-permission.test.ts index 8860511c..42ad3ee1 100644 --- a/tests/hooks-permission.test.ts +++ b/tests/hooks-permission.test.ts @@ -9,7 +9,12 @@ import { createTextCompleteHandler, } from "../lib/hooks" import { Logger } from "../lib/logger" -import { createSessionState, type WithParts } from "../lib/state" +import { + createSessionState, + ensureSessionInitialized, + saveSessionState, + type WithParts, +} from "../lib/state" function buildConfig(permission: "allow" | "ask" | "deny" = "allow"): PluginConfig { return { @@ -404,3 +409,116 @@ test("event hook falls back to completed runtime when running duration missing", assert.equal(state.prune.messages.blocksById.get(1)?.durationMs, 440) }) + +test("event hook queues duration updates until the matching session is loaded", async () => { + const logger = new Logger(false) + const targetSessionId = `session-target-${process.pid}-${Date.now()}` + const otherSessionId = `session-other-${process.pid}-${Date.now()}` + const persistedState = createSessionState() + persistedState.sessionId = targetSessionId + persistedState.prune.messages.blocksById.set(1, { + blockId: 1, + runId: 1, + active: true, + deactivatedByUser: false, + compressedTokens: 0, + summaryTokens: 0, + durationMs: 0, + mode: "message", + topic: "one", + batchTopic: "one", + startId: "m0001", + endId: "m0001", + anchorMessageId: "msg-a", + compressMessageId: "message-1", + compressCallId: "call-remote", + includedBlockIds: [], + consumedBlockIds: [], + parentBlockIds: [], + directMessageIds: [], + directToolIds: [], + effectiveMessageIds: ["msg-a"], + effectiveToolIds: [], + createdAt: 1, + summary: "a", + }) + await saveSessionState(persistedState, logger) + + const liveState = createSessionState() + liveState.sessionId = otherSessionId + const handler = createEventHandler(liveState, logger) + + await handler({ + event: { + type: "message.part.updated", + properties: { + sessionID: targetSessionId, + part: { + type: "tool", + tool: "compress", + callID: "call-remote", + messageID: "message-1", + state: { + status: "pending", + input: {}, + raw: "", + }, + }, + }, + time: 100, + }, + }) + + await handler({ + event: { + type: "message.part.updated", + properties: { + sessionID: targetSessionId, + part: { + type: "tool", + tool: "compress", + callID: "call-remote", + messageID: "message-1", + state: { + status: "completed", + input: {}, + output: "done", + title: "", + metadata: {}, + time: { start: 350, end: 500 }, + }, + }, + }, + }, + }) + + assert.equal(liveState.compressionTiming.pendingBySessionId.get(targetSessionId)?.length, 1) + assert.equal(liveState.compressionTiming.startsByCallId.has("call-remote"), false) + + await ensureSessionInitialized( + { + session: { + get: async () => ({ data: { parentID: null } }), + }, + } as any, + liveState, + targetSessionId, + logger, + [ + { + info: { + id: "msg-user-1", + role: "user", + sessionID: targetSessionId, + agent: "assistant", + time: { created: 1 }, + } as WithParts["info"], + parts: [], + }, + ], + false, + ) + + assert.equal(liveState.prune.messages.blocksById.get(1)?.durationMs, 250) + assert.equal(liveState.compressionTiming.pendingBySessionId.has(targetSessionId), false) +}) From caf6009bed324bfcc01e881a11337cf85dd5eceb Mon Sep 17 00:00:00 2001 From: Daniel Smolsky Date: Tue, 31 Mar 2026 16:45:42 -0400 Subject: [PATCH 07/14] refactor: move compression duration updates into compress state --- lib/compress/state.ts | 27 ++++++++++++++++++++++++++- lib/state/state.ts | 2 +- lib/state/utils.ts | 25 ------------------------- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/lib/compress/state.ts b/lib/compress/state.ts index 15b59e39..06905d65 100644 --- a/lib/compress/state.ts +++ b/lib/compress/state.ts @@ -1,4 +1,4 @@ -import type { CompressionBlock, SessionState } from "../state" +import type { CompressionBlock, PruneMessagesState, SessionState } from "../state" import { formatBlockRef, formatMessageIdTag } from "../message-ids" import type { AppliedCompressionResult, CompressionStateInput, SelectionResolution } from "./types" @@ -26,6 +26,31 @@ export function allocateRunId(state: SessionState): number { return next } +export function attachCompressionDuration( + messagesState: PruneMessagesState, + callId: string, + messageId: string, + durationMs: number, +): number { + if (typeof durationMs !== "number" || !Number.isFinite(durationMs)) { + return 0 + } + + let updates = 0 + for (const block of messagesState.blocksById.values()) { + const matchesCall = block.compressCallId === callId + const matchesMessage = !block.compressCallId && block.compressMessageId === messageId + if (!matchesCall && !matchesMessage) { + continue + } + + block.durationMs = durationMs + updates++ + } + + return updates +} + export function wrapCompressedSummary(blockId: number, summary: string): string { const header = COMPRESSED_BLOCK_HEADER const footer = formatMessageIdTag(formatBlockRef(blockId)) diff --git a/lib/state/state.ts b/lib/state/state.ts index c543bdec..fbb544bc 100644 --- a/lib/state/state.ts +++ b/lib/state/state.ts @@ -1,8 +1,8 @@ import type { CompressionTimingState, SessionState, ToolParameterEntry, WithParts } from "./types" import type { Logger } from "../logger" +import { attachCompressionDuration } from "../compress/state" import { loadSessionState, saveSessionState } from "./persistence" import { - attachCompressionDuration, isSubAgentSession, findLastCompactionTimestamp, countTurns, diff --git a/lib/state/utils.ts b/lib/state/utils.ts index 5a0ad546..a18552a2 100644 --- a/lib/state/utils.ts +++ b/lib/state/utils.ts @@ -8,31 +8,6 @@ import type { import { isIgnoredUserMessage, messageHasCompress } from "../messages/query" import { countTokens } from "../token-utils" -export function attachCompressionDuration( - messagesState: PruneMessagesState, - callId: string, - messageId: string, - durationMs: number, -): number { - if (typeof durationMs !== "number" || !Number.isFinite(durationMs)) { - return 0 - } - - let updates = 0 - for (const block of messagesState.blocksById.values()) { - const matchesCall = block.compressCallId === callId - const matchesMessage = !block.compressCallId && block.compressMessageId === messageId - if (!matchesCall && !matchesMessage) { - continue - } - - block.durationMs = durationMs - updates++ - } - - return updates -} - export const isMessageCompacted = (state: SessionState, msg: WithParts): boolean => { if (msg.info.time.created < state.lastCompaction) { return true From 4671b5b3249e47c9aed55fe941183249b50ef621 Mon Sep 17 00:00:00 2001 From: Daniel Smolsky Date: Tue, 31 Mar 2026 16:59:49 -0400 Subject: [PATCH 08/14] refactor: drop redundant timing flush in checkSession --- lib/state/state.ts | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/lib/state/state.ts b/lib/state/state.ts index fbb544bc..9a5c88f6 100644 --- a/lib/state/state.ts +++ b/lib/state/state.ts @@ -51,17 +51,6 @@ export const checkSession = async ( } } - if (state.sessionId === lastSessionId) { - const applied = applyPendingCompressionDurations(state, lastSessionId) - if (applied > 0) { - saveSessionState(state, logger).catch((error) => { - logger.warn("Failed to persist queued compression time updates", { - error: error instanceof Error ? error.message : String(error), - }) - }) - } - } - const lastCompactionTimestamp = findLastCompactionTimestamp(messages) if (lastCompactionTimestamp > state.lastCompaction) { state.lastCompaction = lastCompactionTimestamp From 8334dec640bbbebbb0d39286ec7ed7a4b65b77fb Mon Sep 17 00:00:00 2001 From: Daniel Smolsky Date: Tue, 31 Mar 2026 17:05:15 -0400 Subject: [PATCH 09/14] refactor: inline compression timing state --- lib/state/state.ts | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/lib/state/state.ts b/lib/state/state.ts index 9a5c88f6..91b4dfa5 100644 --- a/lib/state/state.ts +++ b/lib/state/state.ts @@ -1,4 +1,4 @@ -import type { CompressionTimingState, SessionState, ToolParameterEntry, WithParts } from "./types" +import type { SessionState, ToolParameterEntry, WithParts } from "./types" import type { Logger } from "../logger" import { attachCompressionDuration } from "../compress/state" import { loadSessionState, saveSessionState } from "./persistence" @@ -14,13 +14,6 @@ import { } from "./utils" import { getLastUserMessage } from "../messages/query" -function createCompressionTimingState(): CompressionTimingState { - return { - startsByCallId: new Map(), - pendingBySessionId: new Map(), - } -} - export const checkSession = async ( client: any, state: SessionState, @@ -89,7 +82,10 @@ export function createSessionState(): SessionState { pruneTokenCounter: 0, totalPruneTokens: 0, }, - compressionTiming: createCompressionTimingState(), + compressionTiming: { + startsByCallId: new Map(), + pendingBySessionId: new Map(), + }, toolParameters: new Map(), subAgentResultCache: new Map(), toolIdList: [], From d9a43c2585c34821c026a030187d6401785b23d0 Mon Sep 17 00:00:00 2001 From: Daniel Smolsky Date: Tue, 31 Mar 2026 17:16:20 -0400 Subject: [PATCH 10/14] refactor: centralize compression timing logic --- lib/compress/timing.ts | 129 +++++++++++++++++++++++++++++++++++++++++ lib/hooks.ts | 63 +++++++------------- lib/state/state.ts | 51 +--------------- lib/state/types.ts | 18 +----- 4 files changed, 154 insertions(+), 107 deletions(-) create mode 100644 lib/compress/timing.ts diff --git a/lib/compress/timing.ts b/lib/compress/timing.ts new file mode 100644 index 00000000..30492400 --- /dev/null +++ b/lib/compress/timing.ts @@ -0,0 +1,129 @@ +import type { SessionState } from "../state/types" +import { attachCompressionDuration } from "./state" + +export interface CompressionStart { + sessionId: string + messageId: string + startedAt: number +} + +export interface PendingCompressionDuration { + callId: string + messageId: string + durationMs: number +} + +export interface CompressionTimingState { + startsByCallId: Map + pendingBySessionId: Map +} + +export function createCompressionTimingState(): CompressionTimingState { + return { + startsByCallId: new Map(), + pendingBySessionId: new Map(), + } +} + +export function recordCompressionStart( + state: SessionState, + callId: string, + sessionId: string, + messageId: string, + startedAt: number, +): boolean { + if (state.compressionTiming.startsByCallId.has(callId)) { + return false + } + + state.compressionTiming.startsByCallId.set(callId, { + sessionId, + messageId, + startedAt, + }) + return true +} + +export function consumeCompressionStart( + state: SessionState, + callId: string, +): CompressionStart | undefined { + const start = state.compressionTiming.startsByCallId.get(callId) + state.compressionTiming.startsByCallId.delete(callId) + return start +} + +export function clearCompressionStart(state: SessionState, callId: string): void { + state.compressionTiming.startsByCallId.delete(callId) +} + +export function resolveCompressionDuration( + start: CompressionStart | undefined, + eventTime: number | undefined, + partTime: { start?: unknown; end?: unknown } | undefined, +): number | undefined { + const runningAt = + typeof partTime?.start === "number" && Number.isFinite(partTime.start) + ? partTime.start + : eventTime + const pendingToRunningMs = + start && typeof runningAt === "number" + ? Math.max(0, runningAt - start.startedAt) + : undefined + + const toolStart = partTime?.start + const toolEnd = partTime?.end + const runtimeMs = + typeof toolStart === "number" && + Number.isFinite(toolStart) && + typeof toolEnd === "number" && + Number.isFinite(toolEnd) + ? Math.max(0, toolEnd - toolStart) + : undefined + + return typeof pendingToRunningMs === "number" ? pendingToRunningMs : runtimeMs +} + +export function queueCompressionDuration( + state: SessionState, + sessionId: string, + callId: string, + messageId: string, + durationMs: number, +): void { + const queued = state.compressionTiming.pendingBySessionId.get(sessionId) || [] + const filtered = queued.filter((entry) => entry.callId !== callId) + filtered.push({ callId, messageId, durationMs }) + state.compressionTiming.pendingBySessionId.set(sessionId, filtered) +} + +export function applyPendingCompressionDurations(state: SessionState, sessionId: string): number { + const queued = state.compressionTiming.pendingBySessionId.get(sessionId) + if (!queued || queued.length === 0) { + return 0 + } + + let updates = 0 + const remaining = [] + for (const entry of queued) { + const applied = attachCompressionDuration( + state.prune.messages, + entry.callId, + entry.messageId, + entry.durationMs, + ) + if (applied > 0) { + updates += applied + continue + } + remaining.push(entry) + } + + if (remaining.length > 0) { + state.compressionTiming.pendingBySessionId.set(sessionId, remaining) + } else { + state.compressionTiming.pendingBySessionId.delete(sessionId) + } + + return updates +} diff --git a/lib/hooks.ts b/lib/hooks.ts index a954cdb3..e0c9853a 100644 --- a/lib/hooks.ts +++ b/lib/hooks.ts @@ -16,6 +16,14 @@ import { } from "./messages" import { renderSystemPrompt, type PromptStore } from "./prompts" import { buildProtectedToolsExtension } from "./prompts/extensions/system" +import { + applyPendingCompressionDurations, + clearCompressionStart, + consumeCompressionStart, + queueCompressionDuration, + recordCompressionStart, + resolveCompressionDuration, +} from "./compress/timing" import { applyPendingManualTrigger, handleContextCommand, @@ -29,14 +37,7 @@ import { } from "./commands" import { type HostPermissionSnapshot } from "./host-permissions" import { compressPermission, syncCompressPermissionState } from "./compress-permission" -import { - checkSession, - ensureSessionInitialized, - applyPendingCompressionDurations, - queueCompressionDuration, - saveSessionState, - syncToolCache, -} from "./state" +import { checkSession, ensureSessionInitialized, saveSessionState, syncToolCache } from "./state" import { cacheSystemPromptTokens } from "./ui/utils" const INTERNAL_AGENT_SIGNATURES = [ @@ -307,16 +308,18 @@ export function createEventHandler(state: SessionState, logger: Logger) { return } - if (state.compressionTiming.startsByCallId.has(part.callID)) { + const startedAt = eventTime ?? Date.now() + if ( + !recordCompressionStart( + state, + part.callID, + eventSessionId, + part.messageID, + startedAt, + ) + ) { return } - - const startedAt = eventTime ?? Date.now() - state.compressionTiming.startsByCallId.set(part.callID, { - sessionId: eventSessionId, - messageId: part.messageID, - startedAt, - }) logger.debug("Recorded compression start", { sessionID: eventSessionId, callID: part.callID, @@ -335,30 +338,8 @@ export function createEventHandler(state: SessionState, logger: Logger) { return } - const start = state.compressionTiming.startsByCallId.get(part.callID) - state.compressionTiming.startsByCallId.delete(part.callID) - - const runningAt = - typeof part.state.time?.start === "number" && Number.isFinite(part.state.time.start) - ? part.state.time.start - : eventTime - const pendingToRunningMs = - start && typeof runningAt === "number" - ? Math.max(0, runningAt - start.startedAt) - : undefined - - const toolStart = part.state.time?.start - const toolEnd = part.state.time?.end - const runtimeMs = - typeof toolStart === "number" && - Number.isFinite(toolStart) && - typeof toolEnd === "number" && - Number.isFinite(toolEnd) - ? Math.max(0, toolEnd - toolStart) - : undefined - - const durationMs = - typeof pendingToRunningMs === "number" ? pendingToRunningMs : runtimeMs + const start = consumeCompressionStart(state, part.callID) + const durationMs = resolveCompressionDuration(start, eventTime, part.state.time) if (typeof durationMs !== "number") { return } @@ -390,7 +371,7 @@ export function createEventHandler(state: SessionState, logger: Logger) { } if (typeof part.callID === "string") { - state.compressionTiming.startsByCallId.delete(part.callID) + clearCompressionStart(state, part.callID) } } } diff --git a/lib/state/state.ts b/lib/state/state.ts index 91b4dfa5..37f5edf9 100644 --- a/lib/state/state.ts +++ b/lib/state/state.ts @@ -1,6 +1,6 @@ import type { SessionState, ToolParameterEntry, WithParts } from "./types" import type { Logger } from "../logger" -import { attachCompressionDuration } from "../compress/state" +import { applyPendingCompressionDurations, createCompressionTimingState } from "../compress/timing" import { loadSessionState, saveSessionState } from "./persistence" import { isSubAgentSession, @@ -82,10 +82,7 @@ export function createSessionState(): SessionState { pruneTokenCounter: 0, totalPruneTokens: 0, }, - compressionTiming: { - startsByCallId: new Map(), - pendingBySessionId: new Map(), - }, + compressionTiming: createCompressionTimingState(), toolParameters: new Map(), subAgentResultCache: new Map(), toolIdList: [], @@ -188,47 +185,3 @@ export async function ensureSessionInitialized( await saveSessionState(state, logger) } } - -export function queueCompressionDuration( - state: SessionState, - sessionId: string, - callId: string, - messageId: string, - durationMs: number, -): void { - const queued = state.compressionTiming.pendingBySessionId.get(sessionId) || [] - const filtered = queued.filter((entry) => entry.callId !== callId) - filtered.push({ callId, messageId, durationMs }) - state.compressionTiming.pendingBySessionId.set(sessionId, filtered) -} - -export function applyPendingCompressionDurations(state: SessionState, sessionId: string): number { - const queued = state.compressionTiming.pendingBySessionId.get(sessionId) - if (!queued || queued.length === 0) { - return 0 - } - - let updates = 0 - const remaining = [] - for (const entry of queued) { - const applied = attachCompressionDuration( - state.prune.messages, - entry.callId, - entry.messageId, - entry.durationMs, - ) - if (applied > 0) { - updates += applied - continue - } - remaining.push(entry) - } - - if (remaining.length > 0) { - state.compressionTiming.pendingBySessionId.set(sessionId, remaining) - } else { - state.compressionTiming.pendingBySessionId.delete(sessionId) - } - - return updates -} diff --git a/lib/state/types.ts b/lib/state/types.ts index ac77bf5b..7b0b04da 100644 --- a/lib/state/types.ts +++ b/lib/state/types.ts @@ -1,3 +1,4 @@ +import type { CompressionTimingState } from "../compress/timing" import { Message, Part } from "@opencode-ai/sdk/v2" export interface WithParts { @@ -21,23 +22,6 @@ export interface SessionStats { totalPruneTokens: number } -export interface CompressionStart { - sessionId: string - messageId: string - startedAt: number -} - -export interface PendingCompressionDuration { - callId: string - messageId: string - durationMs: number -} - -export interface CompressionTimingState { - startsByCallId: Map - pendingBySessionId: Map -} - export interface PrunedMessageEntry { tokenCount: number allBlockIds: number[] From 97c3d0b5a3e81dc0210a8fc27090ba71d840ad1b Mon Sep 17 00:00:00 2001 From: Daniel Smolsky Date: Tue, 31 Mar 2026 17:48:09 -0400 Subject: [PATCH 11/14] refactor: simplify compression timing state --- lib/compress/pipeline.ts | 2 ++ lib/compress/state.ts | 5 +-- lib/compress/timing.ts | 57 ++++++++-------------------------- lib/hooks.ts | 39 +++-------------------- lib/state/state.ts | 2 +- tests/hooks-permission.test.ts | 6 ++-- 6 files changed, 25 insertions(+), 86 deletions(-) diff --git a/lib/compress/pipeline.ts b/lib/compress/pipeline.ts index acf92392..5f9875e6 100644 --- a/lib/compress/pipeline.ts +++ b/lib/compress/pipeline.ts @@ -9,6 +9,7 @@ import { sendCompressNotification } from "../ui/notification" import type { ToolContext } from "./types" import { buildSearchContext, fetchSessionMessages } from "./search" import type { SearchContext } from "./types" +import { applyPendingCompressionDurations } from "./timing" interface RunContext { ask(input: { @@ -83,6 +84,7 @@ export async function finalizeSession( batchTopic: string | undefined, ): Promise { ctx.state.manualMode = ctx.state.manualMode ? "active" : false + applyPendingCompressionDurations(ctx.state) await saveSessionState(ctx.state, ctx.logger) const params = getCurrentParams(ctx.state, rawMessages, ctx.logger) diff --git a/lib/compress/state.ts b/lib/compress/state.ts index 06905d65..294cc6ab 100644 --- a/lib/compress/state.ts +++ b/lib/compress/state.ts @@ -29,7 +29,6 @@ export function allocateRunId(state: SessionState): number { export function attachCompressionDuration( messagesState: PruneMessagesState, callId: string, - messageId: string, durationMs: number, ): number { if (typeof durationMs !== "number" || !Number.isFinite(durationMs)) { @@ -38,9 +37,7 @@ export function attachCompressionDuration( let updates = 0 for (const block of messagesState.blocksById.values()) { - const matchesCall = block.compressCallId === callId - const matchesMessage = !block.compressCallId && block.compressMessageId === messageId - if (!matchesCall && !matchesMessage) { + if (block.compressCallId !== callId) { continue } diff --git a/lib/compress/timing.ts b/lib/compress/timing.ts index 30492400..d6fe1e18 100644 --- a/lib/compress/timing.ts +++ b/lib/compress/timing.ts @@ -1,53 +1,37 @@ import type { SessionState } from "../state/types" import { attachCompressionDuration } from "./state" -export interface CompressionStart { - sessionId: string - messageId: string - startedAt: number -} - export interface PendingCompressionDuration { callId: string - messageId: string durationMs: number } export interface CompressionTimingState { - startsByCallId: Map - pendingBySessionId: Map + startsByCallId: Map + pendingByCallId: Map } export function createCompressionTimingState(): CompressionTimingState { return { startsByCallId: new Map(), - pendingBySessionId: new Map(), + pendingByCallId: new Map(), } } export function recordCompressionStart( state: SessionState, callId: string, - sessionId: string, - messageId: string, startedAt: number, ): boolean { if (state.compressionTiming.startsByCallId.has(callId)) { return false } - state.compressionTiming.startsByCallId.set(callId, { - sessionId, - messageId, - startedAt, - }) + state.compressionTiming.startsByCallId.set(callId, startedAt) return true } -export function consumeCompressionStart( - state: SessionState, - callId: string, -): CompressionStart | undefined { +export function consumeCompressionStart(state: SessionState, callId: string): number | undefined { const start = state.compressionTiming.startsByCallId.get(callId) state.compressionTiming.startsByCallId.delete(callId) return start @@ -58,7 +42,7 @@ export function clearCompressionStart(state: SessionState, callId: string): void } export function resolveCompressionDuration( - start: CompressionStart | undefined, + startedAt: number | undefined, eventTime: number | undefined, partTime: { start?: unknown; end?: unknown } | undefined, ): number | undefined { @@ -67,8 +51,8 @@ export function resolveCompressionDuration( ? partTime.start : eventTime const pendingToRunningMs = - start && typeof runningAt === "number" - ? Math.max(0, runningAt - start.startedAt) + typeof startedAt === "number" && typeof runningAt === "number" + ? Math.max(0, runningAt - startedAt) : undefined const toolStart = partTime?.start @@ -86,43 +70,28 @@ export function resolveCompressionDuration( export function queueCompressionDuration( state: SessionState, - sessionId: string, callId: string, - messageId: string, durationMs: number, ): void { - const queued = state.compressionTiming.pendingBySessionId.get(sessionId) || [] - const filtered = queued.filter((entry) => entry.callId !== callId) - filtered.push({ callId, messageId, durationMs }) - state.compressionTiming.pendingBySessionId.set(sessionId, filtered) + state.compressionTiming.pendingByCallId.set(callId, { callId, durationMs }) } -export function applyPendingCompressionDurations(state: SessionState, sessionId: string): number { - const queued = state.compressionTiming.pendingBySessionId.get(sessionId) - if (!queued || queued.length === 0) { +export function applyPendingCompressionDurations(state: SessionState): number { + if (state.compressionTiming.pendingByCallId.size === 0) { return 0 } let updates = 0 - const remaining = [] - for (const entry of queued) { + for (const [callId, entry] of state.compressionTiming.pendingByCallId) { const applied = attachCompressionDuration( state.prune.messages, entry.callId, - entry.messageId, entry.durationMs, ) if (applied > 0) { updates += applied - continue + state.compressionTiming.pendingByCallId.delete(callId) } - remaining.push(entry) - } - - if (remaining.length > 0) { - state.compressionTiming.pendingBySessionId.set(sessionId, remaining) - } else { - state.compressionTiming.pendingBySessionId.delete(sessionId) } return updates diff --git a/lib/hooks.ts b/lib/hooks.ts index e0c9853a..4e75588d 100644 --- a/lib/hooks.ts +++ b/lib/hooks.ts @@ -276,12 +276,6 @@ export function createTextCompleteHandler() { export function createEventHandler(state: SessionState, logger: Logger) { return async (input: { event: any }) => { - const eventSessionId = - typeof input.event?.properties?.sessionID === "string" - ? input.event.properties.sessionID - : typeof input.event?.properties?.part?.sessionID === "string" - ? input.event.properties.part.sessionID - : undefined const eventTime = typeof input.event?.time === "number" && Number.isFinite(input.event.time) ? input.event.time @@ -300,41 +294,23 @@ export function createEventHandler(state: SessionState, logger: Logger) { } if (part.state.status === "pending") { - if ( - typeof part.callID !== "string" || - typeof part.messageID !== "string" || - typeof eventSessionId !== "string" - ) { + if (typeof part.callID !== "string") { return } const startedAt = eventTime ?? Date.now() - if ( - !recordCompressionStart( - state, - part.callID, - eventSessionId, - part.messageID, - startedAt, - ) - ) { + if (!recordCompressionStart(state, part.callID, startedAt)) { return } logger.debug("Recorded compression start", { - sessionID: eventSessionId, callID: part.callID, - messageID: part.messageID, startedAt, }) return } if (part.state.status === "completed") { - if ( - typeof part.callID !== "string" || - typeof part.messageID !== "string" || - typeof eventSessionId !== "string" - ) { + if (typeof part.callID !== "string") { return } @@ -344,12 +320,9 @@ export function createEventHandler(state: SessionState, logger: Logger) { return } - queueCompressionDuration(state, eventSessionId, part.callID, part.messageID, durationMs) + queueCompressionDuration(state, part.callID, durationMs) - const updates = - state.sessionId === eventSessionId - ? applyPendingCompressionDurations(state, eventSessionId) - : 0 + const updates = applyPendingCompressionDurations(state) if (updates === 0) { return } @@ -357,9 +330,7 @@ export function createEventHandler(state: SessionState, logger: Logger) { await saveSessionState(state, logger) logger.info("Attached compression time to blocks", { - sessionID: eventSessionId, callID: part.callID, - messageID: part.messageID, blocks: updates, durationMs, }) diff --git a/lib/state/state.ts b/lib/state/state.ts index 37f5edf9..3df6f500 100644 --- a/lib/state/state.ts +++ b/lib/state/state.ts @@ -180,7 +180,7 @@ export async function ensureSessionInitialized( totalPruneTokens: persisted.stats?.totalPruneTokens || 0, } - const applied = applyPendingCompressionDurations(state, sessionId) + const applied = applyPendingCompressionDurations(state) if (applied > 0) { await saveSessionState(state, logger) } diff --git a/tests/hooks-permission.test.ts b/tests/hooks-permission.test.ts index 42ad3ee1..238b0412 100644 --- a/tests/hooks-permission.test.ts +++ b/tests/hooks-permission.test.ts @@ -372,7 +372,7 @@ test("event hook falls back to completed runtime when running duration missing", endId: "m0001", anchorMessageId: "msg-a", compressMessageId: "message-1", - compressCallId: undefined, + compressCallId: "call-3", includedBlockIds: [], consumedBlockIds: [], parentBlockIds: [], @@ -492,7 +492,7 @@ test("event hook queues duration updates until the matching session is loaded", }, }) - assert.equal(liveState.compressionTiming.pendingBySessionId.get(targetSessionId)?.length, 1) + assert.equal(liveState.compressionTiming.pendingByCallId.has("call-remote"), true) assert.equal(liveState.compressionTiming.startsByCallId.has("call-remote"), false) await ensureSessionInitialized( @@ -520,5 +520,5 @@ test("event hook queues duration updates until the matching session is loaded", ) assert.equal(liveState.prune.messages.blocksById.get(1)?.durationMs, 250) - assert.equal(liveState.compressionTiming.pendingBySessionId.has(targetSessionId), false) + assert.equal(liveState.compressionTiming.pendingByCallId.has("call-remote"), false) }) From ecde964e2788630bd4871a403f8b1d6da974f740 Mon Sep 17 00:00:00 2001 From: Daniel Smolsky Date: Tue, 31 Mar 2026 18:25:59 -0400 Subject: [PATCH 12/14] refactor: inline timing map writes --- lib/compress/timing.ts | 25 ------------------------- lib/hooks.ts | 13 +++++++------ 2 files changed, 7 insertions(+), 31 deletions(-) diff --git a/lib/compress/timing.ts b/lib/compress/timing.ts index d6fe1e18..a8521843 100644 --- a/lib/compress/timing.ts +++ b/lib/compress/timing.ts @@ -18,29 +18,12 @@ export function createCompressionTimingState(): CompressionTimingState { } } -export function recordCompressionStart( - state: SessionState, - callId: string, - startedAt: number, -): boolean { - if (state.compressionTiming.startsByCallId.has(callId)) { - return false - } - - state.compressionTiming.startsByCallId.set(callId, startedAt) - return true -} - export function consumeCompressionStart(state: SessionState, callId: string): number | undefined { const start = state.compressionTiming.startsByCallId.get(callId) state.compressionTiming.startsByCallId.delete(callId) return start } -export function clearCompressionStart(state: SessionState, callId: string): void { - state.compressionTiming.startsByCallId.delete(callId) -} - export function resolveCompressionDuration( startedAt: number | undefined, eventTime: number | undefined, @@ -68,14 +51,6 @@ export function resolveCompressionDuration( return typeof pendingToRunningMs === "number" ? pendingToRunningMs : runtimeMs } -export function queueCompressionDuration( - state: SessionState, - callId: string, - durationMs: number, -): void { - state.compressionTiming.pendingByCallId.set(callId, { callId, durationMs }) -} - export function applyPendingCompressionDurations(state: SessionState): number { if (state.compressionTiming.pendingByCallId.size === 0) { return 0 diff --git a/lib/hooks.ts b/lib/hooks.ts index 4e75588d..7f71265a 100644 --- a/lib/hooks.ts +++ b/lib/hooks.ts @@ -18,10 +18,7 @@ import { renderSystemPrompt, type PromptStore } from "./prompts" import { buildProtectedToolsExtension } from "./prompts/extensions/system" import { applyPendingCompressionDurations, - clearCompressionStart, consumeCompressionStart, - queueCompressionDuration, - recordCompressionStart, resolveCompressionDuration, } from "./compress/timing" import { @@ -299,9 +296,10 @@ export function createEventHandler(state: SessionState, logger: Logger) { } const startedAt = eventTime ?? Date.now() - if (!recordCompressionStart(state, part.callID, startedAt)) { + if (state.compressionTiming.startsByCallId.has(part.callID)) { return } + state.compressionTiming.startsByCallId.set(part.callID, startedAt) logger.debug("Recorded compression start", { callID: part.callID, startedAt, @@ -320,7 +318,10 @@ export function createEventHandler(state: SessionState, logger: Logger) { return } - queueCompressionDuration(state, part.callID, durationMs) + state.compressionTiming.pendingByCallId.set(part.callID, { + callId: part.callID, + durationMs, + }) const updates = applyPendingCompressionDurations(state) if (updates === 0) { @@ -342,7 +343,7 @@ export function createEventHandler(state: SessionState, logger: Logger) { } if (typeof part.callID === "string") { - clearCompressionStart(state, part.callID) + state.compressionTiming.startsByCallId.delete(part.callID) } } } From 57dc7d6629551e8e7b006c5e42162e638a2eacb4 Mon Sep 17 00:00:00 2001 From: Daniel Smolsky Date: Tue, 31 Mar 2026 18:33:51 -0400 Subject: [PATCH 13/14] refactor: inline timing state init --- lib/compress/timing.ts | 7 ------- lib/state/state.ts | 7 +++++-- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/lib/compress/timing.ts b/lib/compress/timing.ts index a8521843..4a5efed6 100644 --- a/lib/compress/timing.ts +++ b/lib/compress/timing.ts @@ -11,13 +11,6 @@ export interface CompressionTimingState { pendingByCallId: Map } -export function createCompressionTimingState(): CompressionTimingState { - return { - startsByCallId: new Map(), - pendingByCallId: new Map(), - } -} - export function consumeCompressionStart(state: SessionState, callId: string): number | undefined { const start = state.compressionTiming.startsByCallId.get(callId) state.compressionTiming.startsByCallId.delete(callId) diff --git a/lib/state/state.ts b/lib/state/state.ts index 3df6f500..71cb3aac 100644 --- a/lib/state/state.ts +++ b/lib/state/state.ts @@ -1,6 +1,6 @@ import type { SessionState, ToolParameterEntry, WithParts } from "./types" import type { Logger } from "../logger" -import { applyPendingCompressionDurations, createCompressionTimingState } from "../compress/timing" +import { applyPendingCompressionDurations } from "../compress/timing" import { loadSessionState, saveSessionState } from "./persistence" import { isSubAgentSession, @@ -82,7 +82,10 @@ export function createSessionState(): SessionState { pruneTokenCounter: 0, totalPruneTokens: 0, }, - compressionTiming: createCompressionTimingState(), + compressionTiming: { + startsByCallId: new Map(), + pendingByCallId: new Map(), + }, toolParameters: new Map(), subAgentResultCache: new Map(), toolIdList: [], From 15c5d207e294ea10c9a2f83f01ae3c60325b38fd Mon Sep 17 00:00:00 2001 From: Daniel Smolsky Date: Tue, 31 Mar 2026 18:37:56 -0400 Subject: [PATCH 14/14] refactor: move event hook last --- index.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/index.ts b/index.ts index 0e58d1db..e69357e6 100644 --- a/index.ts +++ b/index.ts @@ -58,7 +58,6 @@ const plugin: Plugin = (async (ctx) => { config, prompts, ), - "experimental.chat.messages.transform": createChatMessageTransformHandler( ctx.client, state, @@ -69,7 +68,6 @@ const plugin: Plugin = (async (ctx) => { ) as any, "chat.message": createChatMessageHandler(state, logger, config, hostPermissions), "experimental.text.complete": createTextCompleteHandler(), - event: createEventHandler(state, logger), "command.execute.before": createCommandExecuteHandler( ctx.client, state, @@ -78,6 +76,7 @@ const plugin: Plugin = (async (ctx) => { ctx.directory, hostPermissions, ), + event: createEventHandler(state, logger), tool: { ...(config.compress.permission !== "deny" && { compress: