From 0bdf4127b7faa33273194505cbfbba4d3caa5201 Mon Sep 17 00:00:00 2001 From: Chen <99816898+donteatfriedrice@users.noreply.github.com> Date: Wed, 15 Apr 2026 13:50:57 +0800 Subject: [PATCH] fix: M:N capture_connectors table for connector provenance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two long-standing bugs shared one root cause: the captures table treated connector ownership as a single FK + a single string in metadata, but connector→capture is fundamentally M:N (one Reddit post can be both saved and upvoted; HN hot/saved overlap; github-stars/notifications overlap on the same repo). Symptom A: every connector capture had source_id=1 (claude) due to a hardcoded workaround. The schema lied about origin. Symptom B: per-connector item counts oscillated across syncs. The single metadata.connectorId field was clobbered on every UPSERT, so whichever connector synced last "won" the shared item, and the loser's count dropped by one until it synced again. Fix: introduce capture_connectors(capture_id, connector_id) M:N table, add a generic 'connector' source row, drop dead idx_captures_source. Migration v3 backfills M:N from existing metadata.connectorId, strips the field, and repoints connector captures to the new source row. Six query sites (sync-engine upsert/delete, main uninstall + count, CLI reset + count, ACP prompt examples) updated to JOIN through M:N. --- packages/app/src/main/acp.ts | 4 +- packages/app/src/main/index.ts | 16 +++-- packages/cli/src/commands/connector-sync.ts | 13 ++-- packages/core/src/connectors/sync-engine.ts | 69 +++++++++++--------- packages/core/src/connectors/test-helpers.ts | 11 +++- packages/core/src/db/db.ts | 47 +++++++++++-- 6 files changed, 111 insertions(+), 49 deletions(-) diff --git a/packages/app/src/main/acp.ts b/packages/app/src/main/acp.ts index f998b52..330da73 100644 --- a/packages/app/src/main/acp.ts +++ b/packages/app/src/main/acp.ts @@ -899,7 +899,7 @@ export class AcpManager { ' sqlite3 ~/.spool/spool.db "SELECT c.title, c.author, c.url, c.content_text, c.platform, c.captured_at FROM captures_fts f JOIN captures c ON c.id = f.rowid WHERE captures_fts MATCH \'search terms\' ORDER BY rank LIMIT 10"', '', ' # List captures for a specific connector', - ' sqlite3 ~/.spool/spool.db "SELECT c.title, c.author, c.url, c.content_text, c.captured_at, c.platform FROM captures c WHERE json_extract(c.metadata, \'$.connectorId\') = \'twitter-bookmarks\' ORDER BY c.captured_at DESC LIMIT 20"', + ' sqlite3 ~/.spool/spool.db "SELECT c.title, c.author, c.url, c.content_text, c.captured_at, c.platform FROM captures c JOIN capture_connectors cc ON cc.capture_id = c.id WHERE cc.connector_id = \'twitter-bookmarks\' ORDER BY c.captured_at DESC LIMIT 20"', '', ' # What connectors are enabled', ' sqlite3 ~/.spool/spool.db "SELECT connector_id, total_synced, last_forward_sync_at FROM connector_sync_state WHERE enabled = 1"', @@ -909,7 +909,7 @@ export class AcpManager { '- For questions about bookmarks, saved content, or web platforms → query captures/captures_fts.', '- For questions about coding sessions, projects, or what the user built → query messages/sessions.', '- Treat source boundaries as part of the query semantics, not as an implementation detail.', - '- For connector captures, use `json_extract(metadata, \'$.connectorId\')` to distinguish different connectors for the same platform.', + '- For connector captures, JOIN `capture_connectors cc ON cc.capture_id = c.id` and filter by `cc.connector_id` to distinguish different connectors for the same platform. One capture can belong to multiple connectors.', '- If the user names a source, only return results from that source unless they explicitly ask for cross-source search.', '- For cross-source questions, first identify the relevant sources, then query each source separately, confirm hits or no-hits per source, and only then merge them into one answer.', '- For temporal queries ("what did I do recently"), use explicit date filters and be conservative when comparing times across different sources.', diff --git a/packages/app/src/main/index.ts b/packages/app/src/main/index.ts index af78384..2da1197 100644 --- a/packages/app/src/main/index.ts +++ b/packages/app/src/main/index.ts @@ -926,9 +926,15 @@ ipcMain.handle('connector:uninstall', (_e, { id }: { id: string }) => { for (const sib of siblings) { tryRun(() => db.prepare('DELETE FROM connector_sync_state WHERE connector_id = ?').run(sib.connectorId), `sync state for ${sib.connectorId}`) tryRun( - () => db.prepare("DELETE FROM captures WHERE json_extract(metadata, '$.connectorId') = ?").run(sib.connectorId), + () => { + db.prepare('DELETE FROM capture_connectors WHERE connector_id = ?').run(sib.connectorId) + db.prepare(` + DELETE FROM captures + WHERE source_id = (SELECT id FROM sources WHERE name = 'connector') + AND NOT EXISTS (SELECT 1 FROM capture_connectors WHERE capture_id = captures.id) + `).run() + }, `captures for ${sib.connectorId}`, - () => db.prepare('DELETE FROM captures WHERE platform = ?').run(sib.platform), ) } @@ -982,10 +988,10 @@ ipcMain.handle('connector:update', async (_e, { id }: { id: string }) => { }) ipcMain.handle('connector:get-capture-count', (_e, { connectorId }: { connectorId: string }) => { - const connector = connectorRegistry.get(connectorId) + connectorRegistry.get(connectorId) const row = db.prepare( - "SELECT COUNT(*) as cnt FROM captures WHERE platform = ? AND json_extract(metadata, '$.connectorId') = ?", - ).get(connector.platform, connectorId) as { cnt: number } + 'SELECT COUNT(*) as cnt FROM capture_connectors WHERE connector_id = ?', + ).get(connectorId) as { cnt: number } return row.cnt }) diff --git a/packages/cli/src/commands/connector-sync.ts b/packages/cli/src/commands/connector-sync.ts index e029e90..5647a91 100644 --- a/packages/cli/src/commands/connector-sync.ts +++ b/packages/cli/src/commands/connector-sync.ts @@ -61,9 +61,12 @@ export const connectorSyncCommand = new Command('connector-sync') // Reset if requested if (opts.reset) { console.log(`Resetting ${connectorId}...`) - db.prepare( - `DELETE FROM captures WHERE json_extract(metadata, '$.connectorId') = ?`, - ).run(connectorId) + db.prepare('DELETE FROM capture_connectors WHERE connector_id = ?').run(connectorId) + db.prepare(` + DELETE FROM captures + WHERE source_id = (SELECT id FROM sources WHERE name = 'connector') + AND NOT EXISTS (SELECT 1 FROM capture_connectors WHERE capture_id = captures.id) + `).run() db.prepare('DELETE FROM connector_sync_state WHERE connector_id = ?').run(connectorId) console.log('Data cleared.') } @@ -101,8 +104,8 @@ export const connectorSyncCommand = new Command('connector-sync') // Final count from DB const row = db.prepare( - `SELECT COUNT(*) as cnt FROM captures WHERE platform = ? AND json_extract(metadata, '$.connectorId') = ?`, - ).get(connector.platform, connectorId) as { cnt: number } + 'SELECT COUNT(*) as cnt FROM capture_connectors WHERE connector_id = ?', + ).get(connectorId) as { cnt: number } console.log(`Done.`) console.log(` stop reason: ${result.stopReason}`) diff --git a/packages/core/src/connectors/sync-engine.ts b/packages/core/src/connectors/sync-engine.ts index 59f6938..71602c6 100644 --- a/packages/core/src/connectors/sync-engine.ts +++ b/packages/core/src/connectors/sync-engine.ts @@ -70,12 +70,6 @@ function interruptibleSleep(ms: number, cancel: CancelSignal): Effect.Effect)['connectorId'] = connectorId - } -} - const nowIso: Effect.Effect = Effect.map( Clock.currentTimeMillis, (ms) => new Date(ms).toISOString(), @@ -171,6 +165,7 @@ interface UpsertResult { function upsertItems( db: Database.Database, sourceId: number, + connectorId: string, items: CapturedItem[], ): UpsertResult { let newCount = 0 @@ -192,8 +187,13 @@ function upsertItems( metadata, captured_at, raw_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `) + const linkStmt = db.prepare( + 'INSERT OR IGNORE INTO capture_connectors (capture_id, connector_id) VALUES (?, ?)', + ) for (const item of items) { + let captureId: number | bigint | null = null + if (item.platformId) { const existing = checkStmt.get(item.platform, item.platformId) as | { id: number } @@ -204,43 +204,52 @@ function upsertItems( JSON.stringify(item.metadata), item.capturedAt, item.rawJson, item.thumbnailUrl, existing.id, ) + captureId = existing.id updatedCount++ - continue } } - insertStmt.run( - sourceId, randomUUID(), item.url, item.title, item.contentText, - item.author, item.platform, item.platformId, item.contentType, - item.thumbnailUrl, JSON.stringify(item.metadata), item.capturedAt, - item.rawJson, - ) - newCount++ + if (captureId === null) { + const info = insertStmt.run( + sourceId, randomUUID(), item.url, item.title, item.contentText, + item.author, item.platform, item.platformId, item.contentType, + item.thumbnailUrl, JSON.stringify(item.metadata), item.capturedAt, + item.rawJson, + ) + captureId = info.lastInsertRowid + newCount++ + } + + linkStmt.run(captureId, connectorId) } return { newCount, updatedCount } } function deleteConnectorItems(db: Database.Database, connectorId: string): void { - // connectorId format is e.g. 'twitter-bookmarks', platform is 'twitter' - // We need the connector to know which platform+content_type to delete. - // For now, delete by matching connector_id pattern in metadata or by platform. - // Since we store connector_id in captures metadata, let's use a convention: - // captures from connectors have metadata.connectorId set. - db.prepare( - `DELETE FROM captures WHERE json_extract(metadata, '$.connectorId') = ?`, - ).run(connectorId) + // 1. Drop this connector's M:N claims. + db.prepare('DELETE FROM capture_connectors WHERE connector_id = ?').run(connectorId) + // 2. Delete captures that belonged to this connector and have no other + // connector attribution left. Scoped to source='connector' so we never + // touch session-world captures. + db.prepare(` + DELETE FROM captures + WHERE source_id = (SELECT id FROM sources WHERE name = 'connector') + AND NOT EXISTS ( + SELECT 1 FROM capture_connectors WHERE capture_id = captures.id + ) + `).run() } // ── Sync Engine ───────────────────────────────────────────────────────────── function getSourceId(db: Database.Database): number { - // All connector items share the 'claude' source_id for the FK constraint. - // The connector_id in metadata and connector_sync_state table distinguish them. - const row = db.prepare("SELECT id FROM sources WHERE name = 'claude'").get() as + // Connector captures all share a single generic 'connector' source row. + // Per-connector attribution lives in the capture_connectors M:N table. + const row = db.prepare("SELECT id FROM sources WHERE name = 'connector'").get() as | { id: number } | undefined - if (!row) throw new Error("Source 'claude' not found in DB") + if (!row) throw new Error("Source 'connector' not found in DB") return row.id } @@ -447,10 +456,8 @@ export class SyncEngine { } } - tagConnectorId(result.items, connector.id) - const { newCount } = yield* Effect.sync(() => - db.transaction(() => upsertItems(db, sourceId, result.items))(), + db.transaction(() => upsertItems(db, sourceId, connector.id, result.items))(), ).pipe( Effect.withSpan('sync.upsert', { attributes: { 'items.count': result.items.length }, @@ -738,10 +745,8 @@ export class SyncEngine { const result = outcome.right totalPages++ - tagConnectorId(result.items, connector.id) - const { newCount } = yield* Effect.sync(() => - db.transaction(() => upsertItems(db, sourceId, result.items))(), + db.transaction(() => upsertItems(db, sourceId, connector.id, result.items))(), ).pipe( Effect.withSpan('sync.upsert', { attributes: { 'items.count': result.items.length }, diff --git a/packages/core/src/connectors/test-helpers.ts b/packages/core/src/connectors/test-helpers.ts index d1e63f5..5c7a78c 100644 --- a/packages/core/src/connectors/test-helpers.ts +++ b/packages/core/src/connectors/test-helpers.ts @@ -15,7 +15,9 @@ export function createTestDB(): InstanceType { base_path TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); - INSERT INTO sources (name, base_path) VALUES ('claude', '~/.claude/projects'); + INSERT INTO sources (name, base_path) VALUES + ('claude', '~/.claude/projects'), + ('connector', ''); CREATE TABLE captures ( id INTEGER PRIMARY KEY, @@ -35,6 +37,13 @@ export function createTestDB(): InstanceType { raw_json TEXT ); + CREATE TABLE capture_connectors ( + capture_id INTEGER NOT NULL REFERENCES captures(id) ON DELETE CASCADE, + connector_id TEXT NOT NULL, + PRIMARY KEY (capture_id, connector_id) + ); + CREATE INDEX idx_capture_connectors_connector ON capture_connectors(connector_id); + CREATE TABLE connector_sync_state ( connector_id TEXT PRIMARY KEY, head_cursor TEXT, diff --git a/packages/core/src/db/db.ts b/packages/core/src/db/db.ts index a8fddc3..dcff1de 100644 --- a/packages/core/src/db/db.ts +++ b/packages/core/src/db/db.ts @@ -38,9 +38,10 @@ function runMigrations(db: Database.Database): void { ); INSERT OR IGNORE INTO sources (name, base_path) VALUES - ('claude', '~/.claude/projects'), - ('codex', '~/.codex/sessions'), - ('gemini', '~/.gemini/tmp'); + ('claude', '~/.claude/projects'), + ('codex', '~/.codex/sessions'), + ('gemini', '~/.gemini/tmp'), + ('connector', ''); CREATE TABLE IF NOT EXISTS projects ( id INTEGER PRIMARY KEY, @@ -159,7 +160,6 @@ function runMigrations(db: Database.Database): void { raw_json TEXT ); - CREATE INDEX IF NOT EXISTS idx_captures_source ON captures(source_id); CREATE INDEX IF NOT EXISTS idx_captures_platform ON captures(platform); CREATE INDEX IF NOT EXISTS idx_captures_url ON captures(url); CREATE INDEX IF NOT EXISTS idx_captures_captured ON captures(captured_at DESC); @@ -182,6 +182,15 @@ function runMigrations(db: Database.Database): void { -- ── Connector sync state ──────────────────────────────────────────────── + CREATE TABLE IF NOT EXISTS capture_connectors ( + capture_id INTEGER NOT NULL REFERENCES captures(id) ON DELETE CASCADE, + connector_id TEXT NOT NULL, + PRIMARY KEY (capture_id, connector_id) + ); + + CREATE INDEX IF NOT EXISTS idx_capture_connectors_connector + ON capture_connectors(connector_id); + CREATE TABLE IF NOT EXISTS connector_sync_state ( connector_id TEXT PRIMARY KEY, head_cursor TEXT, @@ -298,6 +307,36 @@ function runMigrations(db: Database.Database): void { db.pragma('user_version = 2') } + if (version < 3) { + // v3: migrate connector provenance from metadata.connectorId (single-valued, + // clobbered on UPSERT) to the M:N capture_connectors table, and stop + // claiming connector captures came from source_id=claude. + db.transaction(() => { + // Backfill M:N from existing metadata.connectorId. + db.exec(` + INSERT OR IGNORE INTO capture_connectors (capture_id, connector_id) + SELECT id, json_extract(metadata, '$.connectorId') + FROM captures + WHERE json_extract(metadata, '$.connectorId') IS NOT NULL + `) + // Strip the now-redundant field from metadata. + db.exec(` + UPDATE captures + SET metadata = json_remove(metadata, '$.connectorId') + WHERE json_extract(metadata, '$.connectorId') IS NOT NULL + `) + // Point connector captures at the 'connector' source row instead of claude. + db.exec(` + UPDATE captures + SET source_id = (SELECT id FROM sources WHERE name = 'connector') + WHERE id IN (SELECT capture_id FROM capture_connectors) + `) + // idx_captures_source was never used by any query — drop it. + db.exec(`DROP INDEX IF EXISTS idx_captures_source`) + })() + db.pragma('user_version = 3') + } + rebuildFtsTableIfEmpty(db, 'messages', 'messages_fts_trigram') rebuildFtsTableIfEmpty(db, 'captures', 'captures_fts_trigram') rebuildFtsTableIfEmpty(db, 'session_search', 'session_search_fts')