Skip to content

Commit ec0b73e

Browse files
fix(table): row-scoped Refresh cancels in-flight; counter includes queued/pending
- runWorkflowColumn now cancels prior in-flight cells for row-scoped manual runs too (context-menu Refresh on a row subset, action-bar Refresh on selected rows). Previously only the table-wide path cancelled, so a row-scoped Refresh would bulk-clear running sidecar rows without aborting workers. Per-row cancel skips markActiveDispatchesCancelled so unrelated dispatches keep running. - countRunningCells now counts all in-flight statuses (queued / running / pending) instead of just running. The row gutter Run/Stop button reads this map — with the old behavior, clicking Play during the queued window would re-enqueue an already-queued cell. SSE applyCell handler updated to use isExecInFlight so client deltas track the same semantics.
1 parent 0b14dbb commit ec0b73e

3 files changed

Lines changed: 50 additions & 22 deletions

File tree

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { createLogger } from '@sim/logger'
55
import { useQueryClient } from '@tanstack/react-query'
66
import type { ActiveDispatch } from '@/lib/api/contracts/tables'
77
import type { RowData, RowExecutionMetadata, RowExecutions } from '@/lib/table'
8+
import { isExecInFlight } from '@/lib/table/deps'
89
import type { TableEvent, TableEventEntry } from '@/lib/table/events'
910
import { snapshotAndMutateRows, type TableRunState, tableKeys } from '@/hooks/queries/tables'
1011

