Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@ layout can change freely within a minor version.

## [Unreleased]

## [0.1.2] — 2026-05-26

### Performance
- **TUI and the in-process web UI (`w`) no longer stall on large histories.**
On a ~1GB history the web UI went from minutes to open down to a couple of
seconds. Several costs that pegged the shared event loop are gone:
- Budget + anomaly rollups recompute on a ~2.5s tick instead of on every event
(keys like `q` stopped responding before; the live timeline still updates
instantly).
- The budget rollup now aggregates in SQL (~20ms) instead of pulling up to 50k
rows into JS (~1s) on every tick.
- Adapter initial-scan backfill drains one file per macrotask, so HTTP/SSE stay
responsive during startup instead of blocking on a file-read storm.
- Stale (>48h) session files are skipped on startup only when the store already
has their history — a fresh/empty/unavailable store still backfills fully, so
there is no first-run data loss.

## [0.1.1] — 2026-05-26

### Performance
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@misha_misha/agentwatch",
"version": "0.1.1",
"version": "0.1.2",
"description": "Local-only observability + control plane for every AI coding agent on your machine. TUI live tail + browser dashboard on localhost. Unified timeline across Claude Code, Codex, Gemini CLI, Cursor, Hermes, OpenClaw — token + cost accounting, compaction + anomaly detection, SVG call graphs, diff attribution, agent-aware replay, MCP server mode, OpenTelemetry exporter. No cloud, no telemetry, no sign-in.",
"type": "module",
"author": "Misha Nefedov",
Expand Down
16 changes: 14 additions & 2 deletions src/adapters/claude-code.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { registerSpawn } from "../util/spawn-tracker.js";
import { costOf, parseUsage } from "../util/cost.js";
import { markAgentWrite } from "../util/recent-writes.js";
import { readNewlineTerminatedLines } from "../util/jsonl-stream.js";
import { backfillStartOffset } from "../util/backfill.js";
import { createBackfillQueue } from "../util/backfill-queue.js";
import { createParseErrorTracker } from "../util/parse-errors.js";

type Emit = EventSink | ((e: AgentEvent) => void);
Expand Down Expand Up @@ -73,7 +75,7 @@ export function startClaudeAdapter(sink: Emit): () => void {
const size = safeSize(file);
let cursor = cursors.get(file);
if (!cursor) {
const start = isInitialAdd ? Math.max(0, size - BACKFILL_BYTES) : size;
const start = backfillStartOffset(file, size, isInitialAdd, BACKFILL_BYTES);
cursor = { offset: start };
cursors.set(file, cursor);
}
Expand Down Expand Up @@ -155,8 +157,18 @@ export function startClaudeAdapter(sink: Emit): () => void {
}
};

