Skip to content

Commit d4f49b3

Browse files
feat(rate-limiter): make hosted-key queue waits abort-interruptible
Replace the plain capped sleeps in the queue-head and bucket-capacity wait loops with an interruptibleSleep that resolves early when the execution AbortSignal fires (timeout or cancellation), cleaning up its own timer and listener. Previously a cancelled/timed-out run could overshoot by up to the heartbeat cap (~10s) before the loop re-checked its budget; now it wakes within a tick. The cap remains for heartbeat renewal. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 71cb2c0 commit d4f49b3

2 files changed

Lines changed: 63 additions & 6 deletions

File tree

apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,36 @@ describe('HostedKeyRateLimiter', () => {
393393
expect(mockAdapter.consumeTokens).toHaveBeenCalledTimes(1)
394394
})
395395

396+
it('stops waiting promptly when the signal aborts mid-sleep', async () => {
397+
// Bucket reports a long refill, so the wait sleeps up to the heartbeat cap (10s).
398+
// Aborting mid-sleep must wake the wait within a tick, not after the full interval.
399+
const blocked: ConsumeResult = {
400+
allowed: false,
401+
tokensRemaining: 0,
402+
resetAt: new Date(Date.now() + 10_000),
403+
}
404+
mockAdapter.consumeTokens.mockResolvedValue(blocked)
405+
406+
const controller = new AbortController()
407+
const start = Date.now()
408+
const promise = rateLimiter.acquireKey(
409+
testProvider,
410+
envKeyPrefix,
411+
perRequestRateLimit,
412+
'workspace-1',
413+
controller.signal
414+
)
415+
// Let the first bucket check run and the sleep begin, then abort.
416+
await new Promise((resolve) => setTimeout(resolve, 20))
417+
controller.abort()
418+
const result = await promise
419+
420+
expect(result.success).toBe(false)
421+
expect(result.billingActorRateLimited).toBe(true)
422+
// Resolved well before the 10s capped sleep would otherwise have elapsed.
423+
expect(Date.now() - start).toBeLessThan(2000)
424+
})
425+
396426
it('keeps waiting past the no-signal fallback cap while the signal is live', async () => {
397427
// A live (non-aborted) signal means the run still has budget, so the wait must not
398428
// 429 at the 5-minute MAX_QUEUE_WAIT_MS fallback. The bucket frees up after ~7 min.

apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,28 @@ const MIN_QUEUE_RETRY_DELAY_MS = 50
5353
*/
5454
const QUEUE_HEAD_POLL_MS = 200
5555

56+
/**
57+
* Sleep for `ms`, resolving early if `signal` aborts. Cleans up its own timer and listener
58+
* so neither leaks. Callers don't need to distinguish an early (aborted) return from a normal
59+
* one — the surrounding wait loop re-checks its budget immediately after and bails when the
60+
* signal has fired. Falls back to a plain sleep when no signal is provided.
61+
*/
62+
function interruptibleSleep(ms: number, signal?: AbortSignal): Promise<void> {
63+
if (!signal) return sleep(ms)
64+
if (signal.aborted) return Promise.resolve()
65+
return new Promise<void>((resolve) => {
66+
const onAbort = () => {
67+
clearTimeout(timer)
68+
resolve()
69+
}
70+
const timer = setTimeout(() => {
71+
signal.removeEventListener('abort', onAbort)
72+
resolve()
73+
}, ms)
74+
signal.addEventListener('abort', onAbort, { once: true })
75+
})
76+
}
77+
5678
/**
5779
* Resolves env var names for a numbered key prefix using a `{PREFIX}_COUNT` env var.
5880
* E.g. with `EXA_API_KEY_COUNT=5`, returns `['EXA_API_KEY_1', ..., 'EXA_API_KEY_5']`.
@@ -416,21 +438,24 @@ export class HostedKeyRateLimiter {
416438
* is capped at {@link HEARTBEAT_REFRESH_INTERVAL_MS} so that no single wait can outlive the
417439
* heartbeat TTL — even when the bucket reports a long `retryAfterMs` (e.g. low-RPM
418440
* providers). Without this cap a multi-second sleep could let the heartbeat lapse, the
419-
* head get reaped as dead, and a second caller advance and race us for the bucket.
441+
* head get reaped as dead, and a second caller advance and race us for the bucket. The
442+
* sleep also resolves early if `signal` aborts, so a cancelled/timed-out run stops waiting
443+
* promptly rather than overshooting by up to the cap.
420444
*/
421445
private async heartbeatAwareSleep(
422446
provider: string,
423447
billingActorId: string,
424448
ticketId: string,
425449
desiredMs: number,
426-
waitState: WaitState
450+
waitState: WaitState,
451+
signal?: AbortSignal
427452
): Promise<void> {
428453
await this.maybeRefreshHeartbeat(provider, billingActorId, ticketId, waitState)
429454
const sleepMs = Math.min(
430455
Math.max(MIN_QUEUE_RETRY_DELAY_MS, desiredMs),
431456
HEARTBEAT_REFRESH_INTERVAL_MS
432457
)
433-
await sleep(sleepMs)
458+
await interruptibleSleep(sleepMs, signal)
434459
}
435460

436461
/**
@@ -463,7 +488,7 @@ export class HostedKeyRateLimiter {
463488
}
464489

465490
await this.maybeRefreshHeartbeat(provider, billingActorId, ticketId, waitState)
466-
await sleep(QUEUE_HEAD_POLL_MS)
491+
await interruptibleSleep(QUEUE_HEAD_POLL_MS, signal)
467492
}
468493
}
469494

@@ -507,7 +532,8 @@ export class HostedKeyRateLimiter {
507532
billingActorId,
508533
ticketId,
509534
result.retryAfterMs,
510-
waitState
535+
waitState,
536+
signal
511537
)
512538
}
513539
}
@@ -558,7 +584,8 @@ export class HostedKeyRateLimiter {
558584
billingActorId,
559585
ticketId,
560586
result.retryAfterMs,
561-
waitState
587+
waitState,
588+
signal
562589
)
563590
}
564591
}

0 commit comments

Comments
 (0)