Skip to content

Commit 5fa7391

Browse files
fix(tables): gate import ownership every batch and stop canceled imports reappearing
- Worker checked run ownership only at the progress cadence (~every 5k rows), so a canceled/superseded import could insert several more batches (incl. the final partial batch) before stopping. Move the updateImportProgress ownership gate to the top of every flush — a run that lost the table stops within one batch. - A list/dialog import canceled mid-upload left the server row `importing` until the in-flight server cancel landed; hydration re-seeded it from useTablesList, so the dismissed import flickered back. Flag the real table id canceled on the mid-upload cancel path, skip re-seeding flagged tables in hydration, and clear the flag once the server import is terminal. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent a69d15b commit 5fa7391

3 files changed

Lines changed: 22 additions & 5 deletions

File tree

apps/sim/app/workspace/[workspaceId]/tables/components/import-progress-menu/use-hydrate-import-tray.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ export function useHydrateImportTray(workspaceId: string | undefined): void {
3333
for (const table of tables) {
3434
if (table.importStatus === 'importing') {
3535
if (tray.entries[table.id]) continue
36+
// Canceled mid-upload: the server row stays `importing` until the in-flight server cancel
37+
// lands. Don't re-seed an entry the user already dismissed — it would flicker back.
38+
if (tray.isCanceled(table.id)) continue
3639
tray.upsert({
3740
tableId: table.id,
3841
workspaceId,
@@ -57,6 +60,12 @@ export function useHydrateImportTray(workspaceId: string | undefined): void {
5760
})
5861
}
5962
}
63+
64+
// Once the server import reaches a terminal state, drop any lingering cancel flag so it can
65+
// never suppress a future re-import of the same table.
66+
if (table.importStatus !== 'importing' && tray.isCanceled(table.id)) {
67+
tray.consumeCanceled(table.id)
68+
}
6069
}
6170
}, [workspaceId, tables])
6271
}

apps/sim/app/workspace/[workspaceId]/tables/tables.tsx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,10 @@ export function Tables() {
445445
})
446446
useImportTrayStore.getState().dismiss(pendingId)
447447
if (result?.tableId && useImportTrayStore.getState().consumeCanceled(pendingId)) {
448-
// Canceled mid-upload — the worker just started; cancel it server-side.
448+
// Canceled mid-upload — the worker just started. Flag the real table id so
449+
// hydration won't re-seed it from the still-`importing` server row while the
450+
// server cancel is in flight, then cancel it server-side.
451+
useImportTrayStore.getState().cancel(result.tableId)
449452
void cancelTableImport(workspaceId, result.tableId, result.importId).catch(
450453
() => {}
451454
)

apps/sim/lib/table/import-runner.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,21 +176,26 @@ export async function runTableImport(payload: TableImportPayload): Promise<void>
176176

177177
const flush = async (rows: Record<string, unknown>[]) => {
178178
if (rows.length === 0 || !schema || !headerToColumn) return
179+
// Ownership gate on *every* batch, before inserting: once this run loses the table (cancel,
180+
// supersede, or the stale-import janitor), `updateImportProgress` returns false and we stop
181+
// immediately — a superseded worker must never write another batch into a table a newer
182+
// import now owns. (Throttling this to the progress cadence let up to PROGRESS_INTERVAL_ROWS
183+
// extra rows land after cancel.) The write also keeps the persisted row count fresh.
184+
const owns = await updateImportProgress(tableId, inserted, importId)
185+
if (!owns) throw new ImportSupersededError()
179186
const coerced = coerceRowsForTable(rows, schema, headerToColumn)
180187
inserted += await bulkInsertImportBatch(
181188
{ tableId, workspaceId, userId, rows: coerced, startPosition: basePosition + inserted },
182189
{ ...table, schema },
183190
requestId
184191
)
185-
// Emit after the first batch lands, then every interval, so the bar appears early.
192+
// Emit after the first batch lands, then every interval, so the bar appears early without
193+
// flooding the SSE stream (the ownership/progress write above is what runs every batch).
186194
if (
187195
inserted - lastReported >= PROGRESS_INTERVAL_ROWS ||
188196
(lastReported === 0 && inserted > 0)
189197
) {
190198
lastReported = inserted
191-
// Heartbeat + ownership check: if a newer import has taken over this table, stop.
192-
const owns = await updateImportProgress(tableId, inserted, importId)
193-
if (!owns) throw new ImportSupersededError()
194199
// Exact, monotonic completion from bytes consumed — no wobbly row estimate.
195200
const percent =
196201
totalBytes > 0 ? Math.min(99, Math.round((bytesRead / totalBytes) * 100)) : undefined

0 commit comments

Comments
 (0)