Skip to content

Commit ebbba86

Browse files
fix(tables): make markTableImporting an atomic claim to close the concurrent-import TOCTOU race
1 parent 7080f0b commit ebbba86

3 files changed

Lines changed: 37 additions & 14 deletions

File tree

apps/sim/app/api/table/[tableId]/import-async/route.test.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ describe('POST /api/table/[tableId]/import-async', () => {
7777
authType: 'session',
7878
})
7979
mockCheckAccess.mockResolvedValue({ ok: true, table: buildTable() })
80-
mockMarkTableImporting.mockResolvedValue(undefined)
80+
mockMarkTableImporting.mockResolvedValue(true)
8181
mockRunTableImport.mockResolvedValue(undefined)
8282
})
8383

@@ -104,6 +104,13 @@ describe('POST /api/table/[tableId]/import-async', () => {
104104
)
105105
})
106106

107+
it('returns 409 when the table is already importing (claim lost)', async () => {
108+
mockMarkTableImporting.mockResolvedValue(false)
109+
const response = await makeRequest(validBody)
110+
expect(response.status).toBe(409)
111+
expect(mockRunTableImport).not.toHaveBeenCalled()
112+
})
113+
107114
it('returns 401 when unauthenticated', async () => {
108115
hybridAuthMockFns.mockCheckSessionOrInternalAuth.mockResolvedValue({ success: false })
109116
const response = await makeRequest(validBody)

apps/sim/app/api/table/[tableId]/import-async/route.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,23 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
4949
if (table.archivedAt) {
5050
return NextResponse.json({ error: 'Cannot import into an archived table' }, { status: 400 })
5151
}
52-
// Reject overlapping imports: a second worker would insert at colliding row positions.
53-
if (table.importStatus === 'importing') {
54-
return NextResponse.json(
55-
{ error: 'An import is already in progress for this table' },
56-
{ status: 409 }
57-
)
58-
}
5952

6053
const ext = fileName.split('.').pop()?.toLowerCase()
6154
if (ext !== 'csv' && ext !== 'tsv') {
6255
return NextResponse.json({ error: 'Only CSV and TSV files are supported' }, { status: 400 })
6356
}
6457
const delimiter = ext === 'tsv' ? '\t' : ','
6558

59+
// Atomically claim the table — the single concurrency gate. If another import already holds it,
60+
// this returns false (no overlapping workers writing colliding row positions).
6661
const importId = generateId()
67-
await markTableImporting(tableId, importId)
62+
const claimed = await markTableImporting(tableId, importId)
63+
if (!claimed) {
64+
return NextResponse.json(
65+
{ error: 'An import is already in progress for this table' },
66+
{ status: 409 }
67+
)
68+
}
6869

6970
runDetached('table-import', () =>
7071
runTableImport({

apps/sim/lib/table/service.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import {
1717
import { createLogger } from '@sim/logger'
1818
import { getPostgresErrorCode } from '@sim/utils/errors'
1919
import { generateId } from '@sim/utils/id'
20-
import { and, count, eq, gt, gte, inArray, isNull, type SQL, sql } from 'drizzle-orm'
20+
import { and, count, eq, gt, gte, inArray, isNull, ne, or, type SQL, sql } from 'drizzle-orm'
2121
import { MATERIALIZE_CONCURRENCY, mapWithConcurrency } from '@/lib/core/utils/concurrency'
2222
import { generateRestoreName } from '@/lib/core/utils/restore-name'
2323
import type { DbOrTx } from '@/lib/db/types'
@@ -1341,9 +1341,14 @@ export async function setTableSchemaForImport(tableId: string, schema: TableSche
13411341
.where(eq(userTableDefinitions.id, tableId))
13421342
}
13431343

1344-
/** Marks an existing table as undergoing an async import (rows hidden until ready). */
1345-
export async function markTableImporting(tableId: string, importId: string): Promise<void> {
1346-
await db
1344+
/**
1345+
* Atomically claims a table for an async import. The `import_status != 'importing'` guard makes
1346+
* this the single concurrency gate: of two racing kickoffs only one row-update matches, so only
1347+
* one wins (no TOCTOU between a separate status check and this write). Returns whether it claimed
1348+
* the table — the caller returns 409 when it didn't.
1349+
*/
1350+
export async function markTableImporting(tableId: string, importId: string): Promise<boolean> {
1351+
const updated = await db
13471352
.update(userTableDefinitions)
13481353
.set({
13491354
importStatus: 'importing',
@@ -1353,7 +1358,17 @@ export async function markTableImporting(tableId: string, importId: string): Pro
13531358
importStartedAt: new Date(),
13541359
updatedAt: new Date(),
13551360
})
1356-
.where(eq(userTableDefinitions.id, tableId))
1361+
.where(
1362+
and(
1363+
eq(userTableDefinitions.id, tableId),
1364+
or(
1365+
isNull(userTableDefinitions.importStatus),
1366+
ne(userTableDefinitions.importStatus, 'importing')
1367+
)
1368+
)
1369+
)
1370+
.returning({ id: userTableDefinitions.id })
1371+
return updated.length > 0
13571372
}
13581373

13591374
/**

0 commit comments

Comments
 (0)