Skip to content

Commit 67cc80c

Browse files
committed
improvement(knowledge): batch trigger dispatch, prune redundant DB roundtrips
Connector sync was dispatching Trigger.dev document-processing jobs one HTTP roundtrip at a time. processDocumentsWithQueue now uses tasks.batchTrigger when Trigger.dev is available, collapsing N roundtrips to ceil(N/1000). Idempotency keys protect against duplicate runs on retry. Also trims DB roundtrips inside the sync loop: - Per-batch isConnectorDeleted + isKnowledgeBaseDeleted collapsed into a single checkSyncLiveness JOIN (one SELECT instead of two per batch). - Dropped redundant pre-upload isKnowledgeBaseDeleted checks from addDocument/updateDocument: the batch-boundary liveness check already catches pre-batch deletions and the in-tx FOR UPDATE is authoritative for races during the batch. - Removed dead processDocumentsWithTrigger helper (never called).
1 parent c381550 commit 67cc80c

2 files changed

Lines changed: 76 additions & 77 deletions

File tree

apps/sim/lib/knowledge/connectors/sync-engine.ts

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,33 @@ type DocOp =
5858
| { type: 'add'; extDoc: ExternalDocument }
5959
| { type: 'update'; existingId: string; extDoc: ExternalDocument }
6060

