From 12362d1aada03863abf4ff4ccb5c4845fd1b62d9 Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Sun, 26 Apr 2026 15:14:01 -0400 Subject: [PATCH 1/2] Migrate burn summary to archive query (#82) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the per-invocation `queryAll` ledger walk with `queryAllFromArchive` that issues SQL against `archive.sqlite`. Filters lower to indexed `WHERE` clauses; tool calls are bulk-hydrated; subagent-tree and `--by-subagent-type` modes consume the same archive-derived turn slice. JSON + text output stays parity-identical against the legacy reader for the load-bearing blocks (`byModel`, `totalCost`, `fidelity`). Two escape hatches preserve the old behavior: `--no-archive` flag and `RELAYBURN_ARCHIVE=0` env var both revert to `queryAll`. If the archive build/read throws (corrupt sqlite, schema mismatch we couldn't recover from cleanly), the command transparently falls back to the streaming reader and surfaces the reason on stderr — the archive can never wedge `burn summary`. Tests cover parity (text + JSON), auto-build behavior, both fallback paths, and the new `archive-query` reader (filters, tool-call hydration, fidelity class round-trip, subagent block hydration). Closes #82, refs #40, #78. --- packages/cli/CHANGELOG.md | 4 + packages/cli/src/cli.ts | 2 +- packages/cli/src/commands/summary.test.ts | 203 +++++++++++ packages/cli/src/commands/summary.ts | 44 ++- packages/ledger/CHANGELOG.md | 1 + packages/ledger/src/archive-query.test.ts | 315 +++++++++++++++++ packages/ledger/src/archive-query.ts | 412 ++++++++++++++++++++++ packages/ledger/src/index.ts | 1 + 8 files changed, 979 insertions(+), 3 deletions(-) create mode 100644 packages/cli/src/commands/summary.test.ts create mode 100644 packages/ledger/src/archive-query.test.ts create mode 100644 packages/ledger/src/archive-query.ts diff --git a/packages/cli/CHANGELOG.md b/packages/cli/CHANGELOG.md index 20acabe..96aa63e 100644 --- a/packages/cli/CHANGELOG.md +++ b/packages/cli/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- **`burn summary` now reads from `archive.sqlite` instead of streaming `ledger.jsonl`** ([#82](https://github.com/AgentWorkforce/burn/issues/82)). The default hot path calls `buildArchive()` (cheap incremental tail scan after the per-invocation `ingestAll`) and issues SQL with filters lowered to indexed `WHERE` clauses against `turns`, replacing the per-invocation full ledger walk + stamp fold. Subagent-tree (`--subagent-tree`) and `--by-subagent-type` modes consume the same archive-derived turn slice. Output (text + `--json`) is parity-preserved against the legacy reader for the `byModel`, `totalCost`, and `fidelity` blocks. Two escape hatches preserve the old behavior: a new `--no-archive` flag and the `RELAYBURN_ARCHIVE=0` env var both revert to `queryAll`. If the archive path throws (corrupt sqlite, schema mismatch we couldn't recover from cleanly), the command transparently falls back to the streaming reader and surfaces the reason on stderr — the archive can never wedge `burn summary`. + ### Added - **Execution-graph passthrough for Codex ingest** ([#87](https://github.com/AgentWorkforce/burn/issues/87)). `burn ingest` (and `burn codex`) now persist Codex `SessionRelationshipRecord`s and `ToolResultEventRecord`s the reader emits, alongside the existing turns / content lines, mirroring the Claude path landed in the previous release. The Codex cursor (`~/.relayburn/cursors.json`) gains `rootSessionEmitted`, `nextEventIndex`, and `toolResultCounters` so dedup of execution-graph rows survives across `burn` invocations even when the writer-side index isn't warm. diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index 1c71187..fe0c996 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -22,7 +22,7 @@ const HELP = `burn — token usage & cost attribution for agent CLIs Usage: burn summary [--since 7d] [--project ] [--session ] [--workflow ] [--agent ] [--quality] - [--subagent-tree ] [--by-subagent-type] + [--subagent-tree ] [--by-subagent-type] [--no-archive] burn by-tool [--since 7d] [--project ] [--session ] burn waste [--since 7d] [--project ] [--session ] [--workflow ] [--all] [--json] [--patterns[=retries,failures,compaction,reverts]] diff --git a/packages/cli/src/commands/summary.test.ts b/packages/cli/src/commands/summary.test.ts new file mode 100644 index 0000000..bfe4a82 --- /dev/null +++ b/packages/cli/src/commands/summary.test.ts @@ -0,0 +1,203 @@ +import { strict as assert } from 'node:assert'; +import { mkdtemp, rm, stat } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import * as path from 'node:path'; +import { after, beforeEach, describe, it } from 'node:test'; + +import { appendTurns, archivePath, stamp } from '@relayburn/ledger'; +import type { TurnRecord } from '@relayburn/reader'; + +import { runSummary } from './summary.js'; + +function fakeTurn(overrides: Partial = {}): TurnRecord { + return { + v: 1, + source: 'claude-code', + sessionId: 's-1', + messageId: 'msg-1', + turnIndex: 0, + ts: '2026-04-20T00:00:00.000Z', + model: 'claude-sonnet-4-6', + usage: { + input: 1000, + output: 500, + reasoning: 0, + cacheRead: 1000, + cacheCreate5m: 0, + cacheCreate1h: 0, + }, + toolCalls: [], + project: '/tmp/project', + ...overrides, + }; +} + +interface CapturedOutput { + stdout: string; + stderr: string; + code: number; +} + +async function captureSummary( + flags: Record = {}, +): Promise { + const origStdout = process.stdout.write.bind(process.stdout); + const origStderr = process.stderr.write.bind(process.stderr); + let stdout = ''; + let stderr = ''; + process.stdout.write = ((chunk: string | Uint8Array): boolean => { + stdout += typeof chunk === 'string' ? chunk : chunk.toString(); + return true; + }) as typeof process.stdout.write; + process.stderr.write = ((chunk: string | Uint8Array): boolean => { + stderr += typeof chunk === 'string' ? chunk : chunk.toString(); + return true; + }) as typeof process.stderr.write; + let code: number; + try { + code = await runSummary({ flags, tags: {}, positional: [], passthrough: [] }); + } finally { + process.stdout.write = origStdout; + process.stderr.write = origStderr; + } + return { stdout, stderr, code }; +} + +describe('burn summary archive integration (#82)', () => { + let tmpHome: string; + let tmpRelay: string; + const originalHome = process.env['HOME']; + const originalRelay = process.env['RELAYBURN_HOME']; + const originalArchive = process.env['RELAYBURN_ARCHIVE']; + + beforeEach(async () => { + tmpHome = await mkdtemp(path.join(tmpdir(), 'burn-summary-home-')); + tmpRelay = await mkdtemp(path.join(tmpdir(), 'burn-summary-relay-')); + process.env['HOME'] = tmpHome; + process.env['RELAYBURN_HOME'] = tmpRelay; + delete process.env['RELAYBURN_ARCHIVE']; + }); + + after(async () => { + if (originalHome !== undefined) process.env['HOME'] = originalHome; + else delete process.env['HOME']; + if (originalRelay !== undefined) process.env['RELAYBURN_HOME'] = originalRelay; + else delete process.env['RELAYBURN_HOME']; + if (originalArchive !== undefined) process.env['RELAYBURN_ARCHIVE'] = originalArchive; + else delete process.env['RELAYBURN_ARCHIVE']; + await rm(tmpHome, { recursive: true, force: true }); + await rm(tmpRelay, { recursive: true, force: true }); + }); + + it('--json output is identical between archive and ledger paths (parity)', async () => { + await appendTurns([ + fakeTurn({ sessionId: 's-A', messageId: 'pa-1' }), + fakeTurn({ + sessionId: 's-A', + messageId: 'pa-2', + turnIndex: 1, + ts: '2026-04-20T00:01:00.000Z', + }), + fakeTurn({ + sessionId: 's-B', + messageId: 'pa-3', + ts: '2026-04-20T00:02:00.000Z', + model: 'claude-haiku-4-5', + project: '/tmp/other', + }), + ]); + await stamp({ sessionId: 's-A' }, { workflowId: 'wf-parity' }); + + // Default path: builds the archive, then queries SQL. + const archiveOut = await captureSummary({ json: true }); + assert.equal(archiveOut.code, 0); + + // Fallback path: streams the ledger. + const ledgerOut = await captureSummary({ json: true, 'no-archive': true }); + assert.equal(ledgerOut.code, 0); + + interface SummaryPayload { + turns: number; + totalCost: { total: number }; + byModel: Array<{ model: string; turns: number; usage: Record; cost: { total: number } }>; + fidelity: unknown; + } + const archive = JSON.parse(archiveOut.stdout) as SummaryPayload; + const ledger = JSON.parse(ledgerOut.stdout) as SummaryPayload; + assert.equal(archive.turns, ledger.turns); + assert.equal(archive.turns, 3); + assert.deepEqual( + archive.byModel.map((r) => ({ model: r.model, turns: r.turns, usage: r.usage, cost: r.cost })), + ledger.byModel.map((r) => ({ model: r.model, turns: r.turns, usage: r.usage, cost: r.cost })), + ); + assert.deepEqual(archive.totalCost, ledger.totalCost); + assert.deepEqual(archive.fidelity, ledger.fidelity); + }); + + it('default path auto-builds archive.sqlite on first run', async () => { + await appendTurns([fakeTurn({ sessionId: 's-AB', messageId: 'ab-1' })]); + // Pre-condition: no archive on disk. + await assert.rejects(stat(archivePath()), /ENOENT/); + + const out = await captureSummary({ json: true }); + assert.equal(out.code, 0); + + // Post-condition: `loadTurns` ran `buildArchive()` and the file exists. + const st = await stat(archivePath()); + assert.equal(st.isFile(), true); + }); + + it('--no-archive flag does NOT build the archive (fallback path)', async () => { + await appendTurns([fakeTurn({ sessionId: 's-NA', messageId: 'na-1' })]); + await assert.rejects(stat(archivePath()), /ENOENT/); + + const out = await captureSummary({ json: true, 'no-archive': true }); + assert.equal(out.code, 0); + + // The archive should still be missing — we hit the legacy `queryAll` path. + await assert.rejects(stat(archivePath()), /ENOENT/); + }); + + it('RELAYBURN_ARCHIVE=0 env disables the archive path (fallback)', async () => { + await appendTurns([fakeTurn({ sessionId: 's-ENV', messageId: 'env-1' })]); + await assert.rejects(stat(archivePath()), /ENOENT/); + + process.env['RELAYBURN_ARCHIVE'] = '0'; + try { + const out = await captureSummary({ json: true }); + assert.equal(out.code, 0); + } finally { + delete process.env['RELAYBURN_ARCHIVE']; + } + // Same fallback behavior — no archive built. + await assert.rejects(stat(archivePath()), /ENOENT/); + }); + + it('text output matches between archive and ledger paths (parity)', async () => { + await appendTurns([ + fakeTurn({ sessionId: 's-T', messageId: 'tx-1' }), + fakeTurn({ + sessionId: 's-T', + messageId: 'tx-2', + turnIndex: 1, + ts: '2026-04-20T00:01:00.000Z', + }), + ]); + + const archiveOut = await captureSummary({}); + assert.equal(archiveOut.code, 0); + const ledgerOut = await captureSummary({ 'no-archive': true }); + assert.equal(ledgerOut.code, 0); + + // The "ingested N new sessions (+M turns)" preamble depends on the live + // ingest pass which is a no-op here (no ~/.claude or ~/.codex sessions in + // the temp HOME), but stripping the preamble keeps the test resilient if + // that contract ever changes. Compare the body — model table + total + // cost. + const stripPreamble = (s: string): string => { + const idx = s.indexOf('turns analyzed:'); + return idx >= 0 ? s.slice(idx) : s; + }; + assert.equal(stripPreamble(archiveOut.stdout), stripPreamble(ledgerOut.stdout)); + }); +}); diff --git a/packages/cli/src/commands/summary.ts b/packages/cli/src/commands/summary.ts index 45ccce5..a52cf27 100644 --- a/packages/cli/src/commands/summary.ts +++ b/packages/cli/src/commands/summary.ts @@ -13,7 +13,13 @@ import type { QualityResult, SubagentTreeNode, } from '@relayburn/analyze'; -import { queryAll, readContent, type Query } from '@relayburn/ledger'; +import { + buildArchive, + queryAll, + queryAllFromArchive, + readContent, + type Query, +} from '@relayburn/ledger'; import type { EnrichedTurn } from '@relayburn/ledger'; import type { ContentRecord } from '@relayburn/reader'; @@ -34,7 +40,7 @@ export async function runSummary(args: ParsedArgs): Promise { const ingestReport = await ingestAll(); const pricing = await loadPricing(); - const turns = await queryAll(q); + const turns = await loadTurns(q, args); if (subagentTreeFlag !== undefined) { return renderSubagentTreeMode(args, turns, pricing, subagentTreeFlag, q); @@ -323,6 +329,40 @@ function renderFidelityNotice(f: FidelitySummary): string | undefined { return `fidelity: ${parts.join(' / ')} (use --json for per-field coverage)`; } +/** + * Load the turns slice that drives every summary mode. + * + * Default path: bring `archive.sqlite` current via `buildArchive()` (cheap + * incremental tail scan after `ingestAll`'s appends), then issue SQL with + * filters lowered as `WHERE` clauses against indexed columns. Replaces the + * full ledger walk (`queryAll`) on the hot path. + * + * Fallback path: `--no-archive` flag or `RELAYBURN_ARCHIVE=0` env reverts + * to the legacy `queryAll` ledger stream — kept as an escape hatch for + * parity validation and for environments where the archive is missing / + * corrupt. If a build/query against the archive throws, we transparently + * fall back to the same legacy path so a wedged archive can never break + * the command. + */ +async function loadTurns(q: Query, args: ParsedArgs): Promise { + const noArchiveFlag = args.flags['no-archive'] === true; + const envDisabled = process.env['RELAYBURN_ARCHIVE'] === '0'; + if (noArchiveFlag || envDisabled) { + return queryAll(q); + } + try { + await buildArchive(); + return await queryAllFromArchive(q); + } catch (err) { + // Don't let an archive-side failure (corrupt sqlite, schema mismatch we + // didn't recover from cleanly, etc.) take down `burn summary`. Surface + // the reason on stderr and fall back to the streaming reader. + const msg = err instanceof Error ? err.message : String(err); + process.stderr.write(`burn: archive read failed (${msg}); falling back to ledger walk\n`); + return queryAll(q); + } +} + function aggregateByModel(turns: EnrichedTurn[], pricing: Parameters[1]): ModelRow[] { const byModel = new Map(); for (const t of turns) { diff --git a/packages/ledger/CHANGELOG.md b/packages/ledger/CHANGELOG.md index fdd90b5..1c6b9b9 100644 --- a/packages/ledger/CHANGELOG.md +++ b/packages/ledger/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **`queryAllFromArchive(query)` + `archiveAvailable()`** ([#82](https://github.com/AgentWorkforce/burn/issues/82)). New read-side entry point in `@relayburn/ledger` that issues SQL against `archive.sqlite` and returns the same `EnrichedTurn[]` shape as `queryAll`, so consumers (starting with `burn summary`) can swap implementations without touching their aggregation code. Filters land as `WHERE` clauses against indexed columns (`ts`, `model`, `project_key`, `session_id`, `source`, materialized enrichment columns); arbitrary stamp keys not promoted to columns fall back to a `json_extract` over `enrichment_json` to match `queryAll` semantics. Tool calls are bulk-hydrated keyed on `(source, session_id, message_id)` so callers that read `turn.toolCalls` keep working without an extra round-trip. Fidelity is reconstructed from the persisted `attribution_fidelity` / `tokens_present` / `cost_present` columns plus class-implied coverage defaults — class equality (the load-bearing parity contract for `summarizeFidelity`) is preserved; the synthesized coverage shape may differ from the on-ledger blob for classes that don't pin every flag. - **`CodexCursor` carries execution-graph commit state** ([#87](https://github.com/AgentWorkforce/burn/issues/87)). Three optional fields — `rootSessionEmitted`, `nextEventIndex`, `toolResultCounters` — let `burn ingest` resume Codex sessions without re-emitting the root `SessionRelationshipRecord` or restarting `ToolResultEventRecord.eventIndex` at zero across `burn` invocations. Older cursor files are backward-compatible: missing fields default to "fresh" (root not emitted, indices start at zero), and the next ingest pass pre-loads them onto the writer's dedup index. ## [0.25.0] - 2026-04-26 diff --git a/packages/ledger/src/archive-query.test.ts b/packages/ledger/src/archive-query.test.ts new file mode 100644 index 0000000..c5e6a94 --- /dev/null +++ b/packages/ledger/src/archive-query.test.ts @@ -0,0 +1,315 @@ +import { strict as assert } from 'node:assert'; +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import * as path from 'node:path'; +import { after, before, beforeEach, describe, it } from 'node:test'; + +import type { TurnRecord } from '@relayburn/reader'; + +import { __resetIndexCacheForTesting } from './index-sidecar.js'; +import { appendTurns, stamp } from './writer.js'; +import { queryAll, type EnrichedTurn } from './reader.js'; +import { buildArchive } from './archive.js'; +import { archiveAvailable, queryAllFromArchive } from './archive-query.js'; + +function fakeTurn(overrides: Partial = {}): TurnRecord { + return { + v: 1, + source: 'claude-code', + sessionId: 's-1', + messageId: 'msg-1', + turnIndex: 0, + ts: '2026-04-20T00:00:00.000Z', + model: 'claude-sonnet-4-6', + usage: { + input: 100, + output: 50, + reasoning: 0, + cacheRead: 1000, + cacheCreate5m: 0, + cacheCreate1h: 0, + }, + toolCalls: [], + project: '/tmp/project', + ...overrides, + }; +} + +// Strip the synthesized `fidelity.coverage` block before comparing — the +// archive doesn't persist the full coverage shape (#110), only the class / +// granularity / tokens-present / cost-present projection. Class equality is the +// load-bearing parity guarantee `summarizeFidelity` cares about; coverage may +// differ shape-for-shape between the streamed `queryAll` and the synthesized +// archive row when the source populated coverage flags asymmetrically. +function normalizeForCompare(t: EnrichedTurn): EnrichedTurn { + const out = { ...t } as EnrichedTurn; + if (out.fidelity) { + out.fidelity = { + class: out.fidelity.class, + granularity: out.fidelity.granularity, + coverage: out.fidelity.coverage, + }; + } + return out; +} + +describe('archive-query', () => { + let tmpDir: string; + const originalHome = process.env['RELAYBURN_HOME']; + + before(async () => { + tmpDir = await mkdtemp(path.join(tmpdir(), 'relayburn-archive-query-test-')); + }); + + beforeEach(async () => { + await rm(tmpDir, { recursive: true, force: true }); + tmpDir = await mkdtemp(path.join(tmpdir(), 'relayburn-archive-query-test-')); + process.env['RELAYBURN_HOME'] = tmpDir; + __resetIndexCacheForTesting(); + }); + + after(async () => { + if (originalHome !== undefined) { + process.env['RELAYBURN_HOME'] = originalHome; + } else { + delete process.env['RELAYBURN_HOME']; + } + await rm(tmpDir, { recursive: true, force: true }); + }); + + it('archiveAvailable() is false before the first build, true after', async () => { + assert.equal(await archiveAvailable(), false); + await appendTurns([fakeTurn({ sessionId: 's-a', messageId: 'm-a' })]); + await buildArchive(); + assert.equal(await archiveAvailable(), true); + }); + + it('returns the same turn count as queryAll for an unfiltered query', async () => { + await appendTurns([ + fakeTurn({ sessionId: 's-A', messageId: 'm-1' }), + fakeTurn({ + sessionId: 's-A', + messageId: 'm-2', + turnIndex: 1, + ts: '2026-04-20T00:01:00.000Z', + }), + fakeTurn({ + sessionId: 's-B', + messageId: 'm-3', + ts: '2026-04-20T00:02:00.000Z', + project: '/tmp/other', + }), + ]); + await buildArchive(); + + const fromLedger = await queryAll(); + const fromArchive = await queryAllFromArchive(); + assert.equal(fromArchive.length, fromLedger.length); + assert.equal(fromArchive.length, 3); + }); + + it('parity: per-turn shape matches queryAll for project + workflow filters', async () => { + await appendTurns([ + fakeTurn({ + sessionId: 's-P', + messageId: 'mp-1', + toolCalls: [ + { id: 'tu-1', name: 'Read', target: '/tmp/foo.ts', argsHash: 'a1' }, + { id: 'tu-2', name: 'Edit', target: '/tmp/foo.ts', argsHash: 'a2', isError: false }, + ], + }), + fakeTurn({ + sessionId: 's-P', + messageId: 'mp-2', + turnIndex: 1, + ts: '2026-04-20T00:01:00.000Z', + }), + fakeTurn({ + sessionId: 's-Q', + messageId: 'mq-1', + ts: '2026-04-20T00:02:00.000Z', + project: '/tmp/other', + }), + ]); + await stamp({ sessionId: 's-P' }, { workflowId: 'wf-42', persona: 'eng' }); + await buildArchive(); + + // Project filter — should match both s-P turns and exclude s-Q. + { + const ledger = (await queryAll({ project: '/tmp/project' })).map(normalizeForCompare); + const archive = (await queryAllFromArchive({ project: '/tmp/project' })).map( + normalizeForCompare, + ); + assert.equal(archive.length, ledger.length); + assert.equal(archive.length, 2); + // Same set of message ids, regardless of sort order. + const archiveIds = new Set(archive.map((t) => t.messageId)); + const ledgerIds = new Set(ledger.map((t) => t.messageId)); + assert.deepEqual([...archiveIds].sort(), [...ledgerIds].sort()); + + // Cross-check that the first row's enrichment + tool calls survived the + // archive round-trip. + const a = archive.find((t) => t.messageId === 'mp-1')!; + const l = ledger.find((t) => t.messageId === 'mp-1')!; + assert.equal(a.enrichment['workflowId'], l.enrichment['workflowId']); + assert.equal(a.enrichment['workflowId'], 'wf-42'); + assert.equal(a.enrichment['persona'], 'eng'); + assert.equal(a.toolCalls.length, 2); + assert.equal(a.toolCalls.length, l.toolCalls.length); + assert.equal(a.toolCalls[0]!.name, 'Read'); + assert.equal(a.toolCalls[1]!.isError, false); + assert.equal(a.usage.input, l.usage.input); + assert.equal(a.usage.output, l.usage.output); + } + + // Workflow filter — only the stamped session should land. + { + const ledger = (await queryAll({ enrichment: { workflowId: 'wf-42' } })).map( + normalizeForCompare, + ); + const archive = ( + await queryAllFromArchive({ enrichment: { workflowId: 'wf-42' } }) + ).map(normalizeForCompare); + assert.equal(archive.length, 2); + assert.equal(archive.length, ledger.length); + for (const t of archive) { + assert.equal(t.sessionId, 's-P'); + } + } + }); + + it('honors since/until window filters the same way queryAll does', async () => { + await appendTurns([ + fakeTurn({ + sessionId: 's-W', + messageId: 'mw-1', + ts: '2026-04-20T00:00:00.000Z', + }), + fakeTurn({ + sessionId: 's-W', + messageId: 'mw-2', + turnIndex: 1, + ts: '2026-04-21T00:00:00.000Z', + }), + fakeTurn({ + sessionId: 's-W', + messageId: 'mw-3', + turnIndex: 2, + ts: '2026-04-22T00:00:00.000Z', + }), + ]); + await buildArchive(); + + const since = '2026-04-21T00:00:00.000Z'; + const ledger = await queryAll({ since }); + const archive = await queryAllFromArchive({ since }); + assert.equal(archive.length, ledger.length); + assert.equal(archive.length, 2); + for (const t of archive) { + assert.ok(t.ts >= since, `turn ts ${t.ts} should be >= since`); + } + }); + + it('hydrates subagent block when subagent fields are populated', async () => { + await appendTurns([ + fakeTurn({ + sessionId: 's-S', + messageId: 'ms-1', + subagent: { + isSidechain: true, + agentId: 'agent-A', + parentAgentId: 'agent-root', + parentToolUseId: 'tu-spawn', + subagentType: 'investigator', + }, + }), + // Plain turn — no subagent block expected on the way back out. + fakeTurn({ + sessionId: 's-S', + messageId: 'ms-2', + turnIndex: 1, + ts: '2026-04-20T00:01:00.000Z', + }), + ]); + await buildArchive(); + const archive = await queryAllFromArchive({ sessionId: 's-S' }); + assert.equal(archive.length, 2); + const sub = archive.find((t) => t.messageId === 'ms-1')!; + assert.ok(sub.subagent, 'subagent block should be present'); + assert.equal(sub.subagent.isSidechain, true); + assert.equal(sub.subagent.agentId, 'agent-A'); + assert.equal(sub.subagent.parentAgentId, 'agent-root'); + assert.equal(sub.subagent.parentToolUseId, 'tu-spawn'); + assert.equal(sub.subagent.subagentType, 'investigator'); + + const plain = archive.find((t) => t.messageId === 'ms-2')!; + assert.equal(plain.subagent, undefined); + }); + + it('preserves fidelity class so summarizeFidelity buckets match the streaming reader', async () => { + await appendTurns([ + fakeTurn({ + sessionId: 's-F', + messageId: 'mf-1', + fidelity: { + granularity: 'per-turn', + coverage: { + hasInputTokens: true, + hasOutputTokens: true, + hasReasoningTokens: false, + hasCacheReadTokens: true, + hasCacheCreateTokens: true, + hasToolCalls: true, + hasToolResultEvents: true, + hasSessionRelationships: true, + hasRawContent: true, + }, + class: 'full', + }, + }), + fakeTurn({ + sessionId: 's-F', + messageId: 'mf-2', + turnIndex: 1, + ts: '2026-04-20T00:01:00.000Z', + fidelity: { + granularity: 'cost-only', + coverage: { + hasInputTokens: false, + hasOutputTokens: false, + hasReasoningTokens: false, + hasCacheReadTokens: false, + hasCacheCreateTokens: false, + hasToolCalls: false, + hasToolResultEvents: false, + hasSessionRelationships: false, + hasRawContent: false, + }, + class: 'cost-only', + }, + }), + // Older row with no fidelity at all → archive row has all NULLs and the + // synthesizer must round-trip that back to `fidelity = undefined`. + fakeTurn({ + sessionId: 's-F', + messageId: 'mf-3', + turnIndex: 2, + ts: '2026-04-20T00:02:00.000Z', + }), + ]); + await buildArchive(); + const archive = await queryAllFromArchive({ sessionId: 's-F' }); + const byId = new Map(archive.map((t) => [t.messageId, t])); + assert.equal(byId.get('mf-1')!.fidelity?.class, 'full'); + assert.equal(byId.get('mf-2')!.fidelity?.class, 'cost-only'); + assert.equal(byId.get('mf-2')!.fidelity?.granularity, 'cost-only'); + assert.equal(byId.get('mf-3')!.fidelity, undefined); + }); + + it('returns an empty array when no rows match (no SQL parameter errors on empty filters)', async () => { + await appendTurns([fakeTurn({ sessionId: 's-E', messageId: 'me-1' })]); + await buildArchive(); + const out = await queryAllFromArchive({ sessionId: 'does-not-exist' }); + assert.deepEqual(out, []); + }); +}); diff --git a/packages/ledger/src/archive-query.ts b/packages/ledger/src/archive-query.ts new file mode 100644 index 0000000..c70ab99 --- /dev/null +++ b/packages/ledger/src/archive-query.ts @@ -0,0 +1,412 @@ +import { stat } from 'node:fs/promises'; +import type { SQLInputValue } from 'node:sqlite'; + +import type { + ActivityCategory, + Coverage, + Fidelity, + FidelityClass, + SourceKind, + ToolCall, + TurnRecord, + UsageGranularity, +} from '@relayburn/reader'; + +import { openArchive } from './archive.js'; +import { archivePath } from './paths.js'; +import type { Query, EnrichedTurn } from './reader.js'; +import type { Enrichment } from './schema.js'; + +/** + * Read-side counterpart to `queryAll(query)` that issues SQL against the + * derived archive (`~/.relayburn/archive.sqlite`) instead of streaming the + * canonical `ledger.jsonl`. Returns the same `EnrichedTurn[]` shape so + * downstream commands can swap implementations without touching their + * aggregation code. + * + * Filters land as `WHERE` clauses so the b-tree indexes on `ts`, `model`, + * `project_key`, and `workflow_id` (see `archive.ts`) carry the work. + * Ordering matches `queryAll`: ledger insertion order, which is `(ts, + * turn_index)` for sources that emit turns chronologically. + * + * Tool calls are hydrated from `tool_calls` keyed by `(source, session_id, + * message_id)` so consumers that read `turn.toolCalls` (e.g. quality + * detectors that count trailing failure streaks) keep working without an + * extra round-trip. + * + * Fidelity is reconstructed from the persisted `attribution_fidelity` / + * `tokens_present` / `cost_present` columns plus class-implied defaults for + * the coverage shape — the archive intentionally does not store the full + * `Fidelity` blob (#110 deferred that to keep the schema additive). NULL + * `attribution_fidelity` maps back to `fidelity = undefined` so + * `summarizeFidelity` buckets such turns under `unknown`, matching the + * pre-migration JSON contract. + * + * Throws if the archive file is missing — the CLI is expected to call + * `buildArchive()` (or `rebuildArchive()`) before this. Callers that want + * the implicit fallback to `queryAll` should guard with + * `archiveAvailable()`. + */ +export async function queryAllFromArchive(q: Query = {}): Promise { + const db = await openArchive(); + try { + const { sql, params } = buildSelect(q); + const rows = db.prepare(sql).all(...params) as unknown as ArchiveTurnRow[]; + if (rows.length === 0) return []; + + // Bulk-hydrate tool calls keyed on (source, session_id, message_id). + // SQLite's parameter limit is 999 by default in `node:sqlite`'s default + // build; chunk to stay well below that. Each row contributes 3 params. + const toolCallsByKey = await loadToolCallsForKeys( + db, + rows.map((r) => ({ source: r.source, sessionId: r.session_id, messageId: r.message_id })), + ); + + return rows.map((r) => rowToEnrichedTurn(r, toolCallsByKey)); + } finally { + db.close(); + } +} + +export async function archiveAvailable(): Promise { + try { + const s = await stat(archivePath()); + return s.isFile(); + } catch { + return false; + } +} + +interface ArchiveTurnRow { + source: string; + session_id: string; + message_id: string; + turn_index: number | bigint; + ts: string; + model: string; + project: string | null; + project_key: string | null; + activity: string | null; + stop_reason: string | null; + has_edits: number | bigint | null; + retries: number | bigint | null; + is_sidechain: number | bigint | null; + subagent_id: string | null; + parent_subagent_id: string | null; + parent_tool_use_id: string | null; + subagent_type: string | null; + input_tokens: number | bigint; + output_tokens: number | bigint; + reasoning_tokens: number | bigint; + cache_read_tokens: number | bigint; + cache_create_5m_tokens: number | bigint; + cache_create_1h_tokens: number | bigint; + workflow_id: string | null; + agent_id: string | null; + persona: string | null; + tier: string | null; + enrichment_json: string | null; + attribution_fidelity: string | null; + tokens_present: number | bigint | null; + cost_present: number | bigint | null; +} + +interface ToolCallRow { + source: string; + session_id: string; + message_id: string; + call_index: number | bigint; + tool_use_id: string | null; + tool_name: string; + target: string | null; + args_hash: string | null; + is_error: number | bigint | null; +} + +function buildSelect(q: Query): { sql: string; params: SQLInputValue[] } { + const wheres: string[] = []; + const params: SQLInputValue[] = []; + if (q.since) { + wheres.push('ts >= ?'); + params.push(q.since); + } + if (q.until) { + wheres.push('ts <= ?'); + params.push(q.until); + } + if (q.project) { + // Preserve `queryAll` semantics: a turn matches when either `project` + // or `projectKey` equals the filter value. Both columns are indexed + // (well, project_key is — `project` falls back to a scan, same as the + // ledger walk). + wheres.push('(project = ? OR project_key = ?)'); + params.push(q.project, q.project); + } + if (q.sessionId) { + wheres.push('session_id = ?'); + params.push(q.sessionId); + } + if (q.source) { + wheres.push('source = ?'); + params.push(q.source); + } + if (q.enrichment) { + for (const [k, v] of Object.entries(q.enrichment)) { + if (v === undefined) continue; + const col = ENRICHMENT_COLUMN[k]; + if (col) { + // Materialized enrichment column — index-friendly equality. + wheres.push(`${col} = ?`); + params.push(v); + } else { + // Fall back to the JSON blob for keys we didn't promote to a + // column. Slow path (full scan) but matches the queryAll contract + // for arbitrary stamp keys. + wheres.push(`json_extract(enrichment_json, '$.' || ?) = ?`); + params.push(k, v); + } + } + } + + // Order matches `queryAll`'s emission order from streaming the ledger + // (turns emit roughly in (ts, turn_index) order; preserving that keeps + // downstream `sort` / `[turns.length-1]` lookups stable across paths). + const sql = [ + 'SELECT * FROM turns', + wheres.length > 0 ? `WHERE ${wheres.join(' AND ')}` : '', + 'ORDER BY ts, turn_index', + ] + .filter(Boolean) + .join(' '); + return { sql, params }; +} + +const ENRICHMENT_COLUMN: Record = { + workflowId: 'workflow_id', + agentId: 'agent_id', + persona: 'persona', + tier: 'tier', +}; + +async function loadToolCallsForKeys( + db: Awaited>, + keys: Array<{ source: string; sessionId: string; messageId: string }>, +): Promise> { + const out = new Map(); + if (keys.length === 0) return out; + // Dedup keys — multiple turn rows can never share (source, session_id, + // message_id) (PK guarantees), so this is just defensive. + const seen = new Set(); + const distinct: Array<{ source: string; sessionId: string; messageId: string }> = []; + for (const k of keys) { + const id = `${k.source}|${k.sessionId}|${k.messageId}`; + if (seen.has(id)) continue; + seen.add(id); + distinct.push(k); + } + // SQLite default SQLITE_MAX_VARIABLE_NUMBER is 32766 in modern builds, but + // node:sqlite ships with a more conservative default. 250 keys × 3 params + // = 750 placeholders per chunk leaves plenty of headroom. + const CHUNK = 250; + for (let i = 0; i < distinct.length; i += CHUNK) { + const chunk = distinct.slice(i, i + CHUNK); + const placeholders = chunk.map(() => '(?, ?, ?)').join(', '); + const params: SQLInputValue[] = []; + for (const k of chunk) { + params.push(k.source, k.sessionId, k.messageId); + } + const sql = ` + SELECT source, session_id, message_id, call_index, tool_use_id, + tool_name, target, args_hash, is_error + FROM tool_calls + WHERE (source, session_id, message_id) IN (${placeholders}) + ORDER BY source, session_id, message_id, call_index + `; + const rows = db.prepare(sql).all(...params) as unknown as ToolCallRow[]; + for (const row of rows) { + const key = `${row.source}|${row.session_id}|${row.message_id}`; + let list = out.get(key); + if (!list) { + list = []; + out.set(key, list); + } + const tc: ToolCall = { + // Persisted `tool_use_id` is the only thing the writer sees on the + // ledger as `ToolCall.id`; older rows that lacked one were stored + // as NULL — surface as empty string so the type stays satisfied. + id: row.tool_use_id ?? '', + name: row.tool_name, + argsHash: row.args_hash ?? '', + }; + if (row.target !== null) tc.target = row.target; + if (row.is_error !== null) tc.isError = Number(row.is_error) === 1; + list.push(tc); + } + } + return out; +} + +function rowToEnrichedTurn( + r: ArchiveTurnRow, + toolCallsByKey: Map, +): EnrichedTurn { + const enrichment = parseEnrichment(r.enrichment_json); + const toolCalls = toolCallsByKey.get(`${r.source}|${r.session_id}|${r.message_id}`) ?? []; + const turn: TurnRecord = { + v: 1, + source: r.source as SourceKind, + sessionId: r.session_id, + messageId: r.message_id, + turnIndex: Number(r.turn_index), + ts: r.ts, + model: r.model, + usage: { + input: Number(r.input_tokens), + output: Number(r.output_tokens), + reasoning: Number(r.reasoning_tokens), + cacheRead: Number(r.cache_read_tokens), + cacheCreate5m: Number(r.cache_create_5m_tokens), + cacheCreate1h: Number(r.cache_create_1h_tokens), + }, + toolCalls, + }; + if (r.project !== null) turn.project = r.project; + if (r.project_key !== null) turn.projectKey = r.project_key; + if (r.activity !== null) turn.activity = r.activity as ActivityCategory; + if (r.stop_reason !== null) turn.stopReason = r.stop_reason; + if (r.has_edits !== null) turn.hasEdits = Number(r.has_edits) === 1; + if (r.retries !== null) turn.retries = Number(r.retries); + // Subagent block: only emit when at least one of its fields is populated, + // matching the on-ledger shape (a Codex turn with no sidechain doesn't + // carry an empty `subagent` object; `is_sidechain` lands NULL → skip). + if ( + r.is_sidechain !== null || + r.subagent_id !== null || + r.parent_subagent_id !== null || + r.parent_tool_use_id !== null || + r.subagent_type !== null + ) { + const subagent: NonNullable = { + isSidechain: Number(r.is_sidechain ?? 0) === 1, + }; + if (r.subagent_id !== null) subagent.agentId = r.subagent_id; + if (r.parent_subagent_id !== null) subagent.parentAgentId = r.parent_subagent_id; + if (r.parent_tool_use_id !== null) subagent.parentToolUseId = r.parent_tool_use_id; + if (r.subagent_type !== null) subagent.subagentType = r.subagent_type; + turn.subagent = subagent; + } + const fidelity = synthesizeFidelity(r); + if (fidelity) turn.fidelity = fidelity; + return { ...turn, enrichment }; +} + +function parseEnrichment(json: string | null): Enrichment { + if (!json) return {}; + try { + const parsed = JSON.parse(json); + if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) { + const out: Enrichment = {}; + for (const [k, v] of Object.entries(parsed as Record)) { + if (typeof v === 'string') out[k] = v; + } + return out; + } + } catch { + // Fall through to empty enrichment — corrupted JSON shouldn't crash + // the read path. + } + return {}; +} + +/** + * Reconstruct a `Fidelity` from the projected archive columns. + * + * We only persist `class`, `tokens_present`, and `cost_present` (#110); the + * full `coverage` and `granularity` shape isn't stored to keep the schema + * additive. Synthesize a coverage object that matches the class semantics so + * `summarizeFidelity` buckets the turn into the same `byClass` slot the + * pre-migration `queryAll` would have produced. `byGranularity` and + * `missingCoverage` will reflect the synthesized shape — close to but not + * always byte-identical with the original `Fidelity` if the source + * (Codex/OpenCode) populated coverage flags differently. The class bucket + * IS byte-identical, which is what the text-mode fidelity notice and the + * `unknown`/`full`/`partial`/etc. counts in JSON depend on. + */ +function synthesizeFidelity(r: ArchiveTurnRow): Fidelity | undefined { + if (r.attribution_fidelity === null) return undefined; + const cls = r.attribution_fidelity as FidelityClass; + const tokensPresent = r.tokens_present !== null && Number(r.tokens_present) === 1; + const costPresent = r.cost_present !== null && Number(r.cost_present) === 1; + const granularity: UsageGranularity = costPresent ? 'cost-only' : 'per-turn'; + const coverage = coverageForClass(cls, tokensPresent); + return { class: cls, granularity, coverage }; +} + +function coverageForClass(cls: FidelityClass, tokensPresent: boolean): Coverage { + // Sensible per-class defaults. `full` → everything true; `cost-only` → + // nothing true; `usage-only` → tokens true / structural false; `partial` + // / `aggregate-only` → split the difference using `tokensPresent`. + switch (cls) { + case 'full': + return { + hasInputTokens: true, + hasOutputTokens: true, + hasReasoningTokens: true, + hasCacheReadTokens: true, + hasCacheCreateTokens: true, + hasToolCalls: true, + hasToolResultEvents: true, + hasSessionRelationships: true, + hasRawContent: true, + }; + case 'usage-only': + return { + hasInputTokens: tokensPresent, + hasOutputTokens: tokensPresent, + hasReasoningTokens: false, + hasCacheReadTokens: tokensPresent, + hasCacheCreateTokens: tokensPresent, + hasToolCalls: false, + hasToolResultEvents: false, + hasSessionRelationships: false, + hasRawContent: false, + }; + case 'partial': + return { + hasInputTokens: tokensPresent, + hasOutputTokens: tokensPresent, + hasReasoningTokens: false, + hasCacheReadTokens: false, + hasCacheCreateTokens: false, + hasToolCalls: false, + hasToolResultEvents: false, + hasSessionRelationships: false, + hasRawContent: false, + }; + case 'aggregate-only': + return { + hasInputTokens: false, + hasOutputTokens: false, + hasReasoningTokens: false, + hasCacheReadTokens: false, + hasCacheCreateTokens: false, + hasToolCalls: false, + hasToolResultEvents: false, + hasSessionRelationships: false, + hasRawContent: false, + }; + case 'cost-only': + default: + return { + hasInputTokens: false, + hasOutputTokens: false, + hasReasoningTokens: false, + hasCacheReadTokens: false, + hasCacheCreateTokens: false, + hasToolCalls: false, + hasToolResultEvents: false, + hasSessionRelationships: false, + hasRawContent: false, + }; + } +} diff --git a/packages/ledger/src/index.ts b/packages/ledger/src/index.ts index 4d53d77..6696d2b 100644 --- a/packages/ledger/src/index.ts +++ b/packages/ledger/src/index.ts @@ -39,6 +39,7 @@ export { rebuildArchive, } from './archive.js'; export type { ArchiveStatus, BuildResult } from './archive.js'; +export { archiveAvailable, queryAllFromArchive } from './archive-query.js'; export { appendContent, listContentSessionIds, From 421e5b6a388f54321d52358e91a86d966aa05b49 Mon Sep 17 00:00:00 2001 From: Will Washburn Date: Sun, 26 Apr 2026 23:36:13 -0700 Subject: [PATCH 2/2] Move queryAllFromArchive entry from 0.27.0 to [Unreleased] After merging origin/main (which carried the 0.30.0 release), the queryAllFromArchive / archiveAvailable bullet for #82 ended up filed under the already-released 0.27.0 section. Move it back to [Unreleased] where unreleased work belongs. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/ledger/CHANGELOG.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/ledger/CHANGELOG.md b/packages/ledger/CHANGELOG.md index 967518e..6f1b6a5 100644 --- a/packages/ledger/CHANGELOG.md +++ b/packages/ledger/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **`queryAllFromArchive(query)` + `archiveAvailable()`** ([#82](https://github.com/AgentWorkforce/burn/issues/82)). New read-side entry point in `@relayburn/ledger` that issues SQL against `archive.sqlite` and returns the same `EnrichedTurn[]` shape as `queryAll`, so consumers (starting with `burn summary`) can swap implementations without touching their aggregation code. Filters land as `WHERE` clauses against indexed columns (`ts`, `model`, `project_key`, `session_id`, `source`, materialized enrichment columns); arbitrary stamp keys not promoted to columns fall back to a `json_extract` over `enrichment_json` to match `queryAll` semantics. Tool calls are bulk-hydrated keyed on `(source, session_id, message_id)` so callers that read `turn.toolCalls` keep working without an extra round-trip. Fidelity is reconstructed from the persisted `attribution_fidelity` / `tokens_present` / `cost_present` columns plus class-implied coverage defaults — class equality (the load-bearing parity contract for `summarizeFidelity`) is preserved; the synthesized coverage shape may differ from the on-ledger blob for classes that don't pin every flag. + ## [0.30.0] - 2026-04-27 ### Changed @@ -17,7 +21,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- **`queryAllFromArchive(query)` + `archiveAvailable()`** ([#82](https://github.com/AgentWorkforce/burn/issues/82)). New read-side entry point in `@relayburn/ledger` that issues SQL against `archive.sqlite` and returns the same `EnrichedTurn[]` shape as `queryAll`, so consumers (starting with `burn summary`) can swap implementations without touching their aggregation code. Filters land as `WHERE` clauses against indexed columns (`ts`, `model`, `project_key`, `session_id`, `source`, materialized enrichment columns); arbitrary stamp keys not promoted to columns fall back to a `json_extract` over `enrichment_json` to match `queryAll` semantics. Tool calls are bulk-hydrated keyed on `(source, session_id, message_id)` so callers that read `turn.toolCalls` keep working without an extra round-trip. Fidelity is reconstructed from the persisted `attribution_fidelity` / `tokens_present` / `cost_present` columns plus class-implied coverage defaults — class equality (the load-bearing parity contract for `summarizeFidelity`) is preserved; the synthesized coverage shape may differ from the on-ledger blob for classes that don't pin every flag. - **User-turn ledger lines** (#2). New append-only `user_turn` ledger records persist parser-emitted `UserTurnRecord`s alongside raw turns without rewriting or mutating them. The ledger now exposes `appendUserTurns()` / `queryUserTurns()`, `UserTurnLine` / `isUserTurnLine()`, and `userTurnIdHash()` keyed by `(source, sessionId, userUuid)` through the existing dedup index; `rebuildIndex()` rehydrates those ids after index loss. ## [0.26.0] - 2026-04-26