Skip to content

Commit 7db8a62

Browse files
fix(tables): retry transient DB/Redis failures in cell execution and surface error causes
Workflow-group-cell runs intermittently failed on trivial DB reads/writes under heavy fan-out, stranding cells in `running`. Investigation showed the PlanetScale and ElastiCache backends were healthy at the time — the failures are transient connection-level faults that the cell (maxAttempts: 1) had no tolerance for, and the real cause was never logged (Drizzle wraps it as "Failed query: ..." and the driver cause lives in error.cause). Resilience: - Add retryTransient (lib/table/retry-transient.ts): retries only transient infra errors (reuses isRetryableInfrastructureError; adds an ioredis command-timeout match) with jittered backoff, then rethrows. Fail-fast for everything else. - Wrap the cell's getTableById/getRowById reads, the terminal write (cell-write updateRow — idempotent via the executionId guard), and the Redis cascade-lock acquire. Diagnostics: - Add describeError (lib/core/errors/retryable-infrastructure.ts): walks the .cause chain and always returns the underlying driver cause (code/errno/ syscall + causeChain), including for unclassified errors like AbortError. - Log `cause` + a `retryable` flag (and aborted/timedOut in the cell's main catch) across the cell + finalization error paths, mirroring the existing schedule-execution pattern. Logging-only; no behavior change. This lets the next recurrence reveal the real cause and whether the retry applies. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent c786ada commit 7db8a62

9 files changed

Lines changed: 349 additions & 23 deletions

File tree

