Skip to content

Commit 89cf52a

Browse files
fix(table): X-running count from dispatch scope so reload matches live
The "X running" badge read countRunningCells (sidecar in-flight), but the dispatcher only stamps one ~20-cell window at a time. During a 1000-row Run-all the client optimistically showed ~1000 while a reload showed ~20 — the sidecar never holds more than a window. Derive the count from the active dispatches instead: rows in scope ahead of the cursor × |groupIds| (exact for Run-all, upper bound for incomplete/new). Both scope and cursor are persisted, so a reload computes the same number. - countActiveRunCells (dispatcher.ts): dispatch-scope total, sidecar fallback when no dispatch is active. byRowId stays sidecar-based (the client overlay renders queued rows ahead of the cursor). - Live: applyDispatch re-syncs the badge from the server on every dispatch event (one per window, after its cells finish + cursor advances), so the badge steps down per window and matches reload. applyCell no longer touches runningCellCount (still keeps runningByRowId live for the gutter). - Optimistic on click: useRunColumn seeds the full run scope (totalCount × groups) so the badge is right before the first window lands.
1 parent 73a4cd2 commit 89cf52a

4 files changed

Lines changed: 87 additions & 29 deletions

File tree

apps/sim/app/api/table/[tableId]/dispatches/route.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { parseRequest } from '@/lib/api/server'
55
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
66
import { generateRequestId } from '@/lib/core/utils/request'
77
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
8-
import { countRunningCells, listActiveDispatches } from '@/lib/table/dispatcher'
8+
import { countActiveRunCells, listActiveDispatches } from '@/lib/table/dispatcher'
99
import { accessError, checkAccess } from '@/app/api/table/utils'
1010

