Skip to content

Commit 67ceef4

Browse files
refactor(tables): drop in-process retry, keep cause diagnostics only
In-process retry is the wrong layer for this path: the cell task is maxAttempts:1 by design, retrying on a possibly-degraded worker may not help, and it masks the very transient-failure signal we're trying to capture before we understand the root cause. Removed retryTransient entirely (file + all wrapping in cell-write, the cascade reads, and the lock acquire) and kept only the diagnostic logging. - Deleted lib/table/retry-transient.ts (+ test); cell-write and the cascade reads call getTableById/getRowById/updateRow directly again, fail-fast. - Kept describeError + `cause`/`retryable` fields across the cell + finalization catch blocks; the cell-path `retryable` flag now sources from isRetryableInfrastructureError (the canonical classifier) for consistency. Diagnostics-first: surface the real driver cause on the next recurrence, then decide the actual fix (e.g. task-level maxAttempts, or addressing the worker- side cause) from evidence rather than a speculative in-process retry. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent e0882cd commit 67ceef4

5 files changed

Lines changed: 24 additions & 240 deletions

File tree

apps/sim/background/workflow-column-execution.ts

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ import { generateId } from '@sim/utils/id'
77
import { backoffWithJitter } from '@sim/utils/retry'
88
import { task } from '@trigger.dev/sdk'
99
import { eq } from 'drizzle-orm'
10-
import { describeError } from '@/lib/core/errors/retryable-infrastructure'
10+
import {
11+
describeError,
12+
isRetryableInfrastructureError,
13+
} from '@/lib/core/errors/retryable-infrastructure'
1114
import { createTimeoutAbortController } from '@/lib/core/execution-limits'
1215
import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter'
1316
import { preprocessExecution } from '@/lib/execution/preprocessing'
1417
import { withCascadeLock } from '@/lib/table/cascade-lock'
1518
import { isExecCancelled } from '@/lib/table/deps'
1619
import { appendTableEvent } from '@/lib/table/events'
17-
import { isRetryableCellError, retryTransient } from '@/lib/table/retry-transient'
1820
import type {
1921
RowData,
2022
RowExecutionMetadata,
@@ -69,15 +71,9 @@ export async function executeWorkflowGroupCellJob(
6971
// marked, so stop re-driving this row.
7072
if (outcome.result === 'blocked') break
7173
if (signal?.aborted) break
72-
const freshTable = await retryTransient('cascade getTableById', () => getTableById(tableId), {
73-
signal,
74-
})
74+
const freshTable = await getTableById(tableId)
7575
if (!freshTable) break
76-
const freshRow = await retryTransient(
77-
'cascade getRowById',
78-
() => getRowById(tableId, rowId, workspaceId),
79-
{ signal }
80-
)
76+
const freshRow = await getRowById(tableId, rowId, workspaceId)
8177
if (!freshRow) break
8278
const next = pickNextEligibleGroupForRow(freshTable, freshRow)
8379
if (!next) break
@@ -121,9 +117,7 @@ export async function runRowCascadeLoop(
121117
while (true) {
122118
if (signal?.aborted) break
123119

124-
const freshTable = await retryTransient('cascade getTableById', () => getTableById(tableId), {
125-
signal,
126-
})
120+
const freshTable = await getTableById(tableId)
127121
if (!freshTable) {
128122
logger.warn(`Table ${tableId} vanished mid-cascade`)
129123
break
@@ -152,11 +146,7 @@ export async function runRowCascadeLoop(
152146
// would re-pick the still-pending queued marker and spin.
153147
if (result === 'blocked') return 'blocked'
154148

155-
const freshRow = await retryTransient(
156-
'cascade getRowById',
157-
() => getRowById(tableId, rowId, workspaceId),
158-
{ signal }
159-
)
149+
const freshRow = await getRowById(tableId, rowId, workspaceId)
160150
if (!freshRow) break
161151
const next = pickNextEligibleGroupForRow(freshTable, freshRow, currentGroupId)
162152
if (!next) break
@@ -612,7 +602,7 @@ async function runWorkflowAndWriteTerminal(
612602
.catch((err) => {
613603
logger.warn(
614604
`Per-block partial write failed (table=${tableId} row=${rowId} group=${groupId})`,
615-
{ cause: describeError(err), retryable: isRetryableCellError(err) }
605+
{ cause: describeError(err), retryable: isRetryableInfrastructureError(err) }
616606
)
617607
})
618608
}
@@ -738,7 +728,7 @@ async function runWorkflowAndWriteTerminal(
738728
error: message,
739729
executionId,
740730
cause: describeError(err),
741-
retryable: isRetryableCellError(err),
731+
retryable: isRetryableInfrastructureError(err),
742732
}
743733
)
744734
terminalWritten = true
@@ -757,7 +747,7 @@ async function runWorkflowAndWriteTerminal(
757747
logger.error('Also failed to write error state', {
758748
error: toError(writeErr).message,
759749
cause: describeError(writeErr),
760-
retryable: isRetryableCellError(writeErr),
750+
retryable: isRetryableInfrastructureError(writeErr),
761751
})
762752
}
763753
return 'error'

apps/sim/lib/table/cascade-lock.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,6 @@ export async function withCascadeLock<T>(
4040
fn: () => Promise<T>
4141
): Promise<{ status: 'acquired'; result: T } | { status: 'contended' }> {
4242
const key = cascadeLockKey(tableId, rowId)
43-
// NOT wrapped in retryTransient: acquireLock is a non-idempotent `SET NX`, so
44-
// a retry after a timed-out-but-applied first SET would see the key already
45-
// present and return false — a false `contended` that skips the cascade. A
46-
// transient Redis blip here just fails the run before pickup (no stranded
47-
// cell); the dispatcher/re-drive recovers it.
4843
const acquired = await acquireLock(key, ownerId, LOCK_TTL_SECONDS)
4944
if (!acquired) return { status: 'contended' }
5045

apps/sim/lib/table/cell-write.ts

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import { createLogger } from '@sim/logger'
1313
import { isExecCancelled } from '@/lib/table/deps'
1414
import { appendTableEvent } from '@/lib/table/events'
15-
import { retryTransient } from '@/lib/table/retry-transient'
1615
import type { RowData, RowExecutionMetadata, RowExecutions, WorkflowGroup } from '@/lib/table/types'
1716

1817
const logger = createLogger('WorkflowCellWrite')
@@ -47,14 +46,12 @@ export async function writeWorkflowGroupState(
4746
const requestId = ctx.requestId ?? `wfgrp-${executionId}`
4847
const { getTableById, getRowById, updateRow } = await import('@/lib/table/service')
4948

50-
const table = await retryTransient('cell-write getTableById', () => getTableById(tableId))
49+
const table = await getTableById(tableId)
5150
if (!table) {
5251
logger.warn(`Table ${tableId} vanished before group state write`)
5352
return 'wrote'
5453
}
55-
const row = await retryTransient('cell-write getRowById', () =>
56-
getRowById(tableId, rowId, workspaceId)
57-
)
54+
const row = await getRowById(tableId, rowId, workspaceId)
5855
if (!row) {
5956
logger.warn(`Row ${rowId} vanished before group state write`)
6057
return 'wrote'
@@ -102,22 +99,17 @@ export async function writeWorkflowGroupState(
10299
// task writes (running/completed/error) get the SQL guard so an in-flight
103100
// partial can't clobber a stop click or a newer run that already committed.
104101
const cancellationGuard = bypassStaleWorker ? undefined : { groupId, executionId }
105-
// The executionId/cancellation guard makes this write idempotent — a retry
106-
// after a dropped connection re-applies the same terminal state, so retrying
107-
// is safe and is what stops a transient blip from stranding the cell.
108-
const result = await retryTransient('cell-write updateRow', () =>
109-
updateRow(
110-
{
111-
tableId,
112-
rowId,
113-
data: payload.dataPatch ?? {},
114-
workspaceId,
115-
executionsPatch: { [groupId]: payload.executionState },
116-
cancellationGuard,
117-
},
118-
table,
119-
requestId
120-
)
102+
const result = await updateRow(
103+
{
104+
tableId,
105+
rowId,
106+
data: payload.dataPatch ?? {},
107+
workspaceId,
108+
executionsPatch: { [groupId]: payload.executionState },
109+
cancellationGuard,
110+
},
111+
table,
112+
requestId
121113
)
122114
if (result === null) {
123115
logger.info(

apps/sim/lib/table/retry-transient.test.ts

Lines changed: 0 additions & 106 deletions
This file was deleted.

apps/sim/lib/table/retry-transient.ts

Lines changed: 0 additions & 87 deletions
This file was deleted.

0 commit comments

Comments
 (0)