Skip to content

Commit 73a4cd2

Browse files
fix(table): incomplete bulk-clear is per-group, not per-row
bulkClearWorkflowGroupCells in incomplete mode wiped EVERY targeted group's output data + exec on any row that wasn't fully filled across all targeted groups. So Run-row on a row with one completed group and one cancelled group wiped the completed group's outputs + exec too, and the dispatcher re-ran it. Now incomplete-mode clears per-group: only error/cancelled groups get their output columns + exec cleared; completed and in-flight groups are left intact (never-run groups have nothing to clear and run via eligibility). Combined with the classifyEligibility guard, a completed workflow is never re-run by Run-row — only Run-all re-runs it.
1 parent 802db5b commit 73a4cd2

1 file changed

Lines changed: 56 additions & 54 deletions

File tree

apps/sim/lib/table/dispatcher.ts

Lines changed: 56 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -65,67 +65,69 @@ export async function bulkClearWorkflowGroupCells(input: {
6565
// Pre-existing outputs on any other row must not be wiped by an auto-fire.
6666
if (mode === 'new') return
6767

68-
const outputCols = Array.from(new Set(groups.flatMap((g) => g.outputs.map((o) => o.columnName))))
6968
const groupIds = groups.map((g) => g.id)
70-
71-
// Step 1: clear the targeted output columns from `data` on every row in
72-
// scope. Identical chain to the previous JSONB-only path.
73-
let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)`
74-
for (const col of outputCols) dataExpr = sql`(${dataExpr}) - ${col}::text`
75-
76-
const filters: SQL[] = [eq(userTableRows.tableId, tableId)]
77-
if (rowIds && rowIds.length > 0) {
78-
filters.push(inArray(userTableRows.id, rowIds))
79-
}
80-
if (mode === 'incomplete') {
81-
// Skip rows where all output columns across all targeted groups already
82-
// have a non-empty value — those are "completed-and-filled" and the
83-
// eligibility predicate would skip them anyway.
84-
const filledChecks = outputCols.map(
85-
(col) => sql`coalesce(${userTableRows.data} ->> ${col}, '') != ''`
69+
const rowScope = rowIds && rowIds.length > 0 ? rowIds : null
70+
71+
if (mode === 'all') {
72+
// Run-all re-runs every targeted group: wipe all their output columns +
73+
// executions for the rows in scope. (Prior in-flight runs were already
74+
// cancelled by the caller.)
75+
const outputCols = Array.from(
76+
new Set(groups.flatMap((g) => g.outputs.map((o) => o.columnName)))
8677
)
87-
const allFilled = filledChecks.reduce((acc, expr) => sql`${acc} AND ${expr}`)
88-
filters.push(sql`NOT (${allFilled})`)
89-
// Also skip rows where ANY targeted group has an in-flight exec — those
90-
// belong to another dispatch and clobbering them would race. Encoded as
91-
// a NOT EXISTS subquery against the sidecar's `(table_id, status)`
92-
// partial index.
93-
filters.push(
94-
sql`NOT EXISTS (
78+
let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)`
79+
for (const col of outputCols) dataExpr = sql`(${dataExpr}) - ${col}::text`
80+
const filters: SQL[] = [eq(userTableRows.tableId, tableId)]
81+
if (rowScope) filters.push(inArray(userTableRows.id, rowScope))
82+
83+
await db.transaction(async (trx) => {
84+
await trx
85+
.update(userTableRows)
86+
.set({ data: dataExpr, updatedAt: new Date() })
87+
.where(and(...filters))
88+
const execFilters: SQL[] = [
89+
eq(tableRowExecutions.tableId, tableId),
90+
inArray(tableRowExecutions.groupId, groupIds),
91+
]
92+
if (rowScope) execFilters.push(inArray(tableRowExecutions.rowId, rowScope))
93+
await trx.delete(tableRowExecutions).where(and(...execFilters))
94+
})
95+
return
96+
}
97+
98+
// `incomplete`: clear per-group, not per-row. Only groups that are
99+
// re-runnable (`error` / `cancelled`) get their output columns + exec wiped;
100+
// `completed` and in-flight groups are left fully intact. A row-level "all
101+
// filled" check would otherwise wipe a completed group's data + exec just
102+
// because a *sibling* group on the same row is incomplete, re-running the
103+
// completed one. (`never-run` groups have no exec/output to clear — the
104+
// dispatcher runs them via eligibility.)
105+
await db.transaction(async (trx) => {
106+
for (const group of groups) {
107+
const reRunnable = sql`EXISTS (
95108
SELECT 1 FROM ${tableRowExecutions} re
96109
WHERE re.row_id = ${userTableRows.id}
97-
AND re.group_id = ANY(ARRAY[${sql.join(
98-
groupIds.map((gid) => sql`${gid}`),
99-
sql`, `
100-
)}]::text[])
101-
AND re.status IN ('queued', 'running', 'pending')
110+
AND re.group_id = ${group.id}
111+
AND re.status IN ('error', 'cancelled')
102112
)`
103-
)
104-
}
113+
const filters: SQL[] = [eq(userTableRows.tableId, tableId), reRunnable]
114+
if (rowScope) filters.push(inArray(userTableRows.id, rowScope))
105115

106-
await db.transaction(async (trx) => {
107-
await trx
108-
.update(userTableRows)
109-
.set({ data: dataExpr, updatedAt: new Date() })
110-
.where(and(...filters))
111-
112-
// Step 2: delete the targeted groups' executions for the rows in scope.
113-
// Reuse the same row-scope filter via a subquery.
114-
const execFilters: SQL[] = [
115-
eq(tableRowExecutions.tableId, tableId),
116-
inArray(tableRowExecutions.groupId, groupIds),
117-
]
118-
if (rowIds && rowIds.length > 0) {
119-
execFilters.push(inArray(tableRowExecutions.rowId, rowIds))
120-
}
121-
if (mode === 'incomplete') {
122-
// For `incomplete`, only delete entries that aren't already in-flight
123-
// — terminal states (completed/error/cancelled) get wiped so the
124-
// dispatcher re-enqueues; in-flight entries stay so we don't race
125-
// with their worker.
126-
execFilters.push(sql`${tableRowExecutions.status} NOT IN ('queued', 'running', 'pending')`)
116+
let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)`
117+
for (const out of group.outputs) dataExpr = sql`(${dataExpr}) - ${out.columnName}::text`
118+
await trx
119+
.update(userTableRows)
120+
.set({ data: dataExpr, updatedAt: new Date() })
121+
.where(and(...filters))
122+
123+
const execFilters: SQL[] = [
124+
eq(tableRowExecutions.tableId, tableId),
125+
eq(tableRowExecutions.groupId, group.id),
126+
sql`${tableRowExecutions.status} IN ('error', 'cancelled')`,
127+
]
128+
if (rowScope) execFilters.push(inArray(tableRowExecutions.rowId, rowScope))
129+
await trx.delete(tableRowExecutions).where(and(...execFilters))
127130
}
128-
await trx.delete(tableRowExecutions).where(and(...execFilters))
129131
})
130132
}
131133

0 commit comments

Comments
 (0)