diff --git a/.lore.md b/.lore.md index 6ace5d3..0157f6a 100644 --- a/.lore.md +++ b/.lore.md @@ -19,7 +19,7 @@ * **Batch API integration: gateway enhancement, not mandatory architecture shift**: Implementing Anthropic Message Batches API as a gateway-only feature (50% cost savings on distillation/curation workers) does not require mandating gateway for all deployments. Direct plugin path continues working normally; batching is an optional gateway optimization that transparently accumulates non-urgent distill/distill-curation calls, flushes every N seconds, polls results in background. Keeps gateway experimental status while capturing savings on high-volume workers (\`distillSegment\`, \`metaDistill\`, \`consolidate\`, worker validation). Estimate: ~$1,100/month savings on Lore workers alone. -* **Batch distillation consumption to reduce cache-bust frequency**: Batch distillation consumption at turn boundaries: Refresh \`loadDistillations()\` only at turn boundaries (new user message) or after idle gap > cache TTL (~5min). During autonomous tool chains (consecutive assistant→tool→assistant), freeze prefix—no DB hits. Context: prefix refresh costs \`context\_size × $3.75/MTok\` (~$1.88 per bust for 500K Sonnet). New distillations have marginal value mid-chain—model already has raw messages. Turn-boundary refresh reduces 189 arrivals → 8 refresh points in typical session, cutting bust cache writes from $639 → ~$15 (97% reduction). Combine with batching background distill workers: accumulate \`backgroundDistill()\` calls, flush at turn boundaries instead of firing on every \`message.updated\` event. +* **Batch distillation consumption to reduce cache-bust frequency**: Batch distillation consumption to reduce cache-bust frequency: Refresh \`loadDistillations()\` only at turn boundaries (new user message) or after idle gap > cache TTL (~5min default). During autonomous tool chains (consecutive assistant→tool→assistant), freeze prefix—no DB hits. Meta-distillation rewrites row IDs; when cache is warm (lastTurnAt within TTL), skip meta to avoid prefix invalidation. Use \`getLastTurnAt(sessionID)\` to check cache warmth before distilling. This reduces 189 arrivals → 8 refresh points, cutting cache-write cost from $639 → ~$15 (97% reduction). ### Gotcha @@ -32,6 +32,9 @@ * **Lore transform non-determinism breaks prompt cache between API calls**: Lore transform non-determinism breaks prompt cache. Root causes: (1) \`sanitizeToolParts()\` uses \`Date.now()\` on every call → different timestamps for same pending parts → different message bytes → cache bust. Fix: use deterministic timestamp (part.state.time.start or 0). (2) \`distilledPrefixCached()\` calls \`addRelativeTimeToObservations(newRows, new Date())\` per gen-0 row → relational time changes → cache bust. Fix: batch consumption at turn boundaries \[\[019dfa53-b925-70e2-8f84-cab808d8e115]]. Prevent regressions via unit tests covering transform determinism + runtime bust-rate tracking. + +* **Meta-distillation row ID rewrites invalidate distilled prefix cache**: Meta-distillation changes gen-0 row IDs when consolidating rows (e.g., 10→1 gen-1 row), invalidating the distilled prefix cache on the next transform. If the upstream prompt cache is still warm (within 5min default), this wastes a cache-write burst. Fix: pass \`skipMeta: true\` to \`distillation.run()\` when \`Date.now() - getLastTurnAt(sessionID) < cacheTTLMs\`. Idle workers use this check; manual distillation should too. + * **Runtime cost monitoring is log-only, no session budget enforcement**: Lore has cache-bust detection (prefix hash comparison) and overflow recovery, but NO session cost accumulator, alerts, or abort mechanisms. Cost is only tracked post-hoc in eval harnesses. Cache busts log individually via \`log.info()\` but are never counted or rated. No config option for session spend limits or cost thresholds. Plugin can't abort—only host (OpenCode) can halt. Must implement runtime cost tracking with stderr alerts when session spend exceeds threshold, paired with unit tests for transform determinism to prevent regressions. @@ -44,7 +47,7 @@ ### Pattern -* **Cache bust detection via prefix ID hash but no rate tracking**: Gradient tracks byte-identity of message prefix between turns using \`lastPrefixHash\` (first 5 message IDs concatenated with layer). When prefix changes, logs cache-bust event via \`log.info()\` at lines 1682-1696. Also tracks \`consecutiveHighLayer\` counter for compaction hints (logs at count=3, fires once). But no rolling bust-rate counter, no cumulative bust count per session, no alerting threshold. Need to add per-session \`bustCount\` and \`bustRate\` metrics that fire stderr alert when rate > 50% after 20+ API calls. +* **Cache bust detection via prefix ID hash but no rate tracking**: Cache bust detection with rate tracking: Gradient now tracks byte-identity of message prefix using \`lastPrefixHash\` (first 5 message IDs + layer) and logs cache-bust events with rate percentage (bustCount/transformCount). Runtime metrics added: \`bustCount\` (cumulative busts), \`transformCount\` (total transform calls). Alerts on stderr when rate > 50% after 20+ API calls. Helps identify regressions in transform determinism. Busts remain expensive: ~$3.75/MTok × context\_size per bust. * **Distillation row arrivals trigger cache busts via prefix budget shifts**: Each new gen-0 distillation row (~189 total across session) changes the distilled prefix text length → shrinks raw window budget → \`tryFitStable()\` recalculates raw window cutoff → messages evicted/included from front → entire output array bytes change. Even with \`tryFitStable()\` pinning logic, prefix token growth forces re-evaluation. Result: alternating bust/warm pattern (bust when row arrives, warm on subsequent call with same row count). Meta-distillations compound this: 17 full re-renders with \`new Date()\` cause relational time annotations to potentially differ, plus row count collapse (e.g., 10 gen-0 → 1 gen-1 row) shrinks prefix drastically. @@ -59,10 +62,10 @@ * **Gateway package: new fourth runtime adapter for proxy-based context management**: Gateway package: runtime-agnostic HTTP proxy accepting Anthropic \`/v1/messages\`, applying full Lore pipeline (gradient, LTM, distillation), forwarding upstream. Implements \`LLMClient\` in \`llm-adapter.ts\`. Supports optional interceptor for recording/replay. Plugin spawns gateway if not running (probes \`http://127.0.0.1:6969/health\`, waits 5s), then registers observer hooks in gateway mode to audit gateway decisions without mutating output — logs session ID verification, LTM entries selected, gradient layer/tokens chosen. Observer reads \`temporal\_messages\`, \`knowledge\` tables; runs local \`transform()\` and \`forSession()\` for comparison. -* **Gradient layer transitions trigger cascade of cache busts in Lore**: Late-stage sessions show phase transition at ~step 668: bust rate jumps from 12% → 51%. Correlates with context window growth crossing layer-0 cap, escalating to layer-1+ (higher cost, different message restructuring). Each layer transition may alter how gradient injects context, changing message array bytes and invalidating prompt cache. Effect compounds: higher layer cost + more busts = quadratic explosion. Monitor gradient layer choice at step transitions; may need per-layer cache validation or deterministic layer boundary crossing. +* **Gradient layer transitions trigger cascade of cache busts in Lore**: Gradient layer transitions trigger cascade of cache busts: Late-stage sessions show phase transition at ~step 668: bust rate jumps 12% → 51%. Sticky layer guard now pins to \`lastLayer\` (not layer 0) when message count stable—prevents oscillation (0→1→0 or 1→2→1) that rewrites context bytes. Example: layer 2 strips tool outputs (different bytes), bouncing to layer 1 restores them → two busts. Guard only applies to calibrated sessions to isolate impact. -* **Idle-resume cache refresh: clear caches when wall-clock gap exceeds prompt cache TTL**: Clear caches when wall-clock gap exceeds prompt cache TTL. If \`now - lastTurnAt > 60min\`, call \`onIdleResume(sessionID)\` in pre-LLM hook to clear \`prefixCache\`, \`rawWindowCache\`, delete \`ltmSessionCache\`, set \`cameOutOfIdle=true\`. +* **Idle-resume cache refresh: clear caches when wall-clock gap exceeds prompt cache TTL**: Idle-resume cache refresh: clear caches when wall-clock gap exceeds prompt cache TTL. If \`now - lastTurnAt > cacheTTLMs\` (default 5min for Anthropic default-tier, configurable via \`idleResumeMinutes\`), call \`onIdleResume(sessionID)\` in pre-LLM hook to clear \`prefixCache\`, \`rawWindowCache\`, delete \`ltmSessionCache\`, set \`cameOutOfIdle=true\`. Anthropic's default-tier prompt cache TTL is ~5 minutes (not 1 hour); beyond that window, byte-identity preservation wastes cache-write cost with no benefit. * **Long-running autonomous sessions hit quadratic cache cost — session length budget needed**: Long-running sessions hit quadratic cache cost via non-deterministic transform. Session with 1,345 API calls: 314 calls (23%) read only 40,913 tokens (system prompt), rewriting 400–690K tokens each (busts). Two root causes: (1) Distillation row arrivals (~189 total) change \`distilledPrefix()\` length → shrink raw window budget → entire message array bytes change. (2) \`sanitizeToolParts()\` line 833 uses \`Date.now()\` to convert pending tool parts to error, producing different timestamps on every \`transform()\` call even with same input. OpenCode's cache fix (e148f00aa) preserves old pending parts in cached array—but Lore re-timestamps them. Fix distillation consumption at turn boundaries \[\[019dfa53-b925-70e2-8f84-cab808d8e115]] and use deterministic timestamp (0 or message.time.created) instead of \`Date.now()\` in sanitizeToolParts. diff --git a/packages/core/src/distillation.ts b/packages/core/src/distillation.ts index 5c27b77..ba9f59b 100644 --- a/packages/core/src/distillation.ts +++ b/packages/core/src/distillation.ts @@ -532,6 +532,11 @@ export async function run(input: { * and causes a cache bust on the next turn. Callers should set this to true * when `Date.now() - getLastTurnAt(sessionID) < cacheTTL`. */ skipMeta?: boolean; + /** When true, all LLM calls in this run are marked urgent and bypass the + * batch queue (if one is active). Use for compaction and overflow recovery + * where the caller is blocking on the result. Background/idle distillation + * should leave this false to benefit from batch API 50% cost savings. */ + urgent?: boolean; }): Promise<{ rounds: number; distilled: number }> { // Reset orphaned messages (marked distilled by a deleted/migrated distillation) const orphans = resetOrphans(input.projectPath, input.sessionID); @@ -565,6 +570,7 @@ export async function run(input: { sessionID: input.sessionID, messages: segment, model: input.model, + urgent: input.urgent, }); if (result) { distilled += segment.length; @@ -586,6 +592,7 @@ export async function run(input: { projectPath: input.projectPath, sessionID: input.sessionID, model: input.model, + urgent: input.urgent, }); rounds++; } @@ -603,6 +610,7 @@ async function distillSegment(input: { sessionID: string; messages: TemporalMessage[]; model?: { providerID: string; modelID: string }; + urgent?: boolean; }): Promise { const prior = latestObservations(input.projectPath, input.sessionID); const text = messagesToText(input.messages); @@ -625,7 +633,7 @@ async function distillSegment(input: { const responseText = await input.llm.prompt( DISTILLATION_SYSTEM, userContent, - { model, workerID: "lore-distill", thinking: false }, + { model, workerID: "lore-distill", thinking: false, urgent: input.urgent }, ); if (!responseText) return null; @@ -676,6 +684,7 @@ export async function metaDistill(input: { projectPath: string; sessionID: string; model?: { providerID: string; modelID: string }; + urgent?: boolean; }): Promise { const existing = loadGen0(input.projectPath, input.sessionID); @@ -703,7 +712,7 @@ export async function metaDistill(input: { const responseText = await input.llm.prompt( RECURSIVE_SYSTEM, userContent, - { model, workerID: "lore-distill", thinking: false }, + { model, workerID: "lore-distill", thinking: false, urgent: input.urgent }, ); if (!responseText) return null; diff --git a/packages/core/src/gradient.ts b/packages/core/src/gradient.ts index a06e0b5..852179e 100644 --- a/packages/core/src/gradient.ts +++ b/packages/core/src/gradient.ts @@ -119,6 +119,15 @@ type SessionState = { * the post-idle turn regardless of conversation size. */ cameOutOfIdle: boolean; + /** + * Set true by onIdleResume() alongside cameOutOfIdle; consumed (and cleared) + * by transformInner() to activate the post-idle compact layer. When true AND + * distillations exist, transform skips layer 0 (full-raw passthrough) and + * uses a tighter raw budget for layer 1. Rationale: on a cold cache the + * entire context is a cache WRITE — a smaller total means lower write cost, + * and aggressive idle distillation already captured the older history. + */ + postIdleCompact: boolean; /** Consecutive turns at layer >= 2. When >= 3, log a compaction hint. */ consecutiveHighLayer: number; /** Hash of the first message IDs in the last transform output — for cache-bust diagnostics. */ @@ -156,6 +165,7 @@ function makeSessionState(): SessionState { rawWindowCache: null, lastTurnAt: 0, cameOutOfIdle: false, + postIdleCompact: false, consecutiveHighLayer: 0, lastPrefixHash: "", bustCount: 0, @@ -225,6 +235,7 @@ export function onIdleResume( state.rawWindowCache = null; state.distillationSnapshot = null; state.cameOutOfIdle = true; + state.postIdleCompact = true; return { triggered: true, idleMs }; } @@ -416,6 +427,7 @@ export function inspectSessionState(sessionID: string): { hasPrefixCache: boolean; hasRawWindowCache: boolean; cameOutOfIdle: boolean; + postIdleCompact: boolean; lastTurnAt: number; distillationSnapshot: DistillationSnapshot | null; } | null { @@ -425,6 +437,7 @@ export function inspectSessionState(sessionID: string): { hasPrefixCache: state.prefixCache !== null, hasRawWindowCache: state.rawWindowCache !== null, cameOutOfIdle: state.cameOutOfIdle, + postIdleCompact: state.postIdleCompact, lastTurnAt: state.lastTurnAt, distillationSnapshot: state.distillationSnapshot, }; @@ -1254,7 +1267,8 @@ function transformInner(input: { contextLimit - outputReserved - overhead - ltmTokens, ); const distilledBudget = Math.floor(usable * cfg.budget.distilled); - const rawBudget = Math.floor(usable * cfg.budget.raw); + // Base raw budget. May be overridden below for post-idle compact mode. + let rawBudget = Math.floor(usable * cfg.budget.raw); // --- Force escalation (reactive error recovery) --- // When the API previously rejected with "prompt is too long", skip layers @@ -1308,6 +1322,30 @@ function transformInner(input: { effectiveMinLayer = Math.max(effectiveMinLayer, sessState.lastLayer) as SafetyLayer; } + // --- Post-idle compact layer --- + // When the cache just went cold (onIdleResume fired), skip layer 0 full-raw + // passthrough and use a tighter raw budget. Rationale: the entire context is + // a cache WRITE regardless — a smaller total costs less to write, and + // aggressive idle distillation already captured older history in the prefix. + // The flag is one-shot: consumed here and reset so subsequent turns use + // normal budgets once the cache is warm. + const postIdleCompact = sessState.postIdleCompact; + if (postIdleCompact) { + sessState.postIdleCompact = false; + // Skip layer 0 — don't pass through all raw messages on a cold cache. + effectiveMinLayer = Math.max(effectiveMinLayer, 1) as SafetyLayer; + // Use a tighter raw budget: 20% of usable instead of the normal 40%. + // The distilled prefix covers the older history; the raw window only + // needs the current turn + minimal recent context. This reduces the + // total cold-cache write cost by up to 20% of usable (~29K tokens on + // a 200K context model). + rawBudget = Math.floor(usable * 0.20); + log.info( + `post-idle compact: session=${sid} rawBudget=${rawBudget}` + + ` (${Math.floor(usable * cfg.budget.raw)}→${rawBudget})`, + ); + } + let expectedInput: number; if (calibrated) { // Exact approach: prior API count + estimate of only genuinely new messages. diff --git a/packages/core/src/search.ts b/packages/core/src/search.ts index a3a2e2d..16e49e9 100644 --- a/packages/core/src/search.ts +++ b/packages/core/src/search.ts @@ -299,7 +299,7 @@ export async function expandQuery( llm.prompt( QUERY_EXPANSION_SYSTEM, `Input: "${query}"`, - { model, workerID: "lore-query-expand", thinking: false }, + { model, workerID: "lore-query-expand", thinking: false, urgent: true }, ), new Promise((resolve) => setTimeout(() => resolve(null), TIMEOUT_MS)), ]); diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index 586c687..1d9ee76 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -217,6 +217,21 @@ export interface LLMClient { * relies on Part A (non-reasoning model selection) instead */ thinking?: boolean; + /** + * When true, the request must be processed immediately and the result + * returned before the next user turn. When false or absent, the request + * may be deferred to a batch queue for cost savings (50% discount via + * Anthropic's Message Batches API). + * + * Callers that `await` the result for a blocking operation (compaction, + * overflow recovery, query expansion) should set `urgent: true`. + * Fire-and-forget background work (incremental distillation, idle + * curation) should leave it unset or set `false`. + * + * Only the gateway's BatchLLMClient honors this flag; other adapters + * (OpenCode, Pi) ignore it and always process immediately. + */ + urgent?: boolean; }, ): Promise; } diff --git a/packages/gateway/src/batch-queue.ts b/packages/gateway/src/batch-queue.ts new file mode 100644 index 0000000..2a2643d --- /dev/null +++ b/packages/gateway/src/batch-queue.ts @@ -0,0 +1,523 @@ +/** + * Batch queue for Anthropic Message Batches API. + * + * Wraps a synchronous LLMClient and intercepts non-urgent `prompt()` calls, + * accumulating them in a queue. A flush timer periodically sends the queue + * to Anthropic's `/v1/messages/batches` endpoint for 50% cost savings. + * A poll timer checks for results and resolves the pending promises. + * + * Urgent calls (compaction, overflow recovery, query expansion) bypass + * the queue entirely and delegate to the inner synchronous client. + * + * This is a gateway-only enhancement — the OpenCode and Pi adapters + * always process immediately regardless of the `urgent` flag. + */ + +import type { LLMClient } from "@loreai/core"; +import { log } from "@loreai/core"; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** A single pending request waiting to be batched. */ +interface PendingRequest { + /** Unique ID for correlating batch results (alphanumeric + hyphens). */ + customId: string; + /** Standard Messages API params. */ + params: { + model: string; + max_tokens: number; + system: + | string + | Array<{ type: string; text: string; cache_control?: { type: string; ttl?: string } }>; + messages: Array<{ role: string; content: string }>; + }; + /** Resolve the caller's promise with the text response. */ + resolve: (value: string | null) => void; + /** Reject the caller's promise on error. */ + reject: (error: Error) => void; + /** Timestamp when the request was enqueued. */ + enqueuedAt: number; +} + +/** A batch that has been submitted and is being polled for results. */ +interface InflightBatch { + /** Anthropic batch ID returned by the create endpoint. */ + batchId: string; + /** Map from custom_id → pending request (for resolving on completion). */ + requests: Map; + /** Timestamp when the batch was submitted. */ + submittedAt: number; + /** Poll timer handle. */ + pollTimer: ReturnType; +} + +export interface BatchQueueConfig { + /** How often to flush the queue (ms). Default: 30000 (30s). */ + flushIntervalMs?: number; + /** Max items before auto-flush. Default: 50. */ + maxQueueSize?: number; + /** How often to poll for batch results (ms). Default: 60000 (60s). */ + pollIntervalMs?: number; + /** Max age of a batch before giving up and falling back (ms). Default: 3600000 (1h). */ + maxBatchAgeMs?: number; +} + +const DEFAULT_FLUSH_INTERVAL_MS = 30_000; +const DEFAULT_MAX_QUEUE_SIZE = 50; +const DEFAULT_POLL_INTERVAL_MS = 60_000; +const DEFAULT_MAX_BATCH_AGE_MS = 3_600_000; // 1 hour + +// --------------------------------------------------------------------------- +// ID generation +// --------------------------------------------------------------------------- + +let idCounter = 0; + +/** Generate a batch-API-compatible custom_id (alphanumeric + hyphens, 1-64 chars). */ +function generateCustomId(): string { + const ts = Date.now().toString(36); + const seq = (idCounter++).toString(36); + const rand = Math.random().toString(36).slice(2, 8); + return `lore-${ts}-${seq}-${rand}`; +} + +// --------------------------------------------------------------------------- +// BatchLLMClient +// --------------------------------------------------------------------------- + +/** + * Create a batch-aware LLMClient that wraps a synchronous inner client. + * + * - `urgent: true` calls → immediate delegation to `inner.prompt()` + * - `urgent: false/undefined` calls → queued for batch processing + * - On flush timer or queue full → POST /v1/messages/batches + * - On poll timer → GET /v1/messages/batches/{id}, resolve promises + * - On error → fallback to synchronous calls for the failed batch + * + * @param inner The synchronous LLMClient (gateway's direct adapter) + * @param upstreamUrl Base Anthropic API URL (e.g. "https://api.anthropic.com") + * @param getApiKey Callback to get the current API key + * @param defaultModel Default model for requests without explicit model + * @param batchConfig Optional tuning parameters + */ +export function createBatchLLMClient( + inner: LLMClient, + upstreamUrl: string, + getApiKey: () => string | null, + defaultModel: { providerID: string; modelID: string }, + batchConfig?: BatchQueueConfig, +): LLMClient & { shutdown: () => Promise; stats: () => BatchStats } { + const flushIntervalMs = batchConfig?.flushIntervalMs ?? DEFAULT_FLUSH_INTERVAL_MS; + const maxQueueSize = batchConfig?.maxQueueSize ?? DEFAULT_MAX_QUEUE_SIZE; + const pollIntervalMs = batchConfig?.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS; + const maxBatchAgeMs = batchConfig?.maxBatchAgeMs ?? DEFAULT_MAX_BATCH_AGE_MS; + + // State + const queue: PendingRequest[] = []; + const inflight = new Map(); + let flushTimer: ReturnType | null = null; + let shuttingDown = false; + + // Stats + let totalQueued = 0; + let totalBatched = 0; + let totalUrgent = 0; + let totalFallback = 0; + let totalResolved = 0; + let totalFailed = 0; + + // ------------------------------------------------------------------------- + // Flush: send queued items as a batch + // ------------------------------------------------------------------------- + + async function flush(): Promise { + if (queue.length === 0) return; + + const apiKey = getApiKey(); + if (!apiKey) { + // No API key — fall back to synchronous for all queued items + log.warn("batch flush: no API key, falling back to synchronous"); + await fallbackAll(queue.splice(0)); + return; + } + + // Take all items from the queue + const batch = queue.splice(0); + const requests = batch.map((item) => ({ + custom_id: item.customId, + params: item.params, + })); + + log.info(`batch flush: submitting ${batch.length} requests`); + + try { + const url = `${upstreamUrl.replace(/\/$/, "")}/v1/messages/batches`; + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + "anthropic-version": "2023-06-01", + "x-api-key": apiKey, + }, + body: JSON.stringify({ requests }), + }); + + if (!response.ok) { + const text = await response.text().catch(() => "(no body)"); + log.error(`batch create failed: ${response.status} ${response.statusText} — ${text}`); + // Fall back to synchronous for all items + await fallbackAll(batch); + return; + } + + const data = (await response.json()) as { + id: string; + processing_status: string; + }; + + totalBatched += batch.length; + + // Track inflight batch + const requestMap = new Map(); + for (const item of batch) { + requestMap.set(item.customId, item); + } + + const pollTimer = setInterval( + () => pollBatch(data.id).catch((e) => log.error("batch poll error:", e)), + pollIntervalMs, + ); + + inflight.set(data.id, { + batchId: data.id, + requests: requestMap, + submittedAt: Date.now(), + pollTimer, + }); + + log.info(`batch created: ${data.id} with ${batch.length} requests`); + } catch (e) { + log.error("batch create error:", e); + await fallbackAll(batch); + } + } + + // ------------------------------------------------------------------------- + // Poll: check batch status and resolve promises + // ------------------------------------------------------------------------- + + async function pollBatch(batchId: string): Promise { + const batch = inflight.get(batchId); + if (!batch) return; + + const apiKey = getApiKey(); + if (!apiKey) { + log.warn(`batch poll: no API key for ${batchId}`); + return; + } + + // Check max age — give up and fallback if too old + if (Date.now() - batch.submittedAt > maxBatchAgeMs) { + log.warn(`batch ${batchId} exceeded max age — falling back to synchronous`); + clearInterval(batch.pollTimer); + inflight.delete(batchId); + await fallbackAll([...batch.requests.values()]); + return; + } + + try { + const url = `${upstreamUrl.replace(/\/$/, "")}/v1/messages/batches/${batchId}`; + const response = await fetch(url, { + headers: { + "anthropic-version": "2023-06-01", + "x-api-key": apiKey, + }, + }); + + if (!response.ok) { + log.error(`batch poll failed for ${batchId}: ${response.status}`); + return; // Retry on next poll + } + + const data = (await response.json()) as { + processing_status: string; + results_url: string | null; + }; + + if (data.processing_status !== "ended") return; + + // Batch is done — stream results + log.info(`batch ${batchId} ended — retrieving results`); + + if (data.results_url) { + await retrieveResults(batchId, data.results_url); + } else { + // No results URL — try the standard endpoint + await retrieveResults( + batchId, + `${upstreamUrl.replace(/\/$/, "")}/v1/messages/batches/${batchId}/results`, + ); + } + } catch (e) { + log.error(`batch poll error for ${batchId}:`, e); + } + } + + async function retrieveResults(batchId: string, resultsUrl: string): Promise { + const batch = inflight.get(batchId); + if (!batch) return; + + const apiKey = getApiKey(); + if (!apiKey) return; + + try { + const response = await fetch(resultsUrl, { + headers: { + "anthropic-version": "2023-06-01", + "x-api-key": apiKey, + }, + }); + + if (!response.ok) { + log.error(`batch results fetch failed for ${batchId}: ${response.status}`); + return; + } + + const text = await response.text(); + // Results are JSONL — one JSON object per line + const lines = text.split("\n").filter((l) => l.trim()); + + for (const line of lines) { + try { + const result = JSON.parse(line) as { + custom_id: string; + result: { + type: "succeeded" | "errored" | "canceled" | "expired"; + message?: { + content?: Array<{ type: string; text?: string }>; + }; + error?: { type: string; message: string }; + }; + }; + + const pending = batch.requests.get(result.custom_id); + if (!pending) continue; + + switch (result.result.type) { + case "succeeded": { + const textBlock = result.result.message?.content?.find( + (b) => b.type === "text" && typeof b.text === "string", + ); + pending.resolve(textBlock?.text ?? null); + totalResolved++; + break; + } + case "errored": + pending.resolve(null); // Match inner client behavior (null on error) + totalFailed++; + log.error( + `batch item ${result.custom_id} errored: ${result.result.error?.type} — ${result.result.error?.message}`, + ); + break; + case "canceled": + case "expired": + pending.resolve(null); + totalFailed++; + log.warn(`batch item ${result.custom_id} ${result.result.type}`); + break; + } + + batch.requests.delete(result.custom_id); + } catch { + log.error(`failed to parse batch result line: ${line.slice(0, 200)}`); + } + } + + // Resolve any remaining items that weren't in the results (shouldn't happen) + for (const [, pending] of batch.requests) { + pending.resolve(null); + totalFailed++; + } + + // Clean up + clearInterval(batch.pollTimer); + inflight.delete(batchId); + log.info( + `batch ${batchId} fully resolved (${totalResolved} ok, ${totalFailed} failed total)`, + ); + } catch (e) { + log.error(`batch results retrieval error for ${batchId}:`, e); + } + } + + // ------------------------------------------------------------------------- + // Fallback: process items synchronously via inner client + // ------------------------------------------------------------------------- + + async function fallbackAll(items: PendingRequest[]): Promise { + totalFallback += items.length; + log.info(`batch fallback: processing ${items.length} items synchronously`); + + // Process in parallel with concurrency limit of 5 + const CONCURRENCY = 5; + for (let i = 0; i < items.length; i += CONCURRENCY) { + const chunk = items.slice(i, i + CONCURRENCY); + await Promise.all( + chunk.map(async (item) => { + try { + const system = + typeof item.params.system === "string" + ? item.params.system + : item.params.system + .map((b) => b.text) + .join("\n"); + const user = item.params.messages[0]?.content ?? ""; + const result = await inner.prompt(system, user, { urgent: true }); + item.resolve(result); + } catch (e) { + log.error(`batch fallback error for ${item.customId}:`, e); + item.resolve(null); + } + }), + ); + } + } + + // ------------------------------------------------------------------------- + // Start flush timer + // ------------------------------------------------------------------------- + + flushTimer = setInterval(() => { + flush().catch((e) => log.error("batch flush timer error:", e)); + }, flushIntervalMs); + + // ------------------------------------------------------------------------- + // LLMClient implementation + // ------------------------------------------------------------------------- + + return { + async prompt(system, user, opts) { + // Urgent calls bypass the queue entirely + if (opts?.urgent || shuttingDown) { + totalUrgent++; + return inner.prompt(system, user, opts); + } + + totalQueued++; + + const model = opts?.model ?? defaultModel; + + // Build system payload with 1h cache (same as direct adapter) + const systemPayload = system + ? [ + { + type: "text" as const, + text: system, + cache_control: { type: "ephemeral" as const, ttl: "3600" }, + }, + ] + : system; + + const customId = generateCustomId(); + + const promise = new Promise((resolve, reject) => { + queue.push({ + customId, + params: { + model: model.modelID, + max_tokens: 8192, + system: systemPayload ?? system, + messages: [{ role: "user", content: user }], + }, + resolve, + reject, + enqueuedAt: Date.now(), + }); + }); + + // Auto-flush if queue is full + if (queue.length >= maxQueueSize) { + flush().catch((e) => log.error("batch auto-flush error:", e)); + } + + return promise; + }, + + /** + * Gracefully shut down the batch queue: + * 1. Stop the flush timer + * 2. Flush any remaining queued items (as a batch if possible, fallback sync) + * 3. Switch to synchronous mode for future calls + * 4. DON'T wait for inflight batches — they resolve eventually or expire + */ + async shutdown(): Promise { + shuttingDown = true; + if (flushTimer) { + clearInterval(flushTimer); + flushTimer = null; + } + + // Flush remaining items synchronously (batch API might not finish before process exits) + if (queue.length > 0) { + log.info(`batch shutdown: processing ${queue.length} remaining items synchronously`); + await fallbackAll(queue.splice(0)); + } + + // Clean up inflight poll timers (batches will expire naturally) + for (const [batchId, batch] of inflight) { + clearInterval(batch.pollTimer); + // Resolve all pending promises with null (callers handle null gracefully) + for (const [, pending] of batch.requests) { + pending.resolve(null); + } + log.warn(`batch shutdown: abandoned inflight batch ${batchId}`); + } + inflight.clear(); + }, + + /** Return current batch queue statistics. */ + stats(): BatchStats { + return { + queued: queue.length, + inflightBatches: inflight.size, + inflightRequests: [...inflight.values()].reduce( + (sum, b) => sum + b.requests.size, + 0, + ), + totalQueued, + totalBatched, + totalUrgent, + totalFallback, + totalResolved, + totalFailed, + }; + }, + }; +} + +// --------------------------------------------------------------------------- +// Stats type +// --------------------------------------------------------------------------- + +export interface BatchStats { + /** Items currently in the queue waiting for next flush. */ + queued: number; + /** Number of batches currently being polled. */ + inflightBatches: number; + /** Total requests across all inflight batches. */ + inflightRequests: number; + /** Total requests that entered the queue. */ + totalQueued: number; + /** Total requests successfully submitted to the Batch API. */ + totalBatched: number; + /** Total requests that bypassed the queue (urgent). */ + totalUrgent: number; + /** Total requests that fell back to synchronous processing. */ + totalFallback: number; + /** Total batch results successfully resolved. */ + totalResolved: number; + /** Total batch results that failed/expired/canceled. */ + totalFailed: number; +} diff --git a/packages/gateway/src/idle.ts b/packages/gateway/src/idle.ts index 83aa8e7..aa71462 100644 --- a/packages/gateway/src/idle.ts +++ b/packages/gateway/src/idle.ts @@ -112,15 +112,14 @@ export function buildIdleWorkHandler( return async (sessionID: string, state: SessionState) => { const cfg = loreConfig(); - // 1. Distillation + // 1. Distillation — force-distill ALL pending messages on idle, even + // below minMessages. The cache is going cold; aggressive distillation + // now means a smaller context on the next turn via post-idle compact. + // Meta-distillation is always allowed on idle (cache is cold anyway). try { const pending = temporal.undistilledCount(projectPath, sessionID); - if (pending >= cfg.distillation.minMessages) { - // Skip meta-distillation when the prompt cache is likely still warm. - const cacheTTLMs = cfg.idleResumeMinutes * 60_000; - const lastTurn = getLastTurnAt(sessionID); - const cacheWarm = lastTurn > 0 && (Date.now() - lastTurn) < cacheTTLMs; - await distillation.run({ llm, projectPath, sessionID, skipMeta: cacheWarm }); + if (pending > 0) { + await distillation.run({ llm, projectPath, sessionID, force: true }); } } catch (e) { log.error("idle distillation error:", e); diff --git a/packages/gateway/src/index.ts b/packages/gateway/src/index.ts index 1fe1796..cafdb88 100644 --- a/packages/gateway/src/index.ts +++ b/packages/gateway/src/index.ts @@ -11,6 +11,7 @@ */ import { loadConfig } from "./config"; import { startServer } from "./server"; +import { resetPipelineState } from "./pipeline"; // --------------------------------------------------------------------------- // Boot @@ -28,11 +29,13 @@ console.error(`[lore] Plugin auto-detects gateway — just start OpenCode normal // Graceful shutdown // --------------------------------------------------------------------------- -function shutdown() { +async function shutdown() { console.error("[lore] Shutting down…"); server.stop(); + // Gracefully shut down the batch queue (flushes pending items synchronously) + await resetPipelineState(); process.exit(0); } -process.on("SIGINT", shutdown); -process.on("SIGTERM", shutdown); +process.on("SIGINT", () => shutdown()); +process.on("SIGTERM", () => shutdown()); diff --git a/packages/gateway/src/pipeline.ts b/packages/gateway/src/pipeline.ts index e5c2562..29fddbd 100644 --- a/packages/gateway/src/pipeline.ts +++ b/packages/gateway/src/pipeline.ts @@ -81,6 +81,7 @@ import { getLastSeenApiKey, createGatewayLLMClient, } from "./llm-adapter"; +import { createBatchLLMClient } from "./batch-queue"; import type { UpstreamInterceptor } from "./recorder"; // --------------------------------------------------------------------------- @@ -115,11 +116,15 @@ export function setUpstreamInterceptor( * session state, initialization flags, or cached project paths across test * suites. */ -export function resetPipelineState(): void { +export async function resetPipelineState(): Promise { initialized = false; cachedProjectPath = null; sessions.clear(); ltmSessionCache.clear(); + // Shut down batch queue gracefully before clearing the client + if (llmClient && "shutdown" in llmClient) { + await (llmClient as LLMClient & { shutdown: () => Promise }).shutdown(); + } llmClient = null; activeInterceptor = undefined; } @@ -194,11 +199,25 @@ function getLLMClient(config: GatewayConfig): LLMClient { providerID: "anthropic", modelID: "claude-sonnet-4-20250514", }; - llmClient = createGatewayLLMClient( + const inner = createGatewayLLMClient( config.upstreamAnthropic, getLastSeenApiKey, defaultModel, ); + + // Wrap with batch queue for 50% cost savings on non-urgent worker calls. + // Enabled by default — disable via LORE_BATCH_DISABLED=1. + const batchDisabled = process.env.LORE_BATCH_DISABLED === "1"; + if (batchDisabled) { + llmClient = inner; + } else { + llmClient = createBatchLLMClient( + inner, + config.upstreamAnthropic, + getLastSeenApiKey, + defaultModel, + ); + } } return llmClient; } @@ -573,7 +592,9 @@ function scheduleBackgroundWork( const llm = getLLMClient(config); const cfg = loreConfig(); - // Check if urgent distillation is needed (gradient flagged it) + // Check if urgent distillation is needed (gradient flagged it). + // Mark urgent: true so these bypass the batch queue — the gradient is + // in overflow and needs the result before the next user turn. if (needsUrgentDistillation()) { distillation .run({ @@ -581,6 +602,7 @@ function scheduleBackgroundWork( projectPath, sessionID, force: true, + urgent: true, }) .catch((e) => log.error("background distillation failed:", e)); } @@ -631,12 +653,14 @@ async function handleCompaction( log.info(`compaction intercepted for session ${sessionID.slice(0, 16)}`); - // 1. Force-distill all undistilled messages + // 1. Force-distill all undistilled messages. + // Mark urgent: true — client is blocking on the compaction response. await distillation.run({ llm, projectPath, sessionID, force: true, + urgent: true, }); // 2. Load distillation summaries @@ -690,6 +714,7 @@ async function handleCompaction( const summaryText = await llm.prompt(compactPrompt, userContent, { model: cfg.model, workerID: "lore-compact", + urgent: true, // Client is blocking on this response }); const summary = summaryText ?? "(Compaction failed — no summary generated.)"; diff --git a/packages/gateway/test/batch-queue.test.ts b/packages/gateway/test/batch-queue.test.ts new file mode 100644 index 0000000..915000b --- /dev/null +++ b/packages/gateway/test/batch-queue.test.ts @@ -0,0 +1,333 @@ +/** + * Tests for the batch queue (BatchLLMClient wrapper). + * + * Uses a fake inner LLMClient and fake fetch to verify: + * - Urgent calls bypass the queue + * - Non-urgent calls are queued and flushed + * - Batch results are polled and promises resolved + * - Fallback to synchronous on batch API errors + * - Shutdown drains the queue + */ +import { describe, test, expect, mock, beforeEach, afterEach } from "bun:test"; +import { createBatchLLMClient, type BatchStats } from "../src/batch-queue"; +import type { LLMClient } from "@loreai/core"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Create a mock LLMClient that records calls and returns canned responses. */ +function createMockLLMClient(): LLMClient & { calls: Array<{ system: string; user: string; opts: unknown }> } { + const calls: Array<{ system: string; user: string; opts: unknown }> = []; + return { + calls, + async prompt(system, user, opts) { + calls.push({ system, user, opts }); + return `sync-response-for: ${user.slice(0, 30)}`; + }, + }; +} + +/** Track fetch calls for assertions. */ +let fetchCalls: Array<{ url: string; method: string; body?: unknown }> = []; +let fetchResponses: Array<{ ok: boolean; status: number; body: unknown }> = []; + +function pushFetchResponse(ok: boolean, status: number, body: unknown) { + fetchResponses.push({ ok, status, body }); +} + +const originalFetch = globalThis.fetch; + +beforeEach(() => { + fetchCalls = []; + fetchResponses = []; + + // @ts-expect-error — mock global fetch + globalThis.fetch = async (url: string, init?: RequestInit) => { + const method = init?.method ?? "GET"; + const body = init?.body ? JSON.parse(init.body as string) : undefined; + fetchCalls.push({ url: url.toString(), method, body }); + + const response = fetchResponses.shift(); + if (!response) { + return { ok: false, status: 500, text: async () => "no mock response", json: async () => ({}) }; + } + return { + ok: response.ok, + status: response.status, + statusText: response.ok ? "OK" : "Error", + text: async () => JSON.stringify(response.body), + json: async () => response.body, + }; + }; +}); + +afterEach(() => { + globalThis.fetch = originalFetch; +}); + +const DEFAULT_MODEL = { providerID: "anthropic", modelID: "claude-sonnet-4-20250514" }; +const UPSTREAM = "https://api.anthropic.com"; + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("BatchLLMClient", () => { + test("urgent calls bypass the queue and delegate to inner client", async () => { + const inner = createMockLLMClient(); + const client = createBatchLLMClient(inner, UPSTREAM, () => "test-key", DEFAULT_MODEL, { + flushIntervalMs: 60_000, // Long interval so flush doesn't auto-trigger + }); + + const result = await client.prompt("system", "urgent message", { + workerID: "lore-distill", + urgent: true, + }); + + expect(result).toBe("sync-response-for: urgent message"); + expect(inner.calls).toHaveLength(1); + expect(inner.calls[0].user).toBe("urgent message"); + + const s = client.stats(); + expect(s.totalUrgent).toBe(1); + expect(s.totalQueued).toBe(0); + expect(s.queued).toBe(0); + + await client.shutdown(); + }); + + test("non-urgent calls are queued (not immediately sent to inner)", async () => { + const inner = createMockLLMClient(); + const client = createBatchLLMClient(inner, UPSTREAM, () => "test-key", DEFAULT_MODEL, { + flushIntervalMs: 60_000, + maxQueueSize: 100, + }); + + // Don't await — the promise is pending until batch resolves + const promise = client.prompt("system", "background work", { + workerID: "lore-distill", + }); + + // Inner client should NOT have been called yet + expect(inner.calls).toHaveLength(0); + + const s = client.stats(); + expect(s.queued).toBe(1); + expect(s.totalQueued).toBe(1); + + // Shutdown will fallback to synchronous + await client.shutdown(); + + // Now the promise should resolve (via fallback) + const result = await promise; + expect(result).toBe("sync-response-for: background work"); + expect(inner.calls).toHaveLength(1); + }); + + test("auto-flush when queue reaches maxQueueSize", async () => { + const inner = createMockLLMClient(); + + // Set up batch create response + pushFetchResponse(true, 200, { + id: "msgbatch_test1", + processing_status: "in_progress", + }); + + const client = createBatchLLMClient(inner, UPSTREAM, () => "test-key", DEFAULT_MODEL, { + flushIntervalMs: 60_000, + maxQueueSize: 3, + pollIntervalMs: 60_000, + }); + + // Queue 3 items — should auto-flush on the 3rd + const p1 = client.prompt("sys", "msg1", { workerID: "lore-distill" }); + const p2 = client.prompt("sys", "msg2", { workerID: "lore-distill" }); + const p3 = client.prompt("sys", "msg3", { workerID: "lore-distill" }); + + // Wait a tick for the flush to complete + await new Promise((r) => setTimeout(r, 50)); + + // Should have called the batch API + expect(fetchCalls).toHaveLength(1); + expect(fetchCalls[0].url).toBe(`${UPSTREAM}/v1/messages/batches`); + expect(fetchCalls[0].method).toBe("POST"); + expect(fetchCalls[0].body.requests).toHaveLength(3); + + const s = client.stats(); + expect(s.totalBatched).toBe(3); + expect(s.inflightBatches).toBe(1); + + // Clean up — shutdown resolves remaining promises with null + await client.shutdown(); + }); + + test("fallback to synchronous when batch API returns error", async () => { + const inner = createMockLLMClient(); + + // Set up batch create to fail + pushFetchResponse(false, 500, { error: "internal server error" }); + + const client = createBatchLLMClient(inner, UPSTREAM, () => "test-key", DEFAULT_MODEL, { + flushIntervalMs: 60_000, + maxQueueSize: 2, + }); + + const p1 = client.prompt("sys", "msg1", { workerID: "lore-distill" }); + const p2 = client.prompt("sys", "msg2", { workerID: "lore-distill" }); + + // Wait for auto-flush + fallback + await new Promise((r) => setTimeout(r, 100)); + + const r1 = await p1; + const r2 = await p2; + + expect(r1).toBe("sync-response-for: msg1"); + expect(r2).toBe("sync-response-for: msg2"); + + // Inner should have been called for both (fallback) + expect(inner.calls).toHaveLength(2); + + const s = client.stats(); + expect(s.totalFallback).toBe(2); + + await client.shutdown(); + }); + + test("fallback to synchronous when no API key", async () => { + const inner = createMockLLMClient(); + + const client = createBatchLLMClient(inner, UPSTREAM, () => null, DEFAULT_MODEL, { + flushIntervalMs: 60_000, + maxQueueSize: 1, + }); + + const p1 = client.prompt("sys", "msg1", { workerID: "lore-distill" }); + + // Wait for flush + fallback + await new Promise((r) => setTimeout(r, 100)); + + const result = await p1; + expect(result).toBe("sync-response-for: msg1"); + expect(inner.calls).toHaveLength(1); + + await client.shutdown(); + }); + + test("shutdown drains queue synchronously via inner client", async () => { + const inner = createMockLLMClient(); + const client = createBatchLLMClient(inner, UPSTREAM, () => "test-key", DEFAULT_MODEL, { + flushIntervalMs: 60_000, + maxQueueSize: 100, + }); + + const p1 = client.prompt("sys", "msg1", { workerID: "lore-distill" }); + const p2 = client.prompt("sys", "msg2", { workerID: "lore-curator" }); + + // Shutdown should drain via fallback + await client.shutdown(); + + expect(await p1).toBe("sync-response-for: msg1"); + expect(await p2).toBe("sync-response-for: msg2"); + expect(inner.calls).toHaveLength(2); + + const s = client.stats(); + expect(s.totalFallback).toBe(2); + }); + + test("after shutdown, new calls go directly to inner client", async () => { + const inner = createMockLLMClient(); + const client = createBatchLLMClient(inner, UPSTREAM, () => "test-key", DEFAULT_MODEL, { + flushIntervalMs: 60_000, + }); + + await client.shutdown(); + + // New call after shutdown should go through inner immediately (treated as urgent) + const result = await client.prompt("sys", "post-shutdown", { + workerID: "lore-distill", + }); + + expect(result).toBe("sync-response-for: post-shutdown"); + expect(inner.calls).toHaveLength(1); + }); + + test("stats reflect accurate counts", async () => { + const inner = createMockLLMClient(); + const client = createBatchLLMClient(inner, UPSTREAM, () => "test-key", DEFAULT_MODEL, { + flushIntervalMs: 60_000, + maxQueueSize: 100, + }); + + // 2 urgent, 3 queued + await client.prompt("sys", "urgent1", { urgent: true }); + await client.prompt("sys", "urgent2", { urgent: true }); + const p1 = client.prompt("sys", "bg1", {}); + const p2 = client.prompt("sys", "bg2", {}); + const p3 = client.prompt("sys", "bg3", {}); + + let s = client.stats(); + expect(s.totalUrgent).toBe(2); + expect(s.totalQueued).toBe(3); + expect(s.queued).toBe(3); + expect(s.inflightBatches).toBe(0); + + await client.shutdown(); + + s = client.stats(); + expect(s.totalFallback).toBe(3); + expect(s.queued).toBe(0); + }); + + test("default model is used when no explicit model in opts", async () => { + const inner = createMockLLMClient(); + + // Set up batch create response + pushFetchResponse(true, 200, { + id: "msgbatch_model_test", + processing_status: "in_progress", + }); + + const client = createBatchLLMClient(inner, UPSTREAM, () => "test-key", DEFAULT_MODEL, { + flushIntervalMs: 60_000, + maxQueueSize: 1, + }); + + client.prompt("sys prompt", "user msg", { workerID: "lore-distill" }); + + // Wait for auto-flush + await new Promise((r) => setTimeout(r, 50)); + + // Verify the batch request uses the default model + expect(fetchCalls).toHaveLength(1); + expect(fetchCalls[0].body.requests[0].params.model).toBe("claude-sonnet-4-20250514"); + expect(fetchCalls[0].body.requests[0].params.system[0].text).toBe("sys prompt"); + expect(fetchCalls[0].body.requests[0].params.messages[0].content).toBe("user msg"); + + await client.shutdown(); + }); + + test("explicit model override is used in batch request", async () => { + const inner = createMockLLMClient(); + + pushFetchResponse(true, 200, { + id: "msgbatch_override", + processing_status: "in_progress", + }); + + const client = createBatchLLMClient(inner, UPSTREAM, () => "test-key", DEFAULT_MODEL, { + flushIntervalMs: 60_000, + maxQueueSize: 1, + }); + + client.prompt("sys", "msg", { + model: { providerID: "anthropic", modelID: "claude-haiku-3-5-20241022" }, + }); + + await new Promise((r) => setTimeout(r, 50)); + + expect(fetchCalls[0].body.requests[0].params.model).toBe("claude-haiku-3-5-20241022"); + + await client.shutdown(); + }); +}); diff --git a/packages/gateway/test/helpers/harness.ts b/packages/gateway/test/helpers/harness.ts index d234cf9..00a321c 100644 --- a/packages/gateway/test/helpers/harness.ts +++ b/packages/gateway/test/helpers/harness.ts @@ -78,7 +78,7 @@ export async function createHarness(opts: HarnessOptions): Promise { // Reset any leftover singleton state from a previous harness in this process. // Must close the DB BEFORE resetting pipeline state (which may use the DB). closeDB(); - resetPipelineState(); + await resetPipelineState(); // --- 4. Wire in replay interceptor --- setUpstreamInterceptor(getReplayInterceptor(opts.fixtures)); @@ -127,12 +127,12 @@ export async function createHarness(opts: HarnessOptions): Promise { } // --- 8. teardown() --- - function teardown(): void { + async function teardown(): Promise { server.stop(); // Close the core DB singleton first so the next harness can open a fresh // DB at its own LORE_DB_PATH. closeDB(); - resetPipelineState(); + await resetPipelineState(); setUpstreamInterceptor(undefined); // Delete DB files (main + WAL + SHM) diff --git a/packages/opencode/src/index.ts b/packages/opencode/src/index.ts index 8fa08ca..9da1c2c 100644 --- a/packages/opencode/src/index.ts +++ b/packages/opencode/src/index.ts @@ -999,8 +999,11 @@ export const LorePlugin: Plugin = async (ctx) => { return; } - // Run background distillation for any remaining undistilled messages - await backgroundDistill(sessionID); + // Force-distill ALL pending messages on idle — even below the normal + // minMessages threshold. The cache is about to go cold (or already is), + // so aggressive distillation now means a smaller, cheaper context on the + // next turn via the post-idle compact layer in gradient.ts. + await backgroundDistill(sessionID, true); // Run curator periodically (only when knowledge system is enabled). // onIdle gates whether idle events trigger curation at all; afterTurns