Skip to content

Commit 0ad209f

Browse files
improvement(db): add session statement/lock timeouts; simplify KB doc tx
1 parent 64d855a commit 0ad209f

4 files changed

Lines changed: 111 additions & 98 deletions

File tree

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 95 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -766,99 +766,99 @@ export async function createDocumentRecords(
766766
knowledgeBaseId: string,
767767
requestId: string
768768
): Promise<DocumentData[]> {
769-
return await db.transaction(async (tx) => {
770-
await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`)
771-
772-
const kb = await tx
773-
.select({ id: knowledgeBase.id })
774-
.from(knowledgeBase)
775-
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
776-
.limit(1)
769+
// No tx wrapper: the bulk `db.insert(...).values([...])` is a single statement
770+
// and atomic by Postgres. The KB FK constraint fails loud if the KB is
771+
// concurrently deleted, so an explicit FOR UPDATE lock is unnecessary and
772+
// doubles per-call pool checkouts.
773+
const kb = await db
774+
.select({ id: knowledgeBase.id })
775+
.from(knowledgeBase)
776+
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
777+
.limit(1)
777778

778-
if (kb.length === 0) {
779-
throw new Error('Knowledge base not found')
780-
}
779+
if (kb.length === 0) {
780+
throw new Error('Knowledge base not found')
781+
}
781782

782-
const now = new Date()
783-
const documentRecords = []
784-
const returnData: DocumentData[] = []
783+
const now = new Date()
784+
const documentRecords = []
785+
const returnData: DocumentData[] = []
785786

786-
for (const docData of documents) {
787-
const documentId = generateId()
787+
for (const docData of documents) {
788+
const documentId = generateId()
788789

789-
let processedTags: Partial<ProcessedDocumentTags> = {}
790+
let processedTags: Partial<ProcessedDocumentTags> = {}
790791

791-
if (docData.documentTagsData) {
792-
try {
793-
const tagData = JSON.parse(docData.documentTagsData)
794-
if (Array.isArray(tagData)) {
795-
processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId)
796-
}
797-
} catch (error) {
798-
if (error instanceof SyntaxError) {
799-
logger.warn(`[${requestId}] Failed to parse documentTagsData for bulk document:`, error)
800-
} else {
801-
throw error
802-
}
792+
if (docData.documentTagsData) {
793+
try {
794+
const tagData = JSON.parse(docData.documentTagsData)
795+
if (Array.isArray(tagData)) {
796+
processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId)
797+
}
798+
} catch (error) {
799+
if (error instanceof SyntaxError) {
800+
logger.warn(`[${requestId}] Failed to parse documentTagsData for bulk document:`, error)
801+
} else {
802+
throw error
803803
}
804804
}
805+
}
805806

806-
const newDocument = {
807-
id: documentId,
808-
knowledgeBaseId,
809-
filename: docData.filename,
810-
fileUrl: docData.fileUrl,
811-
fileSize: docData.fileSize,
812-
mimeType: docData.mimeType,
813-
chunkCount: 0,
814-
tokenCount: 0,
815-
characterCount: 0,
816-
processingStatus: 'pending' as const,
817-
enabled: true,
818-
uploadedAt: now,
819-
tag1: processedTags.tag1 ?? docData.tag1 ?? null,
820-
tag2: processedTags.tag2 ?? docData.tag2 ?? null,
821-
tag3: processedTags.tag3 ?? docData.tag3 ?? null,
822-
tag4: processedTags.tag4 ?? docData.tag4 ?? null,
823-
tag5: processedTags.tag5 ?? docData.tag5 ?? null,
824-
tag6: processedTags.tag6 ?? docData.tag6 ?? null,
825-
tag7: processedTags.tag7 ?? docData.tag7 ?? null,
826-
number1: processedTags.number1 ?? null,
827-
number2: processedTags.number2 ?? null,
828-
number3: processedTags.number3 ?? null,
829-
number4: processedTags.number4 ?? null,
830-
number5: processedTags.number5 ?? null,
831-
date1: processedTags.date1 ?? null,
832-
date2: processedTags.date2 ?? null,
833-
boolean1: processedTags.boolean1 ?? null,
834-
boolean2: processedTags.boolean2 ?? null,
835-
boolean3: processedTags.boolean3 ?? null,
836-
}
837-
838-
documentRecords.push(newDocument)
839-
returnData.push({
840-
documentId,
841-
filename: docData.filename,
842-
fileUrl: docData.fileUrl,
843-
fileSize: docData.fileSize,
844-
mimeType: docData.mimeType,
845-
})
807+
const newDocument = {
808+
id: documentId,
809+
knowledgeBaseId,
810+
filename: docData.filename,
811+
fileUrl: docData.fileUrl,
812+
fileSize: docData.fileSize,
813+
mimeType: docData.mimeType,
814+
chunkCount: 0,
815+
tokenCount: 0,
816+
characterCount: 0,
817+
processingStatus: 'pending' as const,
818+
enabled: true,
819+
uploadedAt: now,
820+
tag1: processedTags.tag1 ?? docData.tag1 ?? null,
821+
tag2: processedTags.tag2 ?? docData.tag2 ?? null,
822+
tag3: processedTags.tag3 ?? docData.tag3 ?? null,
823+
tag4: processedTags.tag4 ?? docData.tag4 ?? null,
824+
tag5: processedTags.tag5 ?? docData.tag5 ?? null,
825+
tag6: processedTags.tag6 ?? docData.tag6 ?? null,
826+
tag7: processedTags.tag7 ?? docData.tag7 ?? null,
827+
number1: processedTags.number1 ?? null,
828+
number2: processedTags.number2 ?? null,
829+
number3: processedTags.number3 ?? null,
830+
number4: processedTags.number4 ?? null,
831+
number5: processedTags.number5 ?? null,
832+
date1: processedTags.date1 ?? null,
833+
date2: processedTags.date2 ?? null,
834+
boolean1: processedTags.boolean1 ?? null,
835+
boolean2: processedTags.boolean2 ?? null,
836+
boolean3: processedTags.boolean3 ?? null,
846837
}
847838

848-
if (documentRecords.length > 0) {
849-
await tx.insert(document).values(documentRecords)
850-
logger.info(
851-
`[${requestId}] Bulk created ${documentRecords.length} document records in knowledge base ${knowledgeBaseId}`
852-
)
839+
documentRecords.push(newDocument)
840+
returnData.push({
841+
documentId,
842+
filename: docData.filename,
843+
fileUrl: docData.fileUrl,
844+
fileSize: docData.fileSize,
845+
mimeType: docData.mimeType,
846+
})
847+
}
853848

854-
await tx
855-
.update(knowledgeBase)
856-
.set({ updatedAt: now })
857-
.where(eq(knowledgeBase.id, knowledgeBaseId))
858-
}
849+
if (documentRecords.length > 0) {
850+
await db.insert(document).values(documentRecords)
851+
logger.info(
852+
`[${requestId}] Bulk created ${documentRecords.length} document records in knowledge base ${knowledgeBaseId}`
853+
)
859854

860-
return returnData
861-
})
855+
await db
856+
.update(knowledgeBase)
857+
.set({ updatedAt: now })
858+
.where(eq(knowledgeBase.id, knowledgeBaseId))
859+
}
860+
861+
return returnData
862862
}
863863

864864
export interface TagFilterCondition {
@@ -1312,26 +1312,23 @@ export async function createSingleDocument(
13121312
...processedTags,
13131313
}
13141314

1315-
await db.transaction(async (tx) => {
1316-
await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`)
1317-
1318-
const kb = await tx
1319-
.select({ id: knowledgeBase.id })
1320-
.from(knowledgeBase)
1321-
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
1322-
.limit(1)
1315+
// No tx wrapper: single insert is atomic; KB FK fails loud on concurrent delete.
1316+
const kb = await db
1317+
.select({ id: knowledgeBase.id })
1318+
.from(knowledgeBase)
1319+
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
1320+
.limit(1)
13231321

1324-
if (kb.length === 0) {
1325-
throw new Error('Knowledge base not found')
1326-
}
1322+
if (kb.length === 0) {
1323+
throw new Error('Knowledge base not found')
1324+
}
13271325

1328-
await tx.insert(document).values(newDocument)
1326+
await db.insert(document).values(newDocument)
13291327

1330-
await tx
1331-
.update(knowledgeBase)
1332-
.set({ updatedAt: now })
1333-
.where(eq(knowledgeBase.id, knowledgeBaseId))
1334-
})
1328+
await db
1329+
.update(knowledgeBase)
1330+
.set({ updatedAt: now })
1331+
.where(eq(knowledgeBase.id, knowledgeBaseId))
13351332
logger.info(`[${requestId}] Document created: ${documentId} in knowledge base ${knowledgeBaseId}`)
13361333

