Skip to content

Commit b783c90

Browse files
committed
refactor(knowledge): split dispatch helpers, drop dead trigger branch
- Use the canonical DocumentProcessingPayload from the task module instead of the duplicate DocumentJobData interface in service.ts - Pass typeof processDocumentTask as a generic to tasks.batchTrigger so the payload shape is type-checked against the task definition - Inline TRIGGER_BATCH_SIZE provenance (Trigger.dev SDK 4.3.1+ doc'd cap, we're on 4.4.3) - Split direct vs trigger dispatch into dispatchInProcess and dispatchViaBatchTrigger; collapse the all-failed throw into a single check on the combined dispatched counter - Remove dispatchDocumentProcessingJob — its trigger branch is no longer reachable now that batchTrigger handles the trigger path, and the direct branch is inlined
1 parent 67cc80c commit b783c90

1 file changed

Lines changed: 90 additions & 87 deletions

File tree

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 90 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ import { estimateTokenCount } from '@/lib/tokenization/estimators'
4848
import { deleteFile } from '@/lib/uploads/core/storage-service'
4949
import { extractStorageKey } from '@/lib/uploads/utils/file-utils'
5050
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
51+
import type {
52+
DocumentProcessingPayload,
53+
processDocument as processDocumentTask,
54+
} from '@/background/knowledge-processing'
5155
import { calculateCost } from '@/providers/utils'
5256

5357
const logger = createLogger('DocumentService')
@@ -101,35 +105,6 @@ export interface ProcessingOptions {
101105
lang?: string
102106
}
103107

