Skip to content

Commit 0b14dbb

Browse files
fix(table): cancel prior runs, scope batch insert dispatch, recover orphan pre-stamps
Addresses cursor + greptile review feedback on table dispatcher edge cases: - Manual table-wide Run-all / Run-column now cancels prior active dispatches AND in-flight cell workers before bulk-clearing. Without this, mode:'all' deleted running sidecar rows out from under their workers (which kept writing into the wiped state) and a second Run-all could enqueue overlapping cells racing on the same rows. Row-scoped manual calls (dep-edit cascade) are excluded — those already cancel their own scope. - batchInsertRowsWithTx now scopes its auto-dispatch to the newly-inserted row ids. Without this, after the sidecar migration the NOT EXISTS filter matches every existing row (zero sidecar entries), so a CSV import would walk the entire table dispatching workflow runs on every pre-existing row. - classifyEligibility carve-out: pending + executionId=null is an orphan pre-stamp (cascade-lock contention, batchEnqueueAndWait failure, etc.), treated as claimable so future dispatchers can re-stamp instead of skipping it as 'in-flight' forever. Matches pickNextEligibleGroupForRow's logic. - On batchEnqueueAndWait failure, dispatcherStep now sweeps the orphan pre-stamps it wrote for the failed batch so the cells don't render Queued forever; the next user action picks them up cleanly.
1 parent bfb847b commit 0b14dbb

3 files changed

Lines changed: 53 additions & 6 deletions

File tree

apps/sim/lib/table/dispatcher.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,8 +392,26 @@ export async function dispatcherStep(dispatchId: string): Promise<DispatcherStep
392392
logger.error(`[${dispatchId}] batch dispatch failed`, {
393393
error: toError(err).message,
394394
})
395-
// Don't bail the dispatch — terminal states are already in the DB
396-
// (workers wrote them) or will be reconciled on the next user click.
395+
// Reset the orphan pre-stamps so the cells don't render "Queued" forever.
396+
// Without this sweep, `pending + null executionId` rows stay until a user
397+
// re-triggers the row. The classifyEligibility carve-out lets future
398+
// dispatchers re-claim, but the current dispatch has already advanced
399+
// its cursor past these rows. Deleting the pre-stamps lets the next
400+
// explicit Run pick them up cleanly.
401+
await Promise.allSettled(
402+
pendingRuns.map((p) =>
403+
db
404+
.delete(tableRowExecutions)
405+
.where(
406+
and(
407+
eq(tableRowExecutions.rowId, p.rowId),
408+
eq(tableRowExecutions.groupId, p.groupId),
409+
eq(tableRowExecutions.status, 'pending'),
410+
sql`${tableRowExecutions.executionId} IS NULL`
411+
)
412+
)
413+
)
414+
)
397415
}
398416
}
399417

apps/sim/lib/table/service.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,9 +1134,14 @@ export async function batchInsertRowsWithTx(
11341134
}))
11351135

11361136
void fireTableTrigger(data.tableId, table.name, 'insert', result, null, table.schema, requestId)
1137+
// Scope to the newly-inserted row ids so the dispatcher doesn't walk every
1138+
// row in the table. After the sidecar migration, all existing rows have
1139+
// zero entries → `mode:'new'`'s `NOT EXISTS` filter would otherwise include
1140+
// them, dispatching workflows on every row in a populated table.
11371141
void runWorkflowColumn({
11381142
tableId: table.id,
11391143
workspaceId: table.workspaceId,
1144+
rowIds: result.map((r) => r.id),
11401145
mode: 'new',
11411146
isManualRun: false,
11421147
requestId,

apps/sim/lib/table/workflow-columns.ts

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,22 @@ export function classifyEligibility(
7272
if (group.autoRun === false && !isManualRun) return 'autoRun-off'
7373

7474
const exec = row.executions?.[group.id]
75-
if (isExecInFlight(exec)) return 'in-flight'
75+
// Dispatcher pre-stamp orphans (`pending` + `executionId: null`) are
76+
// placeholders left behind when a previous dispatcher loop wrote the stamp
77+
// but no cell-task picked up (cascade-lock contention, trigger.dev queue
78+
// failure, etc.). Treat them as claimable so a new dispatcher can re-enqueue
79+
// — without this carve-out the row would render "Queued" forever. Matches
80+
// the `pickNextEligibleGroupForRow` cascade-loop carve-out.
81+
const isOrphanPreStamp = exec?.status === 'pending' && exec.executionId == null
82+
if (!isOrphanPreStamp && isExecInFlight(exec)) return 'in-flight'
7683
const status = exec?.status
7784

7885
// `mode: 'new'` is the auto-fire scope: only rows that have never been
7986
// attempted on this group run. Any pre-existing exec entry — completed,
8087
// cancelled, or error — keeps the cell sticky until the user manually
8188
// re-runs via "Run column" / "Run all rows" / "Run this row".
82-
if (mode === 'new' && exec) return 'has-prior-attempt'
89+
// Exception: orphan pre-stamps are claimable (handled above).
90+
if (mode === 'new' && exec && !isOrphanPreStamp) return 'has-prior-attempt'
8391

8492
const completedAndFilled = status === 'completed' && areOutputsFilled(group, row)
8593
if (!isManualRun && completedAndFilled) return 'completed-on-auto'
@@ -476,11 +484,27 @@ export async function runWorkflowColumn(opts: {
476484
if (targetGroups.length === 0) return { dispatchId: null }
477485
const targetGroupIds = targetGroups.map((g) => g.id)
478486

479-
// Wipe targeted output cols + executions[gid] before any cells fire so the
480-
// user sees the column flip to empty/Pending instantly.
481487
const { bulkClearWorkflowGroupCells, insertDispatch, runDispatcherToCompletion } = await import(
482488
'./dispatcher'
483489
)
490+
491+
// For table-wide manual runs (Run all rows / Run column), cancel any prior
492+
// active dispatches AND their in-flight cells before clearing. Without this:
493+
// - Two dispatcher loops would walk overlapping rows and burn duplicate work.
494+
// - mode:'all' bulk-clear deletes in-flight sidecar rows without aborting
495+
// workers — those would keep writing into the wiped state.
496+
// Row-scoped callers (dep-edit cascade in `updateRow`) already cancel their
497+
// specific scope before calling here, so we skip to avoid cancelling
498+
// unrelated dispatches on other rows. Auto-fire (`mode:'new'`) is harmless
499+
// overlap-wise since the NOT EXISTS filter excludes already-attempted rows.
500+
const cancelPriorRuns =
501+
isManualRun && (!rowIds || rowIds.length === 0) && (mode === 'all' || mode === 'incomplete')
502+
if (cancelPriorRuns) {
503+
await cancelWorkflowGroupRuns(tableId, undefined, { groupIds: targetGroupIds })
504+
}
505+
506+
// Wipe targeted output cols + executions[gid] before any cells fire so the
507+
// user sees the column flip to empty/Pending instantly.
484508
await bulkClearWorkflowGroupCells({
485509
tableId,
486510
groups: targetGroups.map((g) => ({ id: g.id, outputs: g.outputs })),

0 commit comments

Comments
 (0)