diff --git a/CHANGELOG.md b/CHANGELOG.md index bb0cc06..0459978 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,23 @@ layout can change freely within a minor version. ## [Unreleased] +## [0.1.2] — 2026-05-26 + +### Performance +- **TUI and the in-process web UI (`w`) no longer stall on large histories.** + On a ~1GB history the web UI went from minutes to open down to a couple of + seconds. Several costs that pegged the shared event loop are gone: + - Budget + anomaly rollups recompute on a ~2.5s tick instead of on every event + (keys like `q` stopped responding before; the live timeline still updates + instantly). + - The budget rollup now aggregates in SQL (~20ms) instead of pulling up to 50k + rows into JS (~1s) on every tick. + - Adapter initial-scan backfill drains one file per macrotask, so HTTP/SSE stay + responsive during startup instead of blocking on a file-read storm. + - Stale (>48h) session files are skipped on startup only when the store already + has their history — a fresh/empty/unavailable store still backfills fully, so + there is no first-run data loss. + ## [0.1.1] — 2026-05-26 ### Performance diff --git a/package.json b/package.json index ed15557..e223dee 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@misha_misha/agentwatch", - "version": "0.1.1", + "version": "0.1.2", "description": "Local-only observability + control plane for every AI coding agent on your machine. TUI live tail + browser dashboard on localhost. Unified timeline across Claude Code, Codex, Gemini CLI, Cursor, Hermes, OpenClaw — token + cost accounting, compaction + anomaly detection, SVG call graphs, diff attribution, agent-aware replay, MCP server mode, OpenTelemetry exporter. No cloud, no telemetry, no sign-in.", "type": "module", "author": "Misha Nefedov", diff --git a/src/adapters/claude-code.ts b/src/adapters/claude-code.ts index 6c596cc..80c9763 100644 --- a/src/adapters/claude-code.ts +++ b/src/adapters/claude-code.ts @@ -10,6 +10,8 @@ import { registerSpawn } from "../util/spawn-tracker.js"; import { costOf, parseUsage } from "../util/cost.js"; import { markAgentWrite } from "../util/recent-writes.js"; import { readNewlineTerminatedLines } from "../util/jsonl-stream.js"; +import { backfillStartOffset } from "../util/backfill.js"; +import { createBackfillQueue } from "../util/backfill-queue.js"; import { createParseErrorTracker } from "../util/parse-errors.js"; type Emit = EventSink | ((e: AgentEvent) => void); @@ -73,7 +75,7 @@ export function startClaudeAdapter(sink: Emit): () => void { const size = safeSize(file); let cursor = cursors.get(file); if (!cursor) { - const start = isInitialAdd ? Math.max(0, size - BACKFILL_BYTES) : size; + const start = backfillStartOffset(file, size, isInitialAdd, BACKFILL_BYTES); cursor = { offset: start }; cursors.set(file, cursor); } @@ -155,8 +157,18 @@ export function startClaudeAdapter(sink: Emit): () => void { } }; - watcher.on("add", (f) => process(f, true)); + // Initial-scan files drain one per macrotask so the event loop stays + // free for HTTP/SSE during startup; live adds (post-scan) process inline. + let scanReady = false; + const backfillQueue = createBackfillQueue((f) => process(f, true)); + watcher.on("add", (f) => { + if (scanReady) process(f, true); + else backfillQueue.enqueue(f); + }); watcher.on("change", (f) => process(f, false)); + watcher.on("ready", () => { + scanReady = true; + }); watcher.on("error", (err) => { if (typeof err === "object" && err !== null) { const code = (err as { code?: string }).code; diff --git a/src/adapters/codex.ts b/src/adapters/codex.ts index c4179fd..d355737 100644 --- a/src/adapters/codex.ts +++ b/src/adapters/codex.ts @@ -8,6 +8,8 @@ import { nextId } from "../util/ids.js"; import { costOf } from "../util/cost.js"; import { consumeSpawn } from "../util/spawn-tracker.js"; import { readNewlineTerminatedLines } from "../util/jsonl-stream.js"; +import { backfillStartOffset } from "../util/backfill.js"; +import { createBackfillQueue } from "../util/backfill-queue.js"; import { createParseErrorTracker } from "../util/parse-errors.js"; const BACKFILL_BYTES = 512 * 1024; @@ -57,7 +59,7 @@ export function startCodexAdapter(sink: EventSink): () => void { const size = safeSize(file); let cursor = cursors.get(file); if (!cursor) { - const start = isInitialAdd ? Math.max(0, size - BACKFILL_BYTES) : size; + const start = backfillStartOffset(file, size, isInitialAdd, BACKFILL_BYTES); cursor = { offset: start, project: "", @@ -209,8 +211,18 @@ export function startCodexAdapter(sink: EventSink): () => void { } }; - watcher.on("add", (f) => handle(f, true)); + // Initial-scan files drain one per macrotask so the event loop stays + // free for HTTP/SSE during startup; live adds (post-scan) process inline. + let scanReady = false; + const backfillQueue = createBackfillQueue((f) => handle(f, true)); + watcher.on("add", (f) => { + if (scanReady) handle(f, true); + else backfillQueue.enqueue(f); + }); watcher.on("change", (f) => handle(f, false)); + watcher.on("ready", () => { + scanReady = true; + }); watcher.on("error", (err) => { if (typeof err === "object" && err !== null) { const code = (err as { code?: string }).code; diff --git a/src/adapters/openclaw.ts b/src/adapters/openclaw.ts index 188b4ab..0a63078 100644 --- a/src/adapters/openclaw.ts +++ b/src/adapters/openclaw.ts @@ -6,6 +6,8 @@ import type { AgentEvent, EventType } from "../schema.js"; import { clampTs, riskOf } from "../schema.js"; import { nextId } from "../util/ids.js"; import { readNewlineTerminatedLines } from "../util/jsonl-stream.js"; +import { backfillStartOffset } from "../util/backfill.js"; +import { createBackfillQueue } from "../util/backfill-queue.js"; import { createParseErrorTracker } from "../util/parse-errors.js"; import { classifySessionKey, @@ -133,8 +135,18 @@ export function startOpenClawAdapter(sink: Emit): () => void { if (!sessionRe.test(f)) return; processSession(f, initial, cursors, normalized, parseErrors); }; - sessionsWatcher.on("add", (f) => handleSession(f, true)); + // Initial-scan files drain one per macrotask so the event loop stays + // free for HTTP/SSE during startup; live adds (post-scan) process inline. + let sessionsReady = false; + const sessionBackfillQueue = createBackfillQueue((f) => handleSession(f, true)); + sessionsWatcher.on("add", (f) => { + if (sessionsReady) handleSession(f, true); + else sessionBackfillQueue.enqueue(f); + }); sessionsWatcher.on("change", (f) => handleSession(f, false)); + sessionsWatcher.on("ready", () => { + sessionsReady = true; + }); sessionsWatcher.on("error", swallow); stoppers.push(() => { void sessionsWatcher.close(); @@ -324,8 +336,7 @@ function streamLines( const size = safeSize(file); let cursor = cursors.get(file); if (!cursor) { - const backfillStart = Math.max(0, size - BACKFILL_BYTES); - cursor = { offset: isInitialAdd ? backfillStart : size }; + cursor = { offset: backfillStartOffset(file, size, isInitialAdd, BACKFILL_BYTES) }; cursors.set(file, cursor); } if (size <= cursor.offset) return; diff --git a/src/index.tsx b/src/index.tsx index 536767d..ae07ec6 100644 --- a/src/index.tsx +++ b/src/index.tsx @@ -268,6 +268,10 @@ if (arg === "serve") { try { const seed = store.listRecentEvents({ limit: 5000, order: "desc" }); for (let i = seed.length - 1; i >= 0; i--) addEventToServer(server, seed[i]!); + // Only skip stale files' backfill once the store has history to seed + // from — otherwise a fresh/empty store would drop their events. + const { setStaleSkipEnabled } = await import("./util/backfill.js"); + setStaleSkipEnabled(seed.length > 0); } catch (err) { process.stderr.write(`[agentwatch] ring seed skipped: ${String(err)}\n`); } diff --git a/src/store/sqlite.ts b/src/store/sqlite.ts index 1c31811..941d106 100644 --- a/src/store/sqlite.ts +++ b/src/store/sqlite.ts @@ -115,6 +115,10 @@ export interface EventStore { * (budget rollups, anomaly histories) that need more than the live * in-memory ring but less than the full event table. */ listRecentEvents(opts?: ListRecentEventsOptions): AgentEvent[]; + /** Fast budget aggregation via SQL: today's total cost + the top session + * (last 30d) by total cost. Avoids pulling tens of thousands of rows into + * JS on every rollup tick — the per-event 50k pull blocked the loop ~1s. */ + budgetRollup(): { dayCost: number; maxSession: { id: string; cost: number } }; listSessions(opts?: ListSessionsOptions): SessionSummary[]; listProjects(): ProjectSummary[]; searchFts(query: string, opts?: { limit?: number }): FtsHit[]; @@ -649,6 +653,25 @@ function buildStore(db: Database.Database): EventStore { const rows = sessionEventsStmt.all(sessionId) as RawEventRow[]; return rows.map(rowToEvent); }, + budgetRollup() { + const todayStart = new Date(); + todayStart.setUTCHours(0, 0, 0, 0); + const monthAgo = new Date(Date.now() - 30 * 86_400_000).toISOString(); + const day = db + .prepare("SELECT SUM(cost_usd) AS c FROM events WHERE ts >= ?") + .get(todayStart.toISOString()) as { c: number | null }; + const top = db + .prepare( + "SELECT session_id, cost_usd FROM sessions WHERE last_ts >= ? ORDER BY cost_usd DESC LIMIT 1", + ) + .get(monthAgo) as { session_id: string; cost_usd: number } | undefined; + return { + dayCost: day.c ?? 0, + maxSession: top + ? { id: top.session_id, cost: top.cost_usd } + : { id: "", cost: 0 }, + }; + }, listRecentEvents(opts = {}) { const limit = clamp(opts.limit ?? 1000, 1, 50_000); const order = opts.order === "asc" ? "ASC" : "DESC"; diff --git a/src/ui/App.tsx b/src/ui/App.tsx index 1c96fd0..6349e7a 100644 --- a/src/ui/App.tsx +++ b/src/ui/App.tsx @@ -5,7 +5,8 @@ import { Timeline } from "./Timeline.js"; import { AgentPanel } from "./AgentPanel.js"; import { Header } from "./Header.js"; import { Breadcrumb } from "./Breadcrumb.js"; -import { computeBudgetStatus } from "../util/budgets.js"; +import { budgetStatusFromTotals, computeBudgetStatus } from "../util/budgets.js"; +import { setStaleSkipEnabled } from "../util/backfill.js"; import { emitEventSpan, initOtel, otelEnabled } from "../util/otel.js"; import { watchTriggers } from "../util/triggers.js"; import { @@ -130,6 +131,9 @@ export function App() { try { const seed = store.listRecentEvents({ limit: 500, order: "desc" }); if (seed.length > 0) dispatch({ type: "events-batch", events: seed }); + // Only skip stale files' backfill once the store has history to seed + // from — otherwise a fresh/empty store would drop their events. + setStaleSkipEnabled(seed.length > 0); } catch { // store may be unavailable; live tail will fill from adapters } @@ -224,22 +228,24 @@ export function App() { // any new event grows the ring and busts these memos. const eventsRef = state.events; + // Decouple the expensive store-backed rollups (budget reads up to 50k rows, + // anomalies 5k) from the per-event render. Recomputing them on every event + // pegs the shared event loop on large DBs — which freezes the TUI *and* + // starves the in-process web server (press `w`). Tick every 2.5s instead. + const [rollupTick, setRollupTick] = useState(0); + useEffect(() => { + if (!store) return; + const id = setInterval(() => setRollupTick((t) => t + 1), 2500); + return () => clearInterval(id); + }, [store]); + const budgetStatus = useMemo(() => { if (!store) return computeBudgetStatus(eventsRef); - const todayStart = new Date(); - todayStart.setUTCHours(0, 0, 0, 0); - const since = todayStart.toISOString(); - // Per-session caps are checked against per-session totals — capped at 30 days - // so a long-running session that started yesterday is still seen. Day rollup - // filters by ts inside computeBudgetStatus. - const monthAgo = new Date(Date.now() - 30 * 86_400_000).toISOString(); - const events = store.listRecentEvents({ - sinceTs: monthAgo < since ? monthAgo : since, - limit: 50_000, - order: "asc", - }); - return computeBudgetStatus(events); - }, [eventsRef, store]); + // Aggregate in SQL (~20ms) instead of pulling up to 50k rows into JS + // (~1s) on every tick — the latter intermittently froze the shared loop. + const { dayCost, maxSession } = store.budgetRollup(); + return budgetStatusFromTotals(dayCost, maxSession); + }, [store ? rollupTick : eventsRef, store]); const anomalies = useMemo(() => { const source = store @@ -286,7 +292,7 @@ export function App() { } } return out; - }, [eventsRef, store]); + }, [store ? rollupTick : eventsRef, store]); // Budget-breach notifications (once per distinct breach). const budgetBreachKey = [ diff --git a/src/util/backfill-queue.ts b/src/util/backfill-queue.ts new file mode 100644 index 0000000..3311d93 --- /dev/null +++ b/src/util/backfill-queue.ts @@ -0,0 +1,33 @@ +/** Drains initial-scan backfill files one per macrotask so the event loop + * can service HTTP/SSE between files. Without this, a startup file-read + * storm (hundreds of session files) blocks the loop for seconds — freezing + * the TUI and the in-process web server. Live `change`/`add` events after + * the initial scan bypass the queue: they're one small read at a time. */ +export function createBackfillQueue( + processFile: (file: string) => void, +): { enqueue: (file: string) => void } { + const queue: string[] = []; + let draining = false; + const drain = (): void => { + const file = queue.shift(); + if (file === undefined) { + draining = false; + return; + } + try { + processFile(file); + } catch { + // per-file isolation: one bad file never stalls the drain + } + setImmediate(drain); + }; + return { + enqueue(file: string): void { + queue.push(file); + if (!draining) { + draining = true; + setImmediate(drain); + } + }, + }; +} diff --git a/src/util/backfill.ts b/src/util/backfill.ts new file mode 100644 index 0000000..6f0bd70 --- /dev/null +++ b/src/util/backfill.ts @@ -0,0 +1,47 @@ +import { statSync } from "node:fs"; + +/** On the initial scan, only re-read files modified within this window. + * Older sessions' events are already in the SQLite store (the TUI and + * `serve` seed their timeline from it), so re-reading hundreds of stale + * files at boot just blocks the event loop for seconds. Stale files tail + * from EOF instead — their history is already in the store. */ +export const BACKFILL_MAX_AGE_MS = 48 * 60 * 60 * 1000; + +function mtimeMs(file: string): number { + try { + return statSync(file).mtimeMs; + } catch { + return 0; + } +} + +// Skipping stale files' backfill is only safe once the SQLite store holds +// prior history to seed the timeline from. On a fresh install, a deleted / +// pruned DB, or a store-open failure, skipping would drop those files' +// events entirely (they were never ingested). The startup path enables this +// only after confirming the store is non-empty; default off = always backfill. +let staleSkipEnabled = false; + +/** Set once at startup, before adapters start. Pass `true` only when the + * store already has history (so stale files are already ingested). */ +export function setStaleSkipEnabled(enabled: boolean): void { + staleSkipEnabled = enabled; +} + +/** Byte offset to start reading a file from. Live appends start at EOF + * (`size`). On the initial scan, recently-modified files are backfilled + * `backfillBytes` behind EOF to catch turns written while agentwatch was + * off; stale files start at EOF only when stale-skip is enabled (the store + * already has their history). */ +export function backfillStartOffset( + file: string, + size: number, + isInitialAdd: boolean, + backfillBytes: number, +): number { + if (!isInitialAdd) return size; + if (staleSkipEnabled && mtimeMs(file) < Date.now() - BACKFILL_MAX_AGE_MS) { + return size; + } + return Math.max(0, size - backfillBytes); +} diff --git a/src/util/budgets.ts b/src/util/budgets.ts index f8c095c..0d3f135 100644 --- a/src/util/budgets.ts +++ b/src/util/budgets.ts @@ -79,18 +79,25 @@ export function computeBudgetStatus( if (t >= todayMs) dayCost += c; } + return budgetStatusFromTotals(dayCost, maxSession, budgets); +} + +/** Build a BudgetStatus from pre-aggregated totals. Shared by the in-memory + * path (computeBudgetStatus) and the SQL fast path (store.budgetRollup) so + * the breach semantics never diverge. */ +export function budgetStatusFromTotals( + dayCost: number, + maxSession: { id: string; cost: number }, + budgets: Budgets = loadBudgets(), +): BudgetStatus { const status: BudgetStatus = { sessionCost: maxSession.cost, dayCost, perSessionUsd: budgets.perSessionUsd, perDayUsd: budgets.perDayUsd, - dayBreach: - budgets.perDayUsd != null && dayCost > budgets.perDayUsd, + dayBreach: budgets.perDayUsd != null && dayCost > budgets.perDayUsd, }; - if ( - budgets.perSessionUsd != null && - maxSession.cost > budgets.perSessionUsd - ) { + if (budgets.perSessionUsd != null && maxSession.cost > budgets.perSessionUsd) { status.breachedSession = maxSession.id || "(unknown)"; } return status;