Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/app/src/main/acp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ export class AcpManager {
' sqlite3 ~/.spool/spool.db "SELECT c.title, c.author, c.url, c.content_text, c.platform, c.captured_at FROM captures_fts f JOIN captures c ON c.id = f.rowid WHERE captures_fts MATCH \'search terms\' ORDER BY rank LIMIT 10"',
'',
' # List captures for a specific connector',
' sqlite3 ~/.spool/spool.db "SELECT c.title, c.author, c.url, c.content_text, c.captured_at, c.platform FROM captures c WHERE json_extract(c.metadata, \'$.connectorId\') = \'twitter-bookmarks\' ORDER BY c.captured_at DESC LIMIT 20"',
' sqlite3 ~/.spool/spool.db "SELECT c.title, c.author, c.url, c.content_text, c.captured_at, c.platform FROM captures c JOIN capture_connectors cc ON cc.capture_id = c.id WHERE cc.connector_id = \'twitter-bookmarks\' ORDER BY c.captured_at DESC LIMIT 20"',
'',
' # What connectors are enabled',
' sqlite3 ~/.spool/spool.db "SELECT connector_id, total_synced, last_forward_sync_at FROM connector_sync_state WHERE enabled = 1"',
Expand All @@ -909,7 +909,7 @@ export class AcpManager {
'- For questions about bookmarks, saved content, or web platforms → query captures/captures_fts.',
'- For questions about coding sessions, projects, or what the user built → query messages/sessions.',
'- Treat source boundaries as part of the query semantics, not as an implementation detail.',
'- For connector captures, use `json_extract(metadata, \'$.connectorId\')` to distinguish different connectors for the same platform.',
'- For connector captures, JOIN `capture_connectors cc ON cc.capture_id = c.id` and filter by `cc.connector_id` to distinguish different connectors for the same platform. One capture can belong to multiple connectors.',
'- If the user names a source, only return results from that source unless they explicitly ask for cross-source search.',
'- For cross-source questions, first identify the relevant sources, then query each source separately, confirm hits or no-hits per source, and only then merge them into one answer.',
'- For temporal queries ("what did I do recently"), use explicit date filters and be conservative when comparing times across different sources.',
Expand Down
16 changes: 11 additions & 5 deletions packages/app/src/main/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -926,9 +926,15 @@ ipcMain.handle('connector:uninstall', (_e, { id }: { id: string }) => {
for (const sib of siblings) {
tryRun(() => db.prepare('DELETE FROM connector_sync_state WHERE connector_id = ?').run(sib.connectorId), `sync state for ${sib.connectorId}`)
tryRun(
() => db.prepare("DELETE FROM captures WHERE json_extract(metadata, '$.connectorId') = ?").run(sib.connectorId),
() => {
db.prepare('DELETE FROM capture_connectors WHERE connector_id = ?').run(sib.connectorId)
db.prepare(`
DELETE FROM captures
WHERE source_id = (SELECT id FROM sources WHERE name = 'connector')
AND NOT EXISTS (SELECT 1 FROM capture_connectors WHERE capture_id = captures.id)
`).run()
},
`captures for ${sib.connectorId}`,
() => db.prepare('DELETE FROM captures WHERE platform = ?').run(sib.platform),
)
}

Expand Down Expand Up @@ -982,10 +988,10 @@ ipcMain.handle('connector:update', async (_e, { id }: { id: string }) => {
})

ipcMain.handle('connector:get-capture-count', (_e, { connectorId }: { connectorId: string }) => {
const connector = connectorRegistry.get(connectorId)
connectorRegistry.get(connectorId)
const row = db.prepare(
"SELECT COUNT(*) as cnt FROM captures WHERE platform = ? AND json_extract(metadata, '$.connectorId') = ?",
).get(connector.platform, connectorId) as { cnt: number }
'SELECT COUNT(*) as cnt FROM capture_connectors WHERE connector_id = ?',
).get(connectorId) as { cnt: number }
return row.cnt
})

Expand Down
13 changes: 8 additions & 5 deletions packages/cli/src/commands/connector-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ export const connectorSyncCommand = new Command('connector-sync')
// Reset if requested
if (opts.reset) {
console.log(`Resetting ${connectorId}...`)
db.prepare(
`DELETE FROM captures WHERE json_extract(metadata, '$.connectorId') = ?`,
).run(connectorId)
db.prepare('DELETE FROM capture_connectors WHERE connector_id = ?').run(connectorId)
db.prepare(`
DELETE FROM captures
WHERE source_id = (SELECT id FROM sources WHERE name = 'connector')
AND NOT EXISTS (SELECT 1 FROM capture_connectors WHERE capture_id = captures.id)
`).run()
db.prepare('DELETE FROM connector_sync_state WHERE connector_id = ?').run(connectorId)
console.log('Data cleared.')
}
Expand Down Expand Up @@ -101,8 +104,8 @@ export const connectorSyncCommand = new Command('connector-sync')

// Final count from DB
const row = db.prepare(
`SELECT COUNT(*) as cnt FROM captures WHERE platform = ? AND json_extract(metadata, '$.connectorId') = ?`,
).get(connector.platform, connectorId) as { cnt: number }
'SELECT COUNT(*) as cnt FROM capture_connectors WHERE connector_id = ?',
).get(connectorId) as { cnt: number }

console.log(`Done.`)
console.log(` stop reason: ${result.stopReason}`)
Expand Down
69 changes: 37 additions & 32 deletions packages/core/src/connectors/sync-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,6 @@ function interruptibleSleep(ms: number, cancel: CancelSignal): Effect.Effect<voi
).pipe(Effect.asVoid)
}

