Skip to content

Commit 8812154

Browse files
fix(tables): address review feedback on cell retry resilience
- retryTransient: re-check the abort signal after the backoff sleep so a cancellation during sleep stops the next attempt (don't run/return work for an already-cancelled task). - isRetryableRedisError: walk the .cause chain (mirroring the infra classifier) so wrapped Redis timeouts are recognized; drop "Connection is in subscriber mode" — that's a connection-state programming error, not a transient drop, and would just fail identically every retry. - cascade-lock: stop wrapping acquireLock in retryTransient. acquireLock is a non-idempotent SET NX, so retrying after a timed-out-but-applied first SET returns false (key already ours) and yields a false `contended` that skips the cascade. A transient Redis blip here just fails the run before pickup (no stranded cell); the dispatcher re-drives it. - Tests: cause-chain Redis match, subscriber-mode exclusion, abort-during-sleep. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 91f00bd commit 8812154

3 files changed

Lines changed: 47 additions & 10 deletions

File tree

apps/sim/lib/table/cascade-lock.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { createLogger } from '@sim/logger'
22
import { toError } from '@sim/utils/errors'
33
import { acquireLock, extendLock, releaseLock } from '@/lib/core/config/redis'
4-
import { retryTransient } from '@/lib/table/retry-transient'
54

65
const logger = createLogger('TableCascadeLock')
76

@@ -41,11 +40,12 @@ export async function withCascadeLock<T>(
4140
fn: () => Promise<T>
4241
): Promise<{ status: 'acquired'; result: T } | { status: 'contended' }> {
4342
const key = cascadeLockKey(tableId, rowId)
44-
// A timed-out/dropped Redis command here throws before the cell is picked up;
45-
// retry so a transient Redis blip doesn't fail the run outright.
46-
const acquired = await retryTransient('cascade acquireLock', () =>
47-
acquireLock(key, ownerId, LOCK_TTL_SECONDS)
48-
)
43+
// NOT wrapped in retryTransient: acquireLock is a non-idempotent `SET NX`, so
44+
// a retry after a timed-out-but-applied first SET would see the key already
45+
// present and return false — a false `contended` that skips the cascade. A
46+
// transient Redis blip here just fails the run before pickup (no stranded
47+
// cell); the dispatcher/re-drive recovers it.
48+
const acquired = await acquireLock(key, ownerId, LOCK_TTL_SECONDS)
4949
if (!acquired) return { status: 'contended' }
5050

5151
const heartbeat = setInterval(() => {

apps/sim/lib/table/retry-transient.test.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ describe('isRetryableCellError', () => {
2727
expect(isRetryableCellError(new Error('Connection is closed'))).toBe(true)
2828
})
2929

30+
it('matches a Redis timeout wrapped in a cause chain', () => {
31+
expect(isRetryableCellError(new Error('outer wrapper', { cause: redisTimeout() }))).toBe(true)
32+
})
33+
34+
it('does not retry Redis connection-state programming errors', () => {
35+
expect(isRetryableCellError(new Error('Connection is in subscriber mode'))).toBe(false)
36+
})
37+
3038
it('does not retry application/logic errors', () => {
3139
expect(isRetryableCellError(new Error('row not found'))).toBe(false)
3240
})
@@ -80,4 +88,19 @@ describe('retryTransient', () => {
8088
expect(fn).toHaveBeenCalledTimes(1)
8189
expect(mockSleep).not.toHaveBeenCalled()
8290
})
91+
92+
it('stops retrying if the signal aborts during backoff sleep', async () => {
93+
const controller = new AbortController()
94+
// Cancellation fires mid-backoff, after the first failure but before the
95+
// next attempt would run.
96+
mockSleep.mockImplementationOnce(async () => {
97+
controller.abort()
98+
})
99+
const fn = vi.fn().mockRejectedValue(connError())
100+
await expect(retryTransient('t', fn, { signal: controller.signal })).rejects.toThrow(
101+
'Failed query'
102+
)
103+
expect(fn).toHaveBeenCalledTimes(1)
104+
expect(mockSleep).toHaveBeenCalledTimes(1)
105+
})
83106
})

apps/sim/lib/table/retry-transient.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,26 @@ const logger = createLogger('TableRetryTransient')
1111
* connection drop without risking duplicate side effects. */
1212
const DEFAULT_MAX_ATTEMPTS = 4
1313

14+
/** Redis-side equivalents of a dropped socket. Excludes connection-STATE
15+
* programming errors (e.g. "Connection is in subscriber mode"), which are
16+
* misconfigurations that would just fail identically on every retry. */
17+
const RETRYABLE_REDIS_MESSAGE = /Command timed out|Connection is closed|Stream isn't writeable/i
18+
1419
/**
1520
* ioredis surfaces command timeouts and severed connections as plain `Error`s
1621
* with no `code`/`errno`, so the SQLSTATE/errno-based
1722
* {@link isRetryableInfrastructureError} classifier misses them. Match those by
18-
* message instead — these are the Redis-side equivalents of a dropped socket.
23+
* message instead, walking the `.cause` chain (depth-bounded) so a wrapped
24+
* Redis failure is still recognized — mirroring how the infra classifier walks
25+
* causes.
1926
*/
2027
function isRetryableRedisError(error: unknown): boolean {
21-
return /Command timed out|Connection is closed|Stream isn't writeable|Connection is in subscriber mode/i.test(
22-
getErrorMessage(error)
23-
)
28+
let current: unknown = error
29+
for (let depth = 0; depth < 10 && current instanceof Error; depth++) {
30+
if (RETRYABLE_REDIS_MESSAGE.test(current.message)) return true
31+
current = current.cause
32+
}
33+
return false
2434
}
2535

2636
/**
@@ -68,6 +78,10 @@ export async function retryTransient<T>(
6878
{ error: getErrorMessage(error) }
6979
)
7080
await sleep(waitMs)
81+
// Re-check after backoff: if cancellation fired during the sleep, don't
82+
// run another attempt (and don't return a success from work that started
83+
// after the task was already cancelled).
84+
if (options.signal?.aborted) throw error
7185
}
7286
}
7387
}

0 commit comments

Comments
 (0)