Skip to content

Commit 457c3f8

Browse files
fix(tables): close sync-import TOCTOU by claiming the atomic import gate
The sync import route checked importStatus from a checkAccess snapshot, then parsed/validated/wrote seconds later without taking the atomic claim. A concurrent async kickoff (markTableImporting) could slip into that window and both writers would run together — for replace mode, two delete+insert passes leave the table indeterminate. Claim the same atomic gate (markTableImporting) right before the write and release it in the finally (before the response returns, so a client refetch never sees the transient status). A row-level FOR UPDATE was avoided on purpose: it would invert lock order against the position advisory lock / row-count trigger and risk a deadlock — markTableImporting is the established gate. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 23e4498 commit 457c3f8

3 files changed

Lines changed: 60 additions & 0 deletions

File tree

apps/sim/app/api/table/[tableId]/import/route.test.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,16 @@ const {
1212
mockReplaceTableRowsWithTx,
1313
mockAddTableColumnsWithTx,
1414
mockDispatchAfterBatchInsert,
15+
mockMarkTableImporting,
16+
mockReleaseImportClaim,
1517
} = vi.hoisted(() => ({
1618
mockCheckAccess: vi.fn(),
1719
mockBatchInsertRowsWithTx: vi.fn(),
1820
mockReplaceTableRowsWithTx: vi.fn(),
1921
mockAddTableColumnsWithTx: vi.fn(),
2022
mockDispatchAfterBatchInsert: vi.fn(),
23+
mockMarkTableImporting: vi.fn(),
24+
mockReleaseImportClaim: vi.fn(),
2125
}))
2226

2327
vi.mock('@sim/utils/id', () => ({
@@ -53,6 +57,8 @@ vi.mock('@/lib/table/service', () => ({
5357
replaceTableRowsWithTx: mockReplaceTableRowsWithTx,
5458
addTableColumnsWithTx: mockAddTableColumnsWithTx,
5559
dispatchAfterBatchInsert: mockDispatchAfterBatchInsert,
60+
markTableImporting: mockMarkTableImporting,
61+
releaseImportClaim: mockReleaseImportClaim,
5662
}))
5763

5864
import { POST } from '@/app/api/table/[tableId]/import/route'
@@ -142,6 +148,8 @@ describe('POST /api/table/[tableId]/import', () => {
142148
data.rows.map((_, i) => ({ id: `row_${i}` }))
143149
)
144150
mockReplaceTableRowsWithTx.mockResolvedValue({ deletedCount: 0, insertedCount: 0 })
151+
mockMarkTableImporting.mockResolvedValue(true)
152+
mockReleaseImportClaim.mockResolvedValue(undefined)
145153
mockAddTableColumnsWithTx.mockImplementation(
146154
async (
147155
_trx,
@@ -168,6 +176,22 @@ describe('POST /api/table/[tableId]/import', () => {
168176
expect(response.status).toBe(401)
169177
})
170178

179+
it('returns 409 when a background import already holds the table (claim lost)', async () => {
180+
mockMarkTableImporting.mockResolvedValueOnce(false)
181+
const response = await callPost(createFormData(createCsvFile('name,age\nAlice,30')))
182+
expect(response.status).toBe(409)
183+
expect(mockBatchInsertRowsWithTx).not.toHaveBeenCalled()
184+
expect(mockReplaceTableRowsWithTx).not.toHaveBeenCalled()
185+
expect(mockReleaseImportClaim).not.toHaveBeenCalled()
186+
})
187+
188+
it('releases the import claim after a successful write', async () => {
189+
const response = await callPost(createFormData(createCsvFile('name,age\nAlice,30')))
190+
expect(response.status).toBe(200)
191+
expect(mockMarkTableImporting).toHaveBeenCalledWith('tbl_1', 'deadbeefcafef00d')
192+
expect(mockReleaseImportClaim).toHaveBeenCalledWith('tbl_1', 'deadbeefcafef00d')
193+
})
194+
171195
it('returns 400 when the mode is invalid', async () => {
172196
const response = await callPost(
173197
createFormData(createCsvFile('name,age\nAlice,30'), { mode: 'bogus' })

apps/sim/app/api/table/[tableId]/import/route.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import {
2929
createCsvParser,
3030
dispatchAfterBatchInsert,
3131
inferColumnType,
32+
markTableImporting,
33+
releaseImportClaim,
3234
replaceTableRowsWithTx,
3335
sanitizeName,
3436
type TableDefinition,
@@ -57,6 +59,7 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
5759
const requestId = generateRequestId()
5860
const { tableId } = tableIdParamsSchema.parse(await params)
5961
let fileStream: Readable | undefined
62+
let claimedImportId: string | null = null
6063

6164
try {
6265
const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
@@ -247,6 +250,19 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
247250

248251
const coerced = coerceRowsForTable(rows, prospectiveTable.schema, validation.effectiveMap)
249252

253+
// Atomically claim the table before writing. The pre-check above reads a checkAccess snapshot
254+
// taken before the parse/validation; a background import could claim the table in that window.
255+
// markTableImporting is the single atomic gate (same one the async kickoff uses) — released in
256+
// the finally so a sync import can't write concurrently with a background one (corrupts replace).
257+
const syncImportId = generateId()
258+
if (!(await markTableImporting(tableId, syncImportId))) {
259+
return NextResponse.json(
260+
{ error: 'An import is already in progress for this table' },
261+
{ status: 409 }
262+
)
263+
}
264+
claimedImportId = syncImportId
265+
250266
if (mode === 'append') {
251267
if (prospectiveTable.rowCount + coerced.length > prospectiveTable.maxRows) {
252268
const deficit = prospectiveTable.rowCount + coerced.length - prospectiveTable.maxRows
@@ -407,5 +423,7 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
407423
)
408424
} finally {
409425
fileStream?.destroy()
426+
// Release before the response returns, so a client refetch never observes the transient claim.
427+
if (claimedImportId) await releaseImportClaim(tableId, claimedImportId).catch(() => {})
410428
}
411429
})

apps/sim/lib/table/service.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1371,6 +1371,24 @@ export async function markTableImporting(tableId: string, importId: string): Pro
13711371
return updated.length > 0
13721372
}
13731373

1374+
/**
1375+
* Releases a claim taken by {@link markTableImporting} for a synchronous import — clears the
1376+
* import state back to idle. Scoped to `importId` so it only clears its own claim, never a newer
1377+
* run that may have taken over. A sync route claims, writes, then releases here in a `finally`.
1378+
*/
1379+
export async function releaseImportClaim(tableId: string, importId: string): Promise<void> {
1380+
await db
1381+
.update(userTableDefinitions)
1382+
.set({ importStatus: null, importId: null, importStartedAt: null, updatedAt: new Date() })
1383+
.where(
1384+
and(
1385+
eq(userTableDefinitions.id, tableId),
1386+
eq(userTableDefinitions.importId, importId),
1387+
eq(userTableDefinitions.importStatus, 'importing')
1388+
)
1389+
)
1390+
}
1391+
13741392
/**
13751393
* Records import progress (rows processed so far). Also bumps `updatedAt` so the
13761394
* stale-import janitor (`cleanup-stale-executions`) sees a live heartbeat and doesn't mark a

0 commit comments

Comments
 (0)