function tagConnectorId(items: CapturedItem[], connectorId: string): void {
for (const item of items) {
(item.metadata as Record<string, unknown>)['connectorId'] = connectorId
}
}

const nowIso: Effect.Effect<string> = Effect.map(
Clock.currentTimeMillis,
(ms) => new Date(ms).toISOString(),
Expand Down Expand Up @@ -171,6 +165,7 @@ interface UpsertResult {
function upsertItems(
db: Database.Database,
sourceId: number,
connectorId: string,
items: CapturedItem[],
): UpsertResult {
let newCount = 0
Expand All @@ -192,8 +187,13 @@ function upsertItems(
metadata, captured_at, raw_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
const linkStmt = db.prepare(
'INSERT OR IGNORE INTO capture_connectors (capture_id, connector_id) VALUES (?, ?)',
)

for (const item of items) {
let captureId: number | bigint | null = null

if (item.platformId) {
const existing = checkStmt.get(item.platform, item.platformId) as
| { id: number }
Expand All @@ -204,43 +204,52 @@ function upsertItems(
JSON.stringify(item.metadata), item.capturedAt, item.rawJson,
item.thumbnailUrl, existing.id,
)
captureId = existing.id
updatedCount++
continue
}
}

insertStmt.run(
sourceId, randomUUID(), item.url, item.title, item.contentText,
item.author, item.platform, item.platformId, item.contentType,
item.thumbnailUrl, JSON.stringify(item.metadata), item.capturedAt,
item.rawJson,
)
newCount++
if (captureId === null) {
const info = insertStmt.run(
sourceId, randomUUID(), item.url, item.title, item.contentText,
item.author, item.platform, item.platformId, item.contentType,
item.thumbnailUrl, JSON.stringify(item.metadata), item.capturedAt,
item.rawJson,
)
captureId = info.lastInsertRowid
newCount++
}

linkStmt.run(captureId, connectorId)
}

return { newCount, updatedCount }
}

function deleteConnectorItems(db: Database.Database, connectorId: string): void {
// connectorId format is e.g. 'twitter-bookmarks', platform is 'twitter'
// We need the connector to know which platform+content_type to delete.
// For now, delete by matching connector_id pattern in metadata or by platform.
// Since we store connector_id in captures metadata, let's use a convention:
// captures from connectors have metadata.connectorId set.
db.prepare(
`DELETE FROM captures WHERE json_extract(metadata, '$.connectorId') = ?`,
).run(connectorId)
// 1. Drop this connector's M:N claims.
db.prepare('DELETE FROM capture_connectors WHERE connector_id = ?').run(connectorId)
// 2. Delete captures that belonged to this connector and have no other
// connector attribution left. Scoped to source='connector' so we never
// touch session-world captures.
db.prepare(`
DELETE FROM captures
WHERE source_id = (SELECT id FROM sources WHERE name = 'connector')
AND NOT EXISTS (
SELECT 1 FROM capture_connectors WHERE capture_id = captures.id
)
`).run()
}

// ── Sync Engine ─────────────────────────────────────────────────────────────

function getSourceId(db: Database.Database): number {
// All connector items share the 'claude' source_id for the FK constraint.
// The connector_id in metadata and connector_sync_state table distinguish them.
const row = db.prepare("SELECT id FROM sources WHERE name = 'claude'").get() as
// Connector captures all share a single generic 'connector' source row.
// Per-connector attribution lives in the capture_connectors M:N table.
const row = db.prepare("SELECT id FROM sources WHERE name = 'connector'").get() as
| { id: number }
| undefined
if (!row) throw new Error("Source 'claude' not found in DB")
if (!row) throw new Error("Source 'connector' not found in DB")
return row.id
}

Expand Down Expand Up @@ -447,10 +456,8 @@ export class SyncEngine {
}
}

tagConnectorId(result.items, connector.id)

const { newCount } = yield* Effect.sync(() =>
db.transaction(() => upsertItems(db, sourceId, result.items))(),
db.transaction(() => upsertItems(db, sourceId, connector.id, result.items))(),
).pipe(
Effect.withSpan('sync.upsert', {
attributes: { 'items.count': result.items.length },
Expand Down Expand Up @@ -738,10 +745,8 @@ export class SyncEngine {
const result = outcome.right
totalPages++

tagConnectorId(result.items, connector.id)

const { newCount } = yield* Effect.sync(() =>
db.transaction(() => upsertItems(db, sourceId, result.items))(),
db.transaction(() => upsertItems(db, sourceId, connector.id, result.items))(),
).pipe(
Effect.withSpan('sync.upsert', {
attributes: { 'items.count': result.items.length },
Expand Down
11 changes: 10 additions & 1 deletion packages/core/src/connectors/test-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ export function createTestDB(): InstanceType<typeof Database> {
base_path TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
INSERT INTO sources (name, base_path) VALUES ('claude', '~/.claude/projects');
INSERT INTO sources (name, base_path) VALUES
('claude', '~/.claude/projects'),
('connector', '<plugin>');

CREATE TABLE captures (
id INTEGER PRIMARY KEY,
Expand All @@ -35,6 +37,13 @@ export function createTestDB(): InstanceType<typeof Database> {
raw_json TEXT
);

CREATE TABLE capture_connectors (
capture_id INTEGER NOT NULL REFERENCES captures(id) ON DELETE CASCADE,
connector_id TEXT NOT NULL,
PRIMARY KEY (capture_id, connector_id)
);
CREATE INDEX idx_capture_connectors_connector ON capture_connectors(connector_id);

CREATE TABLE connector_sync_state (
connector_id TEXT PRIMARY KEY,
head_cursor TEXT,
Expand Down
47 changes: 43 additions & 4 deletions packages/core/src/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ function runMigrations(db: Database.Database): void {
);

INSERT OR IGNORE INTO sources (name, base_path) VALUES
('claude', '~/.claude/projects'),
('codex', '~/.codex/sessions'),
('gemini', '~/.gemini/tmp');
('claude', '~/.claude/projects'),
('codex', '~/.codex/sessions'),
('gemini', '~/.gemini/tmp'),
('connector', '<plugin>');

CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY,
Expand Down Expand Up @@ -159,7 +160,6 @@ function runMigrations(db: Database.Database): void {
raw_json TEXT
);

CREATE INDEX IF NOT EXISTS idx_captures_source ON captures(source_id);
CREATE INDEX IF NOT EXISTS idx_captures_platform ON captures(platform);
CREATE INDEX IF NOT EXISTS idx_captures_url ON captures(url);
CREATE INDEX IF NOT EXISTS idx_captures_captured ON captures(captured_at DESC);
Expand All @@ -182,6 +182,15 @@ function runMigrations(db: Database.Database): void {

-- ── Connector sync state ────────────────────────────────────────────────

CREATE TABLE IF NOT EXISTS capture_connectors (
capture_id INTEGER NOT NULL REFERENCES captures(id) ON DELETE CASCADE,
connector_id TEXT NOT NULL,
PRIMARY KEY (capture_id, connector_id)
);

CREATE INDEX IF NOT EXISTS idx_capture_connectors_connector
ON capture_connectors(connector_id);

CREATE TABLE IF NOT EXISTS connector_sync_state (
connector_id TEXT PRIMARY KEY,
head_cursor TEXT,
Expand Down Expand Up @@ -298,6 +307,36 @@ function runMigrations(db: Database.Database): void {
db.pragma('user_version = 2')
}

if (version < 3) {
// v3: migrate connector provenance from metadata.connectorId (single-valued,
// clobbered on UPSERT) to the M:N capture_connectors table, and stop
// claiming connector captures came from source_id=claude.
db.transaction(() => {
// Backfill M:N from existing metadata.connectorId.
db.exec(`
INSERT OR IGNORE INTO capture_connectors (capture_id, connector_id)
SELECT id, json_extract(metadata, '$.connectorId')
FROM captures
WHERE json_extract(metadata, '$.connectorId') IS NOT NULL
`)
// Strip the now-redundant field from metadata.
db.exec(`
UPDATE captures
SET metadata = json_remove(metadata, '$.connectorId')
WHERE json_extract(metadata, '$.connectorId') IS NOT NULL
`)
// Point connector captures at the 'connector' source row instead of claude.
db.exec(`
UPDATE captures
SET source_id = (SELECT id FROM sources WHERE name = 'connector')
WHERE id IN (SELECT capture_id FROM capture_connectors)
`)
// idx_captures_source was never used by any query — drop it.
db.exec(`DROP INDEX IF EXISTS idx_captures_source`)
})()
db.pragma('user_version = 3')
}

rebuildFtsTableIfEmpty(db, 'messages', 'messages_fts_trigram')
rebuildFtsTableIfEmpty(db, 'captures', 'captures_fts_trigram')
rebuildFtsTableIfEmpty(db, 'session_search', 'session_search_fts')
Expand Down