Skip to content

Commit fb311b5

Browse files
fix(rate-limiter): hosted-key queue follow-ups from #4756 review
1 parent 92fd17c commit fb311b5

4 files changed

Lines changed: 36 additions & 8 deletions

File tree

apps/sim/background/workflow-column-execution.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ export async function executeWorkflowGroupCellJob(
7070
...currentPayload,
7171
groupId: next.id,
7272
workflowId: next.workflowId,
73+
// Re-derive from the target group rather than inheriting the prior group's
74+
// value via the spread: a workflow group following an enrichment group would
75+
// otherwise carry a stale enrichmentId.
76+
enrichmentId: next.enrichmentId,
7377
executionId: generateId(),
7478
}
7579
}

apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,18 @@ function interruptibleSleep(ms: number, signal?: AbortSignal): Promise<void> {
6565
return new Promise<void>((resolve) => {
6666
const onAbort = () => {
6767
clearTimeout(timer)
68+
signal.removeEventListener('abort', onAbort)
6869
resolve()
6970
}
7071
const timer = setTimeout(() => {
7172
signal.removeEventListener('abort', onAbort)
7273
resolve()
7374
}, ms)
7475
signal.addEventListener('abort', onAbort, { once: true })
76+
// Re-check after registering: if the signal fired between the guard above and
77+
// addEventListener, the 'abort' event already dispatched and our listener would
78+
// never run, leaving the sleep to run its full duration. onAbort is idempotent.
79+
if (signal.aborted) onAbort()
7580
})
7681
}
7782

apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,24 +170,32 @@ describe('HostedKeyQueue', () => {
170170
})
171171

172172
describe('refreshHeartbeat', () => {
173-
it('writes the heartbeat key with TTL', async () => {
174-
mockRedis.set.mockResolvedValueOnce('OK')
173+
it('writes the heartbeat key with TTL and re-extends the queue list TTL', async () => {
174+
mockRedis.pipeline.exec.mockResolvedValueOnce([
175+
[null, 'OK'],
176+
[null, 1],
177+
])
175178

176179
await queue.refreshHeartbeat(provider, workspaceId, ticketId)
177180

178-
expect(mockRedis.set).toHaveBeenCalledWith(
181+
expect(mockRedis.pipeline.set).toHaveBeenCalledWith(
179182
'hosted-queue-tkt:exa:workspace-1:ticket-1',
180183
'1',
181184
'EX',
182185
expect.any(Number)
183186
)
187+
expect(mockRedis.pipeline.expire).toHaveBeenCalledWith(
188+
'hosted-queue:exa:workspace-1',
189+
expect.any(Number)
190+
)
191+
expect(mockRedis.pipeline.exec).toHaveBeenCalledTimes(1)
184192
})
185193

186194
it('is a no-op when Redis is unavailable', async () => {
187195
redisConfigMockFns.mockGetRedisClient.mockReturnValueOnce(null)
188196

189197
await expect(queue.refreshHeartbeat(provider, workspaceId, ticketId)).resolves.toBeUndefined()
190-
expect(mockRedis.set).not.toHaveBeenCalled()
198+
expect(mockRedis.multi).not.toHaveBeenCalled()
191199
})
192200
})
193201

apps/sim/lib/core/rate-limiter/hosted-key/queue.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@ const TICKET_HEARTBEAT_TTL_SECONDS = 30
1515
export const HEARTBEAT_REFRESH_INTERVAL_MS = 10_000
1616

1717
/**
18-
* TTL on the queue list itself. Set on every enqueue. Prevents abandoned queues
19-
* (whole workspace went silent) from sticking around forever in Redis.
18+
* TTL on the queue list itself. Set on every enqueue and re-extended by the head's
19+
* heartbeat while it actively waits, so a long-waiting head (whose budget can exceed
20+
* this TTL for enterprise async runs) never lets the list expire out from under the
21+
* waiters behind it. Prevents abandoned queues (whole workspace went silent) from
22+
* sticking around forever in Redis.
2023
*/
2124
const QUEUE_LIST_TTL_SECONDS = 600
2225

@@ -148,7 +151,11 @@ export class HostedKeyQueue {
148151

149152
/**
150153
* Refresh the ticket's heartbeat. Called periodically by the head while it's
151-
* waiting on the bucket so it doesn't get reaped as dead.
154+
* waiting on the bucket so it doesn't get reaped as dead. Also re-extends the
155+
* queue list TTL: a head whose wait outlives {@link QUEUE_LIST_TTL_SECONDS}
156+
* (possible for long enterprise async budgets) would otherwise let the list
157+
* expire with no new enqueue to refresh it, dropping every waiter to "missing"
158+
* and collapsing FIFO ordering into concurrent bucket racing.
152159
*/
153160
async refreshHeartbeat(
154161
provider: string,
@@ -158,9 +165,13 @@ export class HostedKeyQueue {
158165
const redis = getRedisClient()
159166
if (!redis) return
160167

168+
const listKey = queueListKey(provider, billingActorId)
161169
const hbKey = heartbeatKey(provider, billingActorId, ticketId)
162170
try {
163-
await redis.set(hbKey, '1', 'EX', TICKET_HEARTBEAT_TTL_SECONDS)
171+
const pipeline = redis.multi()
172+
pipeline.set(hbKey, '1', 'EX', TICKET_HEARTBEAT_TTL_SECONDS)
173+
pipeline.expire(listKey, QUEUE_LIST_TTL_SECONDS)
174+
await pipeline.exec()
164175
} catch (error) {
165176
logger.warn(`Queue heartbeat refresh failed for ${hbKey}`, {
166177
error: toError(error).message,

0 commit comments

Comments
 (0)