From 6c6c79904e33c9894c8238c00d959d3ec764147c Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Wed, 20 May 2026 13:51:29 -0700 Subject: [PATCH 1/4] improvement(knowledge): eliminate N+1 on tag definitions in bulk upload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit createDocumentRecords previously called processDocumentTags per-doc, each running a SELECT against knowledge_base_tag_definitions — N queries that all returned the same kbId-scoped rows. Worse, those reads used the global db pool while the tx held a FOR UPDATE lock on the KB row, risking pool contention on large bulk uploads. Split the helper into loadTagDefinitions (single query, accepts the tx as executor) and resolveDocumentTags (pure, takes the pre-loaded Map). The bulk path loads once inside the transaction; createSingleDocument loads once outside its tx. Same throw-on-validation-error semantics preserved. --- apps/sim/lib/knowledge/documents/service.ts | 40 ++++++++++++++------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index be71c279717..76eb75aa77e 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -111,11 +111,26 @@ interface DocumentTagData { value: string } -async function processDocumentTags( +type TagDefinition = typeof knowledgeBaseTagDefinitions.$inferSelect +type TagDefinitionsByName = Map +type DbExecutor = Pick + +async function loadTagDefinitions( knowledgeBaseId: string, + executor: DbExecutor = db +): Promise { + const defs = await executor + .select() + .from(knowledgeBaseTagDefinitions) + .where(eq(knowledgeBaseTagDefinitions.knowledgeBaseId, knowledgeBaseId)) + return new Map(defs.map((def) => [def.displayName, def])) +} + +function resolveDocumentTags( tagData: DocumentTagData[], + tagDefinitions: TagDefinitionsByName, requestId: string -): Promise { +): ProcessedDocumentTags { const setTagValue = ( tags: ProcessedDocumentTags, slot: string, @@ -200,13 +215,6 @@ async function processDocumentTags( return result } - const existingDefinitions = await db - .select() - .from(knowledgeBaseTagDefinitions) - .where(eq(knowledgeBaseTagDefinitions.knowledgeBaseId, knowledgeBaseId)) - - const existingByName = new Map(existingDefinitions.map((def) => [def.displayName, def])) - const undefinedTags: string[] = [] const typeErrors: string[] = [] @@ -223,7 +231,7 @@ async function processDocumentTags( if (!hasValue) continue - const existingDef = existingByName.get(tagName) + const existingDef = tagDefinitions.get(tagName) if (!existingDef) { undefinedTags.push(tagName) continue @@ -264,7 +272,7 @@ async function processDocumentTags( if (!hasValue) continue - const existingDef = existingByName.get(tagName) + const existingDef = tagDefinitions.get(tagName) if (!existingDef) continue const targetSlot = existingDef.tagSlot @@ -770,6 +778,11 @@ export async function createDocumentRecords( throw new Error('Knowledge base not found') } + // Load tag definitions once for the whole batch (avoids N+1 across docs) + // and reuses the transaction's connection so we don't double-checkout + // while holding the KB FOR UPDATE lock. + const tagDefinitions = await loadTagDefinitions(knowledgeBaseId, tx) + const now = new Date() const documentRecords = [] const returnData: DocumentData[] = [] @@ -783,7 +796,7 @@ export async function createDocumentRecords( try { const tagData = JSON.parse(docData.documentTagsData) if (Array.isArray(tagData)) { - processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId) + processedTags = resolveDocumentTags(tagData, tagDefinitions, requestId) } } catch (error) { if (error instanceof SyntaxError) { @@ -1277,7 +1290,8 @@ export async function createSingleDocument( try { const tagData = JSON.parse(documentData.documentTagsData) if (Array.isArray(tagData)) { - processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId) + const tagDefinitions = await loadTagDefinitions(knowledgeBaseId) + processedTags = resolveDocumentTags(tagData, tagDefinitions, requestId) } } catch (error) { if (error instanceof SyntaxError) { From 457b67c3ef25d4382e098dd6771cfe942470263d Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Wed, 20 May 2026 13:58:58 -0700 Subject: [PATCH 2/4] improvement(knowledge): fold processDocumentAsync prefetches into one JOIN MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit processDocumentAsync was issuing three separate SELECTs per processed document: knowledge_base (config), workspace (billing settings), and document (tag values). For a typical Trigger.dev fleet processing thousands of docs, that's thousands of redundant pool checkouts. Collapsed into a single JOIN at the top of processDocumentAsync that fetches kb config + billed account user + document tag values in one roundtrip. The post-embedding tag SELECT (which previously held tags through the full embedding-generation wait) is gone; tags from the initial prefetch are reused. Behavior: - Missing/archived/deleted document or KB → same 'failed' status outcome as before, single consolidated error message. - Missing billed account → preserves existing error. - All 208 KB tests pass (test mock extended for innerJoin/leftJoin). --- apps/sim/app/api/knowledge/utils.test.ts | 25 +++++ apps/sim/lib/knowledge/documents/service.ts | 110 ++++++++++---------- 2 files changed, 82 insertions(+), 53 deletions(-) diff --git a/apps/sim/app/api/knowledge/utils.test.ts b/apps/sim/app/api/knowledge/utils.test.ts index 1886c1659e1..9ae994ffc05 100644 --- a/apps/sim/app/api/knowledge/utils.test.ts +++ b/apps/sim/app/api/knowledge/utils.test.ts @@ -116,6 +116,31 @@ vi.mock('@sim/db', async () => { }, } }, + innerJoin() { + // document × knowledge_base context JOIN — return the first kb and + // doc row merged (covers processDocumentAsync's prefetch). + return { + leftJoin: () => ({ + where: () => ({ + limit: (n: number) => + Promise.resolve( + kbRows.length > 0 && docRows.length > 0 + ? [{ ...kbRows[0], ...docRows[0], billedAccountUserId: 'billing-user-1' }] + .slice(0, n) + : [] + ), + }), + }), + where: () => ({ + limit: (n: number) => + Promise.resolve( + kbRows.length > 0 && docRows.length > 0 + ? [{ ...kbRows[0], ...docRows[0] }].slice(0, n) + : [] + ), + }), + } + }, } }, } diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 76eb75aa77e..50c9cd6270c 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -5,6 +5,7 @@ import { knowledgeBase, knowledgeBaseTagDefinitions, knowledgeConnector, + workspace as workspaceTable, } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { sha256Hex } from '@sim/security/hash' @@ -47,7 +48,6 @@ import type { ProcessedDocumentTags } from '@/lib/knowledge/types' import { estimateTokenCount } from '@/lib/tokenization/estimators' import { deleteFile } from '@/lib/uploads/core/storage-service' import { extractStorageKey } from '@/lib/uploads/utils/file-utils' -import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' import type { DocumentProcessingPayload, processDocument as processDocumentTask, @@ -426,34 +426,68 @@ export async function processDocumentAsync( try { logger.info(`[${documentId}] Starting document processing: ${docData.filename}`) - const kb = await db + // Single JOIN fetches everything we'll need: KB processing config, workspace + // billing context, and the document's tag values (folded onto the embedding + // rows later). Replaces three separate SELECTs. + const contextRows = await db .select({ userId: knowledgeBase.userId, workspaceId: knowledgeBase.workspaceId, chunkingConfig: knowledgeBase.chunkingConfig, embeddingModel: knowledgeBase.embeddingModel, + billedAccountUserId: workspaceTable.billedAccountUserId, + tag1: document.tag1, + tag2: document.tag2, + tag3: document.tag3, + tag4: document.tag4, + tag5: document.tag5, + tag6: document.tag6, + tag7: document.tag7, + number1: document.number1, + number2: document.number2, + number3: document.number3, + number4: document.number4, + number5: document.number5, + date1: document.date1, + date2: document.date2, + boolean1: document.boolean1, + boolean2: document.boolean2, + boolean3: document.boolean3, }) - .from(knowledgeBase) - .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) + .from(document) + .innerJoin(knowledgeBase, eq(knowledgeBase.id, document.knowledgeBaseId)) + .leftJoin( + workspaceTable, + and(eq(workspaceTable.id, knowledgeBase.workspaceId), isNull(workspaceTable.archivedAt)) + ) + .where( + and( + eq(document.id, documentId), + eq(knowledgeBase.id, knowledgeBaseId), + isNull(document.archivedAt), + isNull(document.deletedAt), + isNull(knowledgeBase.deletedAt) + ) + ) .limit(1) - if (kb.length === 0) { + if (contextRows.length === 0) { logger.warn( - `[${documentId}] Skipping document processing: knowledge base ${knowledgeBaseId} is deleted` + `[${documentId}] Skipping document processing: document or knowledge base ${knowledgeBaseId} no longer exists` ) await db .update(document) .set({ processingStatus: 'failed', - processingError: 'Knowledge base deleted', + processingError: 'Document or knowledge base no longer exists', processingCompletedAt: new Date(), }) - .where( - and(eq(document.id, documentId), isNull(document.archivedAt), isNull(document.deletedAt)) - ) + .where(eq(document.id, documentId)) return } + const ctx = contextRows[0] + await db .update(document) .set({ @@ -468,7 +502,7 @@ export async function processDocumentAsync( logger.info(`[${documentId}] Status updated to 'processing', starting document processor`) - const rawConfig = kb[0].chunkingConfig as { + const rawConfig = ctx.chunkingConfig as { maxSize?: number minSize?: number overlap?: number @@ -481,13 +515,13 @@ export async function processDocumentAsync( overlap: rawConfig?.overlap ?? 200, } - const kbEmbeddingModel = kb[0].embeddingModel - if (!kb[0].workspaceId) { + const kbEmbeddingModel = ctx.embeddingModel + if (!ctx.workspaceId) { throw new Error(`Knowledge base ${knowledgeBaseId} is missing workspace billing context`) } - const billingUserId = await getWorkspaceBilledAccountUserId(kb[0].workspaceId) + const billingUserId = ctx.billedAccountUserId if (!billingUserId) { - throw new Error(`Workspace ${kb[0].workspaceId} is missing billed account`) + throw new Error(`Workspace ${ctx.workspaceId} is missing billed account`) } let totalEmbeddingTokens = 0 let embeddingIsBYOK = false @@ -503,8 +537,8 @@ export async function processDocumentAsync( kbConfig.maxSize, kbConfig.overlap, kbConfig.minSize, - kb[0].userId, - kb[0].workspaceId, + ctx.userId, + ctx.workspaceId, rawConfig?.strategy, rawConfig?.strategyOptions ) @@ -542,7 +576,7 @@ export async function processDocumentAsync( isBYOK, modelName, pricingId, - } = await generateEmbeddings(batch, kbEmbeddingModel, kb[0].workspaceId) + } = await generateEmbeddings(batch, kbEmbeddingModel, ctx.workspaceId) for (const emb of batchEmbeddings) { embeddings.push(emb) } @@ -555,41 +589,11 @@ export async function processDocumentAsync( } } - logger.info(`[${documentId}] Embeddings generated, fetching document tags`) - - const documentRecord = await db - .select({ - tag1: document.tag1, - tag2: document.tag2, - tag3: document.tag3, - tag4: document.tag4, - tag5: document.tag5, - tag6: document.tag6, - tag7: document.tag7, - number1: document.number1, - number2: document.number2, - number3: document.number3, - number4: document.number4, - number5: document.number5, - date1: document.date1, - date2: document.date2, - boolean1: document.boolean1, - boolean2: document.boolean2, - boolean3: document.boolean3, - }) - .from(document) - .where( - and( - eq(document.id, documentId), - isNull(document.archivedAt), - isNull(document.deletedAt) - ) - ) - .limit(1) - - const documentTags = documentRecord[0] || {} + // Document tag values are stable from upload time and were prefetched in + // the JOIN above — reuse them instead of issuing another SELECT. + const documentTags = ctx - logger.info(`[${documentId}] Creating embedding records with tags`) + logger.info(`[${documentId}] Embeddings generated, creating embedding records with tags`) const tokenizerProvider = getEmbeddingModelInfo(kbEmbeddingModel).tokenizerProvider @@ -694,7 +698,7 @@ export async function processDocumentAsync( if (cost > 0) { await recordUsage({ userId: billingUserId, - workspaceId: kb[0].workspaceId ?? undefined, + workspaceId: ctx.workspaceId ?? undefined, entries: [ { category: 'model', From 00f93b1bb32debf6406723b1e37efeec70835553 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Wed, 20 May 2026 14:00:17 -0700 Subject: [PATCH 3/4] improvement(knowledge): skip tag-definitions load when no doc carries tags Trim verbose comments in the same pass. --- apps/sim/lib/knowledge/documents/service.ts | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 50c9cd6270c..687f3ef3310 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -426,9 +426,7 @@ export async function processDocumentAsync( try { logger.info(`[${documentId}] Starting document processing: ${docData.filename}`) - // Single JOIN fetches everything we'll need: KB processing config, workspace - // billing context, and the document's tag values (folded onto the embedding - // rows later). Replaces three separate SELECTs. + // KB config + workspace billing + doc tags in one JOIN (was 3 SELECTs). const contextRows = await db .select({ userId: knowledgeBase.userId, @@ -589,8 +587,7 @@ export async function processDocumentAsync( } } - // Document tag values are stable from upload time and were prefetched in - // the JOIN above — reuse them instead of issuing another SELECT. + // Tag values prefetched above; reuse for the embedding rows. const documentTags = ctx logger.info(`[${documentId}] Embeddings generated, creating embedding records with tags`) @@ -782,10 +779,11 @@ export async function createDocumentRecords( throw new Error('Knowledge base not found') } - // Load tag definitions once for the whole batch (avoids N+1 across docs) - // and reuses the transaction's connection so we don't double-checkout - // while holding the KB FOR UPDATE lock. - const tagDefinitions = await loadTagDefinitions(knowledgeBaseId, tx) + // One load per batch (was N+1); skip entirely if no doc carries tags. + const hasTaggedDocs = documents.some((d) => d.documentTagsData) + const tagDefinitions = hasTaggedDocs + ? await loadTagDefinitions(knowledgeBaseId, tx) + : (new Map() as TagDefinitionsByName) const now = new Date() const documentRecords = [] From 4ce7939e614ba2f8d3acdb8e83a0f9a014a2a92c Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Wed, 20 May 2026 14:04:56 -0700 Subject: [PATCH 4/4] lint --- apps/sim/app/api/knowledge/utils.test.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/sim/app/api/knowledge/utils.test.ts b/apps/sim/app/api/knowledge/utils.test.ts index 9ae994ffc05..326bbee660f 100644 --- a/apps/sim/app/api/knowledge/utils.test.ts +++ b/apps/sim/app/api/knowledge/utils.test.ts @@ -125,8 +125,9 @@ vi.mock('@sim/db', async () => { limit: (n: number) => Promise.resolve( kbRows.length > 0 && docRows.length > 0 - ? [{ ...kbRows[0], ...docRows[0], billedAccountUserId: 'billing-user-1' }] - .slice(0, n) + ? [ + { ...kbRows[0], ...docRows[0], billedAccountUserId: 'billing-user-1' }, + ].slice(0, n) : [] ), }),