11import { db } from '@sim/db'
2- import { tableRunDispatches , userTableRows } from '@sim/db/schema'
2+ import { tableRowExecutions , tableRunDispatches , userTableRows } from '@sim/db/schema'
33import { createLogger } from '@sim/logger'
44import { toError } from '@sim/utils/errors'
55import { generateId } from '@sim/utils/id'
66import { and , asc , eq , gt , inArray , type SQL , sql } from 'drizzle-orm'
77import { getJobQueue } from '@/lib/core/async-jobs/config'
88import { writeWorkflowGroupState } from '@/lib/table/cell-write'
99import { appendTableEvent } from '@/lib/table/events'
10- import type { TableRow } from '@/lib/table/types'
10+ import type { RowExecutionMetadata , RowExecutions , TableRow } from '@/lib/table/types'
1111import {
1212 buildEnqueueItems ,
1313 buildPendingRuns ,
14- type WorkflowGroupCellPayload ,
1514 TABLE_CONCURRENCY_LIMIT ,
1615 toTableRow ,
16+ type WorkflowGroupCellPayload ,
1717} from './workflow-columns'
1818
1919const logger = createLogger ( 'TableRunDispatcher' )
@@ -68,11 +68,10 @@ export async function bulkClearWorkflowGroupCells(input: {
6868 const outputCols = Array . from ( new Set ( groups . flatMap ( ( g ) => g . outputs . map ( ( o ) => o . columnName ) ) ) )
6969 const groupIds = groups . map ( ( g ) => g . id )
7070
71- // Build `data - 'col1' - 'col2' - ...` and `executions - 'gid1' - 'gid2' - ...`.
71+ // Step 1: clear the targeted output columns from `data` on every row in
72+ // scope. Identical chain to the previous JSONB-only path.
7273 let dataExpr : SQL = sql `coalesce(${ userTableRows . data } , '{}'::jsonb)`
7374 for ( const col of outputCols ) dataExpr = sql `(${ dataExpr } ) - ${ col } ::text`
74- let execExpr : SQL = sql `coalesce(${ userTableRows . executions } , '{}'::jsonb)`
75- for ( const gid of groupIds ) execExpr = sql `(${ execExpr } ) - ${ gid } ::text`
7675
7776 const filters : SQL [ ] = [ eq ( userTableRows . tableId , tableId ) ]
7877 if ( rowIds && rowIds . length > 0 ) {
@@ -87,22 +86,47 @@ export async function bulkClearWorkflowGroupCells(input: {
8786 )
8887 const allFilled = filledChecks . reduce ( ( acc , expr ) => sql `${ acc } AND ${ expr } ` )
8988 filters . push ( sql `NOT (${ allFilled } )` )
90- // Also skip rows where ANY targeted group has an in-flight exec from
91- // another dispatch — clobbering its `executions[gid]` would race with
92- // the in-flight worker. An `incomplete` run by definition shouldn't
93- // touch rows another dispatch is actively working on.
94- const inFlightChecks = groupIds . map (
95- ( gid ) =>
96- sql `${ userTableRows . executions } -> ${ gid } ::text ->> 'status' IN ('queued', 'running', 'pending')`
89+ // Also skip rows where ANY targeted group has an in-flight exec — those
90+ // belong to another dispatch and clobbering them would race. Encoded as
91+ // a NOT EXISTS subquery against the sidecar's `(table_id, status)`
92+ // partial index.
93+ filters . push (
94+ sql `NOT EXISTS (
95+ SELECT 1 FROM ${ tableRowExecutions } re
96+ WHERE re.row_id = ${ userTableRows . id }
97+ AND re.group_id = ANY(ARRAY[${ sql . join (
98+ groupIds . map ( ( gid ) => sql `${ gid } ` ) ,
99+ sql `, `
100+ ) } ]::text[])
101+ AND re.status IN ('queued', 'running', 'pending')
102+ )`
97103 )
98- const anyInFlight = inFlightChecks . reduce ( ( acc , expr ) => sql `${ acc } OR ${ expr } ` )
99- filters . push ( sql `NOT (${ anyInFlight } )` )
100104 }
101105
102- await db
103- . update ( userTableRows )
104- . set ( { data : dataExpr , executions : execExpr , updatedAt : new Date ( ) } )
105- . where ( and ( ...filters ) )
106+ await db . transaction ( async ( trx ) => {
107+ await trx
108+ . update ( userTableRows )
109+ . set ( { data : dataExpr , updatedAt : new Date ( ) } )
110+ . where ( and ( ...filters ) )
111+
112+ // Step 2: delete the targeted groups' executions for the rows in scope.
113+ // Reuse the same row-scope filter via a subquery.
114+ const execFilters : SQL [ ] = [
115+ eq ( tableRowExecutions . tableId , tableId ) ,
116+ inArray ( tableRowExecutions . groupId , groupIds ) ,
117+ ]
118+ if ( rowIds && rowIds . length > 0 ) {
119+ execFilters . push ( inArray ( tableRowExecutions . rowId , rowIds ) )
120+ }
121+ if ( mode === 'incomplete' ) {
122+ // For `incomplete`, only delete entries that aren't already in-flight
123+ // — terminal states (completed/error/cancelled) get wiped so the
124+ // dispatcher re-enqueues; in-flight entries stay so we don't race
125+ // with their worker.
126+ execFilters . push ( sql `${ tableRowExecutions . status } NOT IN ('queued', 'running', 'pending')` )
127+ }
128+ await trx . delete ( tableRowExecutions ) . where ( and ( ...execFilters ) )
129+ } )
106130}
107131
108132export async function insertDispatch ( input : {
@@ -142,27 +166,20 @@ export async function insertDispatch(input: {
142166export async function countRunningCells (
143167 tableId : string
144168) : Promise < { total : number ; byRowId : Record < string , number > } > {
169+ // Hits the `(table_id, status)` partial index on table_row_executions.
145170 const rows = await db
146171 . select ( {
147- id : userTableRows . id ,
148- runningCount : sql < number > `(
149- SELECT count(*)::int FROM jsonb_each(${ userTableRows . executions } ) e
150- WHERE e.value->>'status' = 'running'
151- )` ,
172+ rowId : tableRowExecutions . rowId ,
173+ runningCount : sql < number > `count(*)::int` ,
152174 } )
153- . from ( userTableRows )
154- . where (
155- and (
156- eq ( userTableRows . tableId , tableId ) ,
157- sql `${ userTableRows . executions } IS NOT NULL` ,
158- sql `${ userTableRows . executions } != '{}'::jsonb`
159- )
160- )
175+ . from ( tableRowExecutions )
176+ . where ( and ( eq ( tableRowExecutions . tableId , tableId ) , eq ( tableRowExecutions . status , 'running' ) ) )
177+ . groupBy ( tableRowExecutions . rowId )
161178 let total = 0
162179 const byRowId : Record < string , number > = { }
163180 for ( const r of rows ) {
164181 if ( r . runningCount > 0 ) {
165- byRowId [ r . id ] = r . runningCount
182+ byRowId [ r . rowId ] = r . runningCount
166183 total += r . runningCount
167184 }
168185 }
@@ -265,19 +282,22 @@ export async function dispatcherStep(dispatchId: string): Promise<DispatcherStep
265282 filters . push ( inArray ( userTableRows . id , dispatch . scope . rowIds ) )
266283 }
267284 // `'new'` mode targets only rows whose targeted groups haven't been
268- // attempted. Exclude a row only when EVERY targeted group already has an
269- // `executions[gid]` entry — if any one is missing, the row still has work
270- // to do and per-group JS filtering in `classifyEligibility` handles the
271- // rest. `jsonb_exists_all` is the function form of `?&` — safer than the
272- // operator, which collides with prepared-statement placeholder parsing in
273- // some drivers. Drizzle interpolates a JS array as a tuple of placeholders,
274- // not a Postgres array — emit `ARRAY[...]` literally.
285+ // attempted. Exclude a row only when EVERY targeted group already has a
286+ // sidecar entry — if any one is missing, the row still has work to do
287+ // and per-group JS filtering in `classifyEligibility` handles the rest.
275288 if ( dispatch . mode === 'new' && dispatch . scope . groupIds . length > 0 ) {
289+ const gids = dispatch . scope . groupIds
276290 filters . push (
277- sql `NOT jsonb_exists_all(coalesce(${ userTableRows . executions } , '{}'::jsonb), ARRAY[${ sql . join (
278- dispatch . scope . groupIds . map ( ( gid ) => sql `${ gid } ` ) ,
279- sql `, `
280- ) } ]::text[])`
291+ sql `NOT EXISTS (
292+ SELECT 1 FROM ${ tableRowExecutions } re
293+ WHERE re.row_id = ${ userTableRows . id }
294+ AND re.group_id = ANY(ARRAY[${ sql . join (
295+ gids . map ( ( gid ) => sql `${ gid } ` ) ,
296+ sql `, `
297+ ) } ]::text[])
298+ GROUP BY re.row_id
299+ HAVING count(DISTINCT re.group_id) = ${ gids . length }
300+ )`
281301 )
282302 }
283303
@@ -303,12 +323,40 @@ export async function dispatcherStep(dispatchId: string): Promise<DispatcherStep
303323 return 'done'
304324 }
305325
326+ // Pre-fetch executions for the chunk so per-row eligibility doesn't fan
327+ // out into one query per row. Returns `Map<rowId, RowExecutions>`.
328+ const chunkRowIds = chunk . map ( ( r ) => r . id )
329+ const execRows = await db
330+ . select ( )
331+ . from ( tableRowExecutions )
332+ . where ( inArray ( tableRowExecutions . rowId , chunkRowIds ) )
333+ const executionsByRow = new Map < string , RowExecutions > ( )
334+ for ( const r of execRows ) {
335+ const existing = executionsByRow . get ( r . rowId ) ?? { }
336+ const meta : RowExecutionMetadata = {
337+ status : r . status as RowExecutionMetadata [ 'status' ] ,
338+ executionId : r . executionId ?? null ,
339+ jobId : r . jobId ?? null ,
340+ workflowId : r . workflowId ,
341+ error : r . error ?? null ,
342+ ...( r . runningBlockIds && r . runningBlockIds . length > 0
343+ ? { runningBlockIds : r . runningBlockIds }
344+ : { } ) ,
345+ ...( r . blockErrors && Object . keys ( r . blockErrors as Record < string , string > ) . length > 0
346+ ? { blockErrors : r . blockErrors as Record < string , string > }
347+ : { } ) ,
348+ ...( r . cancelledAt ? { cancelledAt : r . cancelledAt . toISOString ( ) } : { } ) ,
349+ }
350+ existing [ r . groupId ] = meta
351+ executionsByRow . set ( r . rowId , existing )
352+ }
353+
306354 // Strip rows the user cancelled mid-cascade (post-dispatch tombstones)
307355 // before running the shared eligibility filter — `buildPendingRuns`
308356 // doesn't know about the per-dispatch cancel tombstone.
309357 const tombstoneFiltered : TableRow [ ] = [ ]
310358 for ( const r of chunk ) {
311- const tableRow = toTableRow ( r )
359+ const tableRow = toTableRow ( r , executionsByRow . get ( r . id ) ?? { } )
312360 const tombstoned = dispatch . scope . groupIds . some ( ( gid ) => {
313361 const exec = tableRow . executions ?. [ gid ]
314362 if ( ! exec ?. cancelledAt ) return false
0 commit comments