Skip to content

Commit bfb847b

Browse files
fix(table): address remaining cursor/greptile review feedback
- Mothership update_row no longer double-dispatches. updateRow already fires the auto-cascade internally; the second `mode: 'incomplete'` call here raced with it and could bulk-clear sibling-group outputs. - SSE dispatch events no longer dropped when the activeDispatches cache is cold. Seed an empty TableRunState if the initial fetch hasn't landed yet so the queued overlay doesn't lose the first dispatch event. - batchUpdateRows now runs cancel+rerun for per-row in-flight downstream groups, mirroring updateRow. Without this, dep edits in a batch left running workflows reading stale upstream values.
1 parent 01bb233 commit bfb847b

3 files changed

Lines changed: 62 additions & 25 deletions

File tree

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,18 +147,25 @@ export function useTableEventStream({
147147
const applyDispatch = (event: Extract<TableEvent, { kind: 'dispatch' }>): void => {
148148
const { dispatchId, status, scope, cursor, mode, isManualRun } = event
149149
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
150-
if (!prev) return prev
151-
const list = prev.dispatches
150+
// SSE may arrive before the initial fetch lands. Seed an empty
151+
// run-state so the dispatch isn't dropped; counters are reconciled
152+
// by the subsequent fetch / per-cell SSE events.
153+
const base: TableRunState = prev ?? {
154+
dispatches: [],
155+
runningCellCount: 0,
156+
runningByRowId: {},
157+
}
158+
const list = base.dispatches
152159
// Terminal states drop the dispatch from the overlay; client renders
153160
// the row's authoritative DB exec state from here.
154161
if (status === 'complete' || status === 'cancelled') {
155162
const filtered = list.filter((d) => d.id !== dispatchId)
156-
return filtered.length === list.length ? prev : { ...prev, dispatches: filtered }
163+
return filtered.length === list.length ? base : { ...base, dispatches: filtered }
157164
}
158165
if (scope === undefined || cursor === undefined || mode === undefined) {
159166
// Defensive: a legacy emit without the new fields can't drive the
160167
// overlay. Leave existing cache alone.
161-
return prev
168+
return base
162169
}
163170
const idx = list.findIndex((d) => d.id === dispatchId)
164171
const existing = idx === -1 ? undefined : list[idx]
@@ -174,10 +181,10 @@ export function useTableEventStream({
174181
cursor,
175182
scope,
176183
}
177-
if (idx === -1) return { ...prev, dispatches: [...list, next] }
184+
if (idx === -1) return { ...base, dispatches: [...list, next] }
178185
const merged = list.slice()
179186
merged[idx] = next
180-
return { ...prev, dispatches: merged }
187+
return { ...base, dispatches: merged }
181188
})
182189
}
183190

apps/sim/lib/copilot/tools/server/table/user-table.ts

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -525,16 +525,11 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
525525
// doesn't, so the guard never trips here. Defensive narrowing.
526526
return { success: false, message: 'Row update was skipped' }
527527
}
528-
void runWorkflowColumn({
529-
tableId: args.tableId,
530-
workspaceId,
531-
rowIds: [updatedRow.id],
532-
mode: 'incomplete',
533-
isManualRun: false,
534-
requestId,
535-
}).catch((err) =>
536-
logger.error(`[${requestId}] auto-dispatch (Mothership update_row) failed:`, err)
537-
)
528+
// Auto-dispatch for user edits is handled inside `updateRow`
529+
// (mode: 'new' for newly-cleared groups + cancel+rerun for in-flight
530+
// downstream groups). Firing a second mode: 'incomplete' dispatch
531+
// here would race with the internal one AND bulk-clear sibling-group
532+
// outputs (mode: 'incomplete' wipes terminal-state cells in scope).
538533

539534
return {
540535
success: true,

apps/sim/lib/table/service.ts

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2309,21 +2309,22 @@ export async function batchUpdateRows(
23092309
mergedData: RowData
23102310
mergedExecutions: RowExecutions
23112311
executionsPatch?: Record<string, RowExecutionMetadata | null>
2312+
inFlightDownstreamGroups: string[]
23122313
}> = []
23132314
for (const update of data.updates) {
23142315
const existing = existingMap.get(update.rowId)!
23152316
const merged = { ...existing.data, ...update.data }
23162317
// Auto-clear exec records for workflow output columns the user just
23172318
// wiped AND downstream dep-changed terminal groups — same rationale as
2318-
// `updateRow`. In-flight downstream groups are dropped here (batch
2319-
// updates don't run the cancel+restart orchestration — those go through
2320-
// single-row `updateRow`).
2321-
const { executionsPatch: effectiveExecutionsPatch } = deriveExecClearsForDataPatch(
2322-
update.data,
2323-
table.schema,
2324-
existing.executions,
2325-
update.executionsPatch
2326-
)
2319+
// `updateRow`. Per-row in-flight downstream groups are surfaced so we
2320+
// can run the cancel+rerun orchestration after the batch commits.
2321+
const { executionsPatch: effectiveExecutionsPatch, inFlightDownstreamGroups } =
2322+
deriveExecClearsForDataPatch(
2323+
update.data,
2324+
table.schema,
2325+
existing.executions,
2326+
update.executionsPatch
2327+
)
23272328
const mergedExecutions = applyExecutionsPatch(existing.executions, effectiveExecutionsPatch)
23282329

23292330
const sizeValidation = validateRowSize(merged)
@@ -2341,6 +2342,7 @@ export async function batchUpdateRows(
23412342
mergedData: merged,
23422343
mergedExecutions,
23432344
executionsPatch: effectiveExecutionsPatch,
2345+
inFlightDownstreamGroups,
23442346
})
23452347
}
23462348

@@ -2404,6 +2406,39 @@ export async function batchUpdateRows(
24042406
table.schema,
24052407
requestId
24062408
)
2409+
// Per-row cancel+rerun for in-flight downstream groups whose deps just
2410+
// changed — same orchestration as single-row `updateRow`. Without this,
2411+
// batch updates would leave running workflows reading stale dep values.
2412+
// Each row needs its own cancel + manual-incomplete dispatch because
2413+
// `cancelWorkflowGroupRuns`'s `groupIds` filter is per-row.
2414+
const rowsWithInFlightDownstream = mergedUpdates.filter(
2415+
(u) => u.inFlightDownstreamGroups.length > 0
2416+
)
2417+
if (rowsWithInFlightDownstream.length > 0) {
2418+
void (async () => {
2419+
try {
2420+
for (const { rowId, inFlightDownstreamGroups } of rowsWithInFlightDownstream) {
2421+
await cancelWorkflowGroupRuns(data.tableId, rowId, {
2422+
groupIds: inFlightDownstreamGroups,
2423+
})
2424+
await runWorkflowColumn({
2425+
tableId: data.tableId,
2426+
workspaceId: data.workspaceId,
2427+
mode: 'incomplete',
2428+
isManualRun: true,
2429+
rowIds: [rowId],
2430+
groupIds: inFlightDownstreamGroups,
2431+
requestId,
2432+
})
2433+
}
2434+
} catch (err) {
2435+
logger.error(
2436+
`[${requestId}] cancel+rerun for in-flight downstream groups (batch) failed:`,
2437+
err
2438+
)
2439+
}
2440+
})()
2441+
}
24072442
void runWorkflowColumn({
24082443
tableId: table.id,
24092444
workspaceId: table.workspaceId,

0 commit comments

Comments
 (0)