Skip to content

Commit 5c657e7

Browse files
fix(table): per-row Stop tombstones ahead-of-cursor rows during Run-all
Per-row Stop only cancelled sidecar rows already in flight. A row the dispatcher hadn't reached yet had no exec record, so Stop was a no-op there — the dispatcher would later walk to it, classify the group eligible, and re-fire workflows the user thought they stopped. cancelWorkflowGroupRuns now, for a per-row cancel, checks active dispatches whose scope covers the row and writes `cancelled` tombstones (cancelledAt = now) for the at-risk groups that don't already have a sidecar entry. The dispatcher's existing `cancelledAt > dispatch.requestedAt` filter then skips them when the cursor arrives. onConflictDoNothing guards against clobbering a concurrently-written entry; the active-dispatch check avoids stamping spurious cancels on idle rows.
1 parent ec0b73e commit 5c657e7

1 file changed

Lines changed: 69 additions & 1 deletion

File tree

apps/sim/lib/table/workflow-columns.ts

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ export async function cancelWorkflowGroupRuns(
321321
): Promise<number> {
322322
const { getTableById, updateRow } = await import('@/lib/table/service')
323323
const { getJobQueue } = await import('@/lib/core/async-jobs/config')
324-
const { markActiveDispatchesCancelled } = await import('./dispatcher')
324+
const { listActiveDispatches, markActiveDispatchesCancelled } = await import('./dispatcher')
325325

326326
const table = await getTableById(tableId)
327327
if (!table) {
@@ -343,6 +343,35 @@ export async function cancelWorkflowGroupRuns(
343343
: new Set(allGroups.map((g) => g.id))
344344
if (groupIds.size === 0) return 0
345345

346+
// Per-row Stop on a row the dispatcher hasn't reached yet has no sidecar
347+
// entry to cancel — the dispatcher would later walk to that row, see no
348+
// exec, classify eligible, and re-fire. Pre-write `cancelled` tombstones
349+
// for active-dispatch in-scope groups so the existing `cancelledAt >
350+
// dispatch.requestedAt` filter in `dispatcherStep` catches them. Skip
351+
// when there's no active dispatch (nothing to outrun).
352+
let aheadOfCursorTombstones: Array<{ groupId: string; workflowId: string }> = []
353+
if (rowId) {
354+
const activeDispatches = await listActiveDispatches(tableId)
355+
const relevant = activeDispatches.filter((d) => {
356+
if (d.scope.rowIds && !d.scope.rowIds.includes(rowId)) return false
357+
return d.scope.groupIds.some((gid) => groupIds.has(gid))
358+
})
359+
if (relevant.length > 0) {
360+
// Intersection of targeted groups with active-dispatch scopes — only
361+
// these groups are at risk of being re-fired by an in-progress dispatch.
362+
const atRisk = new Set<string>()
363+
for (const d of relevant) {
364+
for (const gid of d.scope.groupIds) {
365+
if (groupIds.has(gid)) atRisk.add(gid)
366+
}
367+
}
368+
aheadOfCursorTombstones = Array.from(atRisk).map((gid) => ({
369+
groupId: gid,
370+
workflowId: allGroups.find((g) => g.id === gid)?.workflowId ?? '',
371+
}))
372+
}
373+
}
374+
346375
// Always filter by tableId — for the per-row case this prevents a
347376
// cross-table rowId from doing a wasted DB round-trip and silently
348377
// under-counting in the response. For the table-wide case it's the
@@ -440,6 +469,45 @@ export async function cancelWorkflowGroupRuns(
440469
)
441470
)
442471

472+
// Tombstones for ahead-of-cursor groups. The in-flight cancel writes above
473+
// already cover groups that have a sidecar entry; we only need fresh
474+
// tombstones for groups that don't (the dispatcher hasn't reached them
475+
// yet, so there's nothing to cancel — but without a tombstone the
476+
// dispatcher would still re-fire when its cursor walks to this row).
477+
if (rowId && aheadOfCursorTombstones.length > 0) {
478+
const alreadyHandled = new Set(mutations.flatMap((m) => Object.keys(m.executionsPatch)))
479+
const needsTombstone = aheadOfCursorTombstones.filter((t) => !alreadyHandled.has(t.groupId))
480+
if (needsTombstone.length > 0) {
481+
const now = new Date()
482+
await Promise.allSettled(
483+
needsTombstone.map((t) =>
484+
db
485+
.insert(tableRowExecutions)
486+
.values({
487+
tableId,
488+
rowId,
489+
groupId: t.groupId,
490+
status: 'cancelled',
491+
executionId: null,
492+
jobId: null,
493+
workflowId: t.workflowId,
494+
error: 'Cancelled',
495+
runningBlockIds: [],
496+
blockErrors: {},
497+
cancelledAt: now,
498+
updatedAt: now,
499+
})
500+
.onConflictDoNothing({
501+
target: [tableRowExecutions.rowId, tableRowExecutions.groupId],
502+
})
503+
.catch((err) => {
504+
logger.error(`Failed to write tombstone for ${tableId}/${rowId}/${t.groupId}:`, err)
505+
})
506+
)
507+
)
508+
}
509+
}
510+
443511
return mutations.reduce((sum, m) => sum + m.cancelledCount, 0)
444512
}
445513

0 commit comments

Comments
 (0)