From 3d929acf53535eca5d9625cb1d8419df99f20571 Mon Sep 17 00:00:00 2001 From: Michael Nefedov Date: Mon, 4 May 2026 17:35:41 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat(AUR-276):=20session-correlation=20tele?= =?UTF-8?q?metry=20=E2=80=94=20candidate-pair=20logging=20only?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Splits AUR-115 (which revives the cancelled AUR-183) into two tickets and ships the data-collection half. AUR-183 was cancelled 2026-04-15 because the rule-based stitching had ~10–15% FP without user-labeled pairs to tune against. AUR-115's PROGRESS.md plan was a stricter rule with the same gating gap. AUR-276 closes that gap by silently building the candidate-pair dataset that AUR-277 (the UI half, gated on a measured FP rate) will be validated against. Scope here is telemetry only — no API field, no React surface. - Schema V3: workspace_root + git_branch on sessions; new session_link_candidates table (candidates, not links — they're unverified). - New src/correlate/: RecentWritesIndex (sliding 30-min window, sweep, hard cap with eviction) + branch-cache (60s TTL around git rev-parse, reuses runGit + gitCommonDir). - New wrapSinkWithLinks layered after wrapSinkWithStore at all three sink composition sites (TUI, serve, daemon). Errors warn-once + swallow, mirroring the store wrapper. - Adapters: Claude + OpenClaw stamp details.cwd on file_write events. - agentwatch link-candidates [--session id] [--limit n]: JSON dump for manual classification toward the AUR-277 validation gate. - AGENTWATCH_DEBUG_LINKS=1: surfaces the candidate-pair count in the TUI Header. Off by default, no overhead in normal use. - Tests: 25 new unit + integration cases (branch-cache, RecentWritesIndex, V3 migration + new store methods, linker wrapper + regression test that wrapSinkWithStore behaviour stays unchanged). PROGRESS.md captures the why-split rationale and the carry-forward map from the original AUR-115 plan. --- PROGRESS.md | 68 ++++++++ src/adapters/claude-code.ts | 5 + src/adapters/openclaw.ts | 6 + src/correlate/branch-cache.test.ts | 80 +++++++++ src/correlate/branch-cache.ts | 66 +++++++ src/correlate/session-links.test.ts | 167 ++++++++++++++++++ src/correlate/session-links.ts | 152 ++++++++++++++++ src/daemon/run.ts | 8 +- src/git/correlate.ts | 15 ++ src/index.tsx | 46 ++++- src/schema.ts | 6 + src/store/index.ts | 5 +- src/store/sqlite.test.ts | 166 ++++++++++++++++++ src/store/sqlite.ts | 257 +++++++++++++++++++++++++++- src/store/wire.test.ts | 247 ++++++++++++++++++++++++++ src/store/wire.ts | 99 ++++++++++- src/ui/App.tsx | 37 +++- src/ui/Header.tsx | 10 ++ 18 files changed, 1428 insertions(+), 12 deletions(-) create mode 100644 PROGRESS.md create mode 100644 src/correlate/branch-cache.test.ts create mode 100644 src/correlate/branch-cache.ts create mode 100644 src/correlate/session-links.test.ts create mode 100644 src/correlate/session-links.ts create mode 100644 src/store/wire.test.ts diff --git a/PROGRESS.md b/PROGRESS.md new file mode 100644 index 0000000..309e932 --- /dev/null +++ b/PROGRESS.md @@ -0,0 +1,68 @@ +# PROGRESS — session-correlation work split (2026-05-04) + +**Original target:** AUR-115 — *Cross-agent session correlation (Claude session → Cursor session stitched)* +**Status:** plan superseded — **do not implement AUR-115 directly**. +**This branch:** `misha/aur-276-session-correlation-telemetry` — implements AUR-276 (telemetry half). + +The original 4-hour plan (full architecture + schema + correlator + API + React UI) drafted earlier today was paused after a GBrain query surfaced **AUR-183** — the same feature, cancelled 2026-04-15 for the same milestone (M7 v1.0). The cancellation rationale was that without user-labeled confirmation pairs, false-positive rate is ~10–15% and the UI can't be trusted. The new plan's stricter rule (exact path + 30-min window + same git branch + same workspace root) likely lowers FP — but still ships an unverified flagship feature with no measurement. + +## New plan — 2-ticket split + +| Ticket | Scope | Effort | Gate | +|---|---|---|---| +| **AUR-276** | Session-correlation telemetry. Correlator + V3 schema + `session_link_candidates` table + dev-only TUI badge. **No API field, no React UI.** | ~1–1.5 hr | None — ships into v1.0 | +| **AUR-277** | Stitched-sessions UI. `/api/sessions/:id` link field + React sidebar block. Optional hedge UI + thumbs-up/down feedback if FP is in the 5–15% band. | ~2–3 hr | Blocked: AUR-276 done **+** ≥10 candidate pairs accumulated in self-use **+** Michael manually classifies them **+** measured FP <15% (or redesign) | + +## Why split, not just shrink + +- The original plan's correlator + schema work is reusable as-is — it produces candidate pairs either way. +- The cancelled AUR-183 explicitly said "revisit in v1.1 once v1.0 is out." Shipping unobserved UI in v1.0 ignores that call. Shipping silent telemetry in v1.0 *enables* it. +- 1 hr of telemetry now buys honest measurement before any UI commitment. Asymmetric: small cost, big optionality. + +## What carries forward from the old PROGRESS.md plan + +The architecture sections of the original plan are still correct and reusable inside AUR-276: + +- §3.1 Data flow (sink wrapper composition). +- §3.2 Key types (`RecentWriteEntry`, but rename `SessionLink` → `SessionLinkCandidate`). +- §3.3 Schema migration `applyV3` — but rename table `session_links` → `session_link_candidates`. +- §3.4 Workspace + branch resolution (lazy 60-s branch cache; null-gate semantics). +- §3.5 Failure scenarios. +- §4 Code-quality decisions (especially: keep `recent-writes.ts` as-is, new `src/correlate/` dir, swallow-error parity with `wrapSinkWithStore`). +- §5 Test coverage diagram (drop the React + API rows; everything else applies). +- §10 Worktree parallelization (Lane A store + Lane B correlator unchanged). + +## What changes from the old plan + +- Drop API-route work (was step 4) → moves to AUR-277. +- Drop React Session-view sidebar (was step 5) → moves to AUR-277. +- Drop integration test that asserts the API returns links → AUR-277. +- Add: dev-only TUI candidate-count badge (env-var gated, e.g. `AGENTWATCH_DEBUG_LINKS=1`). +- Add: a one-shot CLI like `agentwatch link-candidates --session ` (or just `--all`) so Michael can dump candidates to manually classify them. JSON output, no formatting. + +## Pre-coding assumption to verify (carried forward) + +Spot-check from earlier today: top-level lines of recent Claude JSONL in this very repo's project dir reported `(no cwd)`. The plan assumed `obj.cwd` is reliably present. **Before coding AUR-276,** sample 5–10 recent JSONL files and confirm which line shapes carry `cwd`. If only `session_start`-shaped lines do, the correlator must capture cwd at that line and reuse it for downstream `file_write` events on that session — which is fine, but worth verifying first. + +## Linear cross-links + +- **AUR-115** — moved to Backlog, labels `ai-refinement, blocked`, related to AUR-183/276/277. Description rewritten to point here. +- **AUR-183** — cancelled 2026-04-15. The reason this split exists. +- **AUR-276** — telemetry, v1.0, ~1–1.5 hr. +- **AUR-277** — UI, v1.1, blocked on AUR-276 + validation gate. + +## Next step + +Nothing to code yet. When Michael decides to start AUR-276: + +1. `git checkout main && git pull` +2. `git checkout -b misha/aur-276-session-correlation-telemetry` +3. Update Linear AUR-276 → In Progress, kickoff comment linking this PROGRESS.md. +4. Execute Lane A (store + V3 migration) and Lane B (correlator + branch cache) in parallel worktrees per the original plan §10. +5. Wire up Step 3 (sink wrapper + `details.cwd` in adapters). +6. Skip the old Step 4 + Step 5 — those are AUR-277. +7. Add: dev-only TUI badge + `agentwatch link-candidates` CLI. +8. PR per repo convention (no Claude footer). +9. After ship: this PROGRESS.md is rotated to `~/IdeaProjects/knowledge-base/decisions/2026-05-04-agentwatch-session-correlation-split.md` (short ADR). + +The current branch `agent/aur-218-sandbox-docker` is unrelated to this work — it carries the AUR-218 commit (`c2ff389`). PROGRESS.md sits on this branch only because it's where today's planning happened; it should follow into the next branch via `git checkout -b ... && git add PROGRESS.md` or be committed here first depending on Michael's preference. diff --git a/src/adapters/claude-code.ts b/src/adapters/claude-code.ts index aaa8865..8150f85 100644 --- a/src/adapters/claude-code.ts +++ b/src/adapters/claude-code.ts @@ -336,6 +336,10 @@ export function translateClaudeLine( evType === "shell_exec" && toolUse.cmd ? detectAgentCall(toolUse.cmd) : null; + // AUR-276: file_write events carry cwd so the session-correlation + // linker can resolve the workspace root + branch without a + // round-trip back to the adapter. + const cwd = typeof o.cwd === "string" ? o.cwd : undefined; return { id: nextId(), ts, @@ -357,6 +361,7 @@ export function translateClaudeLine( cost, model, ...(agentCall ? { agentCall } : {}), + ...(evType === "file_write" && cwd ? { cwd } : {}), }, }; } diff --git a/src/adapters/openclaw.ts b/src/adapters/openclaw.ts index f7ab812..d0fa36d 100644 --- a/src/adapters/openclaw.ts +++ b/src/adapters/openclaw.ts @@ -474,6 +474,11 @@ export function translateSession( const toolUse = extractToolUse(content); if (toolUse) { const type = inferToolType(toolUse.name); + // AUR-276: file_write events carry cwd so the session-correlation + // linker can resolve the workspace root + branch. cwd was captured + // on the session_start line and is held in the sessionCwd map for + // every event in this session. + const cwd = type === "file_write" ? sessionCwd.get(sessionId) : undefined; return base(type, { tool: `openclaw:${subAgent}:${toolUse.name}`, path: toolUse.path, @@ -485,6 +490,7 @@ export function translateSession( ...(usage ? { usage } : {}), ...(precomputedCost != null ? { cost: precomputedCost } : {}), ...(model ? { model } : {}), + ...(cwd ? { cwd } : {}), }, }); } diff --git a/src/correlate/branch-cache.test.ts b/src/correlate/branch-cache.test.ts new file mode 100644 index 0000000..568f116 --- /dev/null +++ b/src/correlate/branch-cache.test.ts @@ -0,0 +1,80 @@ +import { beforeEach, describe, expect, it } from "vitest"; +import { _resetBranchCache, resolveWorkspace } from "./branch-cache.js"; + +beforeEach(() => { + _resetBranchCache(); +}); + +describe("resolveWorkspace — null cwd inputs", () => { + it("returns null/null for an undefined cwd without shelling out", () => { + let calls = 0; + const r = resolveWorkspace(undefined, { + branchOf: () => { + calls++; + return "main"; + }, + }); + expect(r).toEqual({ workspaceRoot: null, gitBranch: null }); + expect(calls).toBe(0); + }); + + it("returns null/null for an empty-string cwd without shelling out", () => { + let calls = 0; + const r = resolveWorkspace("", { + branchOf: () => { + calls++; + return "main"; + }, + }); + expect(r).toEqual({ workspaceRoot: null, gitBranch: null }); + expect(calls).toBe(0); + }); +}); + +describe("resolveWorkspace — caching behaviour", () => { + it("re-uses the cached branch within the TTL", () => { + let calls = 0; + const branchOf = (): string | null => { + calls++; + return "main"; + }; + const now = (): number => 1_000_000; + const a = resolveWorkspace("/repo/a", { branchOf, now }); + const b = resolveWorkspace("/repo/a", { branchOf, now }); + expect(calls).toBe(1); + expect(a.gitBranch).toBe("main"); + expect(b.gitBranch).toBe("main"); + // workspaceRoot may have been gitCommonDir-resolved or fall back to cwd + expect(a.workspaceRoot).toBe(b.workspaceRoot); + }); + + it("re-shells when the cache entry is older than the TTL", () => { + let calls = 0; + const branchOf = (): string | null => { + calls++; + return calls === 1 ? "main" : "feature"; + }; + let t = 1_000_000; + const now = (): number => t; + const first = resolveWorkspace("/repo/b", { branchOf, now }); + expect(first.gitBranch).toBe("main"); + t += 60_001; // just past the 60 s TTL + const second = resolveWorkspace("/repo/b", { branchOf, now }); + expect(second.gitBranch).toBe("feature"); + expect(calls).toBe(2); + }); + + it("caches a null branch the same way as a real branch", () => { + let calls = 0; + const branchOf = (): string | null => { + calls++; + return null; + }; + const now = (): number => 2_000_000; + const a = resolveWorkspace("/repo/c", { branchOf, now }); + const b = resolveWorkspace("/repo/c", { branchOf, now }); + expect(a.gitBranch).toBeNull(); + expect(b.gitBranch).toBeNull(); + expect(calls).toBe(1); + }); +}); diff --git a/src/correlate/branch-cache.ts b/src/correlate/branch-cache.ts new file mode 100644 index 0000000..0142724 --- /dev/null +++ b/src/correlate/branch-cache.ts @@ -0,0 +1,66 @@ +import { getCurrentBranch, gitCommonDir } from "../git/correlate.js"; + +/** AUR-276: cache git-branch lookups per workspace root for `TTL_MS`. + * `getCurrentBranch` shells out to `git`, which is cheap (~3–10 ms) but + * hot-path file_write events fire often enough that a tight cache pays. + * + * TTL is intentionally short (60 s): humans switch branches and then + * immediately run an agent on the new branch — we want stale entries + * to expire fast enough that the next file_write picks up the switch, + * but not so fast that we re-spawn git for every burst of writes. + * + * Cache misses on a non-git dir, missing-git-on-PATH, or detached HEAD + * all return `null` and cache that null result for the same TTL — no + * point re-shelling-out to fail again 5 ms later. + */ + +const TTL_MS = 60_000; + +interface CacheEntry { + branch: string | null; + refreshedMs: number; +} + +const cache = new Map(); + +interface BranchCacheDeps { + /** Override for tests. Defaults to the real git shell-out. */ + branchOf?: (root: string) => string | null; + /** Override for tests. Defaults to `Date.now`. */ + now?: () => number; +} + +export interface ResolvedWorkspace { + /** Canonicalized workspace root (gitCommonDir-resolved if possible). + * `null` when the input cwd was null/empty or git couldn't resolve it. */ + workspaceRoot: string | null; + /** Current branch at the workspace root, or `null` (see TTL note above). */ + gitBranch: string | null; +} + +/** Resolve `(workspaceRoot, gitBranch)` for the given cwd, with a + * 60-second cache around the git invocation. Pure-data return; the + * caller decides what to do with nulls (the AUR-276 linker uses null + * as a "do not match" gate). */ +export function resolveWorkspace( + cwd: string | null | undefined, + deps: BranchCacheDeps = {}, +): ResolvedWorkspace { + if (!cwd) return { workspaceRoot: null, gitBranch: null }; + const root = gitCommonDir(cwd) ?? cwd; + const branchOf = deps.branchOf ?? getCurrentBranch; + const now = deps.now ?? Date.now; + const cached = cache.get(root); + const t = now(); + if (cached && t - cached.refreshedMs < TTL_MS) { + return { workspaceRoot: root, gitBranch: cached.branch }; + } + const branch = branchOf(root); + cache.set(root, { branch, refreshedMs: t }); + return { workspaceRoot: root, gitBranch: branch }; +} + +/** Test-only: drop every cached entry. */ +export function _resetBranchCache(): void { + cache.clear(); +} diff --git a/src/correlate/session-links.test.ts b/src/correlate/session-links.test.ts new file mode 100644 index 0000000..2eec9c5 --- /dev/null +++ b/src/correlate/session-links.test.ts @@ -0,0 +1,167 @@ +import { describe, expect, it } from "vitest"; +import { RecentWritesIndex, WINDOW_MS } from "./session-links.js"; + +describe("RecentWritesIndex.recordAndQuery", () => { + it("returns no matches for the very first write of a path", () => { + const idx = new RecentWritesIndex(); + const matches = idx.recordAndQuery( + "/repo/foo.ts", + "claude-code", + "sess-A", + 1_000, + "main", + "/repo", + ); + expect(matches).toEqual([]); + expect(idx.entryCount()).toBe(1); + }); + + it("filters out same-session writes (same agent or not)", () => { + const idx = new RecentWritesIndex(); + idx.recordAndQuery("/repo/foo.ts", "claude-code", "sess-A", 1_000, "main", "/repo"); + const matches = idx.recordAndQuery( + "/repo/foo.ts", + "claude-code", + "sess-A", + 2_000, + "main", + "/repo", + ); + expect(matches).toEqual([]); + }); + + it("filters out same-agent / different-session writes", () => { + const idx = new RecentWritesIndex(); + idx.recordAndQuery("/repo/foo.ts", "claude-code", "sess-A", 1_000, "main", "/repo"); + const matches = idx.recordAndQuery( + "/repo/foo.ts", + "claude-code", + "sess-B", + 2_000, + "main", + "/repo", + ); + expect(matches).toEqual([]); + }); + + it("returns a match for cross-agent + same root + same branch within window", () => { + const idx = new RecentWritesIndex(); + idx.recordAndQuery("/repo/foo.ts", "claude-code", "sess-A", 1_000, "main", "/repo"); + const matches = idx.recordAndQuery( + "/repo/foo.ts", + "openclaw", + "sess-B", + 1_000 + 5 * 60_000, + "main", + "/repo", + ); + expect(matches).toHaveLength(1); + expect(matches[0]).toMatchObject({ + agent: "claude-code", + sessionId: "sess-A", + branch: "main", + root: "/repo", + }); + }); + + it("filters out cross-agent matches on a different workspace root", () => { + const idx = new RecentWritesIndex(); + idx.recordAndQuery("/repo/foo.ts", "claude-code", "sess-A", 1_000, "main", "/repo-1"); + const matches = idx.recordAndQuery( + "/repo/foo.ts", + "openclaw", + "sess-B", + 2_000, + "main", + "/repo-2", + ); + expect(matches).toEqual([]); + }); + + it("filters out cross-agent matches on a different branch", () => { + const idx = new RecentWritesIndex(); + idx.recordAndQuery("/repo/foo.ts", "claude-code", "sess-A", 1_000, "main", "/repo"); + const matches = idx.recordAndQuery( + "/repo/foo.ts", + "openclaw", + "sess-B", + 2_000, + "feature", + "/repo", + ); + expect(matches).toEqual([]); + }); + + it("never matches when either side has a null branch", () => { + const idx = new RecentWritesIndex(); + idx.recordAndQuery("/repo/foo.ts", "claude-code", "sess-A", 1_000, null, "/repo"); + const matches = idx.recordAndQuery( + "/repo/foo.ts", + "openclaw", + "sess-B", + 2_000, + "main", + "/repo", + ); + expect(matches).toEqual([]); + }); + + it("never matches when either side has a null workspace root", () => { + const idx = new RecentWritesIndex(); + idx.recordAndQuery("/repo/foo.ts", "claude-code", "sess-A", 1_000, "main", null); + const matches = idx.recordAndQuery( + "/repo/foo.ts", + "openclaw", + "sess-B", + 2_000, + "main", + "/repo", + ); + expect(matches).toEqual([]); + }); + + it("drops aged-out entries past the 30-min window", () => { + const idx = new RecentWritesIndex(); + idx.recordAndQuery("/repo/foo.ts", "claude-code", "sess-A", 1_000, "main", "/repo"); + expect(idx.entryCount()).toBe(1); + const matches = idx.recordAndQuery( + "/repo/foo.ts", + "openclaw", + "sess-B", + 1_000 + WINDOW_MS + 1, + "main", + "/repo", + ); + expect(matches).toEqual([]); + // The aged peer was swept on the second call; only the new entry remains. + expect(idx.entryCount()).toBe(1); + }); + + it("returns multiple peers when several candidates are in-window", () => { + const idx = new RecentWritesIndex(); + idx.recordAndQuery("/repo/foo.ts", "claude-code", "sess-A", 1_000, "main", "/repo"); + idx.recordAndQuery("/repo/foo.ts", "openclaw", "sess-B", 2_000, "main", "/repo"); + const matches = idx.recordAndQuery( + "/repo/foo.ts", + "gemini", + "sess-C", + 3_000, + "main", + "/repo", + ); + // Both A (claude-code) and B (openclaw) are different agents + different + // sessions from C — both should match. + expect(matches).toHaveLength(2); + const sessions = matches.map((m) => m.sessionId).sort(); + expect(sessions).toEqual(["sess-A", "sess-B"]); + }); + + it("reset() clears all entries", () => { + const idx = new RecentWritesIndex(); + idx.recordAndQuery("/repo/foo.ts", "claude-code", "sess-A", 1_000, "main", "/repo"); + idx.recordAndQuery("/repo/bar.ts", "openclaw", "sess-B", 1_000, "main", "/repo"); + expect(idx.entryCount()).toBe(2); + idx.reset(); + expect(idx.entryCount()).toBe(0); + }); +}); diff --git a/src/correlate/session-links.ts b/src/correlate/session-links.ts new file mode 100644 index 0000000..e35bcbb --- /dev/null +++ b/src/correlate/session-links.ts @@ -0,0 +1,152 @@ +import type { AgentName } from "../schema.js"; + +/** AUR-276: per-path index of recent attributed writes within a sliding + * 30-min window. Pure data structure — no I/O, no subprocess, no SQLite. + * Caller hands us each (path, agent, sessionId, ts, branch, root) and + * receives back any matching peer entries that satisfy: + * + * - different agent + * - different session + * - same workspace_root (both non-null) + * - same git_branch (both non-null) + * - within WINDOW_MS of `ts` + * + * ASCII model: + * + * RecentWritesIndex + * ┌────────────────────────────────────────────────────┐ + * │ path ─────► [ {agent, sess, ts, branch, root} ] │ + * │ (append-order; sweep drops aged-out) │ + * │ │ + * │ recordAndQuery(path, agent, sess, ts, br, root) │ + * │ 1. drop entries older than ts - WINDOW_MS │ + * │ 2. matches = entries where │ + * │ e.agent != agent │ + * │ AND e.sess != sess │ + * │ AND e.branch === branch && branch != null │ + * │ AND e.root === root && root != null │ + * │ 3. push new entry │ + * │ 4. return matches │ + * └────────────────────────────────────────────────────┘ + * + * Memory: capped at MAX_ENTRIES; oldest 10 % evicted on overflow. + * This is a backstop — under normal load the 30-min sweep keeps the + * index well under the cap. + */ + +export const WINDOW_MS = 30 * 60 * 1000; +const MAX_ENTRIES = 50_000; +const EVICT_FRACTION = 0.1; + +export interface RecentWriteEntry { + agent: AgentName; + sessionId: string; + ts: number; // ms since epoch + branch: string | null; + root: string | null; +} + +export class RecentWritesIndex { + private byPath = new Map(); + private size = 0; + + /** Record a write and return any peer entries that should be linked + * per the gate above. Returned entries are *not* removed from the + * index — the same peer may legitimately link to multiple later + * writes in the window. */ + recordAndQuery( + path: string, + agent: AgentName, + sessionId: string, + tsMs: number, + branch: string | null, + root: string | null, + ): RecentWriteEntry[] { + const cutoff = tsMs - WINDOW_MS; + const bucket = this.byPath.get(path); + const matches: RecentWriteEntry[] = []; + if (bucket) { + // Drop aged-out entries up-front; cheap, keeps the bucket small. + let kept = 0; + for (const entry of bucket) { + if (entry.ts < cutoff) { + this.size -= 1; + continue; + } + bucket[kept++] = entry; + if ( + entry.agent !== agent && + entry.sessionId !== sessionId && + entry.branch != null && + branch != null && + entry.branch === branch && + entry.root != null && + root != null && + entry.root === root + ) { + matches.push(entry); + } + } + bucket.length = kept; + if (kept === 0) this.byPath.delete(path); + } + // Append the new entry. + const newEntry: RecentWriteEntry = { + agent, + sessionId, + ts: tsMs, + branch, + root, + }; + const next = this.byPath.get(path); + if (next) { + next.push(newEntry); + } else { + this.byPath.set(path, [newEntry]); + } + this.size += 1; + if (this.size > MAX_ENTRIES) this.evictOldest(); + return matches; + } + + /** Test/diagnostic: total entries currently held. */ + entryCount(): number { + return this.size; + } + + /** Test-only: drop everything. */ + reset(): void { + this.byPath.clear(); + this.size = 0; + } + + /** Hard-cap eviction: collect every entry, sort by ts ascending, + * drop the oldest EVICT_FRACTION. Cheap relative to MAX_ENTRIES, + * rare in practice — sweep keeps us well under the cap normally. */ + private evictOldest(): void { + const toDrop = Math.max(1, Math.floor(MAX_ENTRIES * EVICT_FRACTION)); + const allTs: number[] = []; + for (const bucket of this.byPath.values()) { + for (const e of bucket) allTs.push(e.ts); + } + if (allTs.length <= toDrop) { + this.byPath.clear(); + this.size = 0; + return; + } + allTs.sort((a, b) => a - b); + const cutoff = allTs[toDrop - 1] ?? -Infinity; + for (const [path, bucket] of this.byPath) { + let kept = 0; + for (const e of bucket) { + if (e.ts <= cutoff) { + this.size -= 1; + continue; + } + bucket[kept++] = e; + } + bucket.length = kept; + if (kept === 0) this.byPath.delete(path); + } + } +} diff --git a/src/daemon/run.ts b/src/daemon/run.ts index 176317b..4fc3178 100644 --- a/src/daemon/run.ts +++ b/src/daemon/run.ts @@ -50,7 +50,9 @@ export async function runDaemon(): Promise { let stoppingHooks: Array<() => Promise | void> = []; try { - const { openStore, wrapSinkWithStore } = await import("../store/index.js"); + const { openStore, wrapSinkWithStore, wrapSinkWithLinks } = await import( + "../store/index.js" + ); const { startAllAdapters, stopAllAdapters } = await import( "../adapters/registry.js" ); @@ -70,7 +72,9 @@ export async function runDaemon(): Promise { // wrapper; nothing additional to do here. }, }; - const sink = wrapSinkWithStore(inner, store); + // AUR-276: linker layered after the store wrapper so it can read the + // already-persisted session row when upserting workspace + branch. + const sink = wrapSinkWithLinks(wrapSinkWithStore(inner, store), store); const adapters = startAllAdapters(sink, workspace); stoppingHooks.push(() => stopAllAdapters(adapters)); stoppingHooks.push(() => store?.close()); diff --git a/src/git/correlate.ts b/src/git/correlate.ts index 21a9cbb..5538466 100644 --- a/src/git/correlate.ts +++ b/src/git/correlate.ts @@ -129,6 +129,21 @@ export function gitCommonDir(repoPath: string): string | null { } } +/** Current branch name at `repoPath`. Returns `null` for non-git dirs, + * detached HEAD (where `--abbrev-ref HEAD` reports the literal string + * `HEAD`), or any git failure. AUR-276 uses this — wrap with the + * branch-cache helper before calling on a hot path. */ +export function getCurrentBranch(repoPath: string): string | null { + try { + const out = runGit(["rev-parse", "--abbrev-ref", "HEAD"], { cwd: repoPath }); + const branch = out.trim(); + if (!branch || branch === "HEAD") return null; + return branch; + } catch { + return null; + } +} + /** List commits in `[since, until]` (both ISO). Returns oldest-first. * Skips merge commits (--no-merges) so the cost-per-commit metric * isn't diluted by routine integration commits. */ diff --git a/src/index.tsx b/src/index.tsx index 303818f..ec85b0c 100644 --- a/src/index.tsx +++ b/src/index.tsx @@ -26,6 +26,8 @@ Usage: (subcommands: start | stop | status | logs) agentwatch hooks ... install / uninstall / status the Claude Code hooks adapter agentwatch prune drop events older than --older-than-days (default 90) + agentwatch link-candidates dump AUR-276 session-correlation candidate pairs as JSON + (--session to scope; --limit to cap) agentwatch --help show this help Flags: @@ -42,9 +44,10 @@ Hotkeys inside the TUI: w open web UI in browser Environment: - WORKSPACE_ROOT override the detected workspace root - AGENTWATCH_PORT override the web server port - AGENTWATCH_HOST override the web server bind address + WORKSPACE_ROOT override the detected workspace root + AGENTWATCH_PORT override the web server port + AGENTWATCH_HOST override the web server bind address + AGENTWATCH_DEBUG_LINKS show AUR-276 candidate-pair counts in the agent panel `); process.exit(0); } @@ -116,6 +119,34 @@ if (arg === "hooks") { process.exit(2); } +if (arg === "link-candidates") { + // AUR-276: dump session-correlation candidate pairs as JSON so Michael + // can manually classify them (true-positive / false-positive / unclear) + // toward the AUR-277 validation gate. No formatting, no colours — this + // is plumbing, not UX. + const { openStore } = await import("./store/index.js"); + const sessionId = parseFlag("--session"); + const limitFlag = parseFlag("--limit"); + const limit = limitFlag ? Number(limitFlag) : undefined; + if (limit != null && (!Number.isFinite(limit) || limit < 1)) { + process.stderr.write( + `[agentwatch] link-candidates: --limit must be a positive number, got ${limitFlag}\n`, + ); + process.exit(2); + } + const store = openStore(); + try { + const rows = store.listSessionLinkCandidates({ + ...(sessionId ? { sessionId } : {}), + ...(limit ? { limit } : {}), + }); + process.stdout.write(JSON.stringify(rows, null, 2) + "\n"); + } finally { + store.close(); + } + process.exit(0); +} + if (arg === "prune") { const { openStore } = await import("./store/index.js"); const days = Number(parseFlag("--older-than-days") ?? "90"); @@ -182,7 +213,9 @@ if (arg === "serve") { ); const { detectWorkspaceRoot } = await import("./util/workspace.js"); const { clampTs } = await import("./schema.js"); - const { openStore, wrapSinkWithStore } = await import("./store/index.js"); + const { openStore, wrapSinkWithStore, wrapSinkWithLinks } = await import( + "./store/index.js" + ); const workspace = detectWorkspaceRoot(); const host = parseFlag("--host") ?? process.env.AGENTWATCH_HOST ?? "127.0.0.1"; const port = Number(parseFlag("--port") ?? process.env.AGENTWATCH_PORT ?? 3456); @@ -220,7 +253,10 @@ if (arg === "serve") { const { withClassifier } = await import("./classify/index.js"); const { withClaudeHookDedup } = await import("./adapters/hooks-dedup.js"); const persistSink = store ? wrapSinkWithStore(innerSink, store) : innerSink; - const classifiedSink = withClassifier(persistSink); + // AUR-276: layered after the store wrapper so the linker sees the + // already-persisted session row when it upserts workspace + branch. + const linkedSink = store ? wrapSinkWithLinks(persistSink, store) : persistSink; + const classifiedSink = withClassifier(linkedSink); const sink = withClaudeHookDedup(classifiedSink); server.setHookSink(sink); const adapters = startAllAdapters(sink, workspace); diff --git a/src/schema.ts b/src/schema.ts index 1f95216..9584329 100644 --- a/src/schema.ts +++ b/src/schema.ts @@ -103,6 +103,12 @@ export interface EventDetails { parseErrorCount?: number; /** Truncated preview of the most recent unparseable line. */ parseErrorSample?: string; + /** Working directory of the originating session at emit time. Carried on + * file_write / file_change events so the AUR-276 session-correlation + * linker can resolve same-workspace + same-branch matches without + * re-querying the adapter. Adapter populates from Claude `obj.cwd` + * or OpenClaw `sessionCwd`; null when the adapter has no cwd context. */ + cwd?: string; /** Activity category — one of ACTIVITY_CATEGORIES. Heuristically assigned * on emit by the classify wrapper (AUR-264). Used by the per-session * and per-project activity views to answer "where is my spend going?". */ diff --git a/src/store/index.ts b/src/store/index.ts index e66ced2..76f8f59 100644 --- a/src/store/index.ts +++ b/src/store/index.ts @@ -9,5 +9,8 @@ export { type PruneResult, type StoreStats, type ActivityBucket, + type SessionWorkspace, + type SessionLinkCandidate, + type ListLinkCandidatesOptions, } from "./sqlite.js"; -export { wrapSinkWithStore } from "./wire.js"; +export { wrapSinkWithStore, wrapSinkWithLinks } from "./wire.js"; diff --git a/src/store/sqlite.test.ts b/src/store/sqlite.test.ts index 0110451..5901e31 100644 --- a/src/store/sqlite.test.ts +++ b/src/store/sqlite.test.ts @@ -449,6 +449,172 @@ describe("sqlite store — listRecentEvents", () => { }); }); +describe("sqlite store — AUR-276 schema V3 (workspace + link candidates)", () => { + it("schema_version is at least 3 on a fresh DB", () => { + expect(store.stats().schemaVersion).toBeGreaterThanOrEqual(3); + }); + + it("re-opening an existing DB is idempotent at V3", () => { + const first = store.stats().schemaVersion; + store.close(); + store = openStore({ dbPath: join(dir, "events.db") }); + expect(store.stats().schemaVersion).toBe(first); + }); + + it("upsertSessionWorkspace populates a session row first-write-wins", () => { + const e = makeEvent({ sessionId: "s1" }); + store.insert(e); + store.upsertSessionWorkspace("s1", { + workspaceRoot: "/repo", + gitBranch: "main", + }); + expect(store.getSessionWorkspace("s1")).toEqual({ + workspaceRoot: "/repo", + gitBranch: "main", + }); + // Second call with different values must NOT overwrite. + store.upsertSessionWorkspace("s1", { + workspaceRoot: "/elsewhere", + gitBranch: "feature", + }); + expect(store.getSessionWorkspace("s1")).toEqual({ + workspaceRoot: "/repo", + gitBranch: "main", + }); + }); + + it("upsertSessionWorkspace is a no-op when both values are null", () => { + const e = makeEvent({ sessionId: "s2" }); + store.insert(e); + store.upsertSessionWorkspace("s2", { workspaceRoot: null, gitBranch: null }); + expect(store.getSessionWorkspace("s2")).toEqual({ + workspaceRoot: null, + gitBranch: null, + }); + }); + + it("getSessionWorkspace returns null/null for an unknown session", () => { + expect(store.getSessionWorkspace("does-not-exist")).toEqual({ + workspaceRoot: null, + gitBranch: null, + }); + }); + + it("recordSessionLinkCandidate creates a row, then bumps on repeat", () => { + store.recordSessionLinkCandidate({ + aSession: "sess-claude", + bSession: "sess-openclaw", + aAgent: "claude-code", + bAgent: "openclaw", + samplePath: "/repo/foo.ts", + ts: "2026-05-04T10:00:00.000Z", + workspaceRoot: "/repo", + gitBranch: "main", + }); + const after1 = store.listSessionLinkCandidates(); + expect(after1).toHaveLength(1); + expect(after1[0]?.linkCount).toBe(1); + store.recordSessionLinkCandidate({ + aSession: "sess-claude", + bSession: "sess-openclaw", + aAgent: "claude-code", + bAgent: "openclaw", + samplePath: "/repo/bar.ts", // sample_path on the existing row stays + ts: "2026-05-04T10:05:00.000Z", + workspaceRoot: "/repo", + gitBranch: "main", + }); + const after2 = store.listSessionLinkCandidates(); + expect(after2).toHaveLength(1); + expect(after2[0]?.linkCount).toBe(2); + expect(after2[0]?.lastLinkTs).toBe("2026-05-04T10:05:00.000Z"); + expect(after2[0]?.firstLinkTs).toBe("2026-05-04T10:00:00.000Z"); + expect(after2[0]?.samplePath).toBe("/repo/foo.ts"); // unchanged + }); + + it("canonicalizes pair ids — recording (B,A) hits the same row as (A,B)", () => { + store.recordSessionLinkCandidate({ + aSession: "z-session", // > a-session lexicographically + bSession: "a-session", + aAgent: "openclaw", + bAgent: "claude-code", + samplePath: "/repo/foo.ts", + ts: "2026-05-04T10:00:00.000Z", + workspaceRoot: "/repo", + gitBranch: "main", + }); + store.recordSessionLinkCandidate({ + aSession: "a-session", + bSession: "z-session", + aAgent: "claude-code", + bAgent: "openclaw", + samplePath: "/repo/foo.ts", + ts: "2026-05-04T10:05:00.000Z", + workspaceRoot: "/repo", + gitBranch: "main", + }); + const rows = store.listSessionLinkCandidates(); + expect(rows).toHaveLength(1); + expect(rows[0]?.aSession).toBe("a-session"); + expect(rows[0]?.bSession).toBe("z-session"); + expect(rows[0]?.linkCount).toBe(2); + }); + + it("listSessionLinkCandidates filters by sessionId either side", () => { + store.recordSessionLinkCandidate({ + aSession: "sess-A", + bSession: "sess-B", + aAgent: "claude-code", + bAgent: "openclaw", + samplePath: "/repo/foo.ts", + ts: "2026-05-04T10:00:00.000Z", + workspaceRoot: "/repo", + gitBranch: "main", + }); + store.recordSessionLinkCandidate({ + aSession: "sess-A", + bSession: "sess-C", + aAgent: "claude-code", + bAgent: "gemini", + samplePath: "/repo/bar.ts", + ts: "2026-05-04T10:01:00.000Z", + workspaceRoot: "/repo", + gitBranch: "main", + }); + expect(store.listSessionLinkCandidates({ sessionId: "sess-A" })).toHaveLength(2); + expect(store.listSessionLinkCandidates({ sessionId: "sess-B" })).toHaveLength(1); + expect(store.listSessionLinkCandidates({ sessionId: "sess-D" })).toHaveLength(0); + expect(store.listSessionLinkCandidates()).toHaveLength(2); + }); + + it("countSessionLinkCandidates + countAllLinkCandidates", () => { + store.recordSessionLinkCandidate({ + aSession: "sess-A", + bSession: "sess-B", + aAgent: "claude-code", + bAgent: "openclaw", + samplePath: "/repo/foo.ts", + ts: "2026-05-04T10:00:00.000Z", + workspaceRoot: "/repo", + gitBranch: "main", + }); + store.recordSessionLinkCandidate({ + aSession: "sess-A", + bSession: "sess-C", + aAgent: "claude-code", + bAgent: "gemini", + samplePath: "/repo/bar.ts", + ts: "2026-05-04T10:01:00.000Z", + workspaceRoot: "/repo", + gitBranch: "main", + }); + expect(store.countSessionLinkCandidates("sess-A")).toBe(2); + expect(store.countSessionLinkCandidates("sess-C")).toBe(1); + expect(store.countSessionLinkCandidates("nope")).toBe(0); + expect(store.countAllLinkCandidates()).toBe(2); + }); +}); + describe("sqlite store — bench (10k events)", () => { it( "ingests 10k events in under 2s", diff --git a/src/store/sqlite.ts b/src/store/sqlite.ts index 08b6998..1c31811 100644 --- a/src/store/sqlite.ts +++ b/src/store/sqlite.ts @@ -70,6 +70,40 @@ export interface ActivityBucket { sessionsTouched?: number; } +export interface SessionWorkspace { + /** Canonical workspace root (gitCommonDir-resolved). Null when the + * originating adapter had no cwd to derive it from. */ + workspaceRoot: string | null; + /** Current branch at the time the session's first file_write fired. + * Null when not in a git repo, in detached-HEAD state, or git is + * unavailable. */ + gitBranch: string | null; +} + +/** AUR-276: an unverified pair of sessions that touched the same path + * inside the 30-min sliding window on the same workspace + branch. + * Stored to disk so we can measure the false-positive rate against + * real multi-agent telemetry before promoting any of this to a UI. */ +export interface SessionLinkCandidate { + aSession: string; + bSession: string; + aAgent: AgentName; + bAgent: AgentName; + firstLinkTs: string; + lastLinkTs: string; + linkCount: number; + samplePath: string; + workspaceRoot: string | null; + gitBranch: string | null; +} + +export interface ListLinkCandidatesOptions { + /** Restrict to candidates that involve this session (either side). */ + sessionId?: string; + /** Hard cap on rows returned. Defaults to 200, max 5000. */ + limit?: number; +} + export interface EventStore { insert(event: AgentEvent): void; insertMany(events: AgentEvent[]): void; @@ -88,12 +122,47 @@ export interface EventStore { activityBySession(sessionId: string): ActivityBucket[]; /** Per-category event count + cost across every session in a project. */ activityByProject(projectName: string): ActivityBucket[]; + /** AUR-276: first-write-wins write of (workspace_root, git_branch) + * for a session row. Re-calls with the same session id are no-ops + * whenever the existing column is already populated. */ + upsertSessionWorkspace( + sessionId: string, + workspace: SessionWorkspace, + ): void; + /** AUR-276: read the cached workspace + branch for a session. Returns + * `{workspaceRoot:null, gitBranch:null}` when the session row hasn't + * been resolved yet (or doesn't exist). */ + getSessionWorkspace(sessionId: string): SessionWorkspace; + /** AUR-276: upsert a candidate-pair row. The first call creates a row + * with `link_count=1`; subsequent calls for the same pair bump + * `link_count` and refresh `last_link_ts`. Pair ids are canonicalized + * (`a < b` lexicographically) so each pair stores once. */ + recordSessionLinkCandidate(input: { + aSession: string; + bSession: string; + aAgent: AgentName; + bAgent: AgentName; + samplePath: string; + ts: string; + workspaceRoot: string | null; + gitBranch: string | null; + }): void; + /** AUR-276: list candidate pairs (newest first by last_link_ts). */ + listSessionLinkCandidates( + opts?: ListLinkCandidatesOptions, + ): SessionLinkCandidate[]; + /** AUR-276: total candidate-pair rows touching a given session. Used + * by the dev-only TUI badge gated on AGENTWATCH_DEBUG_LINKS. */ + countSessionLinkCandidates(sessionId: string): number; + /** AUR-276: total candidate-pair rows in the table. Used by the dev-only + * TUI badge to surface global telemetry growth at a glance. */ + countAllLinkCandidates(): number; prune(opts: { olderThanDays: number }): PruneResult; stats(): StoreStats; close(): void; } -const SCHEMA_VERSION = 2; +const SCHEMA_VERSION = 3; export const DEFAULT_DB_PATH = join(homedir(), ".agentwatch", "events.db"); @@ -120,11 +189,44 @@ function applyMigrations(db: Database.Database): void { const current = row?.version ?? 0; if (current < 1) applyV1(db); if (current < 2) applyV2(db); + if (current < 3) applyV3(db); db.prepare( "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", ).run(SCHEMA_VERSION); } +function applyV3(db: Database.Database): void { + // AUR-276: workspace + branch per session, plus candidate-pair table for + // session-correlation telemetry. ALTER TABLE adds nullable columns; the + // candidate table is brand new. All steps idempotent (duplicate-column + // and CREATE IF NOT EXISTS) so a re-applied migration is a no-op. + for (const col of ["workspace_root", "git_branch"]) { + try { + db.exec(`ALTER TABLE sessions ADD COLUMN ${col} TEXT`); + } catch (err) { + if (!String(err).includes("duplicate column name")) throw err; + } + } + db.exec(` + CREATE TABLE IF NOT EXISTS session_link_candidates ( + a_session TEXT NOT NULL, + b_session TEXT NOT NULL, + a_agent TEXT NOT NULL, + b_agent TEXT NOT NULL, + first_link_ts TEXT NOT NULL, + last_link_ts TEXT NOT NULL, + link_count INTEGER NOT NULL DEFAULT 1, + sample_path TEXT NOT NULL, + workspace_root TEXT, + git_branch TEXT, + PRIMARY KEY (a_session, b_session) + ); + CREATE INDEX IF NOT EXISTS idx_link_candidates_a ON session_link_candidates(a_session); + CREATE INDEX IF NOT EXISTS idx_link_candidates_b ON session_link_candidates(b_session); + CREATE INDEX IF NOT EXISTS idx_link_candidates_last_ts ON session_link_candidates(last_link_ts); + `); +} + function applyV2(db: Database.Database): void { // AUR-264: per-event activity category. ALTER TABLE adds the column; // FTS5 doesn't reference category so the existing triggers stay valid. @@ -379,10 +481,163 @@ function buildStore(db: Database.Database): EventStore { } } + // AUR-276: workspace + branch per session, plus candidate-pair upsert + // and reads. Prepared once for the lifetime of the store. + const upsertSessionWorkspaceStmt = db.prepare(` + UPDATE sessions + SET workspace_root = COALESCE(workspace_root, @workspace_root), + git_branch = COALESCE(git_branch, @git_branch), + updated_at = strftime('%s','now') + WHERE session_id = @session_id + `); + + const getSessionWorkspaceStmt = db.prepare(` + SELECT workspace_root, git_branch + FROM sessions + WHERE session_id = ? + `); + + // INSERT OR IGNORE the canonical pair, then UPDATE bumps link_count + + // last_link_ts. INSERT-then-UPDATE is two statements but the alternative + // (`ON CONFLICT(...) DO UPDATE SET link_count = link_count + 1`) makes + // the "first row wins for sample_path / workspace_root / git_branch" + // semantics harder to reason about. Two statements is clearer. + const insertLinkCandidateStmt = db.prepare(` + INSERT OR IGNORE INTO session_link_candidates ( + a_session, b_session, a_agent, b_agent, + first_link_ts, last_link_ts, link_count, + sample_path, workspace_root, git_branch + ) VALUES ( + @a_session, @b_session, @a_agent, @b_agent, + @ts, @ts, 1, + @sample_path, @workspace_root, @git_branch + ) + `); + const bumpLinkCandidateStmt = db.prepare(` + UPDATE session_link_candidates + SET link_count = link_count + 1, + last_link_ts = CASE WHEN @ts > last_link_ts THEN @ts ELSE last_link_ts END + WHERE a_session = @a_session AND b_session = @b_session + `); + + const listLinkCandidatesAllStmt = db.prepare(` + SELECT a_session, b_session, a_agent, b_agent, + first_link_ts, last_link_ts, link_count, + sample_path, workspace_root, git_branch + FROM session_link_candidates + ORDER BY last_link_ts DESC + LIMIT ? + `); + const listLinkCandidatesForSessionStmt = db.prepare(` + SELECT a_session, b_session, a_agent, b_agent, + first_link_ts, last_link_ts, link_count, + sample_path, workspace_root, git_branch + FROM session_link_candidates + WHERE a_session = ? OR b_session = ? + ORDER BY last_link_ts DESC + LIMIT ? + `); + const countLinkCandidatesForSessionStmt = db.prepare(` + SELECT COUNT(*) AS c + FROM session_link_candidates + WHERE a_session = ? OR b_session = ? + `); + const countAllLinkCandidatesStmt = db.prepare(` + SELECT COUNT(*) AS c FROM session_link_candidates + `); + return { insert: doInsert, insertMany: (events) => insertMany(events), enrich: doEnrich, + upsertSessionWorkspace(sessionId, workspace) { + // No-op if both fields are null — nothing to learn from this call. + if (workspace.workspaceRoot == null && workspace.gitBranch == null) return; + upsertSessionWorkspaceStmt.run({ + session_id: sessionId, + workspace_root: workspace.workspaceRoot, + git_branch: workspace.gitBranch, + }); + }, + getSessionWorkspace(sessionId) { + const row = getSessionWorkspaceStmt.get(sessionId) as + | { workspace_root: string | null; git_branch: string | null } + | undefined; + return { + workspaceRoot: row?.workspace_root ?? null, + gitBranch: row?.git_branch ?? null, + }; + }, + recordSessionLinkCandidate(input) { + // Canonicalize so each undirected pair stores once. + const [aSession, bSession, aAgent, bAgent] = + input.aSession < input.bSession + ? [input.aSession, input.bSession, input.aAgent, input.bAgent] + : [input.bSession, input.aSession, input.bAgent, input.aAgent]; + const params = { + a_session: aSession, + b_session: bSession, + a_agent: aAgent, + b_agent: bAgent, + ts: input.ts, + sample_path: input.samplePath, + workspace_root: input.workspaceRoot, + git_branch: input.gitBranch, + }; + const inserted = insertLinkCandidateStmt.run(params); + if (inserted.changes === 0) { + bumpLinkCandidateStmt.run({ + a_session: aSession, + b_session: bSession, + ts: input.ts, + }); + } + }, + listSessionLinkCandidates(opts = {}) { + const limit = clamp(opts.limit ?? 200, 1, 5000); + const rows = ( + opts.sessionId + ? listLinkCandidatesForSessionStmt.all( + opts.sessionId, + opts.sessionId, + limit, + ) + : listLinkCandidatesAllStmt.all(limit) + ) as Array<{ + a_session: string; + b_session: string; + a_agent: AgentName; + b_agent: AgentName; + first_link_ts: string; + last_link_ts: string; + link_count: number; + sample_path: string; + workspace_root: string | null; + git_branch: string | null; + }>; + return rows.map((r) => ({ + aSession: r.a_session, + bSession: r.b_session, + aAgent: r.a_agent, + bAgent: r.b_agent, + firstLinkTs: r.first_link_ts, + lastLinkTs: r.last_link_ts, + linkCount: r.link_count, + samplePath: r.sample_path, + workspaceRoot: r.workspace_root, + gitBranch: r.git_branch, + })); + }, + countSessionLinkCandidates(sessionId) { + const row = countLinkCandidatesForSessionStmt.get( + sessionId, + sessionId, + ) as { c: number }; + return row.c; + }, + countAllLinkCandidates() { + return (countAllLinkCandidatesStmt.get() as { c: number }).c; + }, hasEvent(eventId) { return Boolean(hasStmt.get(eventId)); }, diff --git a/src/store/wire.test.ts b/src/store/wire.test.ts new file mode 100644 index 0000000..11315f7 --- /dev/null +++ b/src/store/wire.test.ts @@ -0,0 +1,247 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import type { AgentEvent, EventDetails, EventSink } from "../schema.js"; +import { openStore, type EventStore } from "./sqlite.js"; +import { wrapSinkWithLinks, wrapSinkWithStore } from "./wire.js"; + +let dir: string; +let store: EventStore; + +beforeEach(() => { + dir = mkdtempSync(join(tmpdir(), "agentwatch-wire-")); + store = openStore({ dbPath: join(dir, "events.db") }); +}); + +afterEach(() => { + store.close(); + rmSync(dir, { recursive: true, force: true }); +}); + +function recordingSink(): { + sink: EventSink; + emitted: AgentEvent[]; + enriched: Array<{ id: string; patch: Partial }>; +} { + const emitted: AgentEvent[] = []; + const enriched: Array<{ id: string; patch: Partial }> = []; + return { + sink: { + emit: (e) => emitted.push(e), + enrich: (id, patch) => enriched.push({ id, patch }), + }, + emitted, + enriched, + }; +} + +function fakeWrite(over: Partial = {}): AgentEvent { + return { + id: over.id ?? `evt-${Math.random().toString(36).slice(2, 10)}`, + ts: over.ts ?? new Date(2026, 0, 1, 12, 0, 0).toISOString(), + agent: over.agent ?? "claude-code", + type: over.type ?? "file_write", + path: over.path ?? "/repo/foo.ts", + sessionId: over.sessionId ?? "sess-A", + riskScore: over.riskScore ?? 4, + details: over.details, + }; +} + +const fakeResolve = (cwd: string | null | undefined) => { + if (!cwd) return { workspaceRoot: null, gitBranch: null }; + return { workspaceRoot: cwd, gitBranch: "main" }; +}; + +describe("wrapSinkWithLinks — pass-through behaviour", () => { + it("passes every emit + enrich straight through to the inner sink", () => { + const inner = recordingSink(); + const linked = wrapSinkWithLinks(inner.sink, store, { resolve: fakeResolve }); + const e = fakeWrite(); + // Insert the event via the store first so the sessions row exists for + // upsertSessionWorkspace to update. + store.insert(e); + linked.emit(e); + linked.enrich(e.id, { fullText: "hello" }); + expect(inner.emitted).toHaveLength(1); + expect(inner.emitted[0]?.id).toBe(e.id); + expect(inner.enriched).toEqual([{ id: e.id, patch: { fullText: "hello" } }]); + }); + + it("ignores non-write events (no DB write attempt)", () => { + const inner = recordingSink(); + const linked = wrapSinkWithLinks(inner.sink, store, { resolve: fakeResolve }); + const e = fakeWrite({ type: "tool_call", path: undefined }); + store.insert(e); + linked.emit(e); + expect(store.countAllLinkCandidates()).toBe(0); + expect(store.getSessionWorkspace(e.sessionId!).workspaceRoot).toBeNull(); + }); + + it("ignores file_write events with no cwd in details", () => { + const inner = recordingSink(); + const linked = wrapSinkWithLinks(inner.sink, store, { resolve: fakeResolve }); + const e = fakeWrite({ details: undefined }); + store.insert(e); + linked.emit(e); + expect(store.countAllLinkCandidates()).toBe(0); + expect(store.getSessionWorkspace(e.sessionId!).workspaceRoot).toBeNull(); + }); +}); + +describe("wrapSinkWithLinks — workspace upsert", () => { + it("populates workspace_root + git_branch on the session row from cwd", () => { + const inner = recordingSink(); + const linked = wrapSinkWithLinks(inner.sink, store, { resolve: fakeResolve }); + const e = fakeWrite({ + sessionId: "sess-A", + details: { cwd: "/repo" }, + }); + store.insert(e); + linked.emit(e); + const ws = store.getSessionWorkspace("sess-A"); + expect(ws).toEqual({ workspaceRoot: "/repo", gitBranch: "main" }); + }); + + it("first-write-wins — a later resolve doesn't overwrite a populated row", () => { + const inner = recordingSink(); + let nextRoot = "/repo"; + let nextBranch: string | null = "main"; + const linked = wrapSinkWithLinks(inner.sink, store, { + resolve: () => ({ workspaceRoot: nextRoot, gitBranch: nextBranch }), + }); + const e1 = fakeWrite({ id: "e1", details: { cwd: "/repo" } }); + store.insert(e1); + linked.emit(e1); + nextRoot = "/elsewhere"; + nextBranch = "feature"; + const e2 = fakeWrite({ id: "e2", details: { cwd: "/elsewhere" } }); + store.insert(e2); // same session, so the same sessions row + linked.emit(e2); + expect(store.getSessionWorkspace("sess-A")).toEqual({ + workspaceRoot: "/repo", + gitBranch: "main", + }); + }); +}); + +describe("wrapSinkWithLinks — candidate-pair recording", () => { + it("records a candidate pair when two agents touch the same file in-window", () => { + const inner = recordingSink(); + const linked = wrapSinkWithLinks(inner.sink, store, { resolve: fakeResolve }); + const t0 = new Date(2026, 0, 1, 12, 0, 0); + const t1 = new Date(2026, 0, 1, 12, 5, 0); // 5 min later, in window + const claude = fakeWrite({ + id: "evt-claude", + agent: "claude-code", + sessionId: "sess-claude", + ts: t0.toISOString(), + details: { cwd: "/repo" }, + }); + const openclaw = fakeWrite({ + id: "evt-openclaw", + agent: "openclaw", + sessionId: "sess-openclaw", + ts: t1.toISOString(), + details: { cwd: "/repo" }, + }); + store.insert(claude); + store.insert(openclaw); + linked.emit(claude); + linked.emit(openclaw); + const candidates = store.listSessionLinkCandidates(); + expect(candidates).toHaveLength(1); + expect(candidates[0]).toMatchObject({ + aSession: "sess-claude", // canonical sort: sess-claude < sess-openclaw + bSession: "sess-openclaw", + aAgent: "claude-code", + bAgent: "openclaw", + linkCount: 1, + samplePath: "/repo/foo.ts", + workspaceRoot: "/repo", + gitBranch: "main", + }); + }); + + it("bumps link_count + last_link_ts on a repeat hit", () => { + const inner = recordingSink(); + const linked = wrapSinkWithLinks(inner.sink, store, { resolve: fakeResolve }); + const claude = fakeWrite({ + id: "evt-claude", + agent: "claude-code", + sessionId: "sess-claude", + ts: new Date(2026, 0, 1, 12, 0, 0).toISOString(), + details: { cwd: "/repo" }, + }); + const openclaw1 = fakeWrite({ + id: "evt-openclaw-1", + agent: "openclaw", + sessionId: "sess-openclaw", + ts: new Date(2026, 0, 1, 12, 5, 0).toISOString(), + details: { cwd: "/repo" }, + }); + const openclaw2 = fakeWrite({ + id: "evt-openclaw-2", + agent: "openclaw", + sessionId: "sess-openclaw", + ts: new Date(2026, 0, 1, 12, 10, 0).toISOString(), + details: { cwd: "/repo" }, + }); + store.insert(claude); + store.insert(openclaw1); + store.insert(openclaw2); + linked.emit(claude); + linked.emit(openclaw1); + linked.emit(openclaw2); + const candidates = store.listSessionLinkCandidates(); + expect(candidates).toHaveLength(1); + expect(candidates[0]?.linkCount).toBe(2); + expect(candidates[0]?.lastLinkTs).toBe(openclaw2.ts); + expect(candidates[0]?.firstLinkTs).toBe(openclaw1.ts); + }); + + it("does not record when resolve returns a null branch", () => { + const inner = recordingSink(); + const linked = wrapSinkWithLinks(inner.sink, store, { + resolve: (cwd) => ({ workspaceRoot: cwd ?? null, gitBranch: null }), + }); + const claude = fakeWrite({ + sessionId: "sess-claude", + agent: "claude-code", + details: { cwd: "/repo" }, + }); + const openclaw = fakeWrite({ + sessionId: "sess-openclaw", + agent: "openclaw", + details: { cwd: "/repo" }, + }); + store.insert(claude); + store.insert(openclaw); + linked.emit(claude); + linked.emit(openclaw); + expect(store.countAllLinkCandidates()).toBe(0); + }); +}); + +describe("wrapSinkWithStore — regression: layered with linker stays parity", () => { + // CRITICAL: AUR-276 must not regress wrapSinkWithStore semantics. An + // event emitted through the layered chain must still land in the events + // table identically to one going through the store wrapper alone. + it("an event emitted through link→store lands in events identically", () => { + const noopInner: EventSink = { emit: () => undefined, enrich: () => undefined }; + const persistOnly = wrapSinkWithStore(noopInner, store); + const linked = wrapSinkWithLinks(persistOnly, store); + const e = fakeWrite({ + id: "evt-regression", + sessionId: "sess-regression", + details: { cwd: "/repo", fullText: "hi" }, + }); + linked.emit(e); + const back = store.getEvent(e.id); + expect(back).not.toBeNull(); + expect(back?.id).toBe(e.id); + expect(back?.path).toBe(e.path); + expect(back?.sessionId).toBe(e.sessionId); + }); +}); diff --git a/src/store/wire.ts b/src/store/wire.ts index 781bfb0..ca2badb 100644 --- a/src/store/wire.ts +++ b/src/store/wire.ts @@ -1,4 +1,6 @@ -import type { EventDetails, EventSink } from "../schema.js"; +import { resolveWorkspace } from "../correlate/branch-cache.js"; +import { RecentWritesIndex } from "../correlate/session-links.js"; +import type { AgentEvent, EventDetails, EventSink } from "../schema.js"; import type { EventStore } from "./sqlite.js"; /** Wraps an existing EventSink so every emit/enrich is mirrored into the @@ -43,3 +45,98 @@ export function wrapSinkWithStore( }, }; } + +interface LinkerDeps { + /** Override for tests. Defaults to the real resolveWorkspace. */ + resolve?: typeof resolveWorkspace; +} + +/** AUR-276 — the session-correlation telemetry layer. Sits *after* + * `wrapSinkWithStore` so it sees events that have already been persisted + * with their session row. On every file_write / file_change event: + * + * 1. Resolve (workspace_root, git_branch) for the session, cached. + * 2. Persist the resolution to the session row (first-write-wins). + * 3. Feed (path, agent, sessionId, ts, branch, root) into the + * in-process RecentWritesIndex. For every peer match returned, + * INSERT (or bump) a session_link_candidates row. + * + * No UI surface and no API field — this is data collection only. + * Promotion to a stitched-sessions UI is gated on AUR-277 + a manual + * validation gate against accumulated candidate pairs. + * + * Errors are warn-once + swallow, mirroring `wrapSinkWithStore` so the + * observability pipeline never crashes the agent runtime. */ +export function wrapSinkWithLinks( + inner: EventSink, + store: EventStore, + deps: LinkerDeps = {}, +): EventSink { + const index = new RecentWritesIndex(); + const resolve = deps.resolve ?? resolveWorkspace; + let warned = false; + return { + emit: (event) => { + try { + if (isLinkableWrite(event)) processWrite(event, store, index, resolve); + } catch (err) { + if (!warned) { + warned = true; + process.stderr.write( + `[agentwatch] session-link error (further occurrences suppressed): ${String(err)}\n`, + ); + } + } + inner.emit(event); + }, + enrich: (eventId, patch) => inner.enrich(eventId, patch), + }; +} + +function isLinkableWrite(event: AgentEvent): boolean { + if (event.type !== "file_write" && event.type !== "file_change") return false; + if (!event.path) return false; + if (!event.sessionId) return false; + // No cwd → can't resolve workspace → can't gate matches → don't bother. + if (!event.details?.cwd) return false; + return true; +} + +function processWrite( + event: AgentEvent, + store: EventStore, + index: RecentWritesIndex, + resolve: typeof resolveWorkspace, +): void { + const cwd = event.details?.cwd ?? null; + const resolved = resolve(cwd); + // Cache the resolution on the session row so downstream readers + // (CLI, future API) can attribute candidates to a workspace + branch. + store.upsertSessionWorkspace(event.sessionId!, { + workspaceRoot: resolved.workspaceRoot, + gitBranch: resolved.gitBranch, + }); + if (resolved.workspaceRoot == null || resolved.gitBranch == null) return; + const tsMs = Date.parse(event.ts); + if (!Number.isFinite(tsMs)) return; + const matches = index.recordAndQuery( + event.path!, + event.agent, + event.sessionId!, + tsMs, + resolved.gitBranch, + resolved.workspaceRoot, + ); + for (const peer of matches) { + store.recordSessionLinkCandidate({ + aSession: event.sessionId!, + bSession: peer.sessionId, + aAgent: event.agent, + bAgent: peer.agent, + samplePath: event.path!, + ts: event.ts, + workspaceRoot: resolved.workspaceRoot, + gitBranch: resolved.gitBranch, + }); + } +} diff --git a/src/ui/App.tsx b/src/ui/App.tsx index 64551d3..1c96fd0 100644 --- a/src/ui/App.tsx +++ b/src/ui/App.tsx @@ -25,7 +25,12 @@ import { initialState, matchesQuery, reducer } from "./state.js"; import { startServer, type ServerHandle, addEventToServer } from "../server/index.js"; import { openUrl } from "../util/open-url.js"; import { onShutdown } from "../util/shutdown.js"; -import { openStore, wrapSinkWithStore, type EventStore } from "../store/index.js"; +import { + openStore, + wrapSinkWithStore, + wrapSinkWithLinks, + type EventStore, +} from "../store/index.js"; import { withClaudeHookDedup } from "../adapters/hooks-dedup.js"; import { withClassifier } from "../classify/index.js"; @@ -50,6 +55,11 @@ export function App() { const [server, setServer] = useState(null); const [store, setStore] = useState(null); const noWeb = process.argv.includes("--no-web"); + // AUR-276: dev-only candidate-pair counter, gated on the env var. + // `undefined` means "don't render the badge"; a number renders it. + const [linkCandidateCount, setLinkCandidateCount] = useState( + undefined, + ); // Persistent SQLite store — opened once, drains on shutdown. Failure to // open (e.g. read-only home dir) leaves store=null; the rest of the TUI @@ -175,7 +185,10 @@ export function App() { }, }; const persistSink = store ? wrapSinkWithStore(sink, store) : sink; - const classifiedSink = withClassifier(persistSink); + // AUR-276: layered after the store wrapper so it can read the + // already-persisted session row when upserting workspace + branch. + const linkedSink = store ? wrapSinkWithLinks(persistSink, store) : persistSink; + const classifiedSink = withClassifier(linkedSink); const finalSink = withClaudeHookDedup(classifiedSink); if (server) { // Hooks route forwards Claude hook curls into the same pipeline @@ -296,6 +309,25 @@ export function App() { } }, [budgetBreachKey]); + // AUR-276: poll the candidate-pair count when the dev env var is set. + // Cheap (single COUNT(*) on an indexed table); 5 s cadence is enough + // for an eyeball-it-now-and-then debug surface. No-op when the var is + // unset so non-dev users see no overhead and no badge. + useEffect(() => { + if (!store) return; + if (process.env.AGENTWATCH_DEBUG_LINKS !== "1") return; + const refresh = (): void => { + try { + setLinkCandidateCount(store.countAllLinkCandidates()); + } catch { + // Store may be transiently locked; ignore — we'll catch the next tick. + } + }; + refresh(); + const handle = setInterval(refresh, 5_000); + return () => clearInterval(handle); + }, [store]); + const sessionSummaries = useMemo(() => summarizeBySession(anomalies), [anomalies]); const anomalyKey = sessionSummaries.map((s) => `${s.sessionId}:${s.headline}`).join("|"); const bannerSuppressed = state.anomalyDismissKey === anomalyKey; @@ -418,6 +450,7 @@ export function App() { anomalies={bannerSuppressed ? undefined : anomalies} sessionAnomalies={bannerSuppressed ? [] : sessionSummaries} webUrl={server?.url} + linkCandidateCount={linkCandidateCount} /> ; sessionAnomalies?: SessionAnomalySummary[]; webUrl?: string; + /** AUR-276: dev-only candidate-pair count surfaced when the operator + * sets `AGENTWATCH_DEBUG_LINKS=1`. Always undefined in normal use. */ + linkCandidateCount?: number; } export type { Props as HeaderProps }; @@ -27,6 +30,7 @@ export function Header({ anomalies, sessionAnomalies, webUrl, + linkCandidateCount, }: Props) { const breached = budget?.breachedSession || budget?.dayBreach; const anomalyMessages = summarizeAnomalies(anomalies); @@ -62,6 +66,12 @@ export function Header({ [w] )} + {linkCandidateCount != null && ( + <> + links: + {linkCandidateCount} + + )} {breached && budget && budget.sessionCost > 0 && ( From 91c9c04edcfeb254b1c5904bc082fdac9de5fdbd Mon Sep 17 00:00:00 2001 From: Michael Nefedov Date: Mon, 4 May 2026 17:53:59 +0200 Subject: [PATCH 2/2] =?UTF-8?q?fix(AUR-276):=20codex=20review=20=E2=80=94?= =?UTF-8?q?=20order,=20worktree=20key,=20hot-path=20subprocess?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three material issues from codex review of #30, all fixed: 1. [P1] wrapSinkWithLinks ran processWrite BEFORE inner.emit. Since inner is wrapSinkWithStore in production, store.insert (which fires the AFTER-INSERT trigger that creates the sessions row) ran AFTER upsertSessionWorkspace's UPDATE. Result: every session's first write silently failed to populate workspace_root + git_branch, and any single-write session stayed permanently null. The whole telemetry feature would have no-op'd on first writes — exactly the data AUR-276 needs to collect. Forward to inner FIRST, then process. The pre-fix tests pre-inserted via store.insert before calling linked.emit, masking the bug. Tests now use the production sink composition (linker over store wrapper) so a single emit() exercises the real ordering invariant. 2. [P2] branch-cache keyed by gitCommonDir(cwd), which collapses linked worktrees of the same repo. Two worktrees on different branches shared a cache entry, so a write from worktree-A on `main` poisoned the cache for a write from worktree-B on `feature` for 60s — wrong branch attribution, exactly the false-positive injection AUR-276 exists to measure. Cache now keyed by cwd (per-worktree). Branch query also moved from common-dir to cwd (HEAD lives per-worktree). workspaceRoot return value is still common-dir-resolved, so on-same- branch worktrees of the same repo still collapse for matching. Added regression test. 3. [P2] gitCommonDir was called BEFORE the cache check on every event, so the 60s cache only covered the branch lookup; the common-dir shell-out fired on every cache hit, defeating the cache's hot-path purpose. Cache lookup now precedes any subprocess; common-dir is resolved only on miss and cached alongside the branch. Added regression test asserting zero shell-outs on cache hit. Tests: 404 pass (1 new), typecheck clean. --- src/correlate/branch-cache.test.ts | 104 +++++++++++++++++++++-------- src/correlate/branch-cache.ts | 56 ++++++++++++---- src/store/wire.test.ts | 99 ++++++++++++++------------- src/store/wire.ts | 11 ++- 4 files changed, 183 insertions(+), 87 deletions(-) diff --git a/src/correlate/branch-cache.test.ts b/src/correlate/branch-cache.test.ts index 568f116..cc81941 100644 --- a/src/correlate/branch-cache.test.ts +++ b/src/correlate/branch-cache.test.ts @@ -7,74 +7,126 @@ beforeEach(() => { describe("resolveWorkspace — null cwd inputs", () => { it("returns null/null for an undefined cwd without shelling out", () => { - let calls = 0; + let branchCalls = 0; + let commonDirCalls = 0; const r = resolveWorkspace(undefined, { branchOf: () => { - calls++; + branchCalls++; return "main"; }, + commonDirOf: () => { + commonDirCalls++; + return null; + }, }); expect(r).toEqual({ workspaceRoot: null, gitBranch: null }); - expect(calls).toBe(0); + expect(branchCalls).toBe(0); + expect(commonDirCalls).toBe(0); }); it("returns null/null for an empty-string cwd without shelling out", () => { - let calls = 0; + let branchCalls = 0; + let commonDirCalls = 0; const r = resolveWorkspace("", { branchOf: () => { - calls++; + branchCalls++; return "main"; }, + commonDirOf: () => { + commonDirCalls++; + return null; + }, }); expect(r).toEqual({ workspaceRoot: null, gitBranch: null }); - expect(calls).toBe(0); + expect(branchCalls).toBe(0); + expect(commonDirCalls).toBe(0); }); }); describe("resolveWorkspace — caching behaviour", () => { - it("re-uses the cached branch within the TTL", () => { - let calls = 0; + it("re-uses both cached values within the TTL — no shell-outs on hit", () => { + let branchCalls = 0; + let commonDirCalls = 0; const branchOf = (): string | null => { - calls++; + branchCalls++; return "main"; }; + const commonDirOf = (): string | null => { + commonDirCalls++; + return "/repo/.git"; + }; const now = (): number => 1_000_000; - const a = resolveWorkspace("/repo/a", { branchOf, now }); - const b = resolveWorkspace("/repo/a", { branchOf, now }); - expect(calls).toBe(1); + const a = resolveWorkspace("/repo/a", { branchOf, commonDirOf, now }); + const b = resolveWorkspace("/repo/a", { branchOf, commonDirOf, now }); + expect(branchCalls).toBe(1); + expect(commonDirCalls).toBe(1); // regression guard: no shell-out on hit + expect(a).toEqual(b); expect(a.gitBranch).toBe("main"); - expect(b.gitBranch).toBe("main"); - // workspaceRoot may have been gitCommonDir-resolved or fall back to cwd - expect(a.workspaceRoot).toBe(b.workspaceRoot); + expect(a.workspaceRoot).toBe("/repo/.git"); }); it("re-shells when the cache entry is older than the TTL", () => { - let calls = 0; + let branchCalls = 0; const branchOf = (): string | null => { - calls++; - return calls === 1 ? "main" : "feature"; + branchCalls++; + return branchCalls === 1 ? "main" : "feature"; }; + const commonDirOf = (): string | null => "/repo/.git"; let t = 1_000_000; const now = (): number => t; - const first = resolveWorkspace("/repo/b", { branchOf, now }); + const first = resolveWorkspace("/repo/b", { branchOf, commonDirOf, now }); expect(first.gitBranch).toBe("main"); t += 60_001; // just past the 60 s TTL - const second = resolveWorkspace("/repo/b", { branchOf, now }); + const second = resolveWorkspace("/repo/b", { branchOf, commonDirOf, now }); expect(second.gitBranch).toBe("feature"); - expect(calls).toBe(2); + expect(branchCalls).toBe(2); }); it("caches a null branch the same way as a real branch", () => { - let calls = 0; + let branchCalls = 0; const branchOf = (): string | null => { - calls++; + branchCalls++; return null; }; const now = (): number => 2_000_000; - const a = resolveWorkspace("/repo/c", { branchOf, now }); - const b = resolveWorkspace("/repo/c", { branchOf, now }); + const a = resolveWorkspace("/repo/c", { + branchOf, + commonDirOf: () => null, + now, + }); + const b = resolveWorkspace("/repo/c", { + branchOf, + commonDirOf: () => null, + now, + }); expect(a.gitBranch).toBeNull(); expect(b.gitBranch).toBeNull(); - expect(calls).toBe(1); + expect(branchCalls).toBe(1); + }); + + it("does NOT collapse linked worktrees that share a common-dir but differ on branch", () => { + // Two worktrees of the same repo, different branches. The previous + // (codex-flagged) keying-by-common-dir would cross-poison them; this + // version keys by cwd so each worktree has its own cache entry. + const commonDirOf = (): string | null => "/repo/.git"; // shared + const branchOf = (cwd: string): string | null => + cwd === "/repo/main-worktree" ? "main" : "feature"; + const now = (): number => 5_000_000; + const a = resolveWorkspace("/repo/main-worktree", { + branchOf, + commonDirOf, + now, + }); + const b = resolveWorkspace("/repo/feature-worktree", { + branchOf, + commonDirOf, + now, + }); + expect(a.gitBranch).toBe("main"); + expect(b.gitBranch).toBe("feature"); + // Both still resolve to the SAME workspaceRoot — the matcher gates + // on (workspaceRoot, branch) together, and on-same-branch worktrees + // SHOULD collapse, while different-branch ones diverge on branch. + expect(a.workspaceRoot).toBe(b.workspaceRoot); }); }); diff --git a/src/correlate/branch-cache.ts b/src/correlate/branch-cache.ts index 0142724..1d55f23 100644 --- a/src/correlate/branch-cache.ts +++ b/src/correlate/branch-cache.ts @@ -1,8 +1,25 @@ import { getCurrentBranch, gitCommonDir } from "../git/correlate.js"; -/** AUR-276: cache git-branch lookups per workspace root for `TTL_MS`. - * `getCurrentBranch` shells out to `git`, which is cheap (~3–10 ms) but - * hot-path file_write events fire often enough that a tight cache pays. +/** AUR-276: cache git-branch + git-common-dir lookups per *worktree* + * (cwd) for `TTL_MS`. Both lookups shell out to `git`, which is cheap + * individually (~3–10 ms each) but hot-path file_write events fire + * often enough that a tight cache pays — the previous version called + * `gitCommonDir` *before* the cache check, which defeated the cache + * for the common-dir lookup on every event. + * + * Why the cache key is `cwd` (not `gitCommonDir(cwd)`): + * Linked worktrees of the same repo share a `.git` common-dir but + * point at different branches. Keying by common-dir would collapse + * them, so a write from worktree-A on `main` would poison the cache + * for a write from worktree-B on `feature` for the next 60 seconds — + * injecting wrong-branch attribution into exactly the telemetry data + * AUR-276 exists to measure. Branch is per-worktree; the cache must + * be per-worktree too. + * + * The returned `workspaceRoot` is still `gitCommonDir(cwd) ?? cwd`, + * so two worktrees of the same repo on the same branch DO collapse + * into one workspace from the matcher's point of view (which is what + * we want: same repo + same branch = same task). * * TTL is intentionally short (60 s): humans switch branches and then * immediately run an agent on the new branch — we want stale entries @@ -17,6 +34,7 @@ import { getCurrentBranch, gitCommonDir } from "../git/correlate.js"; const TTL_MS = 60_000; interface CacheEntry { + workspaceRoot: string | null; branch: string | null; refreshedMs: number; } @@ -24,22 +42,28 @@ interface CacheEntry { const cache = new Map(); interface BranchCacheDeps { - /** Override for tests. Defaults to the real git shell-out. */ - branchOf?: (root: string) => string | null; + /** Override for tests. Defaults to `getCurrentBranch` (shells out to git). */ + branchOf?: (cwd: string) => string | null; + /** Override for tests. Defaults to `gitCommonDir` (shells out to git). */ + commonDirOf?: (cwd: string) => string | null; /** Override for tests. Defaults to `Date.now`. */ now?: () => number; } export interface ResolvedWorkspace { /** Canonicalized workspace root (gitCommonDir-resolved if possible). - * `null` when the input cwd was null/empty or git couldn't resolve it. */ + * Two worktrees of the same repo collapse to the same value here — + * the matcher then gates on this + the per-worktree branch. `null` + * when the input cwd was null/empty or git couldn't resolve it. */ workspaceRoot: string | null; - /** Current branch at the workspace root, or `null` (see TTL note above). */ + /** Current branch at the *worktree* (cwd), not the common-dir, so + * sibling worktrees on different branches don't share a value. + * `null` for non-git, detached HEAD, or any git failure. */ gitBranch: string | null; } /** Resolve `(workspaceRoot, gitBranch)` for the given cwd, with a - * 60-second cache around the git invocation. Pure-data return; the + * 60-second cache around the git invocations. Pure-data return; the * caller decides what to do with nulls (the AUR-276 linker uses null * as a "do not match" gate). */ export function resolveWorkspace( @@ -47,17 +71,21 @@ export function resolveWorkspace( deps: BranchCacheDeps = {}, ): ResolvedWorkspace { if (!cwd) return { workspaceRoot: null, gitBranch: null }; - const root = gitCommonDir(cwd) ?? cwd; const branchOf = deps.branchOf ?? getCurrentBranch; + const commonDirOf = deps.commonDirOf ?? gitCommonDir; const now = deps.now ?? Date.now; - const cached = cache.get(root); const t = now(); + // Cache lookup BEFORE any subprocess. The previous version paid for + // gitCommonDir on every call — this version pays for nothing on hits. + const cached = cache.get(cwd); if (cached && t - cached.refreshedMs < TTL_MS) { - return { workspaceRoot: root, gitBranch: cached.branch }; + return { workspaceRoot: cached.workspaceRoot, gitBranch: cached.branch }; } - const branch = branchOf(root); - cache.set(root, { branch, refreshedMs: t }); - return { workspaceRoot: root, gitBranch: branch }; + // Miss: resolve both, cache once. + const workspaceRoot = commonDirOf(cwd) ?? cwd; + const branch = branchOf(cwd); + cache.set(cwd, { workspaceRoot, branch, refreshedMs: t }); + return { workspaceRoot, gitBranch: branch }; } /** Test-only: drop every cached entry. */ diff --git a/src/store/wire.test.ts b/src/store/wire.test.ts index 11315f7..b64f2aa 100644 --- a/src/store/wire.test.ts +++ b/src/store/wire.test.ts @@ -2,10 +2,13 @@ import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; +import type { ResolvedWorkspace } from "../correlate/branch-cache.js"; import type { AgentEvent, EventDetails, EventSink } from "../schema.js"; import { openStore, type EventStore } from "./sqlite.js"; import { wrapSinkWithLinks, wrapSinkWithStore } from "./wire.js"; +type FakeResolve = (cwd: string | null | undefined) => ResolvedWorkspace; + let dir: string; let store: EventStore; @@ -54,14 +57,26 @@ const fakeResolve = (cwd: string | null | undefined) => { return { workspaceRoot: cwd, gitBranch: "main" }; }; +/** Compose the production sink chain: linker over store wrapper over a + * recording inner. This is the only composition the tests should use — + * it exercises the real ordering invariant (codex review flagged a P1 + * where the previous tests pre-inserted via `store.insert` directly, + * masking a bug where `processWrite` ran before `inner.emit` and the + * workspace UPDATE hit a row that didn't exist yet). */ +function composeProductionSink( + resolve: FakeResolve = fakeResolve, +): { sink: ReturnType; emitted: AgentEvent[] } { + const recorded = recordingSink(); + const persistOnly = wrapSinkWithStore(recorded.sink, store); + const linked = wrapSinkWithLinks(persistOnly, store, { resolve }); + return { sink: linked, emitted: recorded.emitted }; +} + describe("wrapSinkWithLinks — pass-through behaviour", () => { - it("passes every emit + enrich straight through to the inner sink", () => { + it("passes every emit + enrich straight through to the inner chain", () => { const inner = recordingSink(); const linked = wrapSinkWithLinks(inner.sink, store, { resolve: fakeResolve }); const e = fakeWrite(); - // Insert the event via the store first so the sessions row exists for - // upsertSessionWorkspace to update. - store.insert(e); linked.emit(e); linked.enrich(e.id, { fullText: "hello" }); expect(inner.emitted).toHaveLength(1); @@ -70,54 +85,55 @@ describe("wrapSinkWithLinks — pass-through behaviour", () => { }); it("ignores non-write events (no DB write attempt)", () => { - const inner = recordingSink(); - const linked = wrapSinkWithLinks(inner.sink, store, { resolve: fakeResolve }); + const { sink } = composeProductionSink(); const e = fakeWrite({ type: "tool_call", path: undefined }); - store.insert(e); - linked.emit(e); + sink.emit(e); expect(store.countAllLinkCandidates()).toBe(0); expect(store.getSessionWorkspace(e.sessionId!).workspaceRoot).toBeNull(); }); it("ignores file_write events with no cwd in details", () => { - const inner = recordingSink(); - const linked = wrapSinkWithLinks(inner.sink, store, { resolve: fakeResolve }); + const { sink } = composeProductionSink(); const e = fakeWrite({ details: undefined }); - store.insert(e); - linked.emit(e); + sink.emit(e); expect(store.countAllLinkCandidates()).toBe(0); expect(store.getSessionWorkspace(e.sessionId!).workspaceRoot).toBeNull(); }); }); describe("wrapSinkWithLinks — workspace upsert", () => { - it("populates workspace_root + git_branch on the session row from cwd", () => { - const inner = recordingSink(); - const linked = wrapSinkWithLinks(inner.sink, store, { resolve: fakeResolve }); + it("populates workspace_root + git_branch on the FIRST file_write of a session", () => { + // CRITICAL regression: the previous test pre-inserted via store.insert + // before calling linked.emit, masking a bug where the linker ran the + // UPDATE before the events insert trigger had created the sessions + // row. Now the test uses the real production composition (linker over + // store wrapper) so a single emit() is the whole flow — and the bug + // would surface as workspaceRoot staying null. + const { sink } = composeProductionSink(); const e = fakeWrite({ sessionId: "sess-A", details: { cwd: "/repo" }, }); - store.insert(e); - linked.emit(e); - const ws = store.getSessionWorkspace("sess-A"); - expect(ws).toEqual({ workspaceRoot: "/repo", gitBranch: "main" }); + sink.emit(e); + expect(store.getSessionWorkspace("sess-A")).toEqual({ + workspaceRoot: "/repo", + gitBranch: "main", + }); }); it("first-write-wins — a later resolve doesn't overwrite a populated row", () => { - const inner = recordingSink(); - let nextRoot = "/repo"; + const recorded = recordingSink(); + const persistOnly = wrapSinkWithStore(recorded.sink, store); + let nextRoot: string | null = "/repo"; let nextBranch: string | null = "main"; - const linked = wrapSinkWithLinks(inner.sink, store, { + const linked = wrapSinkWithLinks(persistOnly, store, { resolve: () => ({ workspaceRoot: nextRoot, gitBranch: nextBranch }), }); const e1 = fakeWrite({ id: "e1", details: { cwd: "/repo" } }); - store.insert(e1); linked.emit(e1); nextRoot = "/elsewhere"; nextBranch = "feature"; const e2 = fakeWrite({ id: "e2", details: { cwd: "/elsewhere" } }); - store.insert(e2); // same session, so the same sessions row linked.emit(e2); expect(store.getSessionWorkspace("sess-A")).toEqual({ workspaceRoot: "/repo", @@ -128,8 +144,7 @@ describe("wrapSinkWithLinks — workspace upsert", () => { describe("wrapSinkWithLinks — candidate-pair recording", () => { it("records a candidate pair when two agents touch the same file in-window", () => { - const inner = recordingSink(); - const linked = wrapSinkWithLinks(inner.sink, store, { resolve: fakeResolve }); + const { sink } = composeProductionSink(); const t0 = new Date(2026, 0, 1, 12, 0, 0); const t1 = new Date(2026, 0, 1, 12, 5, 0); // 5 min later, in window const claude = fakeWrite({ @@ -146,10 +161,8 @@ describe("wrapSinkWithLinks — candidate-pair recording", () => { ts: t1.toISOString(), details: { cwd: "/repo" }, }); - store.insert(claude); - store.insert(openclaw); - linked.emit(claude); - linked.emit(openclaw); + sink.emit(claude); + sink.emit(openclaw); const candidates = store.listSessionLinkCandidates(); expect(candidates).toHaveLength(1); expect(candidates[0]).toMatchObject({ @@ -165,8 +178,7 @@ describe("wrapSinkWithLinks — candidate-pair recording", () => { }); it("bumps link_count + last_link_ts on a repeat hit", () => { - const inner = recordingSink(); - const linked = wrapSinkWithLinks(inner.sink, store, { resolve: fakeResolve }); + const { sink } = composeProductionSink(); const claude = fakeWrite({ id: "evt-claude", agent: "claude-code", @@ -188,12 +200,9 @@ describe("wrapSinkWithLinks — candidate-pair recording", () => { ts: new Date(2026, 0, 1, 12, 10, 0).toISOString(), details: { cwd: "/repo" }, }); - store.insert(claude); - store.insert(openclaw1); - store.insert(openclaw2); - linked.emit(claude); - linked.emit(openclaw1); - linked.emit(openclaw2); + sink.emit(claude); + sink.emit(openclaw1); + sink.emit(openclaw2); const candidates = store.listSessionLinkCandidates(); expect(candidates).toHaveLength(1); expect(candidates[0]?.linkCount).toBe(2); @@ -202,10 +211,10 @@ describe("wrapSinkWithLinks — candidate-pair recording", () => { }); it("does not record when resolve returns a null branch", () => { - const inner = recordingSink(); - const linked = wrapSinkWithLinks(inner.sink, store, { - resolve: (cwd) => ({ workspaceRoot: cwd ?? null, gitBranch: null }), - }); + const { sink } = composeProductionSink((cwd) => ({ + workspaceRoot: cwd ?? null, + gitBranch: null, + })); const claude = fakeWrite({ sessionId: "sess-claude", agent: "claude-code", @@ -216,10 +225,8 @@ describe("wrapSinkWithLinks — candidate-pair recording", () => { agent: "openclaw", details: { cwd: "/repo" }, }); - store.insert(claude); - store.insert(openclaw); - linked.emit(claude); - linked.emit(openclaw); + sink.emit(claude); + sink.emit(openclaw); expect(store.countAllLinkCandidates()).toBe(0); }); }); diff --git a/src/store/wire.ts b/src/store/wire.ts index ca2badb..b3d4448 100644 --- a/src/store/wire.ts +++ b/src/store/wire.ts @@ -77,6 +77,16 @@ export function wrapSinkWithLinks( let warned = false; return { emit: (event) => { + // CRITICAL: forward to inner FIRST. inner is wrapSinkWithStore in + // production, which runs store.insert and fires the AFTER-INSERT + // trigger that upserts the sessions row. processWrite then runs + // upsertSessionWorkspace against a row that exists. If we ran + // processWrite first, the very first file_write of every session + // would silently fail to populate workspace_root + git_branch + // (UPDATE on a missing row is a no-op) and short-lived sessions + // with only one write would be permanently null — exactly the + // telemetry data AUR-276 needs to collect. + inner.emit(event); try { if (isLinkableWrite(event)) processWrite(event, store, index, resolve); } catch (err) { @@ -87,7 +97,6 @@ export function wrapSinkWithLinks( ); } } - inner.emit(event); }, enrich: (eventId, patch) => inner.enrich(eventId, patch), };