Skip to content

Commit ea7a371

Browse files
feat(tables): workflow-column run fixes + bounded "run N rows"
- Pass group.autoRun as the add-group dispatch flag so an autoRun=false column no longer opens a no-op dispatch that flashes the run-count badge. - Scope the context-menu re-run to the right-clicked workflow cell's group (cascading to dependents) instead of every group on the row. - Add an extensible per-dispatch row cap (DispatchLimit { type:'rows', max }) surfaced as "Run 10 / 1,000 empty rows" in the group header; dispatcher stops after N eligible rows. New limit/processed_count columns on table_run_dispatches. - Fix stranded "Queued" cells: the cascade owner now treats a queued marker (orphan pre-stamp) as a manual run so autoRun=false requested groups are picked up, and drains late markers before releasing the row lock. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 1ca4dba commit ea7a371

16 files changed

Lines changed: 412 additions & 56 deletions

File tree

apps/sim/app/api/table/[tableId]/columns/run/route.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
2525
const parsed = await parseRequest(runColumnContract, request, { params })
2626
if (!parsed.success) return parsed.response
2727
const { tableId } = parsed.data.params
28-
const { workspaceId, groupIds, runMode, rowIds } = parsed.data.body
28+
const { workspaceId, groupIds, runMode, rowIds, limit } = parsed.data.body
2929
const access = await checkAccess(tableId, auth.userId, 'write')
3030
if (!access.ok) return accessError(access, requestId, tableId)
3131

@@ -35,6 +35,7 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
3535
groupIds,
3636
mode: runMode,
3737
rowIds,
38+
limit,
3839
requestId,
3940
})
4041

apps/sim/app/api/table/[tableId]/dispatches/route.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ export const GET = withRouteHandler(async (request: NextRequest, { params }: Rou
4646
isManualRun: r.isManualRun,
4747
cursor: r.cursor,
4848
scope: r.scope,
49+
...(r.limit ? { limit: r.limit } : {}),
4950
}))
5051