1111
const logger = createLogger('TableDispatchesAPI')
@@ -37,10 +37,8 @@ export const GET = withRouteHandler(async (request: NextRequest, { params }: Rou
3737
const result = await checkAccess(tableId, authResult.userId, 'read')
3838
if (!result.ok) return accessError(result, requestId, tableId)
3939

40-
const [rows, running] = await Promise.all([
41-
listActiveDispatches(tableId),
42-
countRunningCells(tableId),
43-
])
40+
const rows = await listActiveDispatches(tableId)
41+
const running = await countActiveRunCells(tableId, rows)
4442
const dispatches: ActiveDispatch[] = rows.map((r) => ({
4543
id: r.id,
4644
status: r.status as 'pending' | 'dispatching',

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,11 @@ export function useTableEventStream({
7373
let lastEventId = loadPointer(tableId)
7474
let reconnectAttempt = 0
7575

76-
const updateRunStateCounters = (
77-
rowId: string,
78-
wasInFlight: boolean,
79-
isInFlight: boolean
80-
): void => {
76+
// Keeps the per-row gutter (`runningByRowId`) live between dispatch events.
77+
// `runningCellCount` (the "X running" badge) is NOT touched here — it's the
78+
// server's dispatch-scope count, seeded optimistically on click and
79+
// re-synced by `applyDispatch` on every window, so live matches reload.
80+
const updateRunningByRow = (rowId: string, wasInFlight: boolean, isInFlight: boolean): void => {
8181
if (wasInFlight === isInFlight) return
8282
const delta = isInFlight ? 1 : -1
8383
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
@@ -87,11 +87,7 @@ export function useTableEventStream({
8787
const nextByRow = { ...prev.runningByRowId }
8888
if (nextForRow === 0) delete nextByRow[rowId]
8989
else nextByRow[rowId] = nextForRow
90-
return {
91-
...prev,
92-
runningCellCount: Math.max(0, prev.runningCellCount + delta),
93-
runningByRowId: nextByRow,
94-
}
90+
return { ...prev, runningByRowId: nextByRow }
9591
})
9692
}
9793

@@ -145,11 +141,7 @@ export function useTableEventStream({
145141
queryKey: tableKeys.activeDispatches(tableId),
146142
})
147143
} else {
148-
updateRunStateCounters(
149-
rowId,
150-
wasInFlight,
151-
isExecInFlight({ status } as RowExecutionMetadata)
152-
)
144+
updateRunningByRow(rowId, wasInFlight, isExecInFlight({ status } as RowExecutionMetadata))
153145
}
154146
}
155147

@@ -195,6 +187,11 @@ export function useTableEventStream({
195187
merged[idx] = next
196188
return { ...base, dispatches: merged }
197189
})
190+
// The dispatcher emits this once per window (after the window's cells
191+
// finish + the cursor advances) and on completion. Re-sync the
192+
// dispatch-scope `runningCellCount` from the server so the badge steps
193+
// down per window and matches a reload exactly.
194+
void queryClient.invalidateQueries({ queryKey: tableKeys.activeDispatches(tableId) })
198195
}
199196

200197
const handlePrune = (payload: PrunedEvent): void => {

apps/sim/hooks/queries/tables.ts

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -234,17 +234,40 @@ function countNewlyInFlight(before: RowExecutions, after: RowExecutions): number
234234
return n
235235
}
236236

237-
/** Add optimistically-stamped cells to the run-state counter so the "X running"
238-
* badge + per-row gutter Stop reflect them instantly (the optimistic stamp
239-
* eats the dispatcher's `pending` SSE, so `applyCell` never bumps the count).
240-
* Returns the prior snapshot for rollback, or `null` when nothing was bumped. */
237+
/** First-page `totalCount` from the rows infinite-query cache (or `null` when
238+
* rows haven't loaded yet). Lets a Run-all estimate the full-table cell count
239+
* even though only a page of rows is in memory. */
240+
function readTotalRowCount(
241+
queryClient: ReturnType<typeof useQueryClient>,
242+
tableId: string
243+
): number | null {
244+
const entries = queryClient.getQueriesData<InfiniteData<TableRowsResponse, number>>({
245+
queryKey: tableKeys.rowsRoot(tableId),
246+
exact: false,
247+
})
248+
for (const [, data] of entries) {
249+
const tc = data?.pages?.[0]?.totalCount
250+
if (typeof tc === 'number') return tc
251+
}
252+
return null
253+
}
254+
255+
/** Optimistically reflect a run on the "X running" badge + per-row gutter Stop
256+
* instantly (the optimistic stamp eats the dispatcher's `pending` SSE, so
257+
* `applyCell` never bumps the count, and the server's dispatch-scope count
258+
* isn't live until the first window). `stampedByRow` drives the per-row gutter
259+
* (loaded rows only); `cellCountDelta` is the badge delta — pass the full run
260+
* scope (rows × groups) for Run-all so it matches the server, or omit to use
261+
* the stamped total. Returns the prior snapshot for rollback. */
241262
function bumpRunState(
242263
queryClient: ReturnType<typeof useQueryClient>,
243264
tableId: string,
244-
stampedByRow: Record<string, number>
265+
stampedByRow: Record<string, number>,
266+
cellCountDelta?: number
245267
): { snapshot: TableRunState | undefined } | null {
246-
const total = Object.values(stampedByRow).reduce((s, n) => s + n, 0)
247-
if (total === 0) return null
268+
const stampedTotal = Object.values(stampedByRow).reduce((s, n) => s + n, 0)
269+
const countDelta = cellCountDelta ?? stampedTotal
270+
if (countDelta === 0 && stampedTotal === 0) return null
248271
const snapshot = queryClient.getQueryData<TableRunState>(tableKeys.activeDispatches(tableId))
249272
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
250273
const base = prev ?? { dispatches: [], runningCellCount: 0, runningByRowId: {} }
@@ -254,7 +277,7 @@ function bumpRunState(
254277
}
255278
return {
256279
...base,
257-
runningCellCount: base.runningCellCount + total,
280+
runningCellCount: base.runningCellCount + countDelta,
258281
runningByRowId: nextByRow,
259282
}
260283
})
@@ -1439,7 +1462,14 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
14391462
return { ...r, data: nextData, executions: next }
14401463
})
14411464

1442-
const bumped = bumpRunState(queryClient, tableId, stampedByRow)
1465+
// Badge counts the whole run scope (rows × groups), matching the server's
1466+
// dispatch-scope count — not just the loaded rows we could stamp. For
1467+
// Run-all that's the table's totalCount; for a scoped run, the rowIds.
1468+
const scopeRowCount = targetRowIds
1469+
? targetRowIds.size
1470+
: (readTotalRowCount(queryClient, tableId) ?? Object.keys(stampedByRow).length)
1471+
const cellCountDelta = scopeRowCount * targetGroupIds.size
1472+
const bumped = bumpRunState(queryClient, tableId, stampedByRow, cellCountDelta)
14431473
return { snapshots, runStateSnapshot: bumped?.snapshot, didBumpRunState: bumped !== null }
14441474
},
14451475
onError: (_err, _variables, context) => {

apps/sim/lib/table/dispatcher.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,39 @@ export async function countRunningCells(
195195
return { total, byRowId }
196196
}
197197

198+
/** Authoritative "cells queued or running" count for the table, derived from
199+
* active dispatches so it survives reload and matches the live count. For each
200+
* active dispatch every row in scope ahead of the cursor still has to run each
201+
* targeted group, so remaining work = (rows ahead of cursor) × |groupIds|.
202+
* Exact for Run-all; an upper bound for incomplete/new (rows the eligibility
203+
* filter later skips are still counted). Falls back to the sidecar in-flight
204+
* count when no dispatch is active (orphan stragglers). `byRowId` stays
205+
* sidecar-based — the client overlay renders queued rows ahead of the cursor. */
206+
export async function countActiveRunCells(
207+
tableId: string,
208+
dispatches?: DispatchRow[]
209+
): Promise<{ total: number; byRowId: Record<string, number> }> {
210+
const active = dispatches ?? (await listActiveDispatches(tableId))
211+
const sidecar = await countRunningCells(tableId)
212+
if (active.length === 0) return sidecar
213+
214+
let total = 0
215+
for (const d of active) {
216+
const groupCount = d.scope.groupIds.length
217+
if (groupCount === 0) continue
218+
const filters = [eq(userTableRows.tableId, tableId), gt(userTableRows.position, d.cursor)]
219+
if (d.scope.rowIds && d.scope.rowIds.length > 0) {
220+
filters.push(inArray(userTableRows.id, d.scope.rowIds))
221+
}
222+
const [row] = await db
223+
.select({ rowsAhead: sql<number>`count(*)::int` })
224+
.from(userTableRows)
225+
.where(and(...filters))
226+
total += (row?.rowsAhead ?? 0) * groupCount
227+
}
228+
return { total, byRowId: sidecar.byRowId }
229+
}
230+
198231
export async function listActiveDispatches(tableId: string): Promise<DispatchRow[]> {
199232
const rows = await db
200233
.select()

0 commit comments

Comments
 (0)