Skip to content

Commit 563fc37

Browse files
refactor(table): unify trigger.dev and inline dispatcher paths
`runWorkflowColumn` now always inserts a `table_run_dispatches` row and drives the dispatcher state machine. The trigger.dev / in-process branch narrows to a single line: trigger.dev fires `tableRunDispatcherTask` (which calls the new `runDispatcherToCompletion`), the inline path calls the same helper fire-and-forget. Deletes `scheduleRunsForRows` and `stampQueuedOrCancel` — the inline-fallback no longer duplicates window walking, SSE emission, or cancel. The dispatcher's window-execute call goes through `JobQueueBackend`: - New `batchEnqueueAndWait` interface method. - Trigger.dev impl wraps `tasks.batchTriggerAndWait` behind a `taskContext.isInsideTask` guard (clear error if called from outside a task). - Database impl skips `async_jobs` entirely — `Promise.all` over `options.runner(payload, signal)` per item, with per-cell AbortControllers tracked by `cancelKey` for cancel. `cancelInlineRun` moves to the interface as `cancelByKey` so `cancelWorkflowGroupRuns` no longer reaches into the database backend. Fix `mode: 'new'` SQL filter: - `${array}::text[]` interpolated as a tuple-cast which Postgres rejected ("cannot cast type record to text[]") and every inline dispatch silently failed. Switched to `ARRAY[${sql.join(...)}]::text[]`. - Predicate was `jsonb_exists_any` ("any one targeted group present"), which excluded rows that needed at least one group re-run after a downstream output was deleted. Switched to `jsonb_exists_all` — per-group JS eligibility handles the rest. Cascade-loop workflowId bug: `runRowCascadeLoop` was not threading the new group's `workflowId` when advancing across groups. The cell-task ran the previous group's workflow against the next group's cell, terminating `completed` with empty `accumulatedData`. Fixed by tracking `currentWorkflowId` alongside `currentGroupId` / `currentExecutionId`. Client optimistic-patch tightening: - `useRunColumn.onMutate` mirrors server eligibility — skip cells with unmet deps so unmet rows don't flash Queued and get stuck (no SSE will arrive for cells the server skipped). - `resolveCellExec` overlay synthesizes a virtual `pending` only when `areGroupDepsSatisfied` is true. Rows with unmet deps render Waiting, matching the dispatcher's actual behavior. Cleanup from /simplify pass: - Use `generateShortId(20)` instead of `generateId().replace(/-/g, '').slice(0, 20)`. - Inline `batchEnqueueAndWait` no longer allocates synthetic ids (returned `string[]` is unused). - Flattened the per-cell `tracked` array — only push entries that registered controllers, drop the null placeholders. - Extracted `runDispatcherToCompletion` to share the loop between the trigger.dev wrapper and the in-process path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 99e623a commit 563fc37

9 files changed

Lines changed: 260 additions & 189 deletions

File tree

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/utils.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import type {
66
TableRow as TableRowType,
77
WorkflowGroup,
88
} from '@/lib/table'
9-
import { areOutputsFilled } from '@/lib/table/deps'
9+
import { areGroupDepsSatisfied, areOutputsFilled } from '@/lib/table/deps'
1010
import type { DeletedRowSnapshot } from '@/stores/table/types'
1111
import type { DisplayColumn } from './types'
1212

