diff --git a/CHANGELOG.md b/CHANGELOG.md index a82ff55..c8f3587 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [3.8.0] - 2026-04-25 +### Added +- `pettyCache.throttle(key, { ttl }, fn)` — distributed throttle primitive. The first caller for a given key in a `ttl` window invokes the callback; subsequent calls within the window are no-ops. Errors thrown by the callback propagate to the caller. Async/await only. + ## [3.7.0] - 2026-03-12 ### Changed - Added the ability for `pettyCache.bulkGet`, `pettyCache.bulkSet`, and `pettyCache.bulkFetch` functions to support callbacks and promises. diff --git a/README.md b/README.md index 57d35b7..eb7ec57 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,9 @@ Provides a distributed lock (mutex) with the ability to retry a specified number **Semaphore** Provides a pool of distributed locks with the ability to release a slot back to the pool or remove the slot from the pool so that it's not used again. +**Throttle** +Provides a distributed throttle: the first caller for a given key in a TTL window invokes the callback; subsequent calls within the window are absorbed. Errors from the callback propagate to the caller. + ## Getting Started ```javascript @@ -517,3 +520,25 @@ pettyCache.semaphore.retrieveOrCreate('key', { size: 10 }, function(err) { size: 1 || function() { const x = 1 + 1; callback(null, x); } // The number of locks to create in the semaphore's pool. Optionally, size can be a `callback(err, size)` function. } ``` + +## Throttle + +### pettyCache.throttle(key, options, fn) + +Distributed throttle: coalesces calls for the same key across multiple processes via Redis so that `fn` runs at most once per `ttl` window. The first caller in a window wins the claim and `fn` runs to completion; subsequent calls within the window are no-ops. After the window expires, the next caller can claim again. + +The returned Promise resolves only after `fn` has resolved (for the winning caller) or immediately (for absorbed callers). Errors thrown by `fn` propagate to the caller — useful for callers that need to know whether the work succeeded. + +```javascript +await pettyCache.throttle('my-key', { ttl: 5 * 60 * 1000 }, async () => { + // First caller's work — runs immediately. Subsequent calls within + // the next 5 minutes are no-ops. + await doWork(); +}); +``` + +**Parameters** + +- `key` (string) — Redis key. Callers compose their own naming convention. +- `options.ttl` (number) — throttle window in milliseconds. Required. +- `fn` (async function) — invoked once per window if this caller wins the claim. Awaited before the returned Promise resolves; errors propagate to the caller. diff --git a/index.js b/index.js index 1b0f81c..2f4c25e 100644 --- a/index.js +++ b/index.js @@ -1087,6 +1087,45 @@ function PettyCache() { } }; + /** + * Distributed throttle: invokes `fn` only if no other call has claimed the same key + * within the last `ttl` milliseconds, coalescing calls across multiple processes via + * Redis. The first caller in a window wins the claim and `fn` runs to completion; + * subsequent calls within the window are no-ops (they return immediately without + * invoking `fn`). After the window's TTL expires, the next caller can claim again. + * + * The returned Promise resolves only after `fn` has resolved (for the winning caller) + * or immediately (for absorbed callers). Errors thrown by `fn` propagate to the + * caller — useful for callers that need to know whether the work succeeded so they + * can NACK upstream messages, etc. + * + * @param {string} key - The Redis key. Callers compose their own naming convention. + * @param {Object} options + * @param {number} options.ttl - Throttle window in milliseconds. Required. + * @param {Function} fn - Async function invoked once per window if this caller wins + * the claim. Awaited before the returned Promise resolves. + * @returns {Promise} + */ + this.throttle = async (key, options, fn) => { + const ttl = options.ttl; + + const won = await new Promise((resolve, reject) => { + redisClient.set(key, '1', 'NX', 'PX', ttl, (err, res) => { + if (err) { + return reject(err); + } + + resolve(res === 'OK'); + }); + }); + + if (!won) { + return; + } + + await fn(); + }; + // Semaphore functions need to be bound to the main PettyCache object for (const method in this.semaphore) { this.semaphore[method] = this.semaphore[method].bind(this); diff --git a/package.json b/package.json index a98eedf..2fcd1dd 100644 --- a/package.json +++ b/package.json @@ -30,5 +30,5 @@ "test": "node --test --test-force-exit --test-reporter=spec", "test:only": "node --test --test-force-exit --test-only --test-reporter=spec" }, - "version": "3.7.0" + "version": "3.8.0" } diff --git a/test/index.js b/test/index.js index b4f8c7f..bd1722f 100644 --- a/test/index.js +++ b/test/index.js @@ -2333,6 +2333,83 @@ test('petty-cache', { concurrency: true }, async (t) => { }); }); + t.test('PettyCache.throttle', { concurrency: false }, async (t) => { + t.test('first call invokes fn and waits for it to complete', async () => { + const key = `throttle-${Math.random()}`; + let invocations = 0; + + await pettyCache.throttle(key, { ttl: 1000 }, async () => { + await timers.setTimeout(50); + invocations += 1; + }); + + assert.strictEqual(invocations, 1); + }); + + t.test('subsequent calls within the window are absorbed', async () => { + const key = `throttle-${Math.random()}`; + const calls = []; + + await pettyCache.throttle(key, { ttl: 1000 }, async () => calls.push('a')); + await pettyCache.throttle(key, { ttl: 1000 }, async () => calls.push('b')); + await pettyCache.throttle(key, { ttl: 1000 }, async () => calls.push('c')); + + assert.deepStrictEqual(calls, ['a']); + }); + + t.test('after the window expires, next call wins again', async () => { + const key = `throttle-${Math.random()}`; + const calls = []; + + await pettyCache.throttle(key, { ttl: 100 }, async () => calls.push('a')); + await timers.setTimeout(200); + await pettyCache.throttle(key, { ttl: 100 }, async () => calls.push('b')); + + assert.deepStrictEqual(calls, ['a', 'b']); + }); + + t.test('errors thrown by fn propagate to the caller', async () => { + const key = `throttle-${Math.random()}`; + + await assert.rejects( + pettyCache.throttle(key, { ttl: 1000 }, async () => { throw new Error('boom'); }), + /boom/ + ); + }); + + t.test('different keys are independent', async () => { + const key1 = `throttle-${Math.random()}-1`; + const key2 = `throttle-${Math.random()}-2`; + const calls = []; + + await pettyCache.throttle(key1, { ttl: 1000 }, async () => calls.push('1')); + await pettyCache.throttle(key2, { ttl: 1000 }, async () => calls.push('2')); + + assert.deepStrictEqual(calls, ['1', '2']); + }); + + t.test('absorbed callers return immediately without waiting on fn', async () => { + const key = `throttle-${Math.random()}`; + + // First caller wins; fn takes 200ms + const winner = pettyCache.throttle(key, { ttl: 1000 }, async () => { + await timers.setTimeout(200); + }); + + // Brief pause to ensure the SETNX lands + await timers.setTimeout(20); + + // Second caller is absorbed; should resolve quickly + const start = Date.now(); + await pettyCache.throttle(key, { ttl: 1000 }, async () => { + await timers.setTimeout(5000); + }); + assert.ok(Date.now() - start < 100, 'absorbed caller did not return immediately'); + + await winner; + }); + }); + t.test('redisClient', { concurrency: true }, async (t) => { t.test('redisClient.mget(falsy keys)', (t, done) => { const key1 = Math.random().toString();