Skip to content

Commit 3518b99

Browse files
feat(tables): background import for large CSVs with live progress (#4861)
* feat(tables): background import for large CSVs with live progress * fix(tables): address review — import heartbeat, overlap guard, column/empty validation * fix(tables): guard sync import overlap, scope fileKey to workspace, delete-on-replace after download * fix(tables): stream large CSV imports from storage instead of buffering the whole file * test(tables): fix async-import route tests for workspace-scoped fileKey + name uniquification * fix(tables): append imports start after existing rows; reconcile missed import failures in the tray * fix(tables): delete the uploaded CSV from storage after the import finishes * fix(tables): validate replace before deleting rows; ignore stale replayed import events by importId * fix(tables): bind import worker to its importId (no stale-worker clobber/overlap) and destroy storage stream on failure * feat(tables): byte-based import progress, cancel support, and a start toast that opens the import view * fix(tables): don't emit ready after cancel; honor cancel during the upload phase * improvement(tables): use a stop (square) icon for canceling an active import * fix(tables): make markTableImporting an atomic claim to close the concurrent-import TOCTOU race * improvement(tables): preview CSV import from a slice, drop client row-count warning The import dialog parsed the entire file in the browser to show an exact row count and a row-limit warning. That holds the whole file in memory, blocks the main thread, and hits V8's ~512MB string ceiling — so the dialog capped the effective import size well below what the streaming importer handles. Parse only the first 512KB (headers + sample for the mapping); drop the exact count and the "would exceed the row limit by N" gate. The DB row-count trigger already enforces max_rows server-side, so an over-limit import fails fast during the run with a clear message instead of being blocked by an expensive parse. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * fix(tables): gate import ownership every batch and stop canceled imports reappearing - Worker checked run ownership only at the progress cadence (~every 5k rows), so a canceled/superseded import could insert several more batches (incl. the final partial batch) before stopping. Move the updateImportProgress ownership gate to the top of every flush — a run that lost the table stops within one batch. - A list/dialog import canceled mid-upload left the server row `importing` until the in-flight server cancel landed; hydration re-seeded it from useTablesList, so the dismissed import flickered back. Flag the real table id canceled on the mid-upload cancel path, skip re-seeding flagged tables in hydration, and clear the flag once the server import is terminal. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * refactor(tables): drive import tray by polling derived from server, not SSE Import progress no longer holds an SSE connection per importing table. The tray now derives its importing rows live from the table list (React Query), polled only while an import is in flight; the table detail page keeps its own cell-state SSE for grid refresh. - store holds only client-only state now: optimistic uploads, which terminal completions to surface this session, canceled ids, menu open — no copied importStatus/rowsProcessed. - useWorkspaceImports is the single source: polls via a data-predicate refetchInterval, derives rows, and fires completion toasts on the importing -> terminal transition. - kickoff handlers use startUpload/setUploadPercent/endUpload; the invalidated list refetch surfaces the server row and polling takes over. - removes use-hydrate-import-tray + use-import-progress-tracker (folded in). - trims over-verbose comments across the import paths. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * fix(tables): ignore superseded-run import events in the detail SSE cache applyImport applied every replayed import payload to the detail cache. The SSE buffer can replay a prior import's terminal event for the same table, stomping a newer in-flight import's UI. Lock to the active run's importId (and ignore a replayed terminal before the id is known), matching the guard the header tracker used to have. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * fix(tables): close sync-import TOCTOU by claiming the atomic import gate The sync import route checked importStatus from a checkAccess snapshot, then parsed/validated/wrote seconds later without taking the atomic claim. A concurrent async kickoff (markTableImporting) could slip into that window and both writers would run together — for replace mode, two delete+insert passes leave the table indeterminate. Claim the same atomic gate (markTableImporting) right before the write and release it in the finally (before the response returns, so a client refetch never sees the transient status). A row-level FOR UPDATE was avoided on purpose: it would invert lock order against the position advisory lock / row-count trigger and risk a deadlock — markTableImporting is the established gate. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * fix(multipart): keep abort wired after resolve so a mid-upload disconnect tears down the stream readMultipart resolves on the file-part header and hands the caller an un-drained stream, but settle() ran cleanup() and detached the abort listener on that path too. A client disconnect mid-upload then destroyed nothing — busboy never saw EOF, the file stream stalled, and the route's `for await` held a request slot until maxDuration (300s). Re-arm an abort handler scoped to the file stream on resolve, detached when the stream closes. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent aed4402 commit 3518b99

50 files changed

Lines changed: 21014 additions & 393 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apps/sim/app/api/cron/cleanup-stale-executions/route.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { asyncJobs, db } from '@sim/db'
2-
import { workflowExecutionLogs } from '@sim/db/schema'
2+
import { userTableDefinitions, workflowExecutionLogs } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
44
import { toError } from '@sim/utils/errors'
55
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
@@ -110,6 +110,37 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
110110
})
111111
}
112112

