diff --git a/README.md b/README.md index 1da25f9..e0c0a62 100644 --- a/README.md +++ b/README.md @@ -90,9 +90,13 @@ Environment variables (BYOK): - `DUBSBOT_ANTHROPIC_MODEL` - `DUBSBOT_GOOGLE_MODEL` (defaults to `gemini-3.1-pro-preview`) - `DUBSBOT_OTEL_ENABLED=1` to enable telemetry export hooks +- `DUBSBOT_EMBEDDING_STRATEGY_V2=1` to enable explicit embedding strategy resolution/fallback +- `DUBSBOT_EMBEDDING_STRATEGY_CONFIG_JSON` to provide explicit strategy config +- `DUBSBOT_EMBEDDING_PROVENANCE_LOG=1` to emit embedding provenance log lines ## Notes - Anthropic embeddings currently fall back to deterministic local vectors. - This project intentionally uses Biome only (no ESLint/Prettier). - Retrieval proofing benchmark schema/workflow docs: `docs/retrieval-proofing-benchmark-schema.md` and `docs/retrieval-proofing.md`. +- Embedding strategy rollout guide: `docs/embedding-strategy-rollout.md`. diff --git a/docs/embedding-strategy-rollout.md b/docs/embedding-strategy-rollout.md new file mode 100644 index 0000000..e8f983a --- /dev/null +++ b/docs/embedding-strategy-rollout.md @@ -0,0 +1,31 @@ +# Embedding Strategy V2 Rollout + +This rollout gates the explicit embedding strategy engine behind: + +- `DUBSBOT_EMBEDDING_STRATEGY_V2=1` + +Optional config override: + +- `DUBSBOT_EMBEDDING_STRATEGY_CONFIG_JSON` (JSON string matching schema version `1.0`) + +Optional provenance logging: + +- `DUBSBOT_EMBEDDING_PROVENANCE_LOG=1` + +## Enable (staged) + +1. Set `DUBSBOT_EMBEDDING_STRATEGY_V2=1` in a non-production environment. +2. Start with default legacy-mapped config (no custom JSON). +3. Run indexing and retrieval checks. +4. If needed, provide explicit strategy JSON to control Anthropic fallback paths. +5. Verify fallback/provenance behavior with tests: + - `pnpm test -- embedding-strategy` + +## Rollback + +1. Unset or set `DUBSBOT_EMBEDDING_STRATEGY_V2=0`. +2. Restart CLI/daemon processes. +3. System returns to legacy embedding execution path. + +Rollback is safe because provenance fields are additive and read-compatible. + diff --git a/openspec/changes/embedding-parity-hardening/.openspec.yaml b/openspec/changes/archive/2026-03-04-embedding-parity-hardening/.openspec.yaml similarity index 100% rename from openspec/changes/embedding-parity-hardening/.openspec.yaml rename to openspec/changes/archive/2026-03-04-embedding-parity-hardening/.openspec.yaml diff --git a/openspec/changes/embedding-parity-hardening/design.md b/openspec/changes/archive/2026-03-04-embedding-parity-hardening/design.md similarity index 100% rename from openspec/changes/embedding-parity-hardening/design.md rename to openspec/changes/archive/2026-03-04-embedding-parity-hardening/design.md diff --git a/openspec/changes/embedding-parity-hardening/proposal.md b/openspec/changes/archive/2026-03-04-embedding-parity-hardening/proposal.md similarity index 100% rename from openspec/changes/embedding-parity-hardening/proposal.md rename to openspec/changes/archive/2026-03-04-embedding-parity-hardening/proposal.md diff --git a/openspec/changes/embedding-parity-hardening/specs/anthropic-embedding-fallback-and-provenance/spec.md b/openspec/changes/archive/2026-03-04-embedding-parity-hardening/specs/anthropic-embedding-fallback-and-provenance/spec.md similarity index 100% rename from openspec/changes/embedding-parity-hardening/specs/anthropic-embedding-fallback-and-provenance/spec.md rename to openspec/changes/archive/2026-03-04-embedding-parity-hardening/specs/anthropic-embedding-fallback-and-provenance/spec.md diff --git a/openspec/changes/embedding-parity-hardening/specs/embedding-strategy-configuration/spec.md b/openspec/changes/archive/2026-03-04-embedding-parity-hardening/specs/embedding-strategy-configuration/spec.md similarity index 100% rename from openspec/changes/embedding-parity-hardening/specs/embedding-strategy-configuration/spec.md rename to openspec/changes/archive/2026-03-04-embedding-parity-hardening/specs/embedding-strategy-configuration/spec.md diff --git a/openspec/changes/embedding-parity-hardening/tasks.md b/openspec/changes/archive/2026-03-04-embedding-parity-hardening/tasks.md similarity index 50% rename from openspec/changes/embedding-parity-hardening/tasks.md rename to openspec/changes/archive/2026-03-04-embedding-parity-hardening/tasks.md index 533c582..a3b7e6f 100644 --- a/openspec/changes/embedding-parity-hardening/tasks.md +++ b/openspec/changes/archive/2026-03-04-embedding-parity-hardening/tasks.md @@ -1,24 +1,24 @@ ## 1. Strategy Configuration Foundation -- [ ] 1.1 Introduce typed `embeddingStrategy` config schema with provider/model primary and ordered fallback entries -- [ ] 1.2 Add startup validation for unknown providers, missing models, and cyclic fallback paths with structured errors -- [ ] 1.3 Add backward-compatible default mapping from legacy embedding settings to explicit strategy definitions +- [x] 1.1 Introduce typed `embeddingStrategy` config schema with provider/model primary and ordered fallback entries +- [x] 1.2 Add startup validation for unknown providers, missing models, and cyclic fallback paths with structured errors +- [x] 1.3 Add backward-compatible default mapping from legacy embedding settings to explicit strategy definitions ## 2. Runtime Strategy Resolution and Anthropic Policy -- [ ] 2.1 Implement deterministic strategy resolver that requires a valid strategy id for each embedding request -- [ ] 2.2 Implement Anthropic native-first execution path with explicit fallback eligibility based on configured failure categories -- [ ] 2.3 Enforce configured fallback order and terminal failure behavior when fallback is disallowed or exhausted +- [x] 2.1 Implement deterministic strategy resolver that requires a valid strategy id for each embedding request +- [x] 2.2 Implement Anthropic native-first execution path with explicit fallback eligibility based on configured failure categories +- [x] 2.3 Enforce configured fallback order and terminal failure behavior when fallback is disallowed or exhausted ## 3. Provenance Envelope and Data Plumbing -- [ ] 3.1 Define a normalized embedding result envelope including strategy id, provider/model attempt path, fallback state, and failure category -- [ ] 3.2 Propagate provenance fields through indexing writes and retrieval/query responses -- [ ] 3.3 Update logging/metrics hooks to include provenance identifiers for debugging and parity analysis +- [x] 3.1 Define a normalized embedding result envelope including strategy id, provider/model attempt path, fallback state, and failure category +- [x] 3.2 Propagate provenance fields through indexing writes and retrieval/query responses +- [x] 3.3 Update logging/metrics hooks to include provenance identifiers for debugging and parity analysis ## 4. Verification and Rollout Safety -- [ ] 4.1 Add conformance tests for valid/invalid strategy config loading and runtime resolution behavior -- [ ] 4.2 Add Anthropic policy tests covering success, non-fallbackable failures, fallbackable failures, and no-fallback scenarios -- [ ] 4.3 Add provenance completeness tests for both successful and terminal-failure embedding outcomes -- [ ] 4.4 Gate rollout behind a feature flag and document enable/rollback procedure for staged deployment +- [x] 4.1 Add conformance tests for valid/invalid strategy config loading and runtime resolution behavior +- [x] 4.2 Add Anthropic policy tests covering success, non-fallbackable failures, fallbackable failures, and no-fallback scenarios +- [x] 4.3 Add provenance completeness tests for both successful and terminal-failure embedding outcomes +- [x] 4.4 Gate rollout behind a feature flag and document enable/rollback procedure for staged deployment diff --git a/openspec/specs/anthropic-embedding-fallback-and-provenance/spec.md b/openspec/specs/anthropic-embedding-fallback-and-provenance/spec.md new file mode 100644 index 0000000..1943817 --- /dev/null +++ b/openspec/specs/anthropic-embedding-fallback-and-provenance/spec.md @@ -0,0 +1,39 @@ +# anthropic-embedding-fallback-and-provenance Specification + +## Purpose +Define expected Anthropic-primary embedding behavior, including native-first execution, +failure-category-gated fallback sequencing, and required provenance metadata for both successful +embedding results and terminal failures. +## Requirements +### Requirement: Anthropic Native-First Execution Policy +For strategies configured with Anthropic as primary, the system SHALL attempt Anthropic native embedding first and SHALL only consider fallback providers explicitly listed in that strategy. + +#### Scenario: Anthropic primary succeeds +- **WHEN** a request resolves to a strategy with Anthropic as primary and Anthropic returns embeddings successfully +- **THEN** the system returns the Anthropic embedding result without invoking fallback providers + +#### Scenario: Anthropic primary fails with non-fallbackable error +- **WHEN** Anthropic returns an error outside configured fallbackable categories +- **THEN** the system returns a terminal embedding error and MUST NOT invoke fallback providers + +### Requirement: Controlled Anthropic Fallback Behavior +The system SHALL invoke fallback providers for Anthropic strategies only for configured fallbackable failure categories and in configured fallback order. + +#### Scenario: Fallback is invoked in configured order +- **WHEN** Anthropic primary fails with a fallbackable error category and fallback providers are configured +- **THEN** the system attempts fallback providers sequentially in strategy order until one succeeds or all fail + +#### Scenario: No fallback configured +- **WHEN** Anthropic primary fails with a fallbackable error category but no fallback providers are configured +- **THEN** the system returns a structured failure indicating no fallback path was available + +### Requirement: Embedding Provenance Metadata +The system SHALL attach provenance metadata to every embedding result and terminal failure outcome, including strategy id, attempt provider/model path, and fallback usage state. + +#### Scenario: Provenance is emitted on success +- **WHEN** any provider successfully returns embeddings +- **THEN** the result includes provenance fields for strategy id, resolved provider/model, attempt path, and whether fallback was used + +#### Scenario: Provenance is emitted on terminal failure +- **WHEN** all attempts fail or fallback is disallowed +- **THEN** the error payload includes provenance fields for attempted providers/models, failure category, and terminal resolution reason diff --git a/openspec/specs/embedding-strategy-configuration/spec.md b/openspec/specs/embedding-strategy-configuration/spec.md new file mode 100644 index 0000000..78a2c89 --- /dev/null +++ b/openspec/specs/embedding-strategy-configuration/spec.md @@ -0,0 +1,28 @@ +# embedding-strategy-configuration Specification + +## Purpose +Define how embedding strategies are configured and resolved across providers and models, +including named strategy IDs, primary provider/model selection, ordered fallback chains, and +deterministic runtime resolution with startup validation of invalid or inconsistent configurations. +## Requirements +### Requirement: Provider-Configurable Embedding Strategy +The system SHALL support explicit embedding strategy configuration per embedding use-case, including primary provider/model selection and an ordered fallback list. + +#### Scenario: Valid strategy is loaded +- **WHEN** the service starts with a strategy configuration where each strategy has a primary provider/model and valid fallback entries +- **THEN** the system initializes successfully and registers the strategy for runtime resolution + +#### Scenario: Invalid strategy is rejected +- **WHEN** the service starts with a strategy configuration that references an unknown provider, missing model, or cyclic fallback path +- **THEN** the system MUST fail validation and return a configuration error that identifies the invalid strategy entry + +### Requirement: Deterministic Runtime Strategy Resolution +The system SHALL resolve embedding strategies deterministically for each request using the configured strategy identifier and SHALL NOT use implicit provider defaults. + +#### Scenario: Strategy id resolves to configured primary +- **WHEN** an embedding request specifies a known strategy id +- **THEN** the system uses the configured primary provider/model for the first execution attempt + +#### Scenario: Unknown strategy id is rejected +- **WHEN** an embedding request specifies a strategy id not present in configuration +- **THEN** the system returns a structured error and MUST NOT attempt embedding generation diff --git a/src/cli/runtime.ts b/src/cli/runtime.ts index d1528a6..7b45964 100644 --- a/src/cli/runtime.ts +++ b/src/cli/runtime.ts @@ -1,5 +1,6 @@ import { AgentOrchestrator } from '../agent/orchestrator'; import { loadAgentsConfig } from '../config/agents-loader'; +import { loadEmbeddingStrategyConfig } from '../context/embedding/config'; import { createDb } from '../db/client'; import { runMigrations } from '../db/migrate'; import { OptionalOtelExporter } from '../observability/otel'; @@ -13,6 +14,7 @@ import { ToolRegistry } from '../tools/registry'; export async function createRuntime() { await runMigrations(); const db = await createDb(); + const embeddingStrategyConfig = loadEmbeddingStrategyConfig(); const agentsConfig = await loadAgentsConfig(process.cwd()); const provider = createProviderAdapter(detectProvider()); const policyEngine = new DefaultPolicyEngine(createDefaultApprovalPolicy()); @@ -24,6 +26,7 @@ export async function createRuntime() { return { db, + embeddingStrategyConfig, provider, policyEngine, orchestrator, diff --git a/src/context/embedding/config.ts b/src/context/embedding/config.ts new file mode 100644 index 0000000..4d211c6 --- /dev/null +++ b/src/context/embedding/config.ts @@ -0,0 +1,93 @@ +import { detectProvider, type ProviderName } from '../../providers'; +import { + type EmbeddingStrategyConfig, + EmbeddingStrategyConfigError, + parseEmbeddingStrategyConfig, +} from './strategy'; + +export function loadEmbeddingStrategyConfig(): EmbeddingStrategyConfig { + const rawFromEnv = process.env.DUBSBOT_EMBEDDING_STRATEGY_CONFIG_JSON; + const raw = rawFromEnv ? parseJsonConfigFromEnv(rawFromEnv) : buildLegacyDefaultConfig(); + const parsed = parseEmbeddingStrategyConfig(raw); + if (!parsed.config) { + throw new EmbeddingStrategyConfigError(parsed.issues); + } + return parsed.config; +} + +export function isEmbeddingStrategyV2Enabled(): boolean { + return process.env.DUBSBOT_EMBEDDING_STRATEGY_V2 === '1'; +} + +function buildLegacyDefaultConfig(): EmbeddingStrategyConfig { + const primaryProvider = detectProvider(); + const primary = toPrimaryStrategy(primaryProvider, 'default-primary'); + const strategies = [primary]; + + if (primaryProvider === 'anthropic') { + strategies.push(toPrimaryStrategy('openai', 'fallback-openai')); + strategies.push(toPrimaryStrategy('google', 'fallback-google')); + primary.fallback = [ + { + strategyId: 'fallback-openai', + onFailure: ['rate_limit', 'timeout', 'service_unavailable'], + }, + { + strategyId: 'fallback-google', + onFailure: ['rate_limit', 'timeout', 'service_unavailable'], + }, + ]; + } + + return { + version: '1.0', + defaults: { + indexing: 'default-primary', + query: 'default-primary', + }, + strategies, + }; +} + +function toPrimaryStrategy(provider: ProviderName, id: string) { + return { + id, + provider, + model: defaultEmbeddingModel(provider), + fallback: [] as Array<{ + strategyId: string; + onFailure: Array< + 'rate_limit' | 'timeout' | 'service_unavailable' | 'auth' | 'invalid_request' | 'unknown' + >; + }>, + }; +} + +function defaultEmbeddingModel(provider: ProviderName): string { + switch (provider) { + case 'openai': + return process.env.DUBSBOT_OPENAI_EMBEDDING_MODEL ?? 'text-embedding-3-small'; + case 'google': + return process.env.DUBSBOT_GOOGLE_EMBEDDING_MODEL ?? 'text-embedding-004'; + case 'anthropic': + return process.env.DUBSBOT_ANTHROPIC_EMBEDDING_MODEL ?? 'local-deterministic'; + default: + return 'local-deterministic'; + } +} + +function parseJsonConfigFromEnv(rawFromEnv: string): unknown { + try { + return JSON.parse(rawFromEnv); + } catch (error) { + if (error instanceof SyntaxError) { + throw new EmbeddingStrategyConfigError([ + { + code: 'schema_invalid', + detail: `DUBSBOT_EMBEDDING_STRATEGY_CONFIG_JSON is invalid JSON: ${error.message}`, + }, + ]); + } + throw error; + } +} diff --git a/src/context/embedding/engine.ts b/src/context/embedding/engine.ts new file mode 100644 index 0000000..ec65453 --- /dev/null +++ b/src/context/embedding/engine.ts @@ -0,0 +1,193 @@ +import type { ProviderAdapter } from '../../providers/types'; +import { + type EmbeddingStrategyConfig, + type FailureCategory, + resolveEmbeddingStrategy, +} from './strategy'; + +export type EmbeddingAttempt = { + strategyId: string; + provider: string; + model: string; + status: 'success' | 'failure'; + failureCategory?: FailureCategory; +}; + +export type EmbeddingProvenance = { + strategyId: string; + attemptPath: EmbeddingAttempt[]; + resolvedBy?: { strategyId: string; provider: string; model: string }; + fallbackUsed: boolean; + failureCategory?: FailureCategory; + terminalReason?: 'fallback_disallowed' | 'fallback_exhausted' | 'no_fallback'; +}; + +export type EmbeddingExecutionSuccess = { + ok: true; + embedding: number[]; + provider: string; + model: string; + provenance: EmbeddingProvenance; +}; + +export type EmbeddingExecutionFailure = { + ok: false; + message: string; + provenance: EmbeddingProvenance; +}; + +export type EmbeddingExecutionResult = EmbeddingExecutionSuccess | EmbeddingExecutionFailure; + +export class EmbeddingExecutionError extends Error { + constructor( + message: string, + public readonly provenance: EmbeddingProvenance + ) { + super(message); + this.name = 'EmbeddingExecutionError'; + } +} + +export async function executeEmbeddingWithStrategy(input: { + config: EmbeddingStrategyConfig; + strategyId: string; + value: string; + adapterForProvider: (provider: string) => ProviderAdapter; +}): Promise { + const attemptPath: EmbeddingAttempt[] = []; + const queue: string[] = [input.strategyId]; + const visited = new Set(); + let fallbackUsed = false; + + while (queue.length > 0) { + const currentId = queue.shift(); + if (!currentId || visited.has(currentId)) { + continue; + } + visited.add(currentId); + + const strategy = resolveEmbeddingStrategy(input.config, currentId); + const adapter = input.adapterForProvider(strategy.provider); + + try { + const vectors = await adapter.embed({ + model: strategy.model, + values: [input.value], + }); + const vector = vectors[0] ?? []; + attemptPath.push({ + strategyId: currentId, + provider: strategy.provider, + model: strategy.model, + status: 'success', + }); + return { + ok: true, + embedding: vector, + provider: strategy.provider, + model: strategy.model, + provenance: { + strategyId: input.strategyId, + attemptPath, + fallbackUsed, + resolvedBy: { + strategyId: currentId, + provider: strategy.provider, + model: strategy.model, + }, + }, + }; + } catch (error) { + const failureCategory = classifyEmbeddingFailure(error); + attemptPath.push({ + strategyId: currentId, + provider: strategy.provider, + model: strategy.model, + status: 'failure', + failureCategory, + }); + + const eligibleFallbacks = strategy.fallback.filter((entry) => + entry.onFailure.includes(failureCategory) + ); + if (eligibleFallbacks.length === 0) { + return { + ok: false, + message: `Embedding failed for strategy "${currentId}" with category "${failureCategory}" and no eligible fallback.`, + provenance: { + strategyId: input.strategyId, + attemptPath, + fallbackUsed, + failureCategory, + terminalReason: strategy.fallback.length > 0 ? 'fallback_disallowed' : 'no_fallback', + }, + }; + } + + const fallbackIds = eligibleFallbacks + .map((entry) => entry.strategyId) + .filter((strategyId) => !visited.has(strategyId) && !queue.includes(strategyId)); + + if (fallbackIds.length === 0) { + return { + ok: false, + message: `Embedding failed for strategy "${currentId}" with category "${failureCategory}" and exhausted fallback chain.`, + provenance: { + strategyId: input.strategyId, + attemptPath, + fallbackUsed: true, + failureCategory, + terminalReason: 'fallback_exhausted', + }, + }; + } + + fallbackUsed = true; + for (const fallbackId of fallbackIds) { + queue.push(fallbackId); + } + } + } + + return { + ok: false, + message: `Embedding failed for strategy "${input.strategyId}" after exhausting fallback chain.`, + provenance: { + strategyId: input.strategyId, + attemptPath, + fallbackUsed, + failureCategory: attemptPath.at(-1)?.failureCategory, + terminalReason: 'fallback_exhausted', + }, + }; +} + +export function classifyEmbeddingFailure(error: unknown): FailureCategory { + const message = + error instanceof Error ? error.message.toLowerCase() : String(error).toLowerCase(); + if (message.includes('rate limit') || message.includes('429')) { + return 'rate_limit'; + } + if (message.includes('timeout') || message.includes('timed out')) { + return 'timeout'; + } + if (message.includes('503') || message.includes('unavailable')) { + return 'service_unavailable'; + } + if (message.includes('401') || message.includes('403') || message.includes('auth')) { + return 'auth'; + } + if (message.includes('400') || message.includes('invalid')) { + return 'invalid_request'; + } + return 'unknown'; +} + +export function assertEmbeddingSuccess( + result: EmbeddingExecutionResult +): EmbeddingExecutionSuccess { + if (result.ok) { + return result; + } + throw new EmbeddingExecutionError(result.message, result.provenance); +} diff --git a/src/context/embedding/strategy.ts b/src/context/embedding/strategy.ts new file mode 100644 index 0000000..503fe15 --- /dev/null +++ b/src/context/embedding/strategy.ts @@ -0,0 +1,190 @@ +import { z } from 'zod'; +import type { ProviderName } from '../../providers'; + +export const FailureCategorySchema = z.enum([ + 'rate_limit', + 'timeout', + 'service_unavailable', + 'auth', + 'invalid_request', + 'unknown', +]); +export type FailureCategory = z.infer; + +export const EmbeddingStrategyRefSchema = z.object({ + strategyId: z.string().min(1), + onFailure: z.array(FailureCategorySchema).min(1), +}); +export type EmbeddingStrategyRef = z.infer; + +export const EmbeddingStrategySchema = z.object({ + id: z.string().min(1), + provider: z.enum(['openai', 'anthropic', 'google']), + model: z.string().min(1), + fallback: z.array(EmbeddingStrategyRefSchema).default([]), +}); +export type EmbeddingStrategy = z.infer & { + provider: ProviderName; +}; + +export const EmbeddingStrategyConfigSchema = z.object({ + version: z.literal('1.0'), + defaults: z.object({ + indexing: z.string().min(1), + query: z.string().min(1), + }), + strategies: z.array(EmbeddingStrategySchema).min(1), +}); +export type EmbeddingStrategyConfig = z.infer; + +export type EmbeddingStrategyValidationIssue = { + code: + | 'unknown_provider' + | 'missing_model' + | 'duplicate_strategy' + | 'unknown_fallback_strategy' + | 'cyclic_fallback_path' + | 'unknown_default_strategy' + | 'schema_invalid'; + strategyId?: string; + detail: string; +}; + +export class EmbeddingStrategyConfigError extends Error { + constructor(public readonly issues: EmbeddingStrategyValidationIssue[]) { + super( + `Invalid embedding strategy configuration: ${issues.map((issue) => issue.detail).join('; ')}` + ); + this.name = 'EmbeddingStrategyConfigError'; + } +} + +export class EmbeddingStrategyResolutionError extends Error { + constructor( + public readonly strategyId: string, + public readonly reason: 'unknown_strategy' + ) { + super(`Unable to resolve embedding strategy "${strategyId}": ${reason}`); + this.name = 'EmbeddingStrategyResolutionError'; + } +} + +export function parseEmbeddingStrategyConfig(raw: unknown): { + config?: EmbeddingStrategyConfig; + issues: EmbeddingStrategyValidationIssue[]; +} { + const parsed = EmbeddingStrategyConfigSchema.safeParse(raw); + if (!parsed.success) { + return { + issues: parsed.error.issues.map((issue) => ({ + code: issue.path.includes('provider') + ? 'unknown_provider' + : issue.path.includes('model') + ? 'missing_model' + : 'schema_invalid', + detail: `${issue.path.join('.') || ''}: ${issue.message}`, + })), + }; + } + + const config = parsed.data; + const issues: EmbeddingStrategyValidationIssue[] = []; + const byId = new Map(); + for (const strategy of config.strategies) { + if (byId.has(strategy.id)) { + issues.push({ + code: 'duplicate_strategy', + strategyId: strategy.id, + detail: `Duplicate strategy id "${strategy.id}"`, + }); + continue; + } + byId.set(strategy.id, strategy); + + if (!strategy.model.trim()) { + issues.push({ + code: 'missing_model', + strategyId: strategy.id, + detail: `Strategy "${strategy.id}" is missing model`, + }); + } + } + + for (const strategy of config.strategies) { + for (const fallback of strategy.fallback) { + if (!byId.has(fallback.strategyId)) { + issues.push({ + code: 'unknown_fallback_strategy', + strategyId: strategy.id, + detail: `Strategy "${strategy.id}" references unknown fallback strategy "${fallback.strategyId}"`, + }); + } + } + } + + if (!byId.has(config.defaults.indexing)) { + issues.push({ + code: 'unknown_default_strategy', + strategyId: config.defaults.indexing, + detail: `Default indexing strategy "${config.defaults.indexing}" does not exist`, + }); + } + if (!byId.has(config.defaults.query)) { + issues.push({ + code: 'unknown_default_strategy', + strategyId: config.defaults.query, + detail: `Default query strategy "${config.defaults.query}" does not exist`, + }); + } + + const visited = new Set(); + const inStack = new Set(); + const path: string[] = []; + + function walk(strategyId: string) { + if (inStack.has(strategyId)) { + const cycleStart = path.indexOf(strategyId); + const cycle = [...path.slice(cycleStart), strategyId].join(' -> '); + issues.push({ + code: 'cyclic_fallback_path', + strategyId, + detail: `Cyclic fallback path detected: ${cycle}`, + }); + return; + } + if (visited.has(strategyId)) { + return; + } + + visited.add(strategyId); + inStack.add(strategyId); + path.push(strategyId); + const strategy = byId.get(strategyId); + if (strategy) { + for (const fallback of strategy.fallback) { + if (byId.has(fallback.strategyId)) { + walk(fallback.strategyId); + } + } + } + path.pop(); + inStack.delete(strategyId); + } + + for (const strategy of config.strategies) { + walk(strategy.id); + } + + return issues.length > 0 ? { issues } : { config, issues: [] }; +} + +export function resolveEmbeddingStrategy( + config: EmbeddingStrategyConfig, + strategyId: string +): EmbeddingStrategy { + const strategy = config.strategies.find((entry) => entry.id === strategyId); + if (!strategy) { + throw new EmbeddingStrategyResolutionError(strategyId, 'unknown_strategy'); + } + return strategy as EmbeddingStrategy; +} diff --git a/src/context/indexer/full-index.ts b/src/context/indexer/full-index.ts index 1e1c76e..9bd2390 100644 --- a/src/context/indexer/full-index.ts +++ b/src/context/indexer/full-index.ts @@ -2,7 +2,14 @@ import { createHash, randomUUID } from 'node:crypto'; import { readFile } from 'node:fs/promises'; import fg from 'fast-glob'; import type { DubsbotDb } from '../../db/client'; +import { createProviderAdapter } from '../../providers'; import type { ProviderAdapter } from '../../providers/types'; +import { isEmbeddingStrategyV2Enabled, loadEmbeddingStrategyConfig } from '../embedding/config'; +import { + assertEmbeddingSuccess, + type EmbeddingProvenance, + executeEmbeddingWithStrategy, +} from '../embedding/engine'; import { deterministicEmbedding } from '../retrieval/rerank'; type Chunk = { @@ -57,6 +64,7 @@ export async function runFullIndex(input: { repoRoot: string; embedProvider?: ProviderAdapter; embeddingModel?: string; + embeddingStrategyId?: string; }): Promise<{ filesIndexed: number; chunksIndexed: number }> { const paths = await fg(['**/*', '!node_modules/**', '!.git/**', '!dist/**', '!coverage/**'], { cwd: input.repoRoot, @@ -68,6 +76,20 @@ export async function runFullIndex(input: { let filesIndexed = 0; let chunksIndexed = 0; + const isStrategyV2 = isEmbeddingStrategyV2Enabled(); + const strategyConfig = isStrategyV2 ? loadEmbeddingStrategyConfig() : null; + const adapterCache = new Map(); + + function getAdapter(provider: string): ProviderAdapter { + const cached = adapterCache.get(provider); + if (cached) { + return cached; + } + const adapter = createProviderAdapter(provider as 'openai' | 'anthropic' | 'google'); + adapterCache.set(provider, adapter); + return adapter; + } + for (const relativePath of paths) { const absolutePath = `${input.repoRoot}/${relativePath}`; const content = await readFile(absolutePath, 'utf8').catch(() => null); @@ -104,26 +126,58 @@ export async function runFullIndex(input: { [chunkId, persistedFileId, chunk.index, chunk.content, chunk.startLine, chunk.endLine] ); - const embedding = - input.embedProvider != null - ? ( - await input.embedProvider.embed({ - model: input.embeddingModel ?? 'text-embedding-3-small', - values: [chunk.content], - }) - )[0] - : deterministicEmbedding(chunk.content); + let embedding: number[]; + let provider = input.embedProvider ? 'remote' : 'local'; + let model = input.embeddingModel ?? 'deterministic-v1'; + let provenance: EmbeddingProvenance = { + strategyId: 'legacy-default', + attemptPath: [ + { + strategyId: 'legacy-default', + provider, + model, + status: 'success', + }, + ], + fallbackUsed: false, + resolvedBy: { + strategyId: 'legacy-default', + provider, + model, + }, + }; + + if (isStrategyV2 && strategyConfig) { + const strategyId = input.embeddingStrategyId ?? strategyConfig.defaults.indexing; + const result = await executeEmbeddingWithStrategy({ + config: strategyConfig, + strategyId, + value: chunk.content, + adapterForProvider: getAdapter, + }); + const success = assertEmbeddingSuccess(result); + embedding = success.embedding; + provider = success.provider; + model = success.model; + provenance = success.provenance; + emitEmbeddingTelemetry(success.provenance); + } else { + embedding = + input.embedProvider != null + ? ( + await input.embedProvider.embed({ + model: input.embeddingModel ?? 'text-embedding-3-small', + values: [chunk.content], + }) + )[0] + : deterministicEmbedding(chunk.content); + } await input.db.query( - `INSERT INTO chunk_embeddings (chunk_id, provider, model, embedding) - VALUES ($1, $2, $3, $4::jsonb) - ON CONFLICT (chunk_id) DO UPDATE SET provider = EXCLUDED.provider, model = EXCLUDED.model, embedding = EXCLUDED.embedding`, - [ - chunkId, - input.embedProvider ? 'remote' : 'local', - input.embeddingModel ?? 'deterministic-v1', - JSON.stringify(embedding), - ] + `INSERT INTO chunk_embeddings (chunk_id, provider, model, embedding, provenance) + VALUES ($1, $2, $3, $4::jsonb, $5::jsonb) + ON CONFLICT (chunk_id) DO UPDATE SET provider = EXCLUDED.provider, model = EXCLUDED.model, embedding = EXCLUDED.embedding, provenance = EXCLUDED.provenance`, + [chunkId, provider, model, JSON.stringify(embedding), JSON.stringify(provenance)] ); await input.db.query('INSERT INTO bm25_documents (id, chunk_id, body) VALUES ($1, $2, $3)', [ @@ -136,3 +190,17 @@ export async function runFullIndex(input: { return { filesIndexed, chunksIndexed }; } + +function emitEmbeddingTelemetry(provenance: EmbeddingProvenance): void { + if (process.env.DUBSBOT_EMBEDDING_PROVENANCE_LOG !== '1') { + return; + } + const resolved = provenance.resolvedBy + ? `${provenance.resolvedBy.provider}:${provenance.resolvedBy.model}` + : 'none'; + console.info( + `[embedding] strategy=${provenance.strategyId} resolved=${resolved} fallback=${provenance.fallbackUsed} attempts=${provenance.attemptPath + .map((attempt) => `${attempt.provider}:${attempt.model}:${attempt.status}`) + .join('>')}` + ); +} diff --git a/src/context/retrieval/hybrid.ts b/src/context/retrieval/hybrid.ts index f052d21..f1a2efc 100644 --- a/src/context/retrieval/hybrid.ts +++ b/src/context/retrieval/hybrid.ts @@ -9,6 +9,9 @@ type ChunkRow = { content: string; path: string; embedding: string | null; + provider: string | null; + model: string | null; + provenance: string | null; }; async function grepSearch( @@ -61,7 +64,7 @@ export async function runHybridRetrieval(input: { const queryVector = deterministicEmbedding(query.vectorQuery || query.lexicalQuery); const rows = await input.db.query( - `SELECT c.id, c.content, f.path, ce.embedding::text as embedding + `SELECT c.id, c.content, f.path, ce.embedding::text as embedding, ce.provider, ce.model, ce.provenance::text as provenance FROM chunks c JOIN files f ON f.id = c.file_id LEFT JOIN chunk_embeddings ce ON ce.chunk_id = c.id @@ -104,6 +107,9 @@ export async function runHybridRetrieval(input: { score: entry.totalScore, metadata: { path: entry.item.path, + provider: entry.item.provider ?? 'unknown', + model: entry.item.model ?? 'unknown', + embeddingProvenance: safeJsonParse(entry.item.provenance), lexicalScore: entry.lexicalScore, vectorScore: entry.vectorScore, rank: index + 1, @@ -139,3 +145,14 @@ export async function runHybridRetrieval(input: { return bundle; } + +function safeJsonParse(value: string | null): unknown { + if (!value) { + return null; + } + try { + return JSON.parse(value); + } catch { + return null; + } +} diff --git a/src/daemon/main.ts b/src/daemon/main.ts index 01c535b..4edf875 100644 --- a/src/daemon/main.ts +++ b/src/daemon/main.ts @@ -3,6 +3,10 @@ import { EventHookRunner } from '../automation/event-hooks'; import { AutomationRunner } from '../automation/runner'; import { AutomationScheduler } from '../automation/scheduler'; import { loadAgentsConfig } from '../config/agents-loader'; +import { + isEmbeddingStrategyV2Enabled, + loadEmbeddingStrategyConfig, +} from '../context/embedding/config'; import { RepoFsWatcher } from '../context/fs-watcher'; import { GitWatcher } from '../context/git-watcher'; import { runIncrementalIndex } from '../context/indexer/incremental'; @@ -15,6 +19,9 @@ import { createProviderAdapter, detectProvider } from '../providers'; async function main(): Promise { await runMigrations(); const db = await createDb(); + if (isEmbeddingStrategyV2Enabled()) { + loadEmbeddingStrategyConfig(); + } const provider = createProviderAdapter(detectProvider()); const policy = new DefaultPolicyEngine(createDefaultApprovalPolicy()); const orchestrator = new AgentOrchestrator({ provider, policyEngine: policy }); diff --git a/src/db/migrate.ts b/src/db/migrate.ts index 397be47..3260914 100644 --- a/src/db/migrate.ts +++ b/src/db/migrate.ts @@ -1,5 +1,6 @@ import { readFile } from 'node:fs/promises'; import { join } from 'node:path'; +import fg from 'fast-glob'; import { createDb } from './client'; export async function runMigrations(): Promise { @@ -8,17 +9,39 @@ export async function runMigrations(): Promise { 'CREATE TABLE IF NOT EXISTS schema_migrations (version TEXT PRIMARY KEY, applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW());' ); - const migrationPath = join(process.cwd(), 'src', 'db', 'migrations', '0001_init.sql'); - const migrationSql = await readFile(migrationPath, 'utf8'); + const migrationFiles = await fg(['*.sql'], { + cwd: join(process.cwd(), 'src', 'db', 'migrations'), + onlyFiles: true, + absolute: false, + }); + migrationFiles.sort(); - const already = await db.query<{ exists: boolean }>( - "SELECT EXISTS (SELECT 1 FROM schema_migrations WHERE version = '0001_init') AS exists" + const existing = await db.query<{ version: string }>( + 'SELECT version FROM schema_migrations ORDER BY version ASC' ); + const applied = new Set(existing.rows.map((row) => row.version)); - if (already.rows[0]?.exists) { - return; - } + for (const file of migrationFiles) { + const version = file.replace(/\.sql$/, ''); + if (applied.has(version)) { + continue; + } + + const migrationPath = join(process.cwd(), 'src', 'db', 'migrations', file); + const migrationSql = await readFile(migrationPath, 'utf8'); - await db.exec(migrationSql); - await db.query("INSERT INTO schema_migrations (version) VALUES ('0001_init')"); + try { + await db.exec('BEGIN'); + await db.exec(migrationSql); + await db.query('INSERT INTO schema_migrations (version) VALUES ($1)', [version]); + await db.exec('COMMIT'); + } catch (error) { + try { + await db.exec('ROLLBACK'); + } catch { + // Ignore rollback failures to avoid masking original migration errors. + } + throw error; + } + } } diff --git a/src/db/migrations/0002_embedding_provenance.sql b/src/db/migrations/0002_embedding_provenance.sql new file mode 100644 index 0000000..a49cddd --- /dev/null +++ b/src/db/migrations/0002_embedding_provenance.sql @@ -0,0 +1,3 @@ +ALTER TABLE chunk_embeddings +ADD COLUMN IF NOT EXISTS provenance JSONB NOT NULL DEFAULT '{}'::jsonb; + diff --git a/tests/embedding-strategy.test.ts b/tests/embedding-strategy.test.ts new file mode 100644 index 0000000..852a3d2 --- /dev/null +++ b/tests/embedding-strategy.test.ts @@ -0,0 +1,259 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { loadEmbeddingStrategyConfig } from '../src/context/embedding/config'; +import { executeEmbeddingWithStrategy } from '../src/context/embedding/engine'; +import { + EmbeddingStrategyConfigError, + EmbeddingStrategyResolutionError, + parseEmbeddingStrategyConfig, + resolveEmbeddingStrategy, +} from '../src/context/embedding/strategy'; +import type { ProviderAdapter } from '../src/providers/types'; + +class FakeProvider implements ProviderAdapter { + constructor( + private readonly behavior: 'ok' | 'rate_limit' | 'auth' | 'timeout' | 'service_unavailable' + ) {} + + async generateStructured(): Promise { + throw new Error('not used'); + } + + async *streamStructured(): AsyncIterable {} + + async embed(): Promise { + if (this.behavior === 'ok') { + return [[0.1, 0.2, 0.3]]; + } + if (this.behavior === 'rate_limit') { + throw new Error('429 rate limit'); + } + if (this.behavior === 'auth') { + throw new Error('401 auth'); + } + if (this.behavior === 'timeout') { + throw new Error('timeout'); + } + throw new Error('503 unavailable'); + } + + async countTokens(): Promise { + return 1; + } + + supports(): boolean { + return true; + } +} + +const baseConfig = { + version: '1.0', + defaults: { indexing: 'indexing', query: 'query' }, + strategies: [ + { + id: 'indexing', + provider: 'anthropic', + model: 'claude-embed', + fallback: [{ strategyId: 'fallback-openai', onFailure: ['rate_limit', 'timeout'] }], + }, + { + id: 'query', + provider: 'anthropic', + model: 'claude-embed', + fallback: [{ strategyId: 'fallback-openai', onFailure: ['rate_limit'] }], + }, + { + id: 'fallback-openai', + provider: 'openai', + model: 'text-embedding-3-small', + fallback: [], + }, + ], +} as const; + +function requireConfig(raw: unknown) { + const parsed = parseEmbeddingStrategyConfig(raw); + if (!parsed.config) { + throw new Error( + `Expected valid config, received issues: ${parsed.issues.map((i) => i.detail).join('; ')}` + ); + } + return parsed.config; +} + +describe('embedding strategy configuration', () => { + beforeEach(() => { + delete process.env.DUBSBOT_EMBEDDING_STRATEGY_CONFIG_JSON; + }); + + it('loads valid strategy config and resolves known strategy ids', () => { + const config = requireConfig(baseConfig); + expect(resolveEmbeddingStrategy(config, 'indexing').provider).toBe('anthropic'); + }); + + it('rejects invalid config entries (unknown provider, missing fallback strategy, cycles)', () => { + const parsed = parseEmbeddingStrategyConfig({ + version: '1.0', + defaults: { indexing: 'a', query: 'b' }, + strategies: [ + { + id: 'a', + provider: 'openai', + model: 'x', + fallback: [{ strategyId: 'b', onFailure: ['rate_limit'] }], + }, + { + id: 'b', + provider: 'google', + model: 'x', + fallback: [{ strategyId: 'a', onFailure: ['rate_limit'] }], + }, + { + id: 'bad', + provider: 'openai', + model: 'x', + fallback: [{ strategyId: 'missing', onFailure: ['rate_limit'] }], + }, + ], + }); + + expect(parsed.config).toBeUndefined(); + expect(parsed.issues.some((issue) => issue.code === 'cyclic_fallback_path')).toBe(true); + expect(parsed.issues.some((issue) => issue.code === 'unknown_fallback_strategy')).toBe(true); + }); + + it('throws structured config error at startup when env config is invalid', () => { + process.env.DUBSBOT_EMBEDDING_STRATEGY_CONFIG_JSON = JSON.stringify({ + version: '1.0', + defaults: { indexing: 'missing', query: 'missing' }, + strategies: [{ id: 'only', provider: 'openai', model: 'x', fallback: [] }], + }); + + expect(() => loadEmbeddingStrategyConfig()).toThrow(EmbeddingStrategyConfigError); + }); + + it('throws structured runtime error when strategy id is unknown', () => { + const config = requireConfig(baseConfig); + expect(() => resolveEmbeddingStrategy(config, 'not-found')).toThrow( + EmbeddingStrategyResolutionError + ); + }); +}); + +describe('anthropic native-first fallback policy', () => { + it('returns anthropic result directly on success', async () => { + const config = requireConfig(baseConfig); + const result = await executeEmbeddingWithStrategy({ + config, + strategyId: 'indexing', + value: 'hello', + adapterForProvider: (provider) => + provider === 'anthropic' ? new FakeProvider('ok') : new FakeProvider('ok'), + }); + + expect(result.ok).toBe(true); + if (result.ok) { + expect(result.provenance.fallbackUsed).toBe(false); + expect(result.provenance.attemptPath).toHaveLength(1); + expect(result.provenance.resolvedBy?.provider).toBe('anthropic'); + } + }); + + it('does not fallback on non-fallbackable anthropic failure', async () => { + const config = requireConfig(baseConfig); + const adapterSpy = vi.fn((provider: string) => + provider === 'anthropic' ? new FakeProvider('auth') : new FakeProvider('ok') + ); + + const result = await executeEmbeddingWithStrategy({ + config, + strategyId: 'indexing', + value: 'hello', + adapterForProvider: adapterSpy, + }); + + expect(result.ok).toBe(false); + expect(adapterSpy).toHaveBeenCalledTimes(1); + if (!result.ok) { + expect(result.provenance.failureCategory).toBe('auth'); + expect(result.provenance.terminalReason).toBe('fallback_disallowed'); + } + }); + + it('falls back in configured order for fallbackable errors', async () => { + const config = requireConfig(baseConfig); + const result = await executeEmbeddingWithStrategy({ + config, + strategyId: 'indexing', + value: 'hello', + adapterForProvider: (provider) => + provider === 'anthropic' ? new FakeProvider('rate_limit') : new FakeProvider('ok'), + }); + + expect(result.ok).toBe(true); + if (result.ok) { + expect(result.provenance.fallbackUsed).toBe(true); + expect(result.provenance.attemptPath.map((entry) => entry.provider)).toEqual([ + 'anthropic', + 'openai', + ]); + } + }); + + it('returns terminal failure when no fallback is configured', async () => { + const config = requireConfig({ + version: '1.0', + defaults: { indexing: 'solo', query: 'solo' }, + strategies: [{ id: 'solo', provider: 'anthropic', model: 'claude-embed', fallback: [] }], + }); + const result = await executeEmbeddingWithStrategy({ + config, + strategyId: 'solo', + value: 'hello', + adapterForProvider: () => new FakeProvider('rate_limit'), + }); + + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.provenance.terminalReason).toBe('no_fallback'); + expect(result.provenance.attemptPath).toHaveLength(1); + } + }); +}); + +describe('embedding provenance completeness', () => { + it('includes complete provenance fields on success', async () => { + const config = requireConfig(baseConfig); + const result = await executeEmbeddingWithStrategy({ + config, + strategyId: 'indexing', + value: 'hello', + adapterForProvider: (provider) => + provider === 'anthropic' ? new FakeProvider('rate_limit') : new FakeProvider('ok'), + }); + + expect(result.ok).toBe(true); + if (result.ok) { + expect(result.provenance.strategyId).toBe('indexing'); + expect(result.provenance.resolvedBy).toBeDefined(); + expect(result.provenance.attemptPath.length).toBeGreaterThan(0); + } + }); + + it('includes complete provenance fields on terminal failure', async () => { + const config = requireConfig(baseConfig); + const result = await executeEmbeddingWithStrategy({ + config, + strategyId: 'indexing', + value: 'hello', + adapterForProvider: () => new FakeProvider('timeout'), + }); + + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.provenance.strategyId).toBe('indexing'); + expect(result.provenance.attemptPath.length).toBeGreaterThan(0); + expect(result.provenance.failureCategory).toBeDefined(); + expect(result.provenance.terminalReason).toBeDefined(); + } + }); +});