Skip to content

Commit 7cec012

Browse files
fix(tables): bind import worker to its importId (no stale-worker clobber/overlap) and destroy storage stream on failure
1 parent b19b9d8 commit 7cec012

2 files changed

Lines changed: 59 additions & 19 deletions

File tree

apps/sim/lib/table/import-runner.ts

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Transform } from 'node:stream'
1+
import { type Readable, Transform } from 'node:stream'
22
import { createLogger } from '@sim/logger'
33
import { getErrorMessage } from '@sim/utils/errors'
44
import { generateId } from '@sim/utils/id'
@@ -35,6 +35,13 @@ const logger = createLogger('TableImportRunner')
3535
/** Emit a progress event / DB update at most every this many rows. */
3636
const PROGRESS_INTERVAL_ROWS = 5000
3737

38+
/**
39+
* Thrown when this worker discovers it no longer owns the table's import (the stale-job janitor
40+
* marked its run failed and a newer import took over). The worker stops inserting rather than
41+
* writing into a table a second worker now owns.
42+
*/
43+
class ImportSupersededError extends Error {}
44+
3845
/** `create` infers a schema for a new table; `append`/`replace` map onto an existing one. */
3946
export type TableImportMode = 'create' | 'append' | 'replace'
4047

@@ -65,6 +72,9 @@ export interface TableImportPayload {
6572
export async function runTableImport(payload: TableImportPayload): Promise<void> {
6673
const { importId, tableId, workspaceId, userId, fileKey, fileName, delimiter, mode } = payload
6774
const requestId = generateId().slice(0, 8)
75+
// Hoisted so `finally` can destroy it on any failure — otherwise the storage HTTP body leaks
76+
// open until it times out.
77+
let source: Readable | undefined
6878

6979
try {
7080
const loaded = await getTableById(tableId, { includeArchived: true })
@@ -76,7 +86,7 @@ export async function runTableImport(payload: TableImportPayload): Promise<void>
7686
const totalBytes = (await headObject(fileKey, 'workspace'))?.size ?? 0
7787

7888
// Stream the file rather than buffering it — a ~1M-row import must never be held in memory.
79-
const source = await downloadFileStream({ key: fileKey, context: 'workspace' })
89+
source = await downloadFileStream({ key: fileKey, context: 'workspace' })
8090

8191
// Append must continue after the existing rows; create/replace start empty. Read once up
8292
// front (the import is the table's sole writer) and assign contiguous positions from it.
@@ -178,7 +188,9 @@ export async function runTableImport(payload: TableImportPayload): Promise<void>
178188
(lastReported === 0 && inserted > 0)
179189
) {
180190
lastReported = inserted
181-
await updateImportProgress(tableId, inserted)
191+
// Heartbeat + ownership check: if a newer import has taken over this table, stop.
192+
const owns = await updateImportProgress(tableId, inserted, importId)
193+
if (!owns) throw new ImportSupersededError()
182194
// Extrapolate the total from rows-per-byte observed so far; self-refines as it runs.
183195
// `Math.max(inserted, …)` keeps it monotonic; omit when the byte size is unknown.
184196
const estimatedTotal =
@@ -219,7 +231,7 @@ export async function runTableImport(payload: TableImportPayload): Promise<void>
219231
if (sample.length === 0) {
220232
// No data rows — fail rather than report a successful empty import (matches the sync route).
221233
const message = 'CSV file has no data rows'
222-
await markImportFailed(tableId, message)
234+
await markImportFailed(tableId, importId, message)
223235
void appendTableEvent({
224236
kind: 'import',
225237
tableId,
@@ -236,8 +248,8 @@ export async function runTableImport(payload: TableImportPayload): Promise<void>
236248
await flush(batch)
237249
}
238250

239-
await updateImportProgress(tableId, inserted)
240-
await markImportReady(tableId)
251+
await updateImportProgress(tableId, inserted, importId)
252+
await markImportReady(tableId, importId)
241253
void appendTableEvent({
242254
kind: 'import',
243255
tableId,
@@ -248,11 +260,22 @@ export async function runTableImport(payload: TableImportPayload): Promise<void>
248260
})
249261
logger.info(`[${requestId}] Import complete`, { tableId, fileName, mode, rows: inserted })
250262
} catch (err) {
251-
const message = getErrorMessage(err, 'Import failed')
252-
logger.error(`[${requestId}] Import failed for table ${tableId}:`, err)
253-
await markImportFailed(tableId, message).catch(() => {})
254-
void appendTableEvent({ kind: 'import', tableId, importId, status: 'failed', error: message })
263+
if (err instanceof ImportSupersededError) {
264+
// A newer import owns the table now — leave its status alone and just stop.
265+
logger.info(`[${requestId}] Import superseded by a newer run; stopping`, {
266+
tableId,
267+
importId,
268+
})
269+
} else {
270+
const message = getErrorMessage(err, 'Import failed')
271+
logger.error(`[${requestId}] Import failed for table ${tableId}:`, err)
272+
// Scoped to importId — a no-op if a newer import has taken over.
273+
await markImportFailed(tableId, importId, message).catch(() => {})
274+
void appendTableEvent({ kind: 'import', tableId, importId, status: 'failed', error: message })
275+
}
255276
} finally {
277+
// Release the storage stream so its HTTP connection doesn't leak on failure.
278+
source?.destroy()
256279
// The uploaded source file is single-use (a fresh upload per import) — delete it once the
257280
// import is terminal so the workspace bucket doesn't accumulate. Best-effort.
258281
await deleteFile({ key: fileKey, context: 'workspace' }).catch((err) => {

apps/sim/lib/table/service.ts

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1360,28 +1360,45 @@ export async function markTableImporting(tableId: string, importId: string): Pro
13601360
* Records import progress (rows processed so far). Also bumps `updatedAt` so the
13611361
* stale-import janitor (`cleanup-stale-executions`) sees a live heartbeat and doesn't mark a
13621362
* still-running import as failed.
1363+
*
1364+
* Scoped to `importId`: a stale/superseded worker (its run was marked failed and retried)
1365+
* no longer matches and its write is a no-op. Returns whether this worker still owns the
1366+
* import, so the caller can stop inserting when it's been superseded.
13631367
*/
1364-
export async function updateImportProgress(tableId: string, rowsProcessed: number): Promise<void> {
1365-
await db
1368+
export async function updateImportProgress(
1369+
tableId: string,
1370+
rowsProcessed: number,
1371+
importId: string
1372+
): Promise<boolean> {
1373+
const updated = await db
13661374
.update(userTableDefinitions)
13671375
.set({ importRowsProcessed: rowsProcessed, updatedAt: new Date() })
1368-
.where(eq(userTableDefinitions.id, tableId))
1376+
.where(and(eq(userTableDefinitions.id, tableId), eq(userTableDefinitions.importId, importId)))
1377+
.returning({ id: userTableDefinitions.id })
1378+
return updated.length > 0
13691379
}
13701380

1371-
/** Marks an import complete; rows become visible. */
1372-
export async function markImportReady(tableId: string): Promise<void> {
1381+
/** Marks an import complete; rows become visible. No-op if a newer import has taken over. */
1382+
export async function markImportReady(tableId: string, importId: string): Promise<void> {
13731383
await db
13741384
.update(userTableDefinitions)
13751385
.set({ importStatus: 'ready', importError: null, updatedAt: new Date() })
1376-
.where(eq(userTableDefinitions.id, tableId))
1386+
.where(and(eq(userTableDefinitions.id, tableId), eq(userTableDefinitions.importId, importId)))
13771387
}
13781388

1379-
/** Marks an import failed, leaving any already-committed rows in place. */
1380-
export async function markImportFailed(tableId: string, error: string): Promise<void> {
1389+
/**
1390+
* Marks an import failed, leaving any already-committed rows in place. No-op if a newer import
1391+
* has taken over (so a stale worker can't clobber the current run's status).
1392+
*/
1393+
export async function markImportFailed(
1394+
tableId: string,
1395+
importId: string,
1396+
error: string
1397+
): Promise<void> {
13811398
await db
13821399
.update(userTableDefinitions)
13831400
.set({ importStatus: 'failed', importError: error.slice(0, 2000), updatedAt: new Date() })
1384-
.where(eq(userTableDefinitions.id, tableId))
1401+
.where(and(eq(userTableDefinitions.id, tableId), eq(userTableDefinitions.importId, importId)))
13851402
}
13861403

13871404
/**

0 commit comments

Comments
 (0)