@@ -74,11 +75,11 @@ export function useTableEventStream({
7475

7576
const updateRunStateCounters = (
7677
rowId: string,
77-
wasRunning: boolean,
78-
isRunning: boolean
78+
wasInFlight: boolean,
79+
isInFlight: boolean
7980
): void => {
80-
if (wasRunning === isRunning) return
81-
const delta = isRunning ? 1 : -1
81+
if (wasInFlight === isInFlight) return
82+
const delta = isInFlight ? 1 : -1
8283
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
8384
if (!prev) return prev
8485
const prevForRow = prev.runningByRowId[rowId] ?? 0
@@ -106,14 +107,18 @@ export function useTableEventStream({
106107
runningBlockIds,
107108
blockErrors,
108109
} = event
109-
let wasRunning: boolean | null = null
110+
let wasInFlight: boolean | null = null
110111
void snapshotAndMutateRows(
111112
queryClient,
112113
tableId,
113114
(row) => {
114115
if (row.id !== rowId) return null
115116
const prevExec = row.executions?.[groupId]
116-
if (wasRunning === null) wasRunning = prevExec?.status === 'running'
117+
// In-flight = queued | running | pending. Server's countRunningCells
118+
// counts all three (the gutter Run/Stop button reads this map and
119+
// needs Stop visible during queued too, else clicking Play would
120+
// re-enqueue a cell that's already queued).
121+
if (wasInFlight === null) wasInFlight = isExecInFlight(prevExec)
117122
const nextExec: RowExecutionMetadata = {
118123
status,
119124
executionId: executionId ?? null,
@@ -133,14 +138,18 @@ export function useTableEventStream({
133138
},
134139
{ cancelInFlight: false }
135140
)
136-
if (wasRunning === null) {
141+
if (wasInFlight === null) {
137142
// Row outside the loaded page slice — can't compute the delta locally.
138143
// Refetch the run-state snapshot from the server. Cheap and rare.
139144
void queryClient.invalidateQueries({
140145
queryKey: tableKeys.activeDispatches(tableId),
141146
})
142147
} else {
143-
updateRunStateCounters(rowId, wasRunning, status === 'running')
148+
updateRunStateCounters(
149+
rowId,
150+
wasInFlight,
151+
isExecInFlight({ status } as RowExecutionMetadata)
152+
)
144153
}
145154
}
146155

apps/sim/lib/table/dispatcher.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,20 +160,27 @@ export async function insertDispatch(input: {
160160
* active dispatch's scope ahead of its cursor are rendered as queued even
161161
* before the dispatcher has reached them, so refresh during a long Run-all
162162
* doesn't lose the queued indicators. */
163-
/** Counts cells across the entire table whose execution `status === 'running'`
164-
* — the authoritative source for the "X running" badge. Cache-derived
165-
* counters miss in-flight cells on rows outside the loaded page slice. */
163+
/** Counts in-flight cells (queued / running / pending) across the entire
164+
* table — the authoritative source for the "X running" badge and the per-row
165+
* gutter Run/Stop button. All three statuses are user-cancellable, so the
166+
* gutter must surface Stop whenever any of them are present (else clicking
167+
* Play during the queued window would re-run an already-queued cell).
168+
* Hits the `(table_id, status)` partial index on table_row_executions. */
166169
export async function countRunningCells(
167170
tableId: string
168171
): Promise<{ total: number; byRowId: Record<string, number> }> {
169-
// Hits the `(table_id, status)` partial index on table_row_executions.
170172
const rows = await db
171173
.select({
172174
rowId: tableRowExecutions.rowId,
173175
runningCount: sql<number>`count(*)::int`,
174176
})
175177
.from(tableRowExecutions)
176-
.where(and(eq(tableRowExecutions.tableId, tableId), eq(tableRowExecutions.status, 'running')))
178+
.where(
179+
and(
180+
eq(tableRowExecutions.tableId, tableId),
181+
inArray(tableRowExecutions.status, ['queued', 'running', 'pending'])
182+
)
183+
)
177184
.groupBy(tableRowExecutions.rowId)
178185
let total = 0
179186
const byRowId: Record<string, number> = {}

apps/sim/lib/table/workflow-columns.ts

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -488,19 +488,31 @@ export async function runWorkflowColumn(opts: {
488488
'./dispatcher'
489489
)
490490

491-
// For table-wide manual runs (Run all rows / Run column), cancel any prior
492-
// active dispatches AND their in-flight cells before clearing. Without this:
491+
// For manual runs (Run all rows / Run column / Refresh-row / Refresh-cell),
492+
// cancel any prior active dispatches AND in-flight cells in scope before
493+
// clearing. Without this:
493494
// - Two dispatcher loops would walk overlapping rows and burn duplicate work.
494495
// - mode:'all' bulk-clear deletes in-flight sidecar rows without aborting
495496
// workers — those would keep writing into the wiped state.
496-
// Row-scoped callers (dep-edit cascade in `updateRow`) already cancel their
497-
// specific scope before calling here, so we skip to avoid cancelling
498-
// unrelated dispatches on other rows. Auto-fire (`mode:'new'`) is harmless
499-
// overlap-wise since the NOT EXISTS filter excludes already-attempted rows.
500-
const cancelPriorRuns =
501-
isManualRun && (!rowIds || rowIds.length === 0) && (mode === 'all' || mode === 'incomplete')
497+
// Scope: table-wide cancel when rowIds is empty (also cancels active
498+
// dispatches via markActiveDispatchesCancelled), per-row cancel otherwise
499+
// (no dispatch cancel — other rows' dispatches keep running). Dep-edit
500+
// cascade in `updateRow` already cancels its own scope before calling,
501+
// so the duplicate work here is a cheap no-op for that caller.
502+
// Auto-fire (`mode:'new'`) is harmless overlap-wise — the NOT EXISTS
503+
// filter excludes already-attempted rows.
504+
const cancelPriorRuns = isManualRun && (mode === 'all' || mode === 'incomplete')
502505
if (cancelPriorRuns) {
503-
await cancelWorkflowGroupRuns(tableId, undefined, { groupIds: targetGroupIds })
506+
if (!rowIds || rowIds.length === 0) {
507+
await cancelWorkflowGroupRuns(tableId, undefined, { groupIds: targetGroupIds })
508+
} else {
509+
// Per-row cancel — sequential so we don't fan out N parallel
510+
// markActiveDispatchesCancelled calls (it's a no-op when rowId is set,
511+
// but each call still touches the DB).
512+
for (const rowId of rowIds) {
513+
await cancelWorkflowGroupRuns(tableId, rowId, { groupIds: targetGroupIds })
514+
}
515+
}
504516
}
505517

506518
// Wipe targeted output cols + executions[gid] before any cells fire so the

0 commit comments

Comments
 (0)