104-
interface DocumentJobData {
105-
knowledgeBaseId: string
106-
documentId: string
107-
docData: {
108-
filename: string
109-
fileUrl: string
110-
fileSize: number
111-
mimeType: string
112-
}
113-
processingOptions: ProcessingOptions
114-
requestId: string
115-
}
116-
117-
async function dispatchDocumentProcessingJob(payload: DocumentJobData): Promise<void> {
118-
if (isTriggerAvailable()) {
119-
await tasks.trigger('knowledge-process-document', payload, {
120-
tags: [`knowledgeBaseId:${payload.knowledgeBaseId}`, `documentId:${payload.documentId}`],
121-
})
122-
return
123-
}
124-
125-
await processDocumentAsync(
126-
payload.knowledgeBaseId,
127-
payload.documentId,
128-
payload.docData,
129-
payload.processingOptions
130-
)
131-
}
132-
133108
interface DocumentTagData {
134109
tagName: string
135110
fieldType: string
@@ -313,17 +288,20 @@ async function processDocumentTags(
313288
return result
314289
}
315290

291+
/**
292+
* Trigger.dev's documented per-call cap for `tasks.batchTrigger` is 1,000
293+
* items on SDK 4.3.1+ (we're on 4.4.3). Payloads above this are chunked.
294+
* https://trigger.dev/docs/triggering
295+
*/
316296
const TRIGGER_BATCH_SIZE = 1000
317297

318-
export async function processDocumentsWithQueue(
319-
createdDocuments: DocumentData[],
298+
function buildJobPayload(
299+
doc: DocumentData,
320300
knowledgeBaseId: string,
321301
processingOptions: ProcessingOptions,
322302
requestId: string
323-
): Promise<void> {
324-
if (createdDocuments.length === 0) return
325-
326-
const jobPayloads = createdDocuments.map<DocumentJobData>((doc) => ({
303+
): DocumentProcessingPayload {
304+
return {
327305
knowledgeBaseId,
328306
documentId: doc.documentId,
329307
docData: {
@@ -334,72 +312,97 @@ export async function processDocumentsWithQueue(
334312
},
335313
processingOptions,
336314
requestId,
337-
}))
315+
}
316+
}
317+
318+
/**
319+
* Dispatches document processing jobs. On Trigger.dev, collapses N runs into
320+
* `ceil(N / TRIGGER_BATCH_SIZE)` HTTP calls via `tasks.batchTrigger`. Without
321+
* Trigger.dev, falls back to in-process execution via `processDocumentAsync`.
322+
*
323+
* Throws only when every dispatch fails — partial failures are logged. Stuck
324+
* docs left in 'pending' are reaped by the next sync's stuck-doc retry pass.
325+
*/
326+
export async function processDocumentsWithQueue(
327+
createdDocuments: DocumentData[],
328+
knowledgeBaseId: string,
329+
processingOptions: ProcessingOptions,
330+
requestId: string
331+
): Promise<void> {
332+
if (createdDocuments.length === 0) return
333+
334+
const jobPayloads = createdDocuments.map((doc) =>
335+
buildJobPayload(doc, knowledgeBaseId, processingOptions, requestId)
336+
)
338337

339338
const useTrigger = isTriggerAvailable()
340339
logger.info(
341340
`[${requestId}] Dispatching background processing for ${jobPayloads.length} documents`,
342341
{ backend: useTrigger ? 'trigger-dev' : 'direct' }
343342
)
344343

345-
if (useTrigger) {
346-
/**
347-
* Single batched dispatch per chunk of up to TRIGGER_BATCH_SIZE — collapses
348-
* N HTTP roundtrips to ceil(N / TRIGGER_BATCH_SIZE). Idempotency keys allow
349-
* safe re-dispatch on retry without duplicating runs.
350-
*/
351-
let dispatched = 0
352-
for (let i = 0; i < jobPayloads.length; i += TRIGGER_BATCH_SIZE) {
353-
const chunk = jobPayloads.slice(i, i + TRIGGER_BATCH_SIZE)
354-
try {
355-
await tasks.batchTrigger(
356-
'knowledge-process-document',
357-
chunk.map((payload) => ({
358-
payload,
359-
options: {
360-
idempotencyKey: `doc-process-${payload.documentId}-${requestId}`,
361-
tags: [
362-
`knowledgeBaseId:${payload.knowledgeBaseId}`,
363-
`documentId:${payload.documentId}`,
364-
],
365-
},
366-
}))
367-
)
368-
dispatched += chunk.length
369-
} catch (error) {
370-
logger.error(`[${requestId}] Failed to batchTrigger ${chunk.length} document jobs`, {
371-
error: getErrorMessage(error),
372-
})
373-
if (dispatched === 0 && i + TRIGGER_BATCH_SIZE >= jobPayloads.length) {
374-
throw new Error(`All ${jobPayloads.length} document processing dispatches failed`)
375-
}
376-
}
377-
}
378-
379-
logger.info(
380-
`[${requestId}] Document dispatch complete: ${dispatched}/${jobPayloads.length} succeeded`
381-
)
382-
return
383-
}
344+
const dispatched = useTrigger
345+
? await dispatchViaBatchTrigger(jobPayloads, requestId)
346+
: await dispatchInProcess(jobPayloads)
384347

385-
const results = await Promise.allSettled(
386-
jobPayloads.map((payload) => dispatchDocumentProcessingJob(payload))
348+
logger.info(
349+
`[${requestId}] Document dispatch complete: ${dispatched}/${jobPayloads.length} succeeded`
387350
)
388351

389-
const failures = results.filter((r): r is PromiseRejectedResult => r.status === 'rejected')
390-
if (failures.length > 0) {
391-
logger.error(`[${requestId}] ${failures.length}/${results.length} document dispatches failed`, {
392-
errors: failures.map((f) => getErrorMessage(f.reason)),
393-
})
352+
if (dispatched === 0) {
353+
throw new Error(`All ${jobPayloads.length} document processing dispatches failed`)
394354
}
355+
}
395356

396-
logger.info(
397-
`[${requestId}] Document dispatch complete: ${results.length - failures.length}/${results.length} succeeded`
398-
)
357+
async function dispatchViaBatchTrigger(
358+
jobPayloads: DocumentProcessingPayload[],
359+
requestId: string
360+
): Promise<number> {
361+
let dispatched = 0
362+
for (let i = 0; i < jobPayloads.length; i += TRIGGER_BATCH_SIZE) {
363+
const chunk = jobPayloads.slice(i, i + TRIGGER_BATCH_SIZE)
364+
try {
365+
await tasks.batchTrigger<typeof processDocumentTask>(
366+
'knowledge-process-document',
367+
chunk.map((payload) => ({
368+
payload,
369+
options: {
370+
/**
371+
* Scoped to (documentId, requestId) so HTTP-level retries inside a
372+
* single dispatch don't double-enqueue, while legitimate re-dispatch
373+
* (e.g. stuck-doc retry on a later sync) gets a fresh requestId and
374+
* is allowed through.
375+
*/
376+
idempotencyKey: `doc-process-${payload.documentId}-${requestId}`,
377+
tags: [
378+
`knowledgeBaseId:${payload.knowledgeBaseId}`,
379+
`documentId:${payload.documentId}`,
380+
],
381+
},
382+
}))
383+
)
384+
dispatched += chunk.length
385+
} catch (error) {
386+
logger.error(`[${requestId}] Failed to batchTrigger ${chunk.length} document jobs`, {
387+
error: getErrorMessage(error),
388+
})
389+
}
390+
}
391+
return dispatched
392+
}
399393

400-
if (failures.length === results.length) {
401-
throw new Error(`All ${failures.length} document processing dispatches failed`)
394+
async function dispatchInProcess(jobPayloads: DocumentProcessingPayload[]): Promise<number> {
395+
const results = await Promise.allSettled(
396+
jobPayloads.map((p) =>
397+
processDocumentAsync(p.knowledgeBaseId, p.documentId, p.docData, p.processingOptions)
398+
)
399+
)
400+
let dispatched = 0
401+
for (const r of results) {
402+
if (r.status === 'fulfilled') dispatched++
403+
else logger.error('Document dispatch failed', { error: getErrorMessage(r.reason) })
402404
}
405+
return dispatched
403406
}
404407

405408
export async function processDocumentAsync(

0 commit comments

Comments
 (0)