watcher.on("add", (f) => process(f, true));
// Initial-scan files drain one per macrotask so the event loop stays
// free for HTTP/SSE during startup; live adds (post-scan) process inline.
let scanReady = false;
const backfillQueue = createBackfillQueue((f) => process(f, true));
watcher.on("add", (f) => {
if (scanReady) process(f, true);
else backfillQueue.enqueue(f);
});
watcher.on("change", (f) => process(f, false));
watcher.on("ready", () => {
scanReady = true;
});
watcher.on("error", (err) => {
if (typeof err === "object" && err !== null) {
const code = (err as { code?: string }).code;
Expand Down
16 changes: 14 additions & 2 deletions src/adapters/codex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { nextId } from "../util/ids.js";
import { costOf } from "../util/cost.js";
import { consumeSpawn } from "../util/spawn-tracker.js";
import { readNewlineTerminatedLines } from "../util/jsonl-stream.js";
import { backfillStartOffset } from "../util/backfill.js";
import { createBackfillQueue } from "../util/backfill-queue.js";
import { createParseErrorTracker } from "../util/parse-errors.js";

const BACKFILL_BYTES = 512 * 1024;
Expand Down Expand Up @@ -57,7 +59,7 @@ export function startCodexAdapter(sink: EventSink): () => void {
const size = safeSize(file);
let cursor = cursors.get(file);
if (!cursor) {
const start = isInitialAdd ? Math.max(0, size - BACKFILL_BYTES) : size;
const start = backfillStartOffset(file, size, isInitialAdd, BACKFILL_BYTES);
cursor = {
offset: start,
project: "",
Expand Down Expand Up @@ -209,8 +211,18 @@ export function startCodexAdapter(sink: EventSink): () => void {
}
};

watcher.on("add", (f) => handle(f, true));
// Initial-scan files drain one per macrotask so the event loop stays
// free for HTTP/SSE during startup; live adds (post-scan) process inline.
let scanReady = false;
const backfillQueue = createBackfillQueue((f) => handle(f, true));
watcher.on("add", (f) => {
if (scanReady) handle(f, true);
else backfillQueue.enqueue(f);
});
watcher.on("change", (f) => handle(f, false));
watcher.on("ready", () => {
scanReady = true;
});
watcher.on("error", (err) => {
if (typeof err === "object" && err !== null) {
const code = (err as { code?: string }).code;
Expand Down
17 changes: 14 additions & 3 deletions src/adapters/openclaw.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import type { AgentEvent, EventType } from "../schema.js";
import { clampTs, riskOf } from "../schema.js";
import { nextId } from "../util/ids.js";
import { readNewlineTerminatedLines } from "../util/jsonl-stream.js";
import { backfillStartOffset } from "../util/backfill.js";
import { createBackfillQueue } from "../util/backfill-queue.js";
import { createParseErrorTracker } from "../util/parse-errors.js";
import {
classifySessionKey,
Expand Down Expand Up @@ -133,8 +135,18 @@ export function startOpenClawAdapter(sink: Emit): () => void {
if (!sessionRe.test(f)) return;
processSession(f, initial, cursors, normalized, parseErrors);
};
sessionsWatcher.on("add", (f) => handleSession(f, true));
// Initial-scan files drain one per macrotask so the event loop stays
// free for HTTP/SSE during startup; live adds (post-scan) process inline.
let sessionsReady = false;
const sessionBackfillQueue = createBackfillQueue((f) => handleSession(f, true));
sessionsWatcher.on("add", (f) => {
if (sessionsReady) handleSession(f, true);
else sessionBackfillQueue.enqueue(f);
});
sessionsWatcher.on("change", (f) => handleSession(f, false));
sessionsWatcher.on("ready", () => {
sessionsReady = true;
});
sessionsWatcher.on("error", swallow);
stoppers.push(() => {
void sessionsWatcher.close();
Expand Down Expand Up @@ -324,8 +336,7 @@ function streamLines(
const size = safeSize(file);
let cursor = cursors.get(file);
if (!cursor) {
const backfillStart = Math.max(0, size - BACKFILL_BYTES);
cursor = { offset: isInitialAdd ? backfillStart : size };
cursor = { offset: backfillStartOffset(file, size, isInitialAdd, BACKFILL_BYTES) };
cursors.set(file, cursor);
}
if (size <= cursor.offset) return;
Expand Down
4 changes: 4 additions & 0 deletions src/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ if (arg === "serve") {
try {
const seed = store.listRecentEvents({ limit: 5000, order: "desc" });
for (let i = seed.length - 1; i >= 0; i--) addEventToServer(server, seed[i]!);
// Only skip stale files' backfill once the store has history to seed
// from — otherwise a fresh/empty store would drop their events.
const { setStaleSkipEnabled } = await import("./util/backfill.js");
setStaleSkipEnabled(seed.length > 0);
} catch (err) {
process.stderr.write(`[agentwatch] ring seed skipped: ${String(err)}\n`);
}
Expand Down
23 changes: 23 additions & 0 deletions src/store/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ export interface EventStore {
* (budget rollups, anomaly histories) that need more than the live
* in-memory ring but less than the full event table. */
listRecentEvents(opts?: ListRecentEventsOptions): AgentEvent[];
/** Fast budget aggregation via SQL: today's total cost + the top session
* (last 30d) by total cost. Avoids pulling tens of thousands of rows into
* JS on every rollup tick — the per-event 50k pull blocked the loop ~1s. */
budgetRollup(): { dayCost: number; maxSession: { id: string; cost: number } };
listSessions(opts?: ListSessionsOptions): SessionSummary[];
listProjects(): ProjectSummary[];
searchFts(query: string, opts?: { limit?: number }): FtsHit[];
Expand Down Expand Up @@ -649,6 +653,25 @@ function buildStore(db: Database.Database): EventStore {
const rows = sessionEventsStmt.all(sessionId) as RawEventRow[];
return rows.map(rowToEvent);
},
budgetRollup() {
const todayStart = new Date();
todayStart.setUTCHours(0, 0, 0, 0);
const monthAgo = new Date(Date.now() - 30 * 86_400_000).toISOString();
const day = db
.prepare("SELECT SUM(cost_usd) AS c FROM events WHERE ts >= ?")
.get(todayStart.toISOString()) as { c: number | null };
const top = db
.prepare(
"SELECT session_id, cost_usd FROM sessions WHERE last_ts >= ? ORDER BY cost_usd DESC LIMIT 1",
)
.get(monthAgo) as { session_id: string; cost_usd: number } | undefined;
return {
dayCost: day.c ?? 0,
maxSession: top
? { id: top.session_id, cost: top.cost_usd }
: { id: "", cost: 0 },
};
},
listRecentEvents(opts = {}) {
const limit = clamp(opts.limit ?? 1000, 1, 50_000);
const order = opts.order === "asc" ? "ASC" : "DESC";
Expand Down
38 changes: 22 additions & 16 deletions src/ui/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { Timeline } from "./Timeline.js";
import { AgentPanel } from "./AgentPanel.js";
import { Header } from "./Header.js";
import { Breadcrumb } from "./Breadcrumb.js";
import { computeBudgetStatus } from "../util/budgets.js";
import { budgetStatusFromTotals, computeBudgetStatus } from "../util/budgets.js";
import { setStaleSkipEnabled } from "../util/backfill.js";
import { emitEventSpan, initOtel, otelEnabled } from "../util/otel.js";
import { watchTriggers } from "../util/triggers.js";
import {
Expand Down Expand Up @@ -130,6 +131,9 @@ export function App() {
try {
const seed = store.listRecentEvents({ limit: 500, order: "desc" });
if (seed.length > 0) dispatch({ type: "events-batch", events: seed });
// Only skip stale files' backfill once the store has history to seed
// from — otherwise a fresh/empty store would drop their events.
setStaleSkipEnabled(seed.length > 0);
} catch {
// store may be unavailable; live tail will fill from adapters
}
Expand Down Expand Up @@ -224,22 +228,24 @@ export function App() {
// any new event grows the ring and busts these memos.
const eventsRef = state.events;

// Decouple the expensive store-backed rollups (budget reads up to 50k rows,
// anomalies 5k) from the per-event render. Recomputing them on every event
// pegs the shared event loop on large DBs — which freezes the TUI *and*
// starves the in-process web server (press `w`). Tick every 2.5s instead.
const [rollupTick, setRollupTick] = useState(0);
useEffect(() => {
if (!store) return;
const id = setInterval(() => setRollupTick((t) => t + 1), 2500);
return () => clearInterval(id);
}, [store]);

const budgetStatus = useMemo(() => {
if (!store) return computeBudgetStatus(eventsRef);
const todayStart = new Date();
todayStart.setUTCHours(0, 0, 0, 0);
const since = todayStart.toISOString();
// Per-session caps are checked against per-session totals — capped at 30 days
// so a long-running session that started yesterday is still seen. Day rollup
// filters by ts inside computeBudgetStatus.
const monthAgo = new Date(Date.now() - 30 * 86_400_000).toISOString();
const events = store.listRecentEvents({
sinceTs: monthAgo < since ? monthAgo : since,
limit: 50_000,
order: "asc",
});
return computeBudgetStatus(events);
}, [eventsRef, store]);
// Aggregate in SQL (~20ms) instead of pulling up to 50k rows into JS
// (~1s) on every tick — the latter intermittently froze the shared loop.
const { dayCost, maxSession } = store.budgetRollup();
return budgetStatusFromTotals(dayCost, maxSession);
}, [store ? rollupTick : eventsRef, store]);

const anomalies = useMemo(() => {
const source = store
Expand Down Expand Up @@ -286,7 +292,7 @@ export function App() {
}
}
return out;
}, [eventsRef, store]);
}, [store ? rollupTick : eventsRef, store]);

// Budget-breach notifications (once per distinct breach).
const budgetBreachKey = [
Expand Down
33 changes: 33 additions & 0 deletions src/util/backfill-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/** Drains initial-scan backfill files one per macrotask so the event loop
* can service HTTP/SSE between files. Without this, a startup file-read
* storm (hundreds of session files) blocks the loop for seconds — freezing
* the TUI and the in-process web server. Live `change`/`add` events after
* the initial scan bypass the queue: they're one small read at a time. */
export function createBackfillQueue(
processFile: (file: string) => void,
): { enqueue: (file: string) => void } {
const queue: string[] = [];
let draining = false;
const drain = (): void => {
const file = queue.shift();
if (file === undefined) {
draining = false;
return;
}
try {
processFile(file);
} catch {
// per-file isolation: one bad file never stalls the drain
}
setImmediate(drain);
};
return {
enqueue(file: string): void {
queue.push(file);
if (!draining) {
draining = true;
setImmediate(drain);
}
},
};
}
47 changes: 47 additions & 0 deletions src/util/backfill.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { statSync } from "node:fs";

/** On the initial scan, only re-read files modified within this window.
* Older sessions' events are already in the SQLite store (the TUI and
* `serve` seed their timeline from it), so re-reading hundreds of stale
* files at boot just blocks the event loop for seconds. Stale files tail
* from EOF instead — their history is already in the store. */
export const BACKFILL_MAX_AGE_MS = 48 * 60 * 60 * 1000;

function mtimeMs(file: string): number {
try {
return statSync(file).mtimeMs;
} catch {
return 0;
}
}

// Skipping stale files' backfill is only safe once the SQLite store holds
// prior history to seed the timeline from. On a fresh install, a deleted /
// pruned DB, or a store-open failure, skipping would drop those files'
// events entirely (they were never ingested). The startup path enables this
// only after confirming the store is non-empty; default off = always backfill.
let staleSkipEnabled = false;

/** Set once at startup, before adapters start. Pass `true` only when the
* store already has history (so stale files are already ingested). */
export function setStaleSkipEnabled(enabled: boolean): void {
staleSkipEnabled = enabled;
}

/** Byte offset to start reading a file from. Live appends start at EOF
* (`size`). On the initial scan, recently-modified files are backfilled
* `backfillBytes` behind EOF to catch turns written while agentwatch was
* off; stale files start at EOF only when stale-skip is enabled (the store
* already has their history). */
export function backfillStartOffset(
file: string,
size: number,
isInitialAdd: boolean,
backfillBytes: number,
): number {
if (!isInitialAdd) return size;
if (staleSkipEnabled && mtimeMs(file) < Date.now() - BACKFILL_MAX_AGE_MS) {
return size;
}
return Math.max(0, size - backfillBytes);
}
19 changes: 13 additions & 6 deletions src/util/budgets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,25 @@ export function computeBudgetStatus(
if (t >= todayMs) dayCost += c;
}

return budgetStatusFromTotals(dayCost, maxSession, budgets);
}

/** Build a BudgetStatus from pre-aggregated totals. Shared by the in-memory
* path (computeBudgetStatus) and the SQL fast path (store.budgetRollup) so
* the breach semantics never diverge. */
export function budgetStatusFromTotals(
dayCost: number,
maxSession: { id: string; cost: number },
budgets: Budgets = loadBudgets(),
): BudgetStatus {
const status: BudgetStatus = {
sessionCost: maxSession.cost,
dayCost,
perSessionUsd: budgets.perSessionUsd,
perDayUsd: budgets.perDayUsd,
dayBreach:
budgets.perDayUsd != null && dayCost > budgets.perDayUsd,
dayBreach: budgets.perDayUsd != null && dayCost > budgets.perDayUsd,
};
if (
budgets.perSessionUsd != null &&
maxSession.cost > budgets.perSessionUsd
) {
if (budgets.perSessionUsd != null && maxSession.cost > budgets.perSessionUsd) {
status.breachedSession = maxSession.id || "(unknown)";
}
return status;
Expand Down
Loading