diff --git a/packages/app/src/main/acp.ts b/packages/app/src/main/acp.ts index f998b52..330da73 100644 --- a/packages/app/src/main/acp.ts +++ b/packages/app/src/main/acp.ts @@ -899,7 +899,7 @@ export class AcpManager { ' sqlite3 ~/.spool/spool.db "SELECT c.title, c.author, c.url, c.content_text, c.platform, c.captured_at FROM captures_fts f JOIN captures c ON c.id = f.rowid WHERE captures_fts MATCH \'search terms\' ORDER BY rank LIMIT 10"', '', ' # List captures for a specific connector', - ' sqlite3 ~/.spool/spool.db "SELECT c.title, c.author, c.url, c.content_text, c.captured_at, c.platform FROM captures c WHERE json_extract(c.metadata, \'$.connectorId\') = \'twitter-bookmarks\' ORDER BY c.captured_at DESC LIMIT 20"', + ' sqlite3 ~/.spool/spool.db "SELECT c.title, c.author, c.url, c.content_text, c.captured_at, c.platform FROM captures c JOIN capture_connectors cc ON cc.capture_id = c.id WHERE cc.connector_id = \'twitter-bookmarks\' ORDER BY c.captured_at DESC LIMIT 20"', '', ' # What connectors are enabled', ' sqlite3 ~/.spool/spool.db "SELECT connector_id, total_synced, last_forward_sync_at FROM connector_sync_state WHERE enabled = 1"', @@ -909,7 +909,7 @@ export class AcpManager { '- For questions about bookmarks, saved content, or web platforms → query captures/captures_fts.', '- For questions about coding sessions, projects, or what the user built → query messages/sessions.', '- Treat source boundaries as part of the query semantics, not as an implementation detail.', - '- For connector captures, use `json_extract(metadata, \'$.connectorId\')` to distinguish different connectors for the same platform.', + '- For connector captures, JOIN `capture_connectors cc ON cc.capture_id = c.id` and filter by `cc.connector_id` to distinguish different connectors for the same platform. One capture can belong to multiple connectors.', '- If the user names a source, only return results from that source unless they explicitly ask for cross-source search.', '- For cross-source questions, first identify the relevant sources, then query each source separately, confirm hits or no-hits per source, and only then merge them into one answer.', '- For temporal queries ("what did I do recently"), use explicit date filters and be conservative when comparing times across different sources.', diff --git a/packages/app/src/main/index.ts b/packages/app/src/main/index.ts index af78384..2da1197 100644 --- a/packages/app/src/main/index.ts +++ b/packages/app/src/main/index.ts @@ -926,9 +926,15 @@ ipcMain.handle('connector:uninstall', (_e, { id }: { id: string }) => { for (const sib of siblings) { tryRun(() => db.prepare('DELETE FROM connector_sync_state WHERE connector_id = ?').run(sib.connectorId), `sync state for ${sib.connectorId}`) tryRun( - () => db.prepare("DELETE FROM captures WHERE json_extract(metadata, '$.connectorId') = ?").run(sib.connectorId), + () => { + db.prepare('DELETE FROM capture_connectors WHERE connector_id = ?').run(sib.connectorId) + db.prepare(` + DELETE FROM captures + WHERE source_id = (SELECT id FROM sources WHERE name = 'connector') + AND NOT EXISTS (SELECT 1 FROM capture_connectors WHERE capture_id = captures.id) + `).run() + }, `captures for ${sib.connectorId}`, - () => db.prepare('DELETE FROM captures WHERE platform = ?').run(sib.platform), ) } @@ -982,10 +988,10 @@ ipcMain.handle('connector:update', async (_e, { id }: { id: string }) => { }) ipcMain.handle('connector:get-capture-count', (_e, { connectorId }: { connectorId: string }) => { - const connector = connectorRegistry.get(connectorId) + connectorRegistry.get(connectorId) const row = db.prepare( - "SELECT COUNT(*) as cnt FROM captures WHERE platform = ? AND json_extract(metadata, '$.connectorId') = ?", - ).get(connector.platform, connectorId) as { cnt: number } + 'SELECT COUNT(*) as cnt FROM capture_connectors WHERE connector_id = ?', + ).get(connectorId) as { cnt: number } return row.cnt }) diff --git a/packages/cli/src/commands/connector-sync.ts b/packages/cli/src/commands/connector-sync.ts index e029e90..5647a91 100644 --- a/packages/cli/src/commands/connector-sync.ts +++ b/packages/cli/src/commands/connector-sync.ts @@ -61,9 +61,12 @@ export const connectorSyncCommand = new Command('connector-sync') // Reset if requested if (opts.reset) { console.log(`Resetting ${connectorId}...`) - db.prepare( - `DELETE FROM captures WHERE json_extract(metadata, '$.connectorId') = ?`, - ).run(connectorId) + db.prepare('DELETE FROM capture_connectors WHERE connector_id = ?').run(connectorId) + db.prepare(` + DELETE FROM captures + WHERE source_id = (SELECT id FROM sources WHERE name = 'connector') + AND NOT EXISTS (SELECT 1 FROM capture_connectors WHERE capture_id = captures.id) + `).run() db.prepare('DELETE FROM connector_sync_state WHERE connector_id = ?').run(connectorId) console.log('Data cleared.') } @@ -101,8 +104,8 @@ export const connectorSyncCommand = new Command('connector-sync') // Final count from DB const row = db.prepare( - `SELECT COUNT(*) as cnt FROM captures WHERE platform = ? AND json_extract(metadata, '$.connectorId') = ?`, - ).get(connector.platform, connectorId) as { cnt: number } + 'SELECT COUNT(*) as cnt FROM capture_connectors WHERE connector_id = ?', + ).get(connectorId) as { cnt: number } console.log(`Done.`) console.log(` stop reason: ${result.stopReason}`) diff --git a/packages/core/src/connectors/sync-engine.ts b/packages/core/src/connectors/sync-engine.ts index 59f6938..71602c6 100644 --- a/packages/core/src/connectors/sync-engine.ts +++ b/packages/core/src/connectors/sync-engine.ts @@ -70,12 +70,6 @@ function interruptibleSleep(ms: number, cancel: CancelSignal): Effect.Effect)['connectorId'] = connectorId - } -} - const nowIso: Effect.Effect = Effect.map( Clock.currentTimeMillis, (ms) => new Date(ms).toISOString(), @@ -171,6 +165,7 @@ interface UpsertResult { function upsertItems( db: Database.Database, sourceId: number, + connectorId: string, items: CapturedItem[], ): UpsertResult { let newCount = 0 @@ -192,8 +187,13 @@ function upsertItems( metadata, captured_at, raw_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `) + const linkStmt = db.prepare( + 'INSERT OR IGNORE INTO capture_connectors (capture_id, connector_id) VALUES (?, ?)', + ) for (const item of items) { + let captureId: number | bigint | null = null + if (item.platformId) { const existing = checkStmt.get(item.platform, item.platformId) as | { id: number } @@ -204,43 +204,52 @@ function upsertItems( JSON.stringify(item.metadata), item.capturedAt, item.rawJson, item.thumbnailUrl, existing.id, ) + captureId = existing.id updatedCount++ - continue } } - insertStmt.run( - sourceId, randomUUID(), item.url, item.title, item.contentText, - item.author, item.platform, item.platformId, item.contentType, - item.thumbnailUrl, JSON.stringify(item.metadata), item.capturedAt, - item.rawJson, - ) - newCount++ + if (captureId === null) { + const info = insertStmt.run( + sourceId, randomUUID(), item.url, item.title, item.contentText, + item.author, item.platform, item.platformId, item.contentType, + item.thumbnailUrl, JSON.stringify(item.metadata), item.capturedAt, + item.rawJson, + ) + captureId = info.lastInsertRowid + newCount++ + } + + linkStmt.run(captureId, connectorId) } return { newCount, updatedCount } } function deleteConnectorItems(db: Database.Database, connectorId: string): void { - // connectorId format is e.g. 'twitter-bookmarks', platform is 'twitter' - // We need the connector to know which platform+content_type to delete. - // For now, delete by matching connector_id pattern in metadata or by platform. - // Since we store connector_id in captures metadata, let's use a convention: - // captures from connectors have metadata.connectorId set. - db.prepare( - `DELETE FROM captures WHERE json_extract(metadata, '$.connectorId') = ?`, - ).run(connectorId) + // 1. Drop this connector's M:N claims. + db.prepare('DELETE FROM capture_connectors WHERE connector_id = ?').run(connectorId) + // 2. Delete captures that belonged to this connector and have no other + // connector attribution left. Scoped to source='connector' so we never + // touch session-world captures. + db.prepare(` + DELETE FROM captures + WHERE source_id = (SELECT id FROM sources WHERE name = 'connector') + AND NOT EXISTS ( + SELECT 1 FROM capture_connectors WHERE capture_id = captures.id + ) + `).run() } // ── Sync Engine ───────────────────────────────────────────────────────────── function getSourceId(db: Database.Database): number { - // All connector items share the 'claude' source_id for the FK constraint. - // The connector_id in metadata and connector_sync_state table distinguish them. - const row = db.prepare("SELECT id FROM sources WHERE name = 'claude'").get() as + // Connector captures all share a single generic 'connector' source row. + // Per-connector attribution lives in the capture_connectors M:N table. + const row = db.prepare("SELECT id FROM sources WHERE name = 'connector'").get() as | { id: number } | undefined - if (!row) throw new Error("Source 'claude' not found in DB") + if (!row) throw new Error("Source 'connector' not found in DB") return row.id } @@ -447,10 +456,8 @@ export class SyncEngine { } } - tagConnectorId(result.items, connector.id) - const { newCount } = yield* Effect.sync(() => - db.transaction(() => upsertItems(db, sourceId, result.items))(), + db.transaction(() => upsertItems(db, sourceId, connector.id, result.items))(), ).pipe( Effect.withSpan('sync.upsert', { attributes: { 'items.count': result.items.length }, @@ -738,10 +745,8 @@ export class SyncEngine { const result = outcome.right totalPages++ - tagConnectorId(result.items, connector.id) - const { newCount } = yield* Effect.sync(() => - db.transaction(() => upsertItems(db, sourceId, result.items))(), + db.transaction(() => upsertItems(db, sourceId, connector.id, result.items))(), ).pipe( Effect.withSpan('sync.upsert', { attributes: { 'items.count': result.items.length }, diff --git a/packages/core/src/connectors/test-helpers.ts b/packages/core/src/connectors/test-helpers.ts index d1e63f5..5c7a78c 100644 --- a/packages/core/src/connectors/test-helpers.ts +++ b/packages/core/src/connectors/test-helpers.ts @@ -15,7 +15,9 @@ export function createTestDB(): InstanceType { base_path TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); - INSERT INTO sources (name, base_path) VALUES ('claude', '~/.claude/projects'); + INSERT INTO sources (name, base_path) VALUES + ('claude', '~/.claude/projects'), + ('connector', ''); CREATE TABLE captures ( id INTEGER PRIMARY KEY, @@ -35,6 +37,13 @@ export function createTestDB(): InstanceType { raw_json TEXT ); + CREATE TABLE capture_connectors ( + capture_id INTEGER NOT NULL REFERENCES captures(id) ON DELETE CASCADE, + connector_id TEXT NOT NULL, + PRIMARY KEY (capture_id, connector_id) + ); + CREATE INDEX idx_capture_connectors_connector ON capture_connectors(connector_id); + CREATE TABLE connector_sync_state ( connector_id TEXT PRIMARY KEY, head_cursor TEXT, diff --git a/packages/core/src/db/db.ts b/packages/core/src/db/db.ts index a8fddc3..dcff1de 100644 --- a/packages/core/src/db/db.ts +++ b/packages/core/src/db/db.ts @@ -38,9 +38,10 @@ function runMigrations(db: Database.Database): void { ); INSERT OR IGNORE INTO sources (name, base_path) VALUES - ('claude', '~/.claude/projects'), - ('codex', '~/.codex/sessions'), - ('gemini', '~/.gemini/tmp'); + ('claude', '~/.claude/projects'), + ('codex', '~/.codex/sessions'), + ('gemini', '~/.gemini/tmp'), + ('connector', ''); CREATE TABLE IF NOT EXISTS projects ( id INTEGER PRIMARY KEY, @@ -159,7 +160,6 @@ function runMigrations(db: Database.Database): void { raw_json TEXT ); - CREATE INDEX IF NOT EXISTS idx_captures_source ON captures(source_id); CREATE INDEX IF NOT EXISTS idx_captures_platform ON captures(platform); CREATE INDEX IF NOT EXISTS idx_captures_url ON captures(url); CREATE INDEX IF NOT EXISTS idx_captures_captured ON captures(captured_at DESC); @@ -182,6 +182,15 @@ function runMigrations(db: Database.Database): void { -- ── Connector sync state ──────────────────────────────────────────────── + CREATE TABLE IF NOT EXISTS capture_connectors ( + capture_id INTEGER NOT NULL REFERENCES captures(id) ON DELETE CASCADE, + connector_id TEXT NOT NULL, + PRIMARY KEY (capture_id, connector_id) + ); + + CREATE INDEX IF NOT EXISTS idx_capture_connectors_connector + ON capture_connectors(connector_id); + CREATE TABLE IF NOT EXISTS connector_sync_state ( connector_id TEXT PRIMARY KEY, head_cursor TEXT, @@ -298,6 +307,36 @@ function runMigrations(db: Database.Database): void { db.pragma('user_version = 2') } + if (version < 3) { + // v3: migrate connector provenance from metadata.connectorId (single-valued, + // clobbered on UPSERT) to the M:N capture_connectors table, and stop + // claiming connector captures came from source_id=claude. + db.transaction(() => { + // Backfill M:N from existing metadata.connectorId. + db.exec(` + INSERT OR IGNORE INTO capture_connectors (capture_id, connector_id) + SELECT id, json_extract(metadata, '$.connectorId') + FROM captures + WHERE json_extract(metadata, '$.connectorId') IS NOT NULL + `) + // Strip the now-redundant field from metadata. + db.exec(` + UPDATE captures + SET metadata = json_remove(metadata, '$.connectorId') + WHERE json_extract(metadata, '$.connectorId') IS NOT NULL + `) + // Point connector captures at the 'connector' source row instead of claude. + db.exec(` + UPDATE captures + SET source_id = (SELECT id FROM sources WHERE name = 'connector') + WHERE id IN (SELECT capture_id FROM capture_connectors) + `) + // idx_captures_source was never used by any query — drop it. + db.exec(`DROP INDEX IF EXISTS idx_captures_source`) + })() + db.pragma('user_version = 3') + } + rebuildFtsTableIfEmpty(db, 'messages', 'messages_fts_trigram') rebuildFtsTableIfEmpty(db, 'captures', 'captures_fts_trigram') rebuildFtsTableIfEmpty(db, 'session_search', 'session_search_fts')