@@ -170,10 +170,12 @@ export function readExecution(
170170

171171
/**
172172
* Resolves a cell's execution state with the "about to run" overlay applied:
173-
* for cells in an active dispatch's scope ahead of its cursor, returns a
174-
* synthetic `pending` exec so the renderer shows `Queued`. Cells with a real
175-
* DB exec always win — the overlay only fills the gap between dispatch start
176-
* and the dispatcher's per-row pending stamp.
173+
* for cells in an active dispatch's scope ahead of its cursor whose deps are
174+
* already satisfied, returns a synthetic `pending` exec so the renderer
175+
* shows `Queued`. Cells with a real DB exec always win — the overlay only
176+
* fills the gap between dispatch start and the dispatcher's per-row pending
177+
* stamp. Cells with unmet deps still render as `Waiting` (the renderer
178+
* computes that from `waitingOnLabels`).
177179
*/
178180
export function resolveCellExec(
179181
row: TableRowType,
@@ -185,6 +187,7 @@ export function resolveCellExec(
185187
if (real) return real
186188
if (!activeDispatches || activeDispatches.length === 0) return undefined
187189
if (areOutputsFilled(group, row)) return undefined
190+
if (!areGroupDepsSatisfied(group, row)) return undefined
188191
for (const d of activeDispatches) {
189192
if (!d.scope.groupIds.includes(group.id)) continue
190193
if (d.scope.rowIds && !d.scope.rowIds.includes(row.id)) continue

apps/sim/background/table-run-dispatcher.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { createLogger } from '@sim/logger'
22
import { toError } from '@sim/utils/errors'
33
import { task } from '@trigger.dev/sdk'
4-
import { dispatcherStep } from '@/lib/table/dispatcher'
4+
import { runDispatcherToCompletion } from '@/lib/table/dispatcher'
55

66
const logger = createLogger('TableRunDispatcherTask')
77

@@ -28,10 +28,7 @@ export const tableRunDispatcherTask = task({
2828
run: async (payload: TableRunDispatcherPayload) => {
2929
const { dispatchId } = payload
3030
try {
31-
while (true) {
32-
const result = await dispatcherStep(dispatchId)
33-
if (result === 'done') return
34-
}
31+
await runDispatcherToCompletion(dispatchId)
3532
} catch (err) {
3633
logger.error(`[${dispatchId}] dispatcher loop failed`, { error: toError(err).message })
3734
throw err

apps/sim/background/workflow-column-execution.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ export async function runRowCascadeLoop(
4343
const { pickNextEligibleGroupForRow } = await import('@/lib/table/workflow-columns')
4444

4545
let currentGroupId = payload.groupId
46+
let currentWorkflowId = payload.workflowId
4647
// Fresh executionId per iteration: SQL guard rejects writes whose id ≠
4748
// row.executions[gid].executionId, so we need a new claim per group.
4849
let currentExecutionId = payload.executionId
@@ -62,7 +63,12 @@ export async function runRowCascadeLoop(
6263
}
6364

6465
const result = await runWorkflowAndWriteTerminal(
65-
{ ...payload, groupId: currentGroupId, executionId: currentExecutionId },
66+
{
67+
...payload,
68+
groupId: currentGroupId,
69+
workflowId: currentWorkflowId,
70+
executionId: currentExecutionId,
71+
},
6672
signal,
6773
freshTable,
6874
currentGroup
@@ -75,6 +81,7 @@ export async function runRowCascadeLoop(
7581
const next = pickNextEligibleGroupForRow(freshTable, freshRow, currentGroupId)
7682
if (!next) break
7783
currentGroupId = next.id
84+
currentWorkflowId = next.workflowId
7885
currentExecutionId = generateId()
7986
}
8087
}

apps/sim/hooks/queries/tables.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ import type {
7272
WorkflowGroupOutput,
7373
} from '@/lib/table'
7474
import {
75+
areGroupDepsSatisfied,
7576
areOutputsFilled,
7677
isExecInFlight,
7778
optimisticallyScheduleNewlyEligibleGroups,
@@ -1331,22 +1332,23 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
13311332
for (const groupId of targetGroupIds) {
13321333
const exec = executions[groupId] as RowExecutionMetadata | undefined
13331334
if (isExecInFlight(exec)) continue
1335+
const group = groupsById.get(groupId)
1336+
// Mirror server eligibility: rows with unmet deps are skipped by the
1337+
// dispatcher regardless of mode. Stamping pending here would leave
1338+
// the cell flashing Queued indefinitely (no SSE event will arrive).
1339+
if (group && !areGroupDepsSatisfied(group, r)) continue
13341340
// Mirror server eligibility for `mode: 'incomplete'`: skip cells whose
13351341
// outputs are filled, regardless of exec status. A cancelled/error
13361342
// cell with a leftover value from a prior run was rendering as filled
13371343
// but flipping to "queued" optimistically here even though the server
13381344
// would skip it.
1339-
if (runMode === 'incomplete') {
1340-
const group = groupsById.get(groupId)
1341-
if (group && areOutputsFilled(group, r)) continue
1342-
}
1345+
if (runMode === 'incomplete' && group && areOutputsFilled(group, r)) continue
13431346
next[groupId] = buildPendingExec(exec)
13441347
// Mirror the server-side bulk clear: wipe output values so the cell
13451348
// doesn't render the stale completed value behind a pending badge.
13461349
// Without this the cell-render path's "value wins" branch keeps
13471350
// showing the previous run's output and the Queued/Running pill
13481351
// never appears.
1349-
const group = groupsById.get(groupId)
13501352
if (group) {
13511353
for (const o of group.outputs) {
13521354
if (o.columnName in nextData) nextData[o.columnName] = null

apps/sim/lib/core/async-jobs/backends/database.ts

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { asyncJobs, db } from '@sim/db'
22
import { createLogger } from '@sim/logger'
33
import { toError } from '@sim/utils/errors'
4-
import { generateId } from '@sim/utils/id'
4+
import { generateShortId } from '@sim/utils/id'
55
import { eq, sql } from 'drizzle-orm'
66
import {
77
type EnqueueOptions,
@@ -37,6 +37,14 @@ function rowToJob(row: AsyncJobRow): Job {
3737

3838
const inlineAbortControllers = new Map<string, AbortController>()
3939

40+
/**
41+
* Per-cancel-key abort controllers for the `batchEnqueueAndWait` direct-call
42+
* path. Distinct from `inlineAbortControllers` (which keys by jobId) — this
43+
* map keys by the domain `cancelKey` callers pass in, since the await-blocking
44+
* path skips `async_jobs` entirely and has no jobId to cancel by.
45+
*/
46+
const inlineCancelKeyControllers = new Map<string, AbortController>()
47+
4048
interface Semaphore {
4149
available: number
4250
waiters: Array<() => void>
@@ -73,7 +81,7 @@ export class DatabaseJobQueue implements JobQueueBackend {
7381
payload: TPayload,
7482
options?: EnqueueOptions
7583
): Promise<string> {
76-
const jobId = options?.jobId ?? `run_${generateId().replace(/-/g, '').slice(0, 20)}`
84+
const jobId = options?.jobId ?? `run_${generateShortId(20)}`
7785
const now = new Date()
7886

7987
await db
@@ -112,7 +120,7 @@ export class DatabaseJobQueue implements JobQueueBackend {
112120
if (items.length === 0) return []
113121
const now = new Date()
114122
const rows = items.map(({ payload, options }) => ({
115-
id: `run_${generateId().replace(/-/g, '').slice(0, 20)}`,
123+
id: `run_${generateShortId(20)}`,
116124
type,
117125
payload: payload as Record<string, unknown>,
118126
status: JOB_STATUS.PENDING,
@@ -144,6 +152,44 @@ export class DatabaseJobQueue implements JobQueueBackend {
144152
return rows.map((r) => r.id)
145153
}
146154

155+
/** Skips `async_jobs` entirely — ids are returned empty since callers can't
156+
* look up rows that don't exist. Cancel goes through `cancelByKey`. */
157+
async batchEnqueueAndWait<TPayload>(
158+
type: JobType,
159+
items: Array<{ payload: TPayload; options?: EnqueueOptions }>
160+
): Promise<string[]> {
161+
if (items.length === 0) return []
162+
const tracked: Array<{ key: string; controller: AbortController }> = []
163+
const runs = items.map((item) => {
164+
const runner = item.options?.runner
165+
if (!runner) return Promise.resolve()
166+
const controller = new AbortController()
167+
const cancelKey = item.options?.cancelKey
168+
if (cancelKey) {
169+
inlineCancelKeyControllers.set(cancelKey, controller)
170+
tracked.push({ key: cancelKey, controller })
171+
}
172+
return runner(item.payload, controller.signal).catch((err) => {
173+
logger.error(`[${type}] Inline run failed`, {
174+
cancelKey,
175+
error: toError(err).message,
176+
})
177+
})
178+
})
179+
try {
180+
await Promise.all(runs)
181+
} finally {
182+
// Compare-and-delete guards against a re-enqueue under the same key
183+
// racing with our cleanup.
184+
for (const t of tracked) {
185+
if (inlineCancelKeyControllers.get(t.key) === t.controller) {
186+
inlineCancelKeyControllers.delete(t.key)
187+
}
188+
}
189+
}
190+
return items.map(() => '')
191+
}
192+
147193
async getJob(jobId: string): Promise<Job | null> {
148194
const [row] = await db.select().from(asyncJobs).where(eq(asyncJobs.id, jobId)).limit(1)
149195

@@ -224,6 +270,14 @@ export class DatabaseJobQueue implements JobQueueBackend {
224270
logger.debug('Marked job as cancelled (DB queue)', { jobId, abortedInline: aborted })
225271
}
226272

273+
cancelByKey(cancelKey: string): boolean {
274+
const controller = inlineCancelKeyControllers.get(cancelKey)
275+
if (!controller) return false
276+
controller.abort('Cancelled')
277+
inlineCancelKeyControllers.delete(cancelKey)
278+
return true
279+
}
280+
227281
/**
228282
* Fire-and-forget IIFE that owns the lifecycle for an inline job: registers
229283
* the abort controller (so `cancelJob` can interrupt mid-flight), acquires

apps/sim/lib/core/async-jobs/backends/trigger-dev.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { createLogger } from '@sim/logger'
2+
import { taskContext } from '@trigger.dev/core/v3'
23
import { runs, type TriggerOptions, tasks } from '@trigger.dev/sdk'
34
import {
45
type EnqueueOptions,
@@ -103,6 +104,50 @@ export class TriggerDevJobQueue implements JobQueueBackend {
103104
return ids
104105
}
105106

107+
async batchEnqueueAndWait<TPayload>(
108+
type: JobType,
109+
items: Array<{ payload: TPayload; options?: EnqueueOptions }>
110+
): Promise<string[]> {
111+
if (items.length === 0) return []
112+
// The SDK's checkpoint-and-resume requires task runtime context. The only
113+
// caller (`dispatcherStep` invoked by `tableRunDispatcherTask.run`) is
114+
// always inside a task; check defensively so misuse fails at the boundary
115+
// instead of as a confusing SDK internal error.
116+
if (!taskContext.isInsideTask) {
117+
throw new Error(
118+
'batchEnqueueAndWait requires trigger.dev task runtime context — call from within a registered task'
119+
)
120+
}
121+
122+
const taskId = JOB_TYPE_TO_TASK_ID[type]
123+
if (!taskId) throw new Error(`Unknown job type: ${type}`)
124+
125+
const batchItems = items.map(({ payload, options }) => {
126+
const enrichedPayload =
127+
options?.metadata && typeof payload === 'object' && payload !== null
128+
? { ...payload, ...options.metadata }
129+
: payload
130+
const tags = buildTags(options)
131+
const batchItem: {
132+
payload: unknown
133+
options?: { concurrencyKey?: string; tags?: string[] }
134+
} = { payload: enrichedPayload }
135+
const batchOpts: { concurrencyKey?: string; tags?: string[] } = {}
136+
if (options?.concurrencyKey) batchOpts.concurrencyKey = options.concurrencyKey
137+
if (tags.length > 0) batchOpts.tags = tags
138+
if (Object.keys(batchOpts).length > 0) batchItem.options = batchOpts
139+
return batchItem
140+
})
141+
142+
const result = await tasks.batchTriggerAndWait(taskId, batchItems)
143+
logger.debug('batchTriggerAndWait completed', {
144+
type,
145+
taskId,
146+
runCount: result.runs.length,
147+
})
148+
return result.runs.map((r) => r.id)
149+
}
150+
106151
async getJob(jobId: string): Promise<Job | null> {
107152
try {
108153
const run = await runs.retrieve(jobId)
@@ -168,6 +213,13 @@ export class TriggerDevJobQueue implements JobQueueBackend {
168213
throw error
169214
}
170215
}
216+
217+
cancelByKey(_cancelKey: string): boolean {
218+
// No in-process AbortControllers to abort — trigger.dev runs are cancelled
219+
// by jobId or via tag sweep (see `cancelCellRunsByTags`). Callers that
220+
// need both surfaces should fan out themselves.
221+
return false
222+
}
171223
}
172224

173225
/**

apps/sim/lib/core/async-jobs/types.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,14 @@ export interface EnqueueOptions {
9696
* payload and an `AbortSignal` driven by `cancelJob`.
9797
*/
9898
runner?: <TPayload>(payload: TPayload, signal: AbortSignal) => Promise<void>
99+
/**
100+
* Stable identity for cancellation lookups on the database backend's
101+
* `batchEnqueueAndWait` path (which skips `async_jobs` entirely, so there
102+
* is no jobId to cancel by). Lets callers map a domain identity (e.g.
103+
* `tableId:rowId:groupId`) to the in-flight `AbortController`. Ignored
104+
* by trigger.dev — runs there are cancelled by tag or jobId.
105+
*/
106+
cancelKey?: string
99107
}
100108

101109
/**
@@ -118,6 +126,28 @@ export interface JobQueueBackend {
118126
items: Array<{ payload: TPayload; options?: EnqueueOptions }>
119127
): Promise<string[]>
120128

129+
/**
130+
* Enqueue a batch and block until every job has reached a terminal state
131+
* (completed, failed, or cancelled). The caller — typically a dispatcher
132+
* walking work in windows — uses this to gate window N+1 on window N's
133+
* completion.
134+
*
135+
* Backend implementations:
136+
* - Trigger.dev: wraps `tasks.batchTriggerAndWait`. MUST be called from
137+
* inside a registered trigger.dev task (the SDK's checkpoint-and-resume
138+
* requires task runtime context). Backends guard with
139+
* `taskContext.isInsideTask` and throw a clear error otherwise.
140+
* - Database (in-process): bypasses `async_jobs` entirely. Since the
141+
* caller is awaiting in-process, the row would serve no live purpose
142+
* (no cross-process recovery, no by-id lookup, no semaphore needed —
143+
* window size IS the concurrency cap). Calls the runner directly via
144+
* `Promise.all` and resolves on the runner's exit.
145+
*/
146+
batchEnqueueAndWait<TPayload>(
147+
type: JobType,
148+
items: Array<{ payload: TPayload; options?: EnqueueOptions }>
149+
): Promise<string[]>
150+
121151
/**
122152
* Get a job by ID
123153
*/
@@ -144,6 +174,15 @@ export interface JobQueueBackend {
144174
* should resolve quietly so callers can drive cancel from possibly-stale state.
145175
*/
146176
cancelJob(jobId: string): Promise<void>
177+
178+
/**
179+
* Cancel an in-flight job by its `cancelKey` (the domain identity callers
180+
* stamped on enqueue via `EnqueueOptions.cancelKey`). Used by
181+
* `batchEnqueueAndWait` paths that skip per-job ids; the trigger.dev
182+
* backend has no in-process AbortControllers to abort and returns `false`.
183+
* Returns `true` if a matching controller was found and aborted.
184+
*/
185+
cancelByKey(cancelKey: string): boolean
147186
}
148187

149188
export type AsyncBackendType = 'trigger-dev' | 'database'

0 commit comments

Comments
 (0)