Skip to content

Commit b0e0d94

Browse files
committed
improvement(redis): strip idempotency body and cap mothership stream zsets
1 parent 4efe999 commit b0e0d94

3 files changed

Lines changed: 36 additions & 4 deletions

File tree

apps/sim/lib/copilot/request/session/buffer.test.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ describe('mothership-stream-outbox', () => {
149149
expect(replayed.map((entry) => entry.payload.text)).toEqual(['world'])
150150
})
151151

152-
it('does not trim active stream history while appending events', async () => {
152+
it('trims active stream history to eventLimit on every append', async () => {
153153
const cursor = await allocateCursor('stream-1')
154154

155155
await appendEvent(
@@ -163,7 +163,11 @@ describe('mothership-stream-outbox', () => {
163163
})
164164
)
165165

166-
expect(mockRedis.zremrangebyrank).not.toHaveBeenCalled()
166+
expect(mockRedis.zremrangebyrank).toHaveBeenCalledWith(
167+
'mothership_stream:stream-1:events',
168+
0,
169+
expect.any(Number)
170+
)
167171
})
168172

169173
it('clears persisted stream state during teardown cleanup', async () => {

apps/sim/lib/copilot/request/session/buffer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ export async function appendEvents(
144144
zaddArgs.push(envelope.seq, JSON.stringify(envelope))
145145
}
146146
pipeline.zadd(key, ...(zaddArgs as [number, string, ...Array<number | string>]))
147+
pipeline.zremrangebyrank(key, 0, -config.eventLimit - 1)
147148
pipeline.expire(key, config.ttlSeconds)
148149
pipeline.set(seqKey, String(envelopes[envelopes.length - 1].seq), 'EX', config.ttlSeconds)
149150
await pipeline.exec()

apps/sim/lib/core/idempotency/service.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,16 @@ export interface IdempotencyConfig {
1616
namespace?: string
1717
/** When true, failed keys are deleted rather than stored so the operation is retried on the next attempt. */
1818
retryFailures?: boolean
19+
/**
20+
* When false, the operation's return value is not persisted alongside
21+
* the dedupe marker — only `{ success, status, error? }` is stored.
22+
* Duplicate calls still short-circuit, but `executeWithIdempotency`
23+
* resolves to `undefined` on the dedupe path. Use for webhook/polling
24+
* flows where the cached body is large (multi-KB execution results)
25+
* and callers don't consume the value of a duplicated delivery.
26+
* Defaults to true.
27+
*/
28+
storeResultBody?: boolean
1929
/**
2030
* Force a specific storage backend regardless of the environment's
2131
* auto-detection. Use `'database'` for correctness-critical flows
@@ -77,6 +87,7 @@ export class IdempotencyService {
7787
ttlSeconds: config.ttlSeconds ?? DEFAULT_TTL,
7888
namespace: config.namespace ?? 'default',
7989
retryFailures: config.retryFailures ?? false,
90+
storeResultBody: config.storeResultBody ?? true,
8091
}
8192
this.storageMethod = config.forceStorage ?? getStorageMethod()
8293
logger.info(`IdempotencyService using ${this.storageMethod} storage`, {
@@ -441,7 +452,9 @@ export class IdempotencyService {
441452

442453
await this.storeResult(
443454
claimResult.normalizedKey,
444-
{ success: true, result, status: 'completed' },
455+
this.config.storeResultBody
456+
? { success: true, result, status: 'completed' }
457+
: { success: true, status: 'completed' },
445458
claimResult.storageMethod
446459
)
447460

@@ -510,15 +523,29 @@ export class IdempotencyService {
510523
}
511524
}
512525

526+
/**
527+
* Webhook idempotency. We're the receiver of provider-initiated webhooks,
528+
* not the originator — duplicate deliveries from the provider's retry
529+
* machinery just need a "we saw this" marker, not a replayable response
530+
* body. `storeResultBody: false` drops the cached workflow result from
531+
* each key, eliminating the long tail of large gmail/outlook payloads
532+
* that pushed Redis Cloud into OOM on 2026-05-15.
533+
*
534+
* TTL stays at 7 days because that's the longest provider retry window
535+
* we care about (Gmail / Pub/Sub). With body-stripping the per-key cost
536+
* is ~150 bytes, so the long TTL is essentially free.
537+
*/
513538
export const webhookIdempotency = new IdempotencyService({
514539
namespace: 'webhook',
515-
ttlSeconds: 60 * 60 * 24 * 7, // 7 days
540+
ttlSeconds: 60 * 60 * 24 * 7, // 7 days — must exceed Gmail/Pub-Sub retry window
541+
storeResultBody: false,
516542
})
517543

518544
export const pollingIdempotency = new IdempotencyService({
519545
namespace: 'polling',
520546
ttlSeconds: 60 * 60 * 24 * 3, // 3 days
521547
retryFailures: true,
548+
storeResultBody: false,
522549
})
523550

524551
/**

0 commit comments

Comments
 (0)