Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1942,10 +1942,12 @@ const memoryLanceDBProPlugin = {
limit: number;
scopeFilter?: string[];
category?: string;
signal?: AbortSignal;
}) {
let results = await retriever.retrieve(params);
if (results.length === 0) {
await sleep(75);
if (params.signal?.aborted) return results;
results = await retriever.retrieve(params);
}
return results;
Expand Down Expand Up @@ -2407,7 +2409,13 @@ const memoryLanceDBProPlugin = {
// (embedding → rerank → lifecycle), which can silently drop messages on
// channels like Telegram when subsequent requests hit lock timeouts.
// See: https://github.com/CortexReach/memory-lancedb-pro/issues/253
const recallWork = async (): Promise<{ prependContext: string } | undefined> => {
//
// The timeout also aborts the in-flight embedding HTTP call via the
// abortController below, so the underlying work doesn't continue to hold
// resources (lancedb connection, per-agent memory-runtime mutex) past
// the timeout boundary.
const abortController = new AbortController();
const recallWork = async (signal: AbortSignal): Promise<{ prependContext: string } | undefined> => {
// Determine agent ID and accessible scopes
const agentId = resolveHookAgentId(ctx?.agentId, (event as any).sessionKey);
const accessibleScopes = resolveScopeFilter(scopeManager, agentId);
Expand Down Expand Up @@ -2450,6 +2458,7 @@ const memoryLanceDBProPlugin = {
limit: retrieveLimit,
scopeFilter: accessibleScopes,
source: "auto-recall",
signal,
}), config.workspaceBoundary);

if (results.length === 0) {
Expand Down Expand Up @@ -2690,9 +2699,12 @@ const memoryLanceDBProPlugin = {
let timeoutId: ReturnType<typeof setTimeout> | undefined;
try {
const result = await Promise.race([
recallWork().then((r) => { clearTimeout(timeoutId); return r; }),
recallWork(abortController.signal).then((r) => { clearTimeout(timeoutId); return r; }),
new Promise<undefined>((resolve) => {
timeoutId = setTimeout(() => {
// Cancel in-flight embedding/retrieval HTTP calls so they don't
// keep holding resources after we've given up on the result.
abortController.abort(new Error("auto-recall timeout"));
api.logger.warn(
`memory-lancedb-pro: auto-recall timed out after ${AUTO_RECALL_TIMEOUT_MS}ms; skipping memory injection to avoid stalling agent startup`,
);
Expand All @@ -2703,7 +2715,15 @@ const memoryLanceDBProPlugin = {
return result;
} catch (err) {
clearTimeout(timeoutId);
api.logger.warn(`memory-lancedb-pro: recall failed: ${String(err)}`);
// Downgrade to debug only when OUR controller aborted (i.e. the
// timeout callback fired). Aborts originating elsewhere — e.g. the
// embedder's own internal timeout — keep warn visibility so real
// failures aren't silenced.
if (abortController.signal.aborted) {
api.logger.debug?.(`memory-lancedb-pro: recall aborted by timeout: ${String(err)}`);
} else {
api.logger.warn(`memory-lancedb-pro: recall failed: ${String(err)}`);
}
}
}, { priority: 10 });

Expand Down
15 changes: 12 additions & 3 deletions src/retriever.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ export interface RetrievalContext {
category?: string;
/** Retrieval source: "manual" for user-triggered, "auto-recall" for system-initiated, "cli" for CLI commands. */
source?: "manual" | "auto-recall" | "cli";
/** Optional AbortSignal. When aborted, in-flight embedding calls cancel and
* the method rejects due to abort (often with AbortError or the signal's
* abort reason) instead of holding the caller's session lock while the
* underlying HTTP request runs to completion. */
signal?: AbortSignal;
}

export interface RetrievalResult extends MemorySearchResult {
Expand Down Expand Up @@ -559,7 +564,7 @@ export class MemoryRetriever {
}

async retrieve(context: RetrievalContext): Promise<RetrievalResult[]> {
const { query, limit, scopeFilter, category, source } = context;
const { query, limit, scopeFilter, category, source, signal } = context;
const safeLimit = clampInt(limit, 1, 20);
this.lastDiagnostics = null;
const diagnostics: RetrievalDiagnostics = {
Expand Down Expand Up @@ -615,6 +620,7 @@ export class MemoryRetriever {
category,
trace,
diagnostics,
signal,
);
} else {
results = await this.hybridRetrieval(
Expand All @@ -625,6 +631,7 @@ export class MemoryRetriever {
trace,
source,
diagnostics,
signal,
);
}

Expand Down Expand Up @@ -717,11 +724,12 @@ export class MemoryRetriever {
category?: string,
trace?: TraceCollector,
diagnostics?: RetrievalDiagnostics,
signal?: AbortSignal,
): Promise<RetrievalResult[]> {
let failureStage: RetrievalDiagnostics["failureStage"] = "vector.embedQuery";
try {
const candidatePoolSize = Math.max(this.config.candidatePoolSize, limit * 2);
const queryVector = await this.embedder.embedQuery(query);
const queryVector = await this.embedder.embedQuery(query, signal);
failureStage = "vector.vectorSearch";
const results = await this.store.vectorSearch(
queryVector,
Expand Down Expand Up @@ -907,11 +915,12 @@ export class MemoryRetriever {
trace?: TraceCollector,
source?: RetrievalContext["source"],
diagnostics?: RetrievalDiagnostics,
signal?: AbortSignal,
): Promise<RetrievalResult[]> {
let failureStage: RetrievalDiagnostics["failureStage"] = "hybrid.embedQuery";
try {
const candidatePoolSize = Math.max(this.config.candidatePoolSize, limit * 2);
const queryVector = await this.embedder.embedQuery(query);
const queryVector = await this.embedder.embedQuery(query, signal);
const bm25Query = this.buildBM25Query(query, source);
if (diagnostics) {
diagnostics.bm25Query = bm25Query;
Expand Down
Loading