113+
// Mark stale table imports as failed. Imports run detached on the web container and
114+
// are lost if the pod is killed mid-load. `updatedAt` is bumped by progress updates, so
115+
// an `importing` table with no recent update has stalled (not merely slow). Rows are
116+
// left in place (no rollback); the user re-imports.
117+
let staleImportsMarkedFailed = 0
118+
try {
119+
const staleImports = await db
120+
.update(userTableDefinitions)
121+
.set({
122+
importStatus: 'failed',
123+
importError: `Import terminated: no progress for more than ${STALE_THRESHOLD_MINUTES} minutes (worker timeout or crash)`,
124+
updatedAt: new Date(),
125+
})
126+
.where(
127+
and(
128+
eq(userTableDefinitions.importStatus, 'importing'),
129+
lt(userTableDefinitions.updatedAt, staleThreshold)
130+
)
131+
)
132+
.returning({ id: userTableDefinitions.id })
133+
134+
staleImportsMarkedFailed = staleImports.length
135+
if (staleImportsMarkedFailed > 0) {
136+
logger.info(`Marked ${staleImportsMarkedFailed} stale table imports as failed`)
137+
}
138+
} catch (error) {
139+
logger.error('Failed to clean up stale table imports:', {
140+
error: toError(error).message,
141+
})
142+
}
143+
113144
// Clean up stale pending jobs (never started, e.g., due to server crash before startJob())
114145
let stalePendingJobsMarkedFailed = 0
115146