apps/sim/background/workflow-column-execution.ts

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ import { generateId } from '@sim/utils/id'
77
import { backoffWithJitter } from '@sim/utils/retry'
88
import { task } from '@trigger.dev/sdk'
99
import { eq } from 'drizzle-orm'
10+
import { describeError } from '@/lib/core/errors/retryable-infrastructure'
1011
import { createTimeoutAbortController } from '@/lib/core/execution-limits'
1112
import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter'
1213
import { preprocessExecution } from '@/lib/execution/preprocessing'
1314
import { withCascadeLock } from '@/lib/table/cascade-lock'
1415
import { isExecCancelled } from '@/lib/table/deps'
1516
import { appendTableEvent } from '@/lib/table/events'
17+
import { isRetryableCellError, retryTransient } from '@/lib/table/retry-transient'
1618
import type {
1719
RowData,
1820
RowExecutionMetadata,
@@ -67,9 +69,15 @@ export async function executeWorkflowGroupCellJob(
6769
// marked, so stop re-driving this row.
6870
if (outcome.result === 'blocked') break
6971
if (signal?.aborted) break
70-
const freshTable = await getTableById(tableId)
72+
const freshTable = await retryTransient('cascade getTableById', () => getTableById(tableId), {
73+
signal,
74+
})
7175
if (!freshTable) break
72-
const freshRow = await getRowById(tableId, rowId, workspaceId)
76+
const freshRow = await retryTransient(
77+
'cascade getRowById',
78+
() => getRowById(tableId, rowId, workspaceId),
79+
{ signal }
80+
)
7381
if (!freshRow) break
7482
const next = pickNextEligibleGroupForRow(freshTable, freshRow)
7583
if (!next) break
@@ -113,7 +121,9 @@ export async function runRowCascadeLoop(
113121
while (true) {
114122
if (signal?.aborted) break
115123

116-
const freshTable = await getTableById(tableId)
124+
const freshTable = await retryTransient('cascade getTableById', () => getTableById(tableId), {
125+
signal,
126+
})
117127
if (!freshTable) {
118128
logger.warn(`Table ${tableId} vanished mid-cascade`)
119129
break
@@ -142,7 +152,11 @@ export async function runRowCascadeLoop(
142152
// would re-pick the still-pending queued marker and spin.
143153
if (result === 'blocked') return 'blocked'
144154

145-
const freshRow = await getRowById(tableId, rowId, workspaceId)
155+
const freshRow = await retryTransient(
156+
'cascade getRowById',
157+
() => getRowById(tableId, rowId, workspaceId),
158+
{ signal }
159+
)
146160
if (!freshRow) break
147161
const next = pickNextEligibleGroupForRow(freshTable, freshRow, currentGroupId)
148162
if (!next) break
@@ -597,8 +611,8 @@ async function runWorkflowAndWriteTerminal(
597611
})
598612
.catch((err) => {
599613
logger.warn(
600-
`Per-block partial write failed (table=${tableId} row=${rowId} group=${groupId}):`,
601-
err
614+
`Per-block partial write failed (table=${tableId} row=${rowId} group=${groupId})`,
615+
{ cause: describeError(err), retryable: isRetryableCellError(err) }
602616
)
603617
})
604618
}
@@ -720,7 +734,14 @@ async function runWorkflowAndWriteTerminal(
720734
const message = toError(err).message
721735
logger.error(
722736
`Workflow group cell execution failed (table=${tableId} row=${rowId} group=${groupId})`,
723-
{ error: message, executionId }
737+
{
738+
error: message,
739+
executionId,
740+
cause: describeError(err),
741+
retryable: isRetryableCellError(err),
742+
aborted: abortSignal.aborted,
743+
timedOut: timeoutController.isTimedOut(),
744+
}
724745
)
725746
terminalWritten = true
726747
await writeChain.catch(() => {})
@@ -735,7 +756,11 @@ async function runWorkflowAndWriteTerminal(
735756
blockErrors,
736757
})
737758
} catch (writeErr) {
738-
logger.error('Also failed to write error state', { error: toError(writeErr).message })
759+
logger.error('Also failed to write error state', {
760+
error: toError(writeErr).message,
761+
cause: describeError(writeErr),
762+
retryable: isRetryableCellError(writeErr),
763+
})
739764
}
740765
return 'error'
741766
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { describe, expect, it } from 'vitest'
5+
import {
6+
describeError,
7+
isRetryableInfrastructureError,
8+
} from '@/lib/core/errors/retryable-infrastructure'
9+
10+
describe('describeError', () => {
11+
it('reports name and message for a plain error, omitting causeChain', () => {
12+
const described = describeError(new Error('boom'))
13+
expect(described).toEqual({ name: 'Error', message: 'boom' })
14+
expect(described.causeChain).toBeUndefined()
15+
})
16+
17+
it('surfaces the deepest cause for a wrapped driver error', () => {
18+
const driver = Object.assign(new Error('read ECONNRESET'), {
19+
code: 'ECONNRESET',
20+
errno: 'ECONNRESET',
21+
syscall: 'read',
22+
})
23+
const wrapped = new Error('Failed query: select ...', { cause: driver })
24+
25+
const described = describeError(wrapped)
26+
expect(described.name).toBe('Error')
27+
expect(described.message).toBe('read ECONNRESET')
28+
expect(described.code).toBe('ECONNRESET')
29+
expect(described.errno).toBe('ECONNRESET')
30+
expect(described.syscall).toBe('read')
31+
expect(described.causeChain).toEqual([
32+
'Error: Failed query: select ...',
33+
'Error: read ECONNRESET',
34+
])
35+
})
36+
37+
it('always returns the cause for unclassified errors (AbortError)', () => {
38+
const aborted = Object.assign(new Error('The operation was aborted'), { name: 'AbortError' })
39+
const described = describeError(aborted)
40+
41+
expect(described.name).toBe('AbortError')
42+
expect(described.message).toBe('The operation was aborted')
43+
// The retryable classifier skips it entirely — describeError still surfaces it.
44+
expect(isRetryableInfrastructureError(aborted)).toBe(false)
45+
})
46+
47+
it('falls back to a populated description for non-Error input without throwing', () => {
48+
expect(describeError('just a string')).toEqual({ name: 'Error', message: 'just a string' })
49+
expect(() => describeError({ weird: true })).not.toThrow()
50+
})
51+
52+
it('stops walking the cause chain at depth 10 and does not loop on a cycle', () => {
53+
const a = new Error('a')
54+
const b = new Error('b')
55+
;(a as Error & { cause?: unknown }).cause = b
56+
;(b as Error & { cause?: unknown }).cause = a
57+
58+
let described: ReturnType<typeof describeError> | undefined
59+
expect(() => {
60+
described = describeError(a)
61+
}).not.toThrow()
62+
expect(described?.causeChain?.length).toBeLessThanOrEqual(10)
63+
})
64+
})

apps/sim/lib/core/errors/retryable-infrastructure.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { toError } from '@sim/utils/errors'
2+
13
const RETRYABLE_DB_ERROR_CODES = new Set([
24
'08000',
35
'08001',
@@ -76,3 +78,47 @@ export function describeRetryableInfrastructureError(
7678
export function isRetryableInfrastructureError(error: unknown): boolean {
7779
return Boolean(describeRetryableInfrastructureError(error))
7880
}
81+
82+
export interface DescribedError {
83+
name: string
84+
message: string
85+
code?: string
86+
errno?: string
87+
syscall?: string
88+
/** `"Name: message"` per link in the `.cause` chain, outermost first. Present only when the chain has more than one link. */
89+
causeChain?: string[]
90+
}
91+
92+
/**
93+
* Always-on diagnostic view of an error and its `.cause` chain.
94+
*
95+
* Unlike {@link describeRetryableInfrastructureError} — which returns
96+
* `undefined` for errors outside its retryable allowlist — this returns the
97+
* underlying cause for ANY error, including `AbortError` and otherwise
98+
* unclassified causes. Reports the fields of the DEEPEST `.cause` link, because
99+
* a wrapped driver error (e.g. Drizzle's `"Failed query: ..."` wrapping an
100+
* `ECONNRESET`) carries the real reason there, not on the outer wrapper.
101+
*
102+
* `@sim/logger` does not serialize the non-enumerable `Error.prototype.cause`,
103+
* so callers must pass the result as an explicit structured log field rather
104+
* than relying on the logger to expand a raw error.
105+
*/
106+
export function describeError(error: unknown): DescribedError {
107+
const chain = getErrorChain(error)
108+
if (chain.length === 0) {
109+
const normalized = toError(error)
110+
return { name: normalized.name, message: normalized.message }
111+
}
112+
const deepest = chain[chain.length - 1]
113+
const code = typeof deepest.code === 'string' ? deepest.code : undefined
114+
const errno = typeof deepest.errno === 'string' ? deepest.errno : undefined
115+
const syscall = typeof deepest.syscall === 'string' ? deepest.syscall : undefined
116+
return {
117+
name: deepest.name,
118+
message: deepest.message,
119+
...(code ? { code } : {}),
120+
...(errno ? { errno } : {}),
121+
...(syscall ? { syscall } : {}),
122+
...(chain.length > 1 ? { causeChain: chain.map((e) => `${e.name}: ${e.message}`) } : {}),
123+
}
124+
}

apps/sim/lib/logs/execution/logging-session.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ import { workflowExecutionLogs } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
44
import { toError } from '@sim/utils/errors'
55
import { and, eq, sql } from 'drizzle-orm'
6+
import {
7+
describeError,
8+
isRetryableInfrastructureError,
9+
} from '@/lib/core/errors/retryable-infrastructure'
610
import { executionLogger } from '@/lib/logs/execution/logger'
711
import {
812
calculateCostSummary,
@@ -177,6 +181,8 @@ export class LoggingSession {
177181
} catch (error) {
178182
logger.error(`Failed to persist last started block for execution ${this.executionId}:`, {
179183
error: toError(error).message,
184+
cause: describeError(error),
185+
retryable: isRetryableInfrastructureError(error),
180186
})
181187
}
182188
}
@@ -193,6 +199,8 @@ export class LoggingSession {
193199
} catch (error) {
194200
logger.error(`Failed to persist last completed block for execution ${this.executionId}:`, {
195201
error: toError(error).message,
202+
cause: describeError(error),
203+
retryable: isRetryableInfrastructureError(error),
196204
})
197205
}
198206
}
@@ -411,6 +419,8 @@ export class LoggingSession {
411419
executionId: this.executionId,
412420
error: toError(error).message,
413421
stack: error instanceof Error ? error.stack : undefined,
422+
cause: describeError(error),
423+
retryable: isRetryableInfrastructureError(error),
414424
})
415425
throw error
416426
}
@@ -1057,7 +1067,11 @@ export class LoggingSession {
10571067
this.completionAttemptFailed = true
10581068
logger.error(
10591069
`[${this.requestId || 'unknown'}] Cost-only fallback also failed for execution ${this.executionId}:`,
1060-
{ error: toError(fallbackError).message }
1070+
{
1071+
error: toError(fallbackError).message,
1072+
cause: describeError(fallbackError),
1073+
retryable: isRetryableInfrastructureError(fallbackError),
1074+
}
10611075
)
10621076
}
10631077
}

apps/sim/lib/table/cascade-lock.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { createLogger } from '@sim/logger'
22
import { toError } from '@sim/utils/errors'
33
import { acquireLock, extendLock, releaseLock } from '@/lib/core/config/redis'
4+
import { retryTransient } from '@/lib/table/retry-transient'
45

56
const logger = createLogger('TableCascadeLock')
67

@@ -40,7 +41,11 @@ export async function withCascadeLock<T>(
4041
fn: () => Promise<T>
4142
): Promise<{ status: 'acquired'; result: T } | { status: 'contended' }> {
4243
const key = cascadeLockKey(tableId, rowId)
43-
const acquired = await acquireLock(key, ownerId, LOCK_TTL_SECONDS)
44+
// A timed-out/dropped Redis command here throws before the cell is picked up;
45+
// retry so a transient Redis blip doesn't fail the run outright.
46+
const acquired = await retryTransient('cascade acquireLock', () =>
47+
acquireLock(key, ownerId, LOCK_TTL_SECONDS)
48+
)
4449
if (!acquired) return { status: 'contended' }
4550

4651
const heartbeat = setInterval(() => {

apps/sim/lib/table/cell-write.ts

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import { createLogger } from '@sim/logger'
1313
import { isExecCancelled } from '@/lib/table/deps'
1414
import { appendTableEvent } from '@/lib/table/events'
15+
import { retryTransient } from '@/lib/table/retry-transient'
1516
import type { RowData, RowExecutionMetadata, RowExecutions, WorkflowGroup } from '@/lib/table/types'
1617

1718
const logger = createLogger('WorkflowCellWrite')
@@ -46,12 +47,14 @@ export async function writeWorkflowGroupState(
4647
const requestId = ctx.requestId ?? `wfgrp-${executionId}`
4748
const { getTableById, getRowById, updateRow } = await import('@/lib/table/service')
4849

49-
const table = await getTableById(tableId)
50+
const table = await retryTransient('cell-write getTableById', () => getTableById(tableId))
5051
if (!table) {
5152
logger.warn(`Table ${tableId} vanished before group state write`)
5253
return 'wrote'
5354
}
54-
const row = await getRowById(tableId, rowId, workspaceId)
55+
const row = await retryTransient('cell-write getRowById', () =>
56+
getRowById(tableId, rowId, workspaceId)
57+
)
5558
if (!row) {
5659
logger.warn(`Row ${rowId} vanished before group state write`)
5760
return 'wrote'
@@ -99,17 +102,22 @@ export async function writeWorkflowGroupState(
99102
// task writes (running/completed/error) get the SQL guard so an in-flight
100103
// partial can't clobber a stop click or a newer run that already committed.
101104
const cancellationGuard = bypassStaleWorker ? undefined : { groupId, executionId }
102-
const result = await updateRow(
103-
{
104-
tableId,
105-
rowId,
106-
data: payload.dataPatch ?? {},
107-
workspaceId,
108-
executionsPatch: { [groupId]: payload.executionState },
109-
cancellationGuard,
110-
},
111-
table,
112-
requestId
105+
// The executionId/cancellation guard makes this write idempotent — a retry
106+
// after a dropped connection re-applies the same terminal state, so retrying
107+
// is safe and is what stops a transient blip from stranding the cell.
108+
const result = await retryTransient('cell-write updateRow', () =>
109+
updateRow(
110+
{
111+
tableId,
112+
rowId,
113+
data: payload.dataPatch ?? {},
114+
workspaceId,
115+
executionsPatch: { [groupId]: payload.executionState },
116+
cancellationGuard,
117+
},
118+
table,
119+
requestId
120+
)
113121
)
114122
if (result === null) {
115123
logger.info(

0 commit comments

Comments
 (0)