Skip to content

Commit 3ccb3a3

Browse files
fix(tables): reliable stop-all, accurate "X running", and rate/usage gating for cell runs (#4838)
* fix(tables): reliable stop-all, accurate "X running", and rate/usage gating for cell runs Stop-all: - Make the cancellation guard status-based (not executionId-scoped) so a `cancelled` tombstone stamped while a cell is still a dispatcher pre-stamp (null executionId) keeps the cell dead — fixes function-execute cells that resurrected after Stop all. Consolidated into shared isExecCancelled / isExecCancelledAfter predicates in deps.ts, reused by the in-memory guard, the SQL guard, the dispatcher tombstone filter, the worker, and resume. - Add an explicit pre-execution cancellation read so a cell that dequeues after Stop all (e.g. from the trigger.dev queue) never runs. - Resume worker aborts a cancelled paused/awaiting cell before resuming; cancelWorkflowGroupRuns marks paused executions cancelling. "X running": - Emit a dispatch SSE at dispatch start so auto-fired/capped runs surface immediately; show the control whenever a dispatch is active. Checkbox dependency: - Treat boolean `false` as an unmet dependency so unchecking never reruns dependents — only checking does. deriveExecClearsForDataPatch no longer re-arms a downstream group whose deps are unmet after the patch. Rate / usage gating: - Route table cell execution through preprocessExecution (billing actor = workspace billed account, usage limit, per-plan timeout), keeping draft. - Rate limit: pace & retry per cell (async counter) so rows aren't skipped. - Usage limit: halt the dispatch without marking cells and emit a usageLimitReached event; the client shows an Upgrade prompt that routes to subscription settings. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * fix(tables): dedupe usage-limit event + release rate-limited cells on cancel Addresses PR review: - Usage limit: only the cell that transitions the dispatch active→complete (via completeDispatchIfActive) emits usageLimitReached, so concurrent cells don't fire up to 20 identical "upgrade" toasts. - Rate-limit retry: re-check the cancelled tombstone after each sleep so a Stop All mid-wait releases the concurrency slot promptly (signal never fires on the trigger.dev backend). * fix(tables): jitter rate-limit retry backoff to avoid thundering herd Passing the bucket's shared resetAt as retryAfterMs made backoffWithJitter return a fixed clamped value (no jitter, attempt ignored), so all concurrent cells retried in lockstep. Pass null to get jittered exponential backoff. * fix(tables): unstick cells + resync counter on usage-limit halt Addresses PR review: - Clear each blocked cell's pre-stamp on a 402 so it reverts to un-run instead of being stuck "Queued" (no error/cancelled badge); covers auto-fire cells with no owning dispatch. - Client re-syncs run-state counts and refetches rows on usageLimitReached so the stale "X running" / Stop-all control clears and queued cells drop. - Make usageLimitReached.dispatchId optional; client only touches the dispatch overlay when present. * fix(tables): don't emit stale dispatching event after a mid-window halt If a cell halts the dispatch mid-window (usage limit), re-read the dispatch status after the batch and bail instead of emitting a per-window 'dispatching' event that would arrive after the client dropped the dispatch and re-add it (flickering 'X running' back). --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent ed19b0b commit 3ccb3a3

14 files changed

Lines changed: 616 additions & 66 deletions

File tree

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ export interface SelectionSnapshot {
9595
/** Total running/queued workflow runs across ALL rows. Drives the page-header
9696
* RunStatusControl ("N running, Stop all"). */
9797
totalRunning: number
98+
/** Whether any dispatch is active (pending/dispatching). Keeps the RunStatusControl
99+
* + Stop-all visible during a run even when the per-row count momentarily reads 0
100+
* (e.g. the first window of an auto-fired/capped dispatch before cells stamp). */
101+
hasActiveDispatch: boolean
98102
/** Whether the table has any workflow-output columns (drives the Run/Stop visibility). */
99103
hasWorkflowColumns: boolean
100104
/** Cells the Play / Refresh / Stop buttons act on. Null when the selection
@@ -333,6 +337,7 @@ export function TableGrid({
333337
// rows still inside a dispatch's scope — e.g. a cascade where 3 of 4 columns
334338
// finished would read "4 running" instead of "1".
335339
const totalRunning = Object.values(runningByRowId).reduce((sum, n) => sum + n, 0)
340+
const hasActiveDispatch = (activeDispatches?.length ?? 0) > 0
336341

337342
const tableRowCountRef = useRef(tableData?.rowCount ?? 0)
338343
tableRowCountRef.current = tableData?.rowCount ?? 0
@@ -3194,6 +3199,7 @@ export function TableGrid({
31943199
sameStats &&
31953200
prev.runningInActionBarSelection === runningInActionBarSelection &&
31963201
prev.totalRunning === totalRunning &&
3202+
prev.hasActiveDispatch === hasActiveDispatch &&
31973203
prev.hasWorkflowColumns === hasWorkflowColumns &&
31983204
prev.actionBarRowIds.length === actionBarRowIds.length &&
31993205
prev.actionBarRowIds.every((id, i) => id === actionBarRowIds[i])
@@ -3204,6 +3210,7 @@ export function TableGrid({
32043210
actionBarRowIds,
32053211
runningInActionBarSelection,
32063212
totalRunning,
3213+
hasActiveDispatch,
32073214
hasWorkflowColumns,
32083215
selectedRunScope,
32093216
selectionStats,
@@ -3215,6 +3222,7 @@ export function TableGrid({
32153222
actionBarRowIds,
32163223
runningInActionBarSelection,
32173224
totalRunning,
3225+
hasActiveDispatch,
32183226
hasWorkflowColumns,
32193227
selectedRunScope,
32203228
selectionStats,

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use client'
22

3-
import { useEffect } from 'react'
3+
import { useEffect, useRef } from 'react'
44
import { createLogger } from '@sim/logger'
55
import { useQueryClient } from '@tanstack/react-query'
66
import type { ActiveDispatch } from '@/lib/api/contracts/tables'
@@ -44,6 +44,9 @@ interface UseTableEventStreamArgs {
4444
tableId: string | undefined
4545
workspaceId: string | undefined
4646
enabled?: boolean
47+
/** Fired when the server halts a dispatch because the billed account is over
48+
* its usage limit. The page surfaces an upgrade prompt + redirect. */
49+
onUsageLimitReached?: (event: { dispatchId?: string; message: string }) => void
4750
}
4851

4952
/**
@@ -59,9 +62,14 @@ export function useTableEventStream({
5962
tableId,
6063
workspaceId,
6164
enabled = true,
65+
onUsageLimitReached,
6266
}: UseTableEventStreamArgs): void {
6367
const queryClient = useQueryClient()
6468

69+
// Ref so a changing callback identity doesn't tear down + reconnect the SSE.
70+
const onUsageLimitReachedRef = useRef(onUsageLimitReached)
71+
onUsageLimitReachedRef.current = onUsageLimitReached
72+
6573
useEffect(() => {
6674
if (!enabled || !tableId || !workspaceId) return
6775

@@ -205,6 +213,28 @@ export function useTableEventStream({
205213
scheduleDispatchInvalidate()
206214
}
207215

216+
const applyUsageLimit = (event: Extract<TableEvent, { kind: 'usageLimitReached' }>): void => {
217+
// Drop the halted dispatch from the overlay so the "running" UI clears
218+
// immediately (the dispatcher was marked complete server-side). Cascade /
219+
// auto-fire events carry no dispatchId — nothing to remove.
220+
if (event.dispatchId) {
221+
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
222+
if (!prev) return prev
223+
const filtered = prev.dispatches.filter((d) => d.id !== event.dispatchId)
224+
return filtered.length === prev.dispatches.length
225+
? prev
226+
: { ...prev, dispatches: filtered }
227+
})
228+
}
229+
// Blocked cells are left `queued` in the DB with no terminal cell event,
230+
// so `runningByRowId` would otherwise stay non-zero (stale "X running").
231+
// Re-sync the server counts, and refetch rows so cells whose pre-stamps
232+
// the server cleared drop their "Queued" state.
233+
scheduleDispatchInvalidate()
234+
void queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) })
235+
onUsageLimitReachedRef.current?.({ dispatchId: event.dispatchId, message: event.message })
236+
}
237+
208238
const handlePrune = (payload: PrunedEvent): void => {
209239
logger.info('Table event buffer pruned — full refetch', { tableId, ...payload })
210240
void queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) })
@@ -253,6 +283,7 @@ export function useTableEventStream({
253283
savePointer(tableId, lastEventId)
254284
if (entry.event?.kind === 'cell') applyCell(entry.event)
255285
else if (entry.event?.kind === 'dispatch') applyDispatch(entry.event)
286+
else if (entry.event?.kind === 'usageLimitReached') applyUsageLimit(entry.event)
256287
} catch (err) {
257288
logger.warn('Failed to parse table event', { tableId, err })
258289
}

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import {
3636
useRunColumn,
3737
} from '@/hooks/queries/tables'
3838
import { useInlineRename } from '@/hooks/use-inline-rename'
39+
import { useSettingsNavigation } from '@/hooks/use-settings-navigation'
3940
import { useLogDetailsUIStore } from '@/stores/logs/store'
4041
import type { DeletedRowSnapshot } from '@/stores/table/types'
4142
import {
@@ -129,7 +130,15 @@ export function Table({
129130
const posthogRef = useRef(posthog)
130131
posthogRef.current = posthog
131132

132-
useTableEventStream({ tableId, workspaceId })
133+
const { navigateToSettings } = useSettingsNavigation()
134+
// Plain function: `useTableEventStream` keeps it in a ref (its effect doesn't
135+
// depend on the identity), so a stable reference buys nothing here.
136+
const onUsageLimitReached = ({ message }: { dispatchId?: string; message: string }) => {
137+
toast.error(message, {
138+
action: { label: 'Upgrade', onClick: () => navigateToSettings({ section: 'subscription' }) },
139+
})
140+
}
141+
useTableEventStream({ tableId, workspaceId, onUsageLimitReached })
133142

134143
const [slideout, dispatch] = useReducer(slideoutReducer, { kind: 'none' })
135144
const [showDeleteTableConfirm, setShowDeleteTableConfirm] = useState(false)
@@ -141,6 +150,7 @@ export function Table({
141150
actionBarRowIds: [],
142151
runningInActionBarSelection: 0,
143152
totalRunning: 0,
153+
hasActiveDispatch: false,
144154
hasWorkflowColumns: false,
145155
selectedRunScope: null,
146156
selectionStats: { hasIncompleteOrFailed: false, hasCompleted: false, hasInFlight: false },
@@ -509,7 +519,7 @@ export function Table({
509519
createTrigger={createTrigger}
510520
actions={headerActions}
511521
leadingActions={
512-
selection.totalRunning > 0 ? (
522+
selection.totalRunning > 0 || selection.hasActiveDispatch ? (
513523
<RunStatusControl
514524
running={selection.totalRunning}
515525
onStopAll={onStopAll}
@@ -527,7 +537,7 @@ export function Table({
527537
onFilterToggle={() => setFilterOpen((prev) => !prev)}
528538
filterActive={filterOpen || !!queryOptions.filter}
529539
trailing={
530-
embedded && selection.totalRunning > 0 ? (
540+
embedded && (selection.totalRunning > 0 || selection.hasActiveDispatch) ? (
531541
<RunStatusControl
532542
running={selection.totalRunning}
533543
onStopAll={onStopAll}

apps/sim/background/resume-execution.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { toError } from '@sim/utils/errors'
33
import { generateId } from '@sim/utils/id'
44
import { task } from '@trigger.dev/sdk'
55
import { withCascadeLock } from '@/lib/table/cascade-lock'
6+
import { isExecCancelled } from '@/lib/table/deps'
67
import type { RowData, RowExecutionMetadata } from '@/lib/table/types'
78
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
89

@@ -44,6 +45,36 @@ export async function executeResumeJob(payload: ResumeExecutionPayload) {
4445
const { findCellContextByExecutionId } = await import('@/lib/table/workflow-columns')
4546
const cellContext = await findCellContextByExecutionId(parentExecutionId)
4647

48+
// A paused/awaiting table cell that was cancelled by "Stop all" must not
49+
// resume — the cancel write is authoritative (matches the cell-write guard
50+
// philosophy). Aborting here also stops the wasted compute the guard alone
51+
// can't prevent. Read the cell's current exec and bail if cancelled.
52+
if (cellContext) {
53+
const { getRowById } = await import('@/lib/table/service')
54+
const cellRow = await getRowById(
55+
cellContext.tableId,
56+
cellContext.rowId,
57+
cellContext.workspaceId
58+
)
59+
if (isExecCancelled(cellRow?.executions?.[cellContext.groupId])) {
60+
logger.info('Skipping resume — table cell cancelled', {
61+
tableId: cellContext.tableId,
62+
rowId: cellContext.rowId,
63+
groupId: cellContext.groupId,
64+
parentExecutionId,
65+
})
66+
return {
67+
success: false,
68+
workflowId,
69+
executionId: resumeExecutionId,
70+
parentExecutionId,
71+
status: 'cancelled' as const,
72+
output: undefined,
73+
executedAt: new Date().toISOString(),
74+
}
75+
}
76+
}
77+
4778
const writers = cellContext
4879
? await buildResumeCellWriters(cellContext, parentExecutionId)
4980
: null

0 commit comments

Comments
 (0)