-
Notifications
You must be signed in to change notification settings - Fork 0
feat(indexer): implement true incremental indexing pipeline #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
dubscode
wants to merge
1
commit into
main
Choose a base branch
from
feat/incremental-indexing-pipeline
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
18 changes: 9 additions & 9 deletions
18
...hanges/true-incremental-indexing/tasks.md → ...-03-04-true-incremental-indexing/tasks.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,17 +1,17 @@ | ||
| ## 1. Incremental Indexing Core | ||
|
|
||
| - [ ] 1.1 Add shared file-index helpers (upsert file, replace chunks/embeddings/docs, delete file by path) for reuse by full and incremental indexers. | ||
| - [ ] 1.2 Replace `runIncrementalIndex` broad fallback with a path-targeted pipeline that processes normalized changed paths only. | ||
| - [ ] 1.3 Add incremental operation/result types that include inserted/updated/deleted file counts and chunk counters. | ||
| - [x] 1.1 Add shared file-index helpers (upsert file, replace chunks/embeddings/docs, delete file by path) for reuse by full and incremental indexers. | ||
| - [x] 1.2 Replace `runIncrementalIndex` broad fallback with a path-targeted pipeline that processes normalized changed paths only. | ||
| - [x] 1.3 Add incremental operation/result types that include inserted/updated/deleted file counts and chunk counters. | ||
|
|
||
| ## 2. Delete Handling and Fallback Rules | ||
|
|
||
| - [ ] 2.1 Implement explicit delete operations for `unlink` events and missing-path detection during incremental runs. | ||
| - [ ] 2.2 Update daemon watcher wiring to pass enough event metadata (fs event type and git-head change context) into incremental indexing. | ||
| - [ ] 2.3 Implement narrow fallback policy so only unresolved unscoped git-head transitions can trigger full reindex, with reason metadata/logging. | ||
| - [x] 2.1 Implement explicit delete operations for `unlink` events and missing-path detection during incremental runs. | ||
| - [x] 2.2 Update daemon watcher wiring to pass enough event metadata (fs event type and git-head change context) into incremental indexing. | ||
| - [x] 2.3 Implement narrow fallback policy so only unresolved unscoped git-head transitions can trigger full reindex, with reason metadata/logging. | ||
|
|
||
| ## 3. Verification | ||
|
|
||
| - [ ] 3.1 Add tests for add/change targeted updates to ensure unrelated files are not reindexed. | ||
| - [ ] 3.2 Add tests for delete handling to verify stale `files`/`chunks`/`chunk_embeddings`/`bm25_documents` data is removed. | ||
| - [ ] 3.3 Add tests for fallback gating to verify fs path events never trigger full fallback and unresolved git-head transitions do. | ||
| - [x] 3.1 Add tests for add/change targeted updates to ensure unrelated files are not reindexed. | ||
| - [x] 3.2 Add tests for delete handling to verify stale `files`/`chunks`/`chunk_embeddings`/`bm25_documents` data is removed. | ||
| - [x] 3.3 Add tests for fallback gating to verify fs path events never trigger full fallback and unresolved git-head transitions do. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| # incremental-indexing Specification | ||
|
|
||
| ## Purpose | ||
| TBD - created by archiving change true-incremental-indexing. Update Purpose after archive. | ||
| ## Requirements | ||
| ### Requirement: Incremental indexing MUST process only targeted changed paths | ||
| The indexing system MUST support an incremental mode that updates only the files identified in a change set, without scanning or reprocessing unrelated repository paths. | ||
|
|
||
| #### Scenario: Single-file change updates only that file | ||
| - **WHEN** incremental indexing is invoked with one changed path | ||
| - **THEN** only that path is read, re-chunked, and re-embedded | ||
| - **AND** unrelated indexed files remain untouched | ||
|
|
||
| #### Scenario: Duplicate changed paths are coalesced | ||
| - **WHEN** incremental indexing receives repeated entries for the same path in one run | ||
| - **THEN** the path is processed once | ||
| - **AND** result counters reflect a single file operation for that path | ||
|
|
||
| ### Requirement: Incremental indexing MUST handle deletes correctly | ||
| The indexing system MUST remove indexed records for files that were deleted from the repository, including dependent chunks and retrieval documents. | ||
|
|
||
| #### Scenario: File unlink removes indexed content | ||
| - **WHEN** incremental indexing is invoked for a path marked as deleted | ||
| - **THEN** the indexed file row for that repo/path is removed | ||
| - **AND** all dependent chunks, embeddings, and bm25 documents are removed via cascade or equivalent guarantees | ||
|
|
||
| #### Scenario: Missing file during change processing is treated as delete | ||
| - **WHEN** incremental indexing receives a changed path that no longer exists on disk | ||
| - **THEN** the system treats that path as a delete operation | ||
| - **AND** stale indexed content for the path is removed | ||
|
|
||
| ### Requirement: Full reindex fallback MUST be narrow and explicit | ||
| The indexing system MUST avoid broad full-reindex fallback for normal path-scoped file-system events and MAY fallback only for unscoped events where changed paths cannot be determined safely. | ||
|
|
||
| #### Scenario: File-system add/change/unlink does not trigger full fallback | ||
| - **WHEN** incremental indexing is triggered from file-system events with concrete paths | ||
| - **THEN** the run completes through targeted path operations only | ||
| - **AND** full repository indexing is not invoked | ||
|
|
||
| #### Scenario: Unscoped git-head change can fallback with reason | ||
| - **WHEN** a git-head transition occurs and changed paths cannot be resolved reliably | ||
| - **THEN** the system runs a single explicit full fallback reindex | ||
| - **AND** emits metadata or logs indicating fallback reason and trigger type | ||
|
|
||
| ### Requirement: Incremental runs MUST expose operation counts | ||
| The indexing system MUST return operation counters that distinguish inserted/updated/deleted files and indexed chunks for each run. | ||
|
|
||
| #### Scenario: Result includes delete counts | ||
| - **WHEN** a run processes at least one delete operation | ||
| - **THEN** the result includes a non-zero deleted-file count | ||
| - **AND** chunk counters reflect removed and/or replaced chunk totals | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,261 @@ | ||
| import { createHash, randomUUID } from 'node:crypto'; | ||
| import { readFile } from 'node:fs/promises'; | ||
| import type { DubsbotDb } from '../../db/client'; | ||
| import { createProviderAdapter } from '../../providers'; | ||
| import type { ProviderAdapter } from '../../providers/types'; | ||
| import { isEmbeddingStrategyV2Enabled, loadEmbeddingStrategyConfig } from '../embedding/config'; | ||
| import { | ||
| assertEmbeddingSuccess, | ||
| type EmbeddingProvenance, | ||
| executeEmbeddingWithStrategy, | ||
| } from '../embedding/engine'; | ||
| import { deterministicEmbedding } from '../retrieval/rerank'; | ||
|
|
||
| type Chunk = { | ||
| index: number; | ||
| content: string; | ||
| startLine: number; | ||
| endLine: number; | ||
| }; | ||
|
|
||
| export type FileIndexSharedInput = { | ||
| db: DubsbotDb; | ||
| repoRoot: string; | ||
| embedProvider?: ProviderAdapter; | ||
| embeddingModel?: string; | ||
| embeddingStrategyId?: string; | ||
| }; | ||
|
|
||
| export type UpsertFileResult = { | ||
| status: 'indexed' | 'missing'; | ||
| fileStatus?: 'inserted' | 'updated'; | ||
| chunksInserted: number; | ||
| chunksDeleted: number; | ||
| }; | ||
|
|
||
| export type DeleteFileResult = { | ||
| fileDeleted: boolean; | ||
| chunksDeleted: number; | ||
| }; | ||
|
|
||
| export function createFileIndexHelpers(input: FileIndexSharedInput): { | ||
| upsertIndexedFileByPath: (relativePath: string) => Promise<UpsertFileResult>; | ||
| deleteIndexedFileByPath: (relativePath: string) => Promise<DeleteFileResult>; | ||
| } { | ||
| const isStrategyV2 = isEmbeddingStrategyV2Enabled(); | ||
| const strategyConfig = isStrategyV2 ? loadEmbeddingStrategyConfig() : null; | ||
| const adapterCache = new Map<string, ProviderAdapter>(); | ||
|
|
||
| function getAdapter(provider: string): ProviderAdapter { | ||
| const cached = adapterCache.get(provider); | ||
| if (cached) { | ||
| return cached; | ||
| } | ||
| const adapter = createProviderAdapter(provider as 'openai' | 'anthropic' | 'google'); | ||
| adapterCache.set(provider, adapter); | ||
| return adapter; | ||
| } | ||
|
|
||
| async function embedContent(chunkContent: string): Promise<{ | ||
| embedding: number[]; | ||
| provider: string; | ||
| model: string; | ||
| provenance: EmbeddingProvenance; | ||
| }> { | ||
| if (isStrategyV2 && strategyConfig) { | ||
| const strategyId = input.embeddingStrategyId ?? strategyConfig.defaults.indexing; | ||
| const result = await executeEmbeddingWithStrategy({ | ||
| config: strategyConfig, | ||
| strategyId, | ||
| value: chunkContent, | ||
| adapterForProvider: getAdapter, | ||
| }); | ||
| const success = assertEmbeddingSuccess(result); | ||
| emitEmbeddingTelemetry(success.provenance); | ||
| return { | ||
| embedding: success.embedding, | ||
| provider: success.provider, | ||
| model: success.model, | ||
| provenance: success.provenance, | ||
| }; | ||
| } | ||
|
|
||
| const provider = input.embedProvider ? 'remote' : 'local'; | ||
| const model = | ||
| input.embeddingModel ?? (input.embedProvider ? 'text-embedding-3-small' : 'deterministic-v1'); | ||
| const embedding = | ||
| input.embedProvider != null | ||
| ? (await input.embedProvider.embed({ model, values: [chunkContent] }))[0] | ||
| : deterministicEmbedding(chunkContent); | ||
| const provenance: EmbeddingProvenance = { | ||
| strategyId: 'legacy-default', | ||
| attemptPath: [ | ||
| { | ||
| strategyId: 'legacy-default', | ||
| provider, | ||
| model, | ||
| status: 'success', | ||
| }, | ||
| ], | ||
| fallbackUsed: false, | ||
| resolvedBy: { | ||
| strategyId: 'legacy-default', | ||
| provider, | ||
| model, | ||
| }, | ||
| }; | ||
|
|
||
| return { embedding, provider, model, provenance }; | ||
| } | ||
|
|
||
| async function upsertIndexedFileByPath(relativePath: string): Promise<UpsertFileResult> { | ||
| const absolutePath = `${input.repoRoot}/${relativePath}`; | ||
| const content = await readFile(absolutePath, 'utf8').catch(() => null); | ||
| if (!content) { | ||
| return { status: 'missing', chunksInserted: 0, chunksDeleted: 0 }; | ||
| } | ||
|
|
||
| const existingRows = await input.db.query<{ id: string }>( | ||
| 'SELECT id FROM files WHERE repo_root = $1 AND path = $2', | ||
| [input.repoRoot, relativePath] | ||
| ); | ||
| const existingFileId = existingRows.rows[0]?.id; | ||
| const fileStatus: 'inserted' | 'updated' = existingFileId ? 'updated' : 'inserted'; | ||
|
|
||
| const fileId = existingFileId ?? randomUUID(); | ||
| const persistedRows = await input.db.query<{ id: string }>( | ||
| `INSERT INTO files (id, repo_root, path, hash, language) | ||
| VALUES ($1, $2, $3, $4, $5) | ||
| ON CONFLICT (repo_root, path) DO UPDATE SET hash = EXCLUDED.hash, language = EXCLUDED.language, updated_at = NOW() | ||
| RETURNING id`, | ||
| [fileId, input.repoRoot, relativePath, hashContent(content), detectLanguage(relativePath)] | ||
| ); | ||
| const persistedFileId = persistedRows.rows[0].id; | ||
|
|
||
| let chunksDeleted = 0; | ||
| if (existingFileId) { | ||
| const deletedRows = await input.db.query<{ count: number | string }>( | ||
| 'SELECT COUNT(*)::int AS count FROM chunks WHERE file_id = $1', | ||
| [persistedFileId] | ||
| ); | ||
| chunksDeleted = Number(deletedRows.rows[0]?.count ?? 0); | ||
| } | ||
|
|
||
| await input.db.query('DELETE FROM chunks WHERE file_id = $1', [persistedFileId]); | ||
|
|
||
| const chunks = chunkFile(content); | ||
| for (const chunk of chunks) { | ||
| const chunkId = randomUUID(); | ||
| await input.db.query( | ||
| `INSERT INTO chunks (id, file_id, chunk_index, content, start_line, end_line) | ||
| VALUES ($1, $2, $3, $4, $5, $6)`, | ||
| [chunkId, persistedFileId, chunk.index, chunk.content, chunk.startLine, chunk.endLine] | ||
| ); | ||
|
|
||
| const embedded = await embedContent(chunk.content); | ||
| await input.db.query( | ||
| `INSERT INTO chunk_embeddings (chunk_id, provider, model, embedding, provenance) | ||
| VALUES ($1, $2, $3, $4::jsonb, $5::jsonb) | ||
| ON CONFLICT (chunk_id) DO UPDATE SET provider = EXCLUDED.provider, model = EXCLUDED.model, embedding = EXCLUDED.embedding, provenance = EXCLUDED.provenance`, | ||
| [ | ||
| chunkId, | ||
| embedded.provider, | ||
| embedded.model, | ||
| JSON.stringify(embedded.embedding), | ||
| JSON.stringify(embedded.provenance), | ||
| ] | ||
| ); | ||
|
|
||
| await input.db.query('INSERT INTO bm25_documents (id, chunk_id, body) VALUES ($1, $2, $3)', [ | ||
| randomUUID(), | ||
| chunkId, | ||
| chunk.content, | ||
| ]); | ||
| } | ||
|
|
||
| return { | ||
| status: 'indexed', | ||
| fileStatus, | ||
| chunksInserted: chunks.length, | ||
| chunksDeleted, | ||
| }; | ||
| } | ||
|
|
||
| async function deleteIndexedFileByPath(relativePath: string): Promise<DeleteFileResult> { | ||
| const fileRows = await input.db.query<{ id: string }>( | ||
| 'SELECT id FROM files WHERE repo_root = $1 AND path = $2', | ||
| [input.repoRoot, relativePath] | ||
| ); | ||
| const fileId = fileRows.rows[0]?.id; | ||
| if (!fileId) { | ||
| return { fileDeleted: false, chunksDeleted: 0 }; | ||
| } | ||
|
|
||
| const countRows = await input.db.query<{ count: number | string }>( | ||
| 'SELECT COUNT(*)::int AS count FROM chunks WHERE file_id = $1', | ||
| [fileId] | ||
| ); | ||
| const chunksDeleted = Number(countRows.rows[0]?.count ?? 0); | ||
| await input.db.query('DELETE FROM files WHERE id = $1', [fileId]); | ||
| return { fileDeleted: true, chunksDeleted }; | ||
| } | ||
|
|
||
| return { | ||
| upsertIndexedFileByPath, | ||
| deleteIndexedFileByPath, | ||
| }; | ||
| } | ||
|
|
||
| function hashContent(content: string): string { | ||
| return createHash('sha256').update(content).digest('hex'); | ||
| } | ||
|
|
||
| function detectLanguage(path: string): string { | ||
| const extension = path.split('.').at(-1)?.toLowerCase(); | ||
| switch (extension) { | ||
| case 'ts': | ||
| case 'tsx': | ||
| return 'typescript'; | ||
| case 'js': | ||
| case 'jsx': | ||
| return 'javascript'; | ||
| case 'py': | ||
| return 'python'; | ||
| case 'rs': | ||
| return 'rust'; | ||
| case 'go': | ||
| return 'go'; | ||
| default: | ||
| return 'text'; | ||
| } | ||
| } | ||
|
|
||
| function chunkFile(content: string, linesPerChunk = 120): Chunk[] { | ||
| const lines = content.split('\n'); | ||
| const chunks: Chunk[] = []; | ||
| for (let i = 0; i < lines.length; i += linesPerChunk) { | ||
| const startLine = i + 1; | ||
| const endLine = Math.min(i + linesPerChunk, lines.length); | ||
| chunks.push({ | ||
| index: chunks.length, | ||
| content: lines.slice(i, endLine).join('\n'), | ||
| startLine, | ||
| endLine, | ||
| }); | ||
| } | ||
| return chunks; | ||
| } | ||
|
|
||
| function emitEmbeddingTelemetry(provenance: EmbeddingProvenance): void { | ||
| if (process.env.DUBSBOT_EMBEDDING_PROVENANCE_LOG !== '1') { | ||
| return; | ||
| } | ||
| const resolved = provenance.resolvedBy | ||
| ? `${provenance.resolvedBy.provider}:${provenance.resolvedBy.model}` | ||
| : 'none'; | ||
| console.info( | ||
| `[embedding] strategy=${provenance.strategyId} resolved=${resolved} fallback=${provenance.fallbackUsed} attempts=${provenance.attemptPath | ||
| .map((attempt) => `${attempt.provider}:${attempt.model}:${attempt.status}`) | ||
| .join('>')}` | ||
| ); | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readFilereturns an empty string for empty files, but the currentif (!content)check treats that as missing and will trigger delete behavior upstream. Consider checkingcontent === null(or catching only ENOENT) so empty files are indexed correctly.