Skip to content

Commit 457b67c

Browse files
committed
improvement(knowledge): fold processDocumentAsync prefetches into one JOIN
processDocumentAsync was issuing three separate SELECTs per processed document: knowledge_base (config), workspace (billing settings), and document (tag values). For a typical Trigger.dev fleet processing thousands of docs, that's thousands of redundant pool checkouts. Collapsed into a single JOIN at the top of processDocumentAsync that fetches kb config + billed account user + document tag values in one roundtrip. The post-embedding tag SELECT (which previously held tags through the full embedding-generation wait) is gone; tags from the initial prefetch are reused. Behavior: - Missing/archived/deleted document or KB → same 'failed' status outcome as before, single consolidated error message. - Missing billed account → preserves existing error. - All 208 KB tests pass (test mock extended for innerJoin/leftJoin).
1 parent 6c6c799 commit 457b67c

2 files changed

Lines changed: 82 additions & 53 deletions

File tree

apps/sim/app/api/knowledge/utils.test.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,31 @@ vi.mock('@sim/db', async () => {
116116
},
117117
}
118118
},
119+
innerJoin() {
120+
// document × knowledge_base context JOIN — return the first kb and
121+
// doc row merged (covers processDocumentAsync's prefetch).
122+
return {
123+
leftJoin: () => ({
124+
where: () => ({
125+
limit: (n: number) =>
126+
Promise.resolve(
127+
kbRows.length > 0 && docRows.length > 0
128+
? [{ ...kbRows[0], ...docRows[0], billedAccountUserId: 'billing-user-1' }]
129+
.slice(0, n)
130+
: []
131+
),
132+
}),
133+
}),
134+
where: () => ({
135+
limit: (n: number) =>
136+
Promise.resolve(
137+
kbRows.length > 0 && docRows.length > 0
138+
? [{ ...kbRows[0], ...docRows[0] }].slice(0, n)
139+
: []
140+
),
141+
}),
142+
}
143+
},
119144
}
120145
},
121146
}

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 57 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
knowledgeBase,
66
knowledgeBaseTagDefinitions,
77
knowledgeConnector,
8+
workspace as workspaceTable,
89
} from '@sim/db/schema'
910
import { createLogger } from '@sim/logger'
1011
import { sha256Hex } from '@sim/security/hash'
@@ -47,7 +48,6 @@ import type { ProcessedDocumentTags } from '@/lib/knowledge/types'
4748
import { estimateTokenCount } from '@/lib/tokenization/estimators'
4849
import { deleteFile } from '@/lib/uploads/core/storage-service'
4950
import { extractStorageKey } from '@/lib/uploads/utils/file-utils'
50-
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
5151
import type {
5252
DocumentProcessingPayload,
5353
processDocument as processDocumentTask,
@@ -426,34 +426,68 @@ export async function processDocumentAsync(
426426
try {
427427
logger.info(`[${documentId}] Starting document processing: ${docData.filename}`)
428428

429-
const kb = await db
429+
// Single JOIN fetches everything we'll need: KB processing config, workspace
430+
// billing context, and the document's tag values (folded onto the embedding
431+
// rows later). Replaces three separate SELECTs.
432+
const contextRows = await db
430433
.select({
431434
userId: knowledgeBase.userId,
432435
workspaceId: knowledgeBase.workspaceId,
433436
chunkingConfig: knowledgeBase.chunkingConfig,
434437
embeddingModel: knowledgeBase.embeddingModel,
438+
billedAccountUserId: workspaceTable.billedAccountUserId,
439+
tag1: document.tag1,
440+
tag2: document.tag2,
441+
tag3: document.tag3,
442+
tag4: document.tag4,
443+
tag5: document.tag5,
444+
tag6: document.tag6,
445+
tag7: document.tag7,
446+
number1: document.number1,
447+
number2: document.number2,
448+
number3: document.number3,
449+
number4: document.number4,
450+
number5: document.number5,
451+
date1: document.date1,
452+
date2: document.date2,
453+
boolean1: document.boolean1,
454+
boolean2: document.boolean2,
455+
boolean3: document.boolean3,
435456
})
436-
.from(knowledgeBase)
437-
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
457+
.from(document)
458+
.innerJoin(knowledgeBase, eq(knowledgeBase.id, document.knowledgeBaseId))
459+
.leftJoin(
460+
workspaceTable,
461+
and(eq(workspaceTable.id, knowledgeBase.workspaceId), isNull(workspaceTable.archivedAt))
462+
)
463+
.where(
464+
and(
465+
eq(document.id, documentId),
466+
eq(knowledgeBase.id, knowledgeBaseId),
467+
isNull(document.archivedAt),
468+
isNull(document.deletedAt),
469+
isNull(knowledgeBase.deletedAt)
470+
)
471+
)
438472
.limit(1)
439473

440-
if (kb.length === 0) {
474+
if (contextRows.length === 0) {
441475
logger.warn(
442-
`[${documentId}] Skipping document processing: knowledge base ${knowledgeBaseId} is deleted`
476+
`[${documentId}] Skipping document processing: document or knowledge base ${knowledgeBaseId} no longer exists`
443477
)
444478
await db
445479
.update(document)
446480
.set({
447481
processingStatus: 'failed',
448-
processingError: 'Knowledge base deleted',
482+
processingError: 'Document or knowledge base no longer exists',
449483
processingCompletedAt: new Date(),
450484
})
451-
.where(
452-
and(eq(document.id, documentId), isNull(document.archivedAt), isNull(document.deletedAt))
453-
)
485+
.where(eq(document.id, documentId))
454486
return
455487
}
456488

489+
const ctx = contextRows[0]
490+
457491
await db
458492
.update(document)
459493
.set({
@@ -468,7 +502,7 @@ export async function processDocumentAsync(
468502

469503
logger.info(`[${documentId}] Status updated to 'processing', starting document processor`)
470504

471-
const rawConfig = kb[0].chunkingConfig as {
505+
const rawConfig = ctx.chunkingConfig as {
472506
maxSize?: number
473507
minSize?: number
474508
overlap?: number
@@ -481,13 +515,13 @@ export async function processDocumentAsync(
481515
overlap: rawConfig?.overlap ?? 200,
482516
}
483517

484-
const kbEmbeddingModel = kb[0].embeddingModel
485-
if (!kb[0].workspaceId) {
518+
const kbEmbeddingModel = ctx.embeddingModel
519+
if (!ctx.workspaceId) {
486520
throw new Error(`Knowledge base ${knowledgeBaseId} is missing workspace billing context`)
487521
}
488-
const billingUserId = await getWorkspaceBilledAccountUserId(kb[0].workspaceId)
522+
const billingUserId = ctx.billedAccountUserId
489523
if (!billingUserId) {
490-
throw new Error(`Workspace ${kb[0].workspaceId} is missing billed account`)
524+
throw new Error(`Workspace ${ctx.workspaceId} is missing billed account`)
491525
}
492526
let totalEmbeddingTokens = 0
493527
let embeddingIsBYOK = false
@@ -503,8 +537,8 @@ export async function processDocumentAsync(
503537
kbConfig.maxSize,
504538
kbConfig.overlap,
505539
kbConfig.minSize,
506-
kb[0].userId,
507-
kb[0].workspaceId,
540+
ctx.userId,
541+
ctx.workspaceId,
508542
rawConfig?.strategy,
509543
rawConfig?.strategyOptions
510544
)
@@ -542,7 +576,7 @@ export async function processDocumentAsync(
542576
isBYOK,
543577
modelName,
544578
pricingId,
545-
} = await generateEmbeddings(batch, kbEmbeddingModel, kb[0].workspaceId)
579+
} = await generateEmbeddings(batch, kbEmbeddingModel, ctx.workspaceId)
546580
for (const emb of batchEmbeddings) {
547581
embeddings.push(emb)
548582
}
@@ -555,41 +589,11 @@ export async function processDocumentAsync(
555589
}
556590
}
557591

558-
logger.info(`[${documentId}] Embeddings generated, fetching document tags`)
559-
560-
const documentRecord = await db
561-
.select({
562-
tag1: document.tag1,
563-
tag2: document.tag2,
564-
tag3: document.tag3,
565-
tag4: document.tag4,
566-
tag5: document.tag5,
567-
tag6: document.tag6,
568-
tag7: document.tag7,
569-
number1: document.number1,
570-
number2: document.number2,
571-
number3: document.number3,
572-
number4: document.number4,
573-
number5: document.number5,
574-
date1: document.date1,
575-
date2: document.date2,
576-
boolean1: document.boolean1,
577-
boolean2: document.boolean2,
578-
boolean3: document.boolean3,
579-
})
580-
.from(document)
581-
.where(
582-
and(
583-
eq(document.id, documentId),
584-
isNull(document.archivedAt),
585-
isNull(document.deletedAt)
586-
)
587-
)
588-
.limit(1)
589-
590-
const documentTags = documentRecord[0] || {}
592+
// Document tag values are stable from upload time and were prefetched in
593+
// the JOIN above — reuse them instead of issuing another SELECT.
594+
const documentTags = ctx
591595

592-
logger.info(`[${documentId}] Creating embedding records with tags`)
596+
logger.info(`[${documentId}] Embeddings generated, creating embedding records with tags`)
593597

594598
const tokenizerProvider = getEmbeddingModelInfo(kbEmbeddingModel).tokenizerProvider
595599

@@ -694,7 +698,7 @@ export async function processDocumentAsync(
694698
if (cost > 0) {
695699
await recordUsage({
696700
userId: billingUserId,
697-
workspaceId: kb[0].workspaceId ?? undefined,
701+
workspaceId: ctx.workspaceId ?? undefined,
698702
entries: [
699703
{
700704
category: 'model',

0 commit comments

Comments
 (0)