5152
return NextResponse.json({

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/context-menu/context-menu.tsx

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ interface ContextMenuProps {
4242
runningInSelectionCount?: number
4343
/** Whether the table has any workflow columns; gates the run-workflows item. */
4444
hasWorkflowColumns?: boolean
45+
/** True when the menu was opened on a workflow-output cell, so Run / Re-run
46+
* act on that cell's group only (the cascade handles dependents). Switches
47+
* the labels from row-wide ("all cells") to cell-scoped ("cell"). */
48+
workflowCellScoped?: boolean
4549
disableEdit?: boolean
4650
disableInsert?: boolean
4751
disableDelete?: boolean
@@ -64,17 +68,26 @@ export function ContextMenu({
6468
onStopWorkflows,
6569
runningInSelectionCount = 0,
6670
hasWorkflowColumns = false,
71+
workflowCellScoped = false,
6772
disableEdit = false,
6873
disableInsert = false,
6974
disableDelete = false,
7075
}: ContextMenuProps) {
7176
const deleteLabel = selectedRowCount > 1 ? `Delete ${selectedRowCount} rows` : 'Delete row'
72-
const runLabel =
73-
selectedRowCount > 1
77+
const runLabel = workflowCellScoped
78+
? selectedRowCount > 1
79+
? `Run cell on ${selectedRowCount} rows`
80+
: 'Run cell'
81+
: selectedRowCount > 1
7482
? `Run empty or failed cells on ${selectedRowCount} rows`
7583
: 'Run empty or failed cells'
76-
const refreshLabel =
77-
selectedRowCount > 1 ? `Re-run all cells on ${selectedRowCount} rows` : 'Re-run all cells'
84+
const refreshLabel = workflowCellScoped
85+
? selectedRowCount > 1
86+
? `Re-run cell on ${selectedRowCount} rows`
87+
: 'Re-run cell'
88+
: selectedRowCount > 1
89+
? `Re-run all cells on ${selectedRowCount} rows`
90+
: 'Re-run all cells'
7891
const stopLabel =
7992
runningInSelectionCount === 1
8093
? 'Stop running workflow'

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/headers/workflow-group-meta-cell.tsx

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import {
2121
PlayOutline,
2222
Trash,
2323
} from '@/components/emcn/icons'
24-
import type { RunMode } from '@/lib/api/contracts/tables'
24+
import type { RunLimit, RunMode } from '@/lib/api/contracts/tables'
2525
import { cn } from '@/lib/core/utils/cn'
2626
import type { WorkflowMetadata } from '@/stores/workflows/registry/types'
2727
import { SELECTION_TINT_BG } from '../constants'
@@ -51,6 +51,9 @@ interface ColumnOptionsMenuProps {
5151
* exposes group-level run actions above the column actions. */
5252
onRunColumnAll?: () => void
5353
onRunColumnIncomplete?: () => void
54+
/** Runs only the first `max` empty/unrun rows. Surfaces fixed "Run N rows"
55+
* shortcuts so users can sample a large table without firing every row. */
56+
onRunColumnLimited?: (max: number) => void
5457
/** When set, surfaces a "Run N selected rows" item above Run all. */
5558
onRunColumnSelected?: () => void
5659
selectedRowCount?: number
@@ -79,6 +82,7 @@ export function ColumnOptionsMenu({
7982
onDeleteGroup,
8083
onRunColumnAll,
8184
onRunColumnIncomplete,
85+
onRunColumnLimited,
8286
onRunColumnSelected,
8387
selectedRowCount = 0,
8488
onViewWorkflow,
@@ -127,6 +131,16 @@ export function ColumnOptionsMenu({
127131
<DropdownMenuItem onSelect={() => onRunColumnIncomplete?.()}>
128132
Run empty rows
129133
</DropdownMenuItem>
134+
{onRunColumnLimited && (
135+
<>
136+
<DropdownMenuItem onSelect={() => onRunColumnLimited(10)}>
137+
Run 10 empty rows
138+
</DropdownMenuItem>
139+
<DropdownMenuItem onSelect={() => onRunColumnLimited(1000)}>
140+
Run 1,000 empty rows
141+
</DropdownMenuItem>
142+
</>
143+
)}
130144
</DropdownMenuSubContent>
131145
</DropdownMenuSub>
132146
<DropdownMenuSeparator />
@@ -175,7 +189,7 @@ interface WorkflowGroupMetaCellProps {
175189
isGroupSelected: boolean
176190
onSelectGroup: (startColIndex: number, size: number) => void
177191
onOpenConfig: (columnName: string) => void
178-
onRunColumn?: (groupId: string, mode?: RunMode, rowIds?: string[]) => void
192+
onRunColumn?: (groupId: string, mode?: RunMode, rowIds?: string[], limit?: RunLimit) => void
179193
onInsertLeft?: (columnName: string) => void
180194
onInsertRight?: (columnName: string) => void
181195
onDeleteColumn?: (columnName: string) => void
@@ -251,6 +265,13 @@ export function WorkflowGroupMetaCell({
251265
}
252266
}, [groupId, onRunColumn, selectedRowIds])
253267

268+
const handleRunLimited = useCallback(
269+
(max: number) => {
270+
if (groupId) onRunColumn?.(groupId, 'incomplete', undefined, { type: 'rows', max })
271+
},
272+
[groupId, onRunColumn]
273+
)
274+
254275
const handleContextMenu = useCallback(
255276
(e: React.MouseEvent) => {
256277
if (!column) return
@@ -406,6 +427,12 @@ export function WorkflowGroupMetaCell({
406427
)}
407428
<DropdownMenuItem onSelect={handleRunAll}>Run all rows</DropdownMenuItem>
408429
<DropdownMenuItem onSelect={handleRunIncomplete}>Run empty rows</DropdownMenuItem>
430+
<DropdownMenuItem onSelect={() => handleRunLimited(10)}>
431+
Run 10 empty rows
432+
</DropdownMenuItem>
433+
<DropdownMenuItem onSelect={() => handleRunLimited(1000)}>
434+
Run 1,000 empty rows
435+
</DropdownMenuItem>
409436
</DropdownMenuContent>
410437
</DropdownMenu>
411438
)}
@@ -423,6 +450,7 @@ export function WorkflowGroupMetaCell({
423450
onDeleteGroup={onDeleteGroup ? () => onDeleteGroup(groupId) : undefined}
424451
onRunColumnAll={onRunColumn ? handleRunAll : undefined}
425452
onRunColumnIncomplete={onRunColumn ? handleRunIncomplete : undefined}
453+
onRunColumnLimited={onRunColumn ? handleRunLimited : undefined}
426454
onRunColumnSelected={onRunColumn && selectedCount > 0 ? handleRunSelected : undefined}
427455
selectedRowCount={selectedCount}
428456
onViewWorkflow={onViewWorkflow ? () => onViewWorkflow(workflowId) : undefined}

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { useParams } from 'next/navigation'
88
import { usePostHog } from 'posthog-js/react'
99
import { Skeleton, toast, useToast } from '@/components/emcn'
1010
import { TableX } from '@/components/emcn/icons'
11-
import type { RunMode } from '@/lib/api/contracts/tables'
11+
import type { RunLimit, RunMode } from '@/lib/api/contracts/tables'
1212
import { cn } from '@/lib/core/utils/cn'
1313
import { captureEvent } from '@/lib/posthog/client'
1414
import type { ColumnDefinition, TableRow as TableRowType } from '@/lib/table'
@@ -147,7 +147,7 @@ interface TableGridProps {
147147
/** Open the delete-columns confirmation modal for `names`. Wrapper renders the modal. */
148148
onRequestDeleteColumns: (names: string[]) => void
149149
/** Fire run for a single column (meta-cell Run menu). */
150-
onRunColumn: (groupId: string, runMode: RunMode, rowIds?: string[]) => void
150+
onRunColumn: (groupId: string, runMode: RunMode, rowIds?: string[], limit?: RunLimit) => void
151151
/** Fire every runnable column on a single row (per-row gutter Play). */
152152
onRunRow: (rowId: string) => void
153153
/** Fan out a run across every workflow group on `rowIds`. Used by context menu. */
@@ -417,8 +417,13 @@ export function TableGrid({
417417
const deleteWorkflowGroupMutation = useDeleteWorkflowGroup({ workspaceId, tableId })
418418
const updateWorkflowGroupMutation = useUpdateWorkflowGroup({ workspaceId, tableId })
419419

420-
function handleRunColumn(groupId: string, runMode: RunMode = 'all', rowIds?: string[]) {
421-
onRunColumn(groupId, runMode, rowIds)
420+
function handleRunColumn(
421+
groupId: string,
422+
runMode: RunMode = 'all',
423+
rowIds?: string[],
424+
limit?: RunLimit
425+
) {
426+
onRunColumn(groupId, runMode, rowIds, limit)
422427
}
423428

424429
const handleViewWorkflow = useCallback(
@@ -740,12 +745,17 @@ export function TableGrid({
740745
let contextMenuExecutionId: string | null = null
741746
let contextMenuIsWorkflowColumn = false
742747
let contextMenuHasStartedRun = false
748+
// The workflow group of the right-clicked cell, when it's a workflow-output
749+
// column. Scopes the run/re-run menu items to just that cell's group (the
750+
// cascade re-runs dependents on its own) instead of every group on the row.
751+
let contextMenuGroupId: string | null = null
743752
if (contextMenu.row && contextMenu.columnName) {
744753
const _col = columnsRef.current.find((c) => c.name === contextMenu.columnName)
745754
const _gid = _col?.workflowGroupId
746755
if (_col && _gid) {
747756
const _exec = contextMenu.row.executions?.[_gid]
748757
contextMenuIsWorkflowColumn = true
758+
contextMenuGroupId = _gid
749759
// Cells with a server-side execution log: `completed` / `error` /
750760
// `running`, plus HITL-paused runs (status `pending` with a `paused-`
751761
// jobId — has a real executionId + viewable trace). `queued` / plain
@@ -2820,13 +2830,18 @@ export function TableGrid({
28202830

28212831
// Context-menu wrappers: act on `contextMenuRowIds`, then close the menu.
28222832
// Mirror the action bar's Play / Refresh split: Play fills empty/failed,
2823-
// Refresh re-runs everything (including completed cells).
2833+
// Refresh re-runs everything (including completed cells). When the menu was
2834+
// opened on a workflow-output cell, scope to just that cell's group — the
2835+
// server cascade re-runs dependent groups whose deps it fills. Right-clicking
2836+
// a plain cell has no group, so fall back to every group on the row(s).
28242837
const handleRunWorkflowsOnSelection = () => {
2825-
onRunRows(contextMenuRowIds, 'incomplete')
2838+
if (contextMenuGroupId) onRunColumn(contextMenuGroupId, 'incomplete', contextMenuRowIds)
2839+
else onRunRows(contextMenuRowIds, 'incomplete')
28262840
closeContextMenu()
28272841
}
28282842
const handleRefreshWorkflowsOnSelection = () => {
2829-
onRunRows(contextMenuRowIds, 'all')
2843+
if (contextMenuGroupId) onRunColumn(contextMenuGroupId, 'all', contextMenuRowIds)
2844+
else onRunRows(contextMenuRowIds, 'all')
28302845
closeContextMenu()
28312846
}
28322847
const handleStopWorkflowsOnSelection = () => {
@@ -2916,10 +2931,17 @@ export function TableGrid({
29162931
)
29172932

29182933
// Drives Run vs Refresh visibility on the context menu — same classifier
2919-
// the action bar uses, so both surfaces stay in sync.
2934+
// the action bar uses, so both surfaces stay in sync. Scoped to the clicked
2935+
// cell's group when the menu opened on a workflow-output cell so visibility
2936+
// tracks that group's state, not the whole row's.
29202937
const contextMenuStats = useMemo(
2921-
() => classifyExecStatusMix(rows, new Set(contextMenuRowIds), tableWorkflowGroupIds),
2922-
[contextMenuRowIds, rows, tableWorkflowGroupIds]
2938+
() =>
2939+
classifyExecStatusMix(
2940+
rows,
2941+
new Set(contextMenuRowIds),
2942+
contextMenuGroupId ? [contextMenuGroupId] : tableWorkflowGroupIds
2943+
),
2944+
[contextMenuRowIds, rows, tableWorkflowGroupIds, contextMenuGroupId]
29232945
)
29242946

29252947
// Run scope is derived from one of two selection sources:
@@ -3373,6 +3395,7 @@ export function TableGrid({
33733395
}
33743396
runningInSelectionCount={runningInContextSelection}
33753397
hasWorkflowColumns={hasWorkflowColumns}
3398+
workflowCellScoped={Boolean(contextMenuGroupId)}
33763399
disableEdit={!userPermissions.canEdit}
33773400
disableInsert={!userPermissions.canEdit}
33783401
disableDelete={!userPermissions.canEdit}

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/utils.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,12 @@ export function resolveCellExec(
189189
if (areOutputsFilled(group, row)) return undefined
190190
if (!areGroupDepsSatisfied(group, row)) return undefined
191191
for (const d of activeDispatches) {
192+
// Capped dispatches run only the first N eligible rows ahead of the
193+
// cursor, and this per-row resolver can't tell which rows fall within the
194+
// budget — rendering every ahead-of-cursor row as Queued would massively
195+
// over-count. The dispatcher's real per-row pending stamps (arriving via
196+
// cell SSE) cover the actual rows instead.
197+
if (d.limit) continue
192198
if (!d.scope.groupIds.includes(group.id)) continue
193199
if (d.scope.rowIds && !d.scope.rowIds.includes(row.id)) continue
194200
if (row.position <= d.cursor) continue

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ export function useTableEventStream({
155155
}
156156

157157
const applyDispatch = (event: Extract<TableEvent, { kind: 'dispatch' }>): void => {
158-
const { dispatchId, status, scope, cursor, mode, isManualRun } = event
158+
const { dispatchId, status, scope, cursor, mode, isManualRun, limit } = event
159159
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
160160
// SSE may arrive before the initial fetch lands. Seed an empty
161161
// run-state so the dispatch isn't dropped; counters are reconciled
@@ -183,13 +183,15 @@ export function useTableEventStream({
183183
// the cached entry's value if this is a legacy emit without the
184184
// field, and finally to `false` if we have nothing.
185185
const resolvedManualRun = isManualRun ?? existing?.isManualRun ?? false
186+
const resolvedLimit = limit ?? existing?.limit
186187
const next: ActiveDispatch = {
187188
id: dispatchId,
188189
status,
189190
mode,
190191
isManualRun: resolvedManualRun,
191192
cursor,
192193
scope,
194+
...(resolvedLimit ? { limit: resolvedLimit } : {}),
193195
}
194196
if (idx === -1) return { ...base, dispatches: [...list, next] }
195197
const merged = list.slice()

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
toast,
1515
} from '@/components/emcn'
1616
import { Download, Pencil, Table as TableIcon, Trash, Upload } from '@/components/emcn/icons'
17-
import type { RunMode } from '@/lib/api/contracts/tables'
17+
import type { RunLimit, RunMode } from '@/lib/api/contracts/tables'
1818
import type { ColumnDefinition, Filter, TableRow as TableRowType } from '@/lib/table'
1919
import {
2020
type ColumnOption,
@@ -214,7 +214,7 @@ export function Table({
214214
// gutter, action-bar Play/Refresh, right-click context menu) reduces to a
215215
// (groupIds, rowIds?, runMode) triple. Empty groupIds = no-op.
216216
const runScope = useCallback(
217-
(args: { groupIds: string[]; rowIds?: string[]; runMode: RunMode }) => {
217+
(args: { groupIds: string[]; rowIds?: string[]; runMode: RunMode; limit?: RunLimit }) => {
218218
if (args.groupIds.length === 0) return
219219
if (args.rowIds && args.rowIds.length === 0) return
220220
runColumnMutate(args)
@@ -223,8 +223,8 @@ export function Table({
223223
)
224224

225225
const onRunColumn = useCallback(
226-
(groupId: string, runMode: RunMode, rowIds?: string[]) => {
227-
runScope({ groupIds: [groupId], rowIds, runMode })
226+
(groupId: string, runMode: RunMode, rowIds?: string[], limit?: RunLimit) => {
227+
runScope({ groupIds: [groupId], rowIds, runMode, limit })
228228
},
229229
[runScope]
230230
)

apps/sim/background/workflow-column-execution.ts

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,49 @@ const logger = createLogger('TriggerWorkflowGroupCell')
2020

2121
/** Cell-task entrypoint. Holds a per-row cascade lock so only one worker
2222
* advances a given row at a time; bails on contention. The held lock heart-
23-
* beats every 10s so a crashed pod releases within ~30s. */
23+
* beats every 10s so a crashed pod releases within ~30s.
24+
*
25+
* After the cascade finishes and the lock releases, re-checks for a runnable
26+
* queued marker that may have landed between the cascade's final
27+
* `pickNextEligibleGroupForRow` and the lock release (a window where a
28+
* contender bails on the still-held lock but we're already done). If one
29+
* appeared, re-acquire and drive it — this is the same task re-acquiring the
30+
* lock, NOT a queue re-enqueue or a timed poll, and it loops only while a
31+
* runnable group exists. */
2432
export async function executeWorkflowGroupCellJob(
2533
payload: WorkflowGroupCellPayload,
2634
signal?: AbortSignal
2735
) {
28-
const { tableId, rowId, executionId } = payload
29-
const outcome = await withCascadeLock(tableId, rowId, executionId, () =>
30-
runRowCascadeLoop(payload, signal)
31-
)
32-
if (outcome.status === 'contended') {
33-
logger.info(
34-
`Cascade lock held — bailing (table=${tableId} row=${rowId} executionId=${executionId})`
36+
const { tableId, rowId, workspaceId } = payload
37+
const { getTableById, getRowById } = await import('@/lib/table/service')
38+
const { pickNextEligibleGroupForRow } = await import('@/lib/table/workflow-columns')
39+
40+
let currentPayload = payload
41+
while (true) {
42+
if (signal?.aborted) break
43+
const outcome = await withCascadeLock(tableId, rowId, currentPayload.executionId, () =>
44+
runRowCascadeLoop(currentPayload, signal)
3545
)
46+
if (outcome.status === 'contended') {
47+
// Another worker owns the row's cascade; it drains the queued marker.
48+
logger.info(
49+
`Cascade lock held — bailing (table=${tableId} row=${rowId} executionId=${currentPayload.executionId})`
50+
)
51+
break
52+
}
53+
if (signal?.aborted) break
54+
const freshTable = await getTableById(tableId)
55+
if (!freshTable) break
56+
const freshRow = await getRowById(tableId, rowId, workspaceId)
57+
if (!freshRow) break
58+
const next = pickNextEligibleGroupForRow(freshTable, freshRow)
59+
if (!next) break
60+
currentPayload = {
61+
...currentPayload,
62+
groupId: next.id,
63+
workflowId: next.workflowId,
64+
executionId: generateId(),
65+
}
3666
}
3767
}
3868

0 commit comments

Comments
 (0)