diff --git a/index.ts b/index.ts index 25b2012f..b9b73298 100644 --- a/index.ts +++ b/index.ts @@ -1942,10 +1942,12 @@ const memoryLanceDBProPlugin = { limit: number; scopeFilter?: string[]; category?: string; + signal?: AbortSignal; }) { let results = await retriever.retrieve(params); if (results.length === 0) { await sleep(75); + if (params.signal?.aborted) return results; results = await retriever.retrieve(params); } return results; @@ -2407,7 +2409,13 @@ const memoryLanceDBProPlugin = { // (embedding → rerank → lifecycle), which can silently drop messages on // channels like Telegram when subsequent requests hit lock timeouts. // See: https://github.com/CortexReach/memory-lancedb-pro/issues/253 - const recallWork = async (): Promise<{ prependContext: string } | undefined> => { + // + // The timeout also aborts the in-flight embedding HTTP call via the + // abortController below, so the underlying work doesn't continue to hold + // resources (lancedb connection, per-agent memory-runtime mutex) past + // the timeout boundary. + const abortController = new AbortController(); + const recallWork = async (signal: AbortSignal): Promise<{ prependContext: string } | undefined> => { // Determine agent ID and accessible scopes const agentId = resolveHookAgentId(ctx?.agentId, (event as any).sessionKey); const accessibleScopes = resolveScopeFilter(scopeManager, agentId); @@ -2450,6 +2458,7 @@ const memoryLanceDBProPlugin = { limit: retrieveLimit, scopeFilter: accessibleScopes, source: "auto-recall", + signal, }), config.workspaceBoundary); if (results.length === 0) { @@ -2690,9 +2699,12 @@ const memoryLanceDBProPlugin = { let timeoutId: ReturnType | undefined; try { const result = await Promise.race([ - recallWork().then((r) => { clearTimeout(timeoutId); return r; }), + recallWork(abortController.signal).then((r) => { clearTimeout(timeoutId); return r; }), new Promise((resolve) => { timeoutId = setTimeout(() => { + // Cancel in-flight embedding/retrieval HTTP calls so they don't + // keep holding resources after we've given up on the result. + abortController.abort(new Error("auto-recall timeout")); api.logger.warn( `memory-lancedb-pro: auto-recall timed out after ${AUTO_RECALL_TIMEOUT_MS}ms; skipping memory injection to avoid stalling agent startup`, ); @@ -2703,7 +2715,15 @@ const memoryLanceDBProPlugin = { return result; } catch (err) { clearTimeout(timeoutId); - api.logger.warn(`memory-lancedb-pro: recall failed: ${String(err)}`); + // Downgrade to debug only when OUR controller aborted (i.e. the + // timeout callback fired). Aborts originating elsewhere — e.g. the + // embedder's own internal timeout — keep warn visibility so real + // failures aren't silenced. + if (abortController.signal.aborted) { + api.logger.debug?.(`memory-lancedb-pro: recall aborted by timeout: ${String(err)}`); + } else { + api.logger.warn(`memory-lancedb-pro: recall failed: ${String(err)}`); + } } }, { priority: 10 }); diff --git a/src/retriever.ts b/src/retriever.ts index 97837888..3c465070 100644 --- a/src/retriever.ts +++ b/src/retriever.ts @@ -105,6 +105,11 @@ export interface RetrievalContext { category?: string; /** Retrieval source: "manual" for user-triggered, "auto-recall" for system-initiated, "cli" for CLI commands. */ source?: "manual" | "auto-recall" | "cli"; + /** Optional AbortSignal. When aborted, in-flight embedding calls cancel and + * the method rejects due to abort (often with AbortError or the signal's + * abort reason) instead of holding the caller's session lock while the + * underlying HTTP request runs to completion. */ + signal?: AbortSignal; } export interface RetrievalResult extends MemorySearchResult { @@ -559,7 +564,7 @@ export class MemoryRetriever { } async retrieve(context: RetrievalContext): Promise { - const { query, limit, scopeFilter, category, source } = context; + const { query, limit, scopeFilter, category, source, signal } = context; const safeLimit = clampInt(limit, 1, 20); this.lastDiagnostics = null; const diagnostics: RetrievalDiagnostics = { @@ -615,6 +620,7 @@ export class MemoryRetriever { category, trace, diagnostics, + signal, ); } else { results = await this.hybridRetrieval( @@ -625,6 +631,7 @@ export class MemoryRetriever { trace, source, diagnostics, + signal, ); } @@ -717,11 +724,12 @@ export class MemoryRetriever { category?: string, trace?: TraceCollector, diagnostics?: RetrievalDiagnostics, + signal?: AbortSignal, ): Promise { let failureStage: RetrievalDiagnostics["failureStage"] = "vector.embedQuery"; try { const candidatePoolSize = Math.max(this.config.candidatePoolSize, limit * 2); - const queryVector = await this.embedder.embedQuery(query); + const queryVector = await this.embedder.embedQuery(query, signal); failureStage = "vector.vectorSearch"; const results = await this.store.vectorSearch( queryVector, @@ -907,11 +915,12 @@ export class MemoryRetriever { trace?: TraceCollector, source?: RetrievalContext["source"], diagnostics?: RetrievalDiagnostics, + signal?: AbortSignal, ): Promise { let failureStage: RetrievalDiagnostics["failureStage"] = "hybrid.embedQuery"; try { const candidatePoolSize = Math.max(this.config.candidatePoolSize, limit * 2); - const queryVector = await this.embedder.embedQuery(query); + const queryVector = await this.embedder.embedQuery(query, signal); const bm25Query = this.buildBM25Query(query, source); if (diagnostics) { diagnostics.bm25Query = bm25Query;