@@ -179,6 +210,9 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
179210
staleThresholdMinutes: STALE_THRESHOLD_MINUTES,
180211
retentionHours: JOB_RETENTION_HOURS,
181212
},
213+
tableImports: {
214+
staleMarkedFailed: staleImportsMarkedFailed,
215+
},
182216
})
183217
} catch (error) {
184218
logger.error('Error in stale execution cleanup job:', error)
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { hybridAuthMockFns } from '@sim/testing'
5+
import { NextRequest } from 'next/server'
6+
import { beforeEach, describe, expect, it, vi } from 'vitest'
7+
import type { TableDefinition } from '@/lib/table'
8+
9+
const { mockCheckAccess, mockMarkTableImporting, mockRunTableImport } = vi.hoisted(() => ({
10+
mockCheckAccess: vi.fn(),
11+
mockMarkTableImporting: vi.fn(),
12+
mockRunTableImport: vi.fn(),
13+
}))
14+
15+
vi.mock('@sim/utils/id', () => ({
16+
generateId: vi.fn().mockReturnValue('import-id-xyz'),
17+
generateShortId: vi.fn().mockReturnValue('short-id'),
18+
}))
19+
vi.mock('@/lib/table/service', () => ({ markTableImporting: mockMarkTableImporting }))
20+
vi.mock('@/lib/table/import-runner', () => ({ runTableImport: mockRunTableImport }))
21+
vi.mock('@/lib/core/utils/background', () => ({
22+
runDetached: (_label: string, work: () => Promise<unknown>) => {
23+
void work()
24+
},
25+
}))
26+
vi.mock('@/app/api/table/utils', async () => {
27+
const { NextResponse } = await import('next/server')
28+
return {
29+
checkAccess: mockCheckAccess,
30+
accessError: (result: { status: number }) =>
31+
NextResponse.json({ error: 'denied' }, { status: result.status }),
32+
}
33+
})
34+
35+
import { POST } from '@/app/api/table/[tableId]/import-async/route'
36+
37+
function buildTable(overrides: Partial<TableDefinition> = {}): TableDefinition {
38+
return {
39+
id: 'tbl_1',
40+
name: 'People',
41+
description: null,
42+
schema: { columns: [{ name: 'name', type: 'string' }] },
43+
metadata: null,
44+
rowCount: 0,
45+
maxRows: 1_000_000,
46+
workspaceId: 'workspace-1',
47+
createdBy: 'user-1',
48+
archivedAt: null,
49+
createdAt: new Date(),
50+
updatedAt: new Date(),
51+
...overrides,
52+
}
53+
}
54+
55+
function makeRequest(body: unknown, tableId = 'tbl_1') {
56+
const req = new NextRequest(`http://localhost:3000/api/table/${tableId}/import-async`, {
57+
method: 'POST',
58+
headers: { 'content-type': 'application/json' },
59+
body: JSON.stringify(body),
60+
})
61+
return POST(req, { params: Promise.resolve({ tableId }) })
62+
}
63+
64+
const validBody = {
65+
workspaceId: 'workspace-1',
66+
fileKey: 'workspace/workspace-1/123-data.csv',
67+
fileName: 'data.csv',
68+
mode: 'append',
69+
}
70+
71+
describe('POST /api/table/[tableId]/import-async', () => {
72+
beforeEach(() => {
73+
vi.clearAllMocks()
74+
hybridAuthMockFns.mockCheckSessionOrInternalAuth.mockResolvedValue({
75+
success: true,
76+
userId: 'user-1',
77+
authType: 'session',
78+
})
79+
mockCheckAccess.mockResolvedValue({ ok: true, table: buildTable() })
80+
mockMarkTableImporting.mockResolvedValue(true)
81+
mockRunTableImport.mockResolvedValue(undefined)
82+
})
83+
84+
it('marks the table importing and kicks off the worker with mode + mapping', async () => {
85+
const response = await makeRequest({
86+
...validBody,
87+
mode: 'replace',
88+
mapping: { Name: 'name' },
89+
createColumns: ['Extra'],
90+
})
91+
const data = await response.json()
92+
93+
expect(response.status).toBe(200)
94+
expect(data.data).toEqual({ tableId: 'tbl_1', importId: 'import-id-xyz' })
95+
expect(mockMarkTableImporting).toHaveBeenCalledWith('tbl_1', 'import-id-xyz')
96+
expect(mockRunTableImport).toHaveBeenCalledWith(
97+
expect.objectContaining({
98+
tableId: 'tbl_1',
99+
mode: 'replace',
100+
delimiter: ',',
101+
mapping: { Name: 'name' },
102+
createColumns: ['Extra'],
103+
})
104+
)
105+
})
106+
107+
it('returns 409 when the table is already importing (claim lost)', async () => {
108+
mockMarkTableImporting.mockResolvedValue(false)
109+
const response = await makeRequest(validBody)
110+
expect(response.status).toBe(409)
111+
expect(mockRunTableImport).not.toHaveBeenCalled()
112+
})
113+
114+
it('returns 401 when unauthenticated', async () => {
115+
hybridAuthMockFns.mockCheckSessionOrInternalAuth.mockResolvedValue({ success: false })
116+
const response = await makeRequest(validBody)
117+
expect(response.status).toBe(401)
118+
expect(mockMarkTableImporting).not.toHaveBeenCalled()
119+
})
120+
121+
it('returns the access error status when access is denied', async () => {
122+
mockCheckAccess.mockResolvedValue({ ok: false, status: 403 })
123+
const response = await makeRequest(validBody)
124+
expect(response.status).toBe(403)
125+
expect(mockRunTableImport).not.toHaveBeenCalled()
126+
})
127+
128+
it('returns 400 when the target table is archived', async () => {
129+
mockCheckAccess.mockResolvedValue({ ok: true, table: buildTable({ archivedAt: new Date() }) })
130+
const response = await makeRequest(validBody)
131+
expect(response.status).toBe(400)
132+
expect(mockRunTableImport).not.toHaveBeenCalled()
133+
})
134+
135+
it('returns 400 on workspace mismatch', async () => {
136+
const response = await makeRequest({ ...validBody, workspaceId: 'other-ws' })
137+
expect(response.status).toBe(400)
138+
})
139+
140+
it('returns 400 for an invalid mode', async () => {
141+
const response = await makeRequest({ ...validBody, mode: 'bogus' })
142+
expect(response.status).toBe(400)
143+
})
144+
})
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import { createLogger } from '@sim/logger'
2+
import { generateId } from '@sim/utils/id'
3+
import { type NextRequest, NextResponse } from 'next/server'
4+
import { importIntoTableAsyncContract } from '@/lib/api/contracts/tables'
5+
import { parseRequest } from '@/lib/api/server'
6+
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
7+
import { runDetached } from '@/lib/core/utils/background'
8+
import { generateRequestId } from '@/lib/core/utils/request'
9+
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
10+
import { runTableImport } from '@/lib/table/import-runner'
11+
import { markTableImporting } from '@/lib/table/service'
12+
import { accessError, checkAccess } from '@/app/api/table/utils'
13+
14+
const logger = createLogger('TableImportIntoAsync')
15+
16+
export const runtime = 'nodejs'
17+
export const dynamic = 'force-dynamic'
18+
19+
interface RouteParams {
20+
params: Promise<{ tableId: string }>
21+
}
22+
23+
export const POST = withRouteHandler(async (request: NextRequest, { params }: RouteParams) => {
24+
const requestId = generateRequestId()
25+
26+
const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
27+
if (!authResult.success || !authResult.userId) {
28+
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
29+
}
30+
const userId = authResult.userId
31+
32+
const parsed = await parseRequest(importIntoTableAsyncContract, request, { params })
33+
if (!parsed.success) return parsed.response
34+
const { tableId } = parsed.data.params
35+
const { workspaceId, fileKey, fileName, mode, mapping, createColumns } = parsed.data.body
36+
37+
const access = await checkAccess(tableId, userId, 'write')
38+
if (!access.ok) return accessError(access, requestId, tableId)
39+
const { table } = access
40+
41+
if (table.workspaceId !== workspaceId) {
42+
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
43+
}
44+
// The fileKey is client-supplied — ensure it points at this workspace's storage prefix so a
45+
// caller can't import another workspace's uploaded object.
46+
if (!fileKey.startsWith(`workspace/${workspaceId}/`)) {
47+
return NextResponse.json({ error: 'Invalid file key for workspace' }, { status: 400 })
48+
}
49+
if (table.archivedAt) {
50+
return NextResponse.json({ error: 'Cannot import into an archived table' }, { status: 400 })
51+
}
52+
53+
const ext = fileName.split('.').pop()?.toLowerCase()
54+
if (ext !== 'csv' && ext !== 'tsv') {
55+
return NextResponse.json({ error: 'Only CSV and TSV files are supported' }, { status: 400 })
56+
}
57+
const delimiter = ext === 'tsv' ? '\t' : ','
58+
59+
// Atomically claim the table — the single concurrency gate. If another import already holds it,
60+
// this returns false (no overlapping workers writing colliding row positions).
61+
const importId = generateId()
62+
const claimed = await markTableImporting(tableId, importId)
63+
if (!claimed) {
64+
return NextResponse.json(
65+
{ error: 'An import is already in progress for this table' },
66+
{ status: 409 }
67+
)
68+
}
69+
70+
runDetached('table-import', () =>
71+
runTableImport({
72+
importId,
73+
tableId,
74+
workspaceId,
75+
userId,
76+
fileKey,
77+
fileName,
78+
delimiter,
79+
mode,
80+
mapping,
81+
createColumns,
82+
})
83+
)
84+
85+
logger.info(`[${requestId}] Async CSV import into existing table started`, {
86+
tableId,
87+
importId,
88+
mode,
89+
fileName,
90+
})
91+
return NextResponse.json({ success: true, data: { tableId, importId } })
92+
})

0 commit comments

Comments
 (0)