61-
async function isConnectorDeleted(connectorId: string): Promise<boolean> {
61+
/**
62+
* Combined liveness check used between batches. One JOIN query checks both
63+
* connector and knowledge base state in a single roundtrip.
64+
*/
65+
async function checkSyncLiveness(
66+
connectorId: string,
67+
knowledgeBaseId: string
68+
): Promise<{ connectorDeleted: boolean; knowledgeBaseDeleted: boolean }> {
6269
const rows = await db
63-
.select({ archivedAt: knowledgeConnector.archivedAt, deletedAt: knowledgeConnector.deletedAt })
70+
.select({
71+
connectorArchivedAt: knowledgeConnector.archivedAt,
72+
connectorDeletedAt: knowledgeConnector.deletedAt,
73+
kbDeletedAt: knowledgeBase.deletedAt,
74+
})
6475
.from(knowledgeConnector)
65-
.where(eq(knowledgeConnector.id, connectorId))
76+
.innerJoin(knowledgeBase, eq(knowledgeBase.id, knowledgeConnector.knowledgeBaseId))
77+
.where(and(eq(knowledgeConnector.id, connectorId), eq(knowledgeBase.id, knowledgeBaseId)))
6678
.limit(1)
67-
return rows.length === 0 || rows[0].archivedAt !== null || rows[0].deletedAt !== null
68-
}
6979

70-
async function isKnowledgeBaseDeleted(knowledgeBaseId: string): Promise<boolean> {
71-
const rows = await db
72-
.select({ deletedAt: knowledgeBase.deletedAt })
73-
.from(knowledgeBase)
74-
.where(eq(knowledgeBase.id, knowledgeBaseId))
75-
.limit(1)
76-
return rows.length === 0 || rows[0].deletedAt !== null
80+
if (rows.length === 0) {
81+
return { connectorDeleted: true, knowledgeBaseDeleted: true }
82+
}
83+
const row = rows[0]
84+
return {
85+
connectorDeleted: row.connectorArchivedAt !== null || row.connectorDeletedAt !== null,
86+
knowledgeBaseDeleted: row.kbDeletedAt !== null,
87+
}
7788
}
7889

7990
async function isKnowledgeBaseActiveInTx(
@@ -502,10 +513,11 @@ export async function executeSync(
502513
}
503514

504515
for (let i = 0; i < pendingOps.length; i += SYNC_BATCH_SIZE) {
505-
if (await isConnectorDeleted(connectorId)) {
516+
const liveness = await checkSyncLiveness(connectorId, connector.knowledgeBaseId)
517+
if (liveness.connectorDeleted) {
506518
throw new ConnectorDeletedException(connectorId)
507519
}
508-
if (await isKnowledgeBaseDeleted(connector.knowledgeBaseId)) {
520+
if (liveness.knowledgeBaseDeleted) {
509521
throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`)
510522
}
511523

@@ -642,11 +654,12 @@ export async function executeSync(
642654
}
643655
}
644656

645-
// Check if connector was deleted before retrying stuck documents
646-
if (await isConnectorDeleted(connectorId)) {
657+
// Check if connector/KB were deleted before retrying stuck documents
658+
const postBatchLiveness = await checkSyncLiveness(connectorId, connector.knowledgeBaseId)
659+
if (postBatchLiveness.connectorDeleted) {
647660
throw new ConnectorDeletedException(connectorId)
648661
}
649-
if (await isKnowledgeBaseDeleted(connector.knowledgeBaseId)) {
662+
if (postBatchLiveness.knowledgeBaseDeleted) {
650663
throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`)
651664
}
652665

@@ -881,9 +894,6 @@ async function addDocument(
881894
extDoc: ExternalDocument,
882895
sourceConfig?: Record<string, unknown>
883896
): Promise<DocumentData> {
884-
if (await isKnowledgeBaseDeleted(knowledgeBaseId)) {
885-
throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`)
886-
}
887897
const documentId = generateId()
888898
const contentBuffer = Buffer.from(extDoc.content, 'utf-8')
889899
const safeTitle = sanitizeStorageTitle(extDoc.title)
@@ -963,9 +973,6 @@ async function updateDocument(
963973
extDoc: ExternalDocument,
964974
sourceConfig?: Record<string, unknown>
965975
): Promise<DocumentData> {
966-
if (await isKnowledgeBaseDeleted(knowledgeBaseId)) {
967-
throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`)
968-
}
969976
// Fetch old file URL before uploading replacement
970977
const existingRows = await db
971978
.select({ fileUrl: document.fileUrl })

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 46 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ import { estimateTokenCount } from '@/lib/tokenization/estimators'
4848
import { deleteFile } from '@/lib/uploads/core/storage-service'
4949
import { extractStorageKey } from '@/lib/uploads/utils/file-utils'
5050
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
51-
import type { DocumentProcessingPayload } from '@/background/knowledge-processing'
5251
import { calculateCost } from '@/providers/utils'
5352

5453
const logger = createLogger('DocumentService')
@@ -314,12 +313,16 @@ async function processDocumentTags(
314313
return result
315314
}
316315

316+
const TRIGGER_BATCH_SIZE = 1000
317+
317318
export async function processDocumentsWithQueue(
318319
createdDocuments: DocumentData[],
319320
knowledgeBaseId: string,
320321
processingOptions: ProcessingOptions,
321322
requestId: string
322323
): Promise<void> {
324+
if (createdDocuments.length === 0) return
325+
323326
const jobPayloads = createdDocuments.map<DocumentJobData>((doc) => ({
324327
knowledgeBaseId,
325328
documentId: doc.documentId,
@@ -333,13 +336,52 @@ export async function processDocumentsWithQueue(
333336
requestId,
334337
}))
335338

339+
const useTrigger = isTriggerAvailable()
336340
logger.info(
337341
`[${requestId}] Dispatching background processing for ${jobPayloads.length} documents`,
338-
{
339-
backend: isTriggerAvailable() ? 'trigger-dev' : 'direct',
340-
}
342+
{ backend: useTrigger ? 'trigger-dev' : 'direct' }
341343
)
342344

345+
if (useTrigger) {
346+
/**
347+
* Single batched dispatch per chunk of up to TRIGGER_BATCH_SIZE — collapses
348+
* N HTTP roundtrips to ceil(N / TRIGGER_BATCH_SIZE). Idempotency keys allow
349+
* safe re-dispatch on retry without duplicating runs.
350+
*/
351+
let dispatched = 0
352+
for (let i = 0; i < jobPayloads.length; i += TRIGGER_BATCH_SIZE) {
353+
const chunk = jobPayloads.slice(i, i + TRIGGER_BATCH_SIZE)
354+
try {
355+
await tasks.batchTrigger(
356+
'knowledge-process-document',
357+
chunk.map((payload) => ({
358+
payload,
359+
options: {
360+
idempotencyKey: `doc-process-${payload.documentId}-${requestId}`,
361+
tags: [
362+
`knowledgeBaseId:${payload.knowledgeBaseId}`,
363+
`documentId:${payload.documentId}`,
364+
],
365+
},
366+
}))
367+
)
368+
dispatched += chunk.length
369+
} catch (error) {
370+
logger.error(`[${requestId}] Failed to batchTrigger ${chunk.length} document jobs`, {
371+
error: getErrorMessage(error),
372+
})
373+
if (dispatched === 0 && i + TRIGGER_BATCH_SIZE >= jobPayloads.length) {
374+
throw new Error(`All ${jobPayloads.length} document processing dispatches failed`)
375+
}
376+
}
377+
}
378+
379+
logger.info(
380+
`[${requestId}] Document dispatch complete: ${dispatched}/${jobPayloads.length} succeeded`
381+
)
382+
return
383+
}
384+
343385
const results = await Promise.allSettled(
344386
jobPayloads.map((payload) => dispatchDocumentProcessingJob(payload))
345387
)
@@ -358,8 +400,6 @@ export async function processDocumentsWithQueue(
358400
if (failures.length === results.length) {
359401
throw new Error(`All ${failures.length} document processing dispatches failed`)
360402
}
361-
362-
return
363403
}
364404

365405
export async function processDocumentAsync(
@@ -698,54 +738,6 @@ export function isTriggerAvailable(): boolean {
698738
return Boolean(env.TRIGGER_SECRET_KEY) && isTriggerDevEnabled
699739
}
700740

701-
async function processDocumentsWithTrigger(
702-
documents: DocumentProcessingPayload[],
703-
requestId: string
704-
): Promise<{ success: boolean; message: string; batchIds?: string[] }> {
705-
if (!isTriggerAvailable()) {
706-
throw new Error('Trigger.dev is not configured - TRIGGER_SECRET_KEY missing')
707-
}
708-
709-
try {
710-
logger.info(`[${requestId}] Triggering background processing for ${documents.length} documents`)
711-
712-
const MAX_BATCH_SIZE = 1000
713-
const batchIds: string[] = []
714-
715-
for (let i = 0; i < documents.length; i += MAX_BATCH_SIZE) {
716-
const chunk = documents.slice(i, i + MAX_BATCH_SIZE)
717-
const batchResult = await tasks.batchTrigger(
718-
'knowledge-process-document',
719-
chunk.map((doc) => ({
720-
payload: doc,
721-
options: {
722-
idempotencyKey: `doc-process-${doc.documentId}-${requestId}`,
723-
tags: [`knowledgeBaseId:${doc.knowledgeBaseId}`, `documentId:${doc.documentId}`],
724-
},
725-
}))
726-
)
727-
batchIds.push(batchResult.batchId)
728-
}
729-
730-
logger.info(
731-
`[${requestId}] Triggered ${documents.length} document processing jobs in ${batchIds.length} batch(es)`
732-
)
733-
734-
return {
735-
success: true,
736-
message: `${documents.length} document processing jobs triggered`,
737-
batchIds,
738-
}
739-
} catch (error) {
740-
logger.error(`[${requestId}] Failed to trigger document processing jobs:`, error)
741-
742-
return {
743-
success: false,
744-
message: getErrorMessage(error, 'Failed to trigger background jobs'),
745-
}
746-
}
747-
}
748-
749741
export async function createDocumentRecords(
750742
documents: Array<{
751743
filename: string

0 commit comments

Comments
 (0)