13371334
return newDocument as {

apps/sim/lib/workspaces/lifecycle.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ describe('workspace lifecycle', () => {
5555
})
5656

5757
const tx = {
58+
execute: vi.fn().mockResolvedValue([]),
5859
select: vi.fn().mockReturnValue({
5960
from: vi.fn().mockReturnValue({
6061
where: vi.fn().mockResolvedValue([{ id: 'kb-1' }]),

apps/sim/lib/workspaces/lifecycle.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ export async function archiveWorkspace(
4949
.where(eq(workflowMcpServer.workspaceId, workspaceId))
5050

5151
await db.transaction(async (tx) => {
52+
// Workspace archival is a rare admin/cleanup operation that touches every
53+
// child table; on large workspaces it can exceed the 30s session default.
54+
// Override per-tx with a generous ceiling — if it ever runs longer than
55+
// this something is genuinely wrong.
56+
await tx.execute(sql`SET LOCAL statement_timeout = '5min'`)
57+
await tx.execute(sql`SET LOCAL lock_timeout = '30s'`)
58+
5259
await tx
5360
.update(knowledgeBase)
5461
.set({

packages/db/db.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@ const postgresClient = postgres(connectionString, {
1313
connect_timeout: 30,
1414
max: 30,
1515
onnotice: () => {},
16+
// Server-side guards. lock_timeout cancels a query waiting on a row lock for
17+
// >5s (e.g. another tx holding `SELECT ... FOR UPDATE`). statement_timeout
18+
// cancels any query running >30s. Heavy paths that legitimately need longer
19+
// (table service bulk JSONB rewrites) override per-tx with `SET LOCAL`.
20+
connection: {
21+
lock_timeout: 5_000,
22+
statement_timeout: 30_000,
23+
},
1624
})
1725

1826
export const db = drizzle(postgresClient, { schema })

0 commit comments

Comments
 (0)