Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [3.8.0] - 2026-04-25
### Added
- `pettyCache.throttle(key, { ttl }, fn)` — distributed throttle primitive. The first caller for a given key in a `ttl` window invokes the callback; subsequent calls within the window are no-ops. Errors thrown by the callback propagate to the caller. Async/await only.

## [3.7.0] - 2026-03-12
### Changed
- Added the ability for `pettyCache.bulkGet`, `pettyCache.bulkSet`, and `pettyCache.bulkFetch` functions to support callbacks and promises.
Expand Down
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ Provides a distributed lock (mutex) with the ability to retry a specified number
**Semaphore**
Provides a pool of distributed locks with the ability to release a slot back to the pool or remove the slot from the pool so that it's not used again.

**Throttle**
Provides a distributed throttle: the first caller for a given key in a TTL window invokes the callback; subsequent calls within the window are absorbed. Errors from the callback propagate to the caller.

## Getting Started

```javascript
Expand Down Expand Up @@ -517,3 +520,25 @@ pettyCache.semaphore.retrieveOrCreate('key', { size: 10 }, function(err) {
size: 1 || function() { const x = 1 + 1; callback(null, x); } // The number of locks to create in the semaphore's pool. Optionally, size can be a `callback(err, size)` function.
}
```

## Throttle

### pettyCache.throttle(key, options, fn)

Distributed throttle: coalesces calls for the same key across multiple processes via Redis so that `fn` runs at most once per `ttl` window. The first caller in a window wins the claim and `fn` runs to completion; subsequent calls within the window are no-ops. After the window expires, the next caller can claim again.

The returned Promise resolves only after `fn` has resolved (for the winning caller) or immediately (for absorbed callers). Errors thrown by `fn` propagate to the caller — useful for callers that need to know whether the work succeeded.

```javascript
await pettyCache.throttle('my-key', { ttl: 5 * 60 * 1000 }, async () => {
// First caller's work — runs immediately. Subsequent calls within
// the next 5 minutes are no-ops.
await doWork();
});
```

**Parameters**

- `key` (string) — Redis key. Callers compose their own naming convention.
- `options.ttl` (number) — throttle window in milliseconds. Required.
- `fn` (async function) — invoked once per window if this caller wins the claim. Awaited before the returned Promise resolves; errors propagate to the caller.
39 changes: 39 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,45 @@ function PettyCache() {
}
};

/**
* Distributed throttle: invokes `fn` only if no other call has claimed the same key
* within the last `ttl` milliseconds, coalescing calls across multiple processes via
* Redis. The first caller in a window wins the claim and `fn` runs to completion;
* subsequent calls within the window are no-ops (they return immediately without
* invoking `fn`). After the window's TTL expires, the next caller can claim again.
*
* The returned Promise resolves only after `fn` has resolved (for the winning caller)
* or immediately (for absorbed callers). Errors thrown by `fn` propagate to the
* caller — useful for callers that need to know whether the work succeeded so they
* can NACK upstream messages, etc.
*
* @param {string} key - The Redis key. Callers compose their own naming convention.
* @param {Object} options
* @param {number} options.ttl - Throttle window in milliseconds. Required.
* @param {Function} fn - Async function invoked once per window if this caller wins
* the claim. Awaited before the returned Promise resolves.
* @returns {Promise<void>}
*/
this.throttle = async (key, options, fn) => {
const ttl = options.ttl;

const won = await new Promise((resolve, reject) => {
redisClient.set(key, '1', 'NX', 'PX', ttl, (err, res) => {
if (err) {
return reject(err);
}

resolve(res === 'OK');
});
});

if (!won) {
return;
}

await fn();
};

// Semaphore functions need to be bound to the main PettyCache object
for (const method in this.semaphore) {
this.semaphore[method] = this.semaphore[method].bind(this);
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@
"test": "node --test --test-force-exit --test-reporter=spec",
"test:only": "node --test --test-force-exit --test-only --test-reporter=spec"
},
"version": "3.7.0"
"version": "3.8.0"
}
77 changes: 77 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2333,6 +2333,83 @@ test('petty-cache', { concurrency: true }, async (t) => {
});
});

t.test('PettyCache.throttle', { concurrency: false }, async (t) => {
t.test('first call invokes fn and waits for it to complete', async () => {
const key = `throttle-${Math.random()}`;
let invocations = 0;

await pettyCache.throttle(key, { ttl: 1000 }, async () => {
await timers.setTimeout(50);
invocations += 1;
});

assert.strictEqual(invocations, 1);
});

t.test('subsequent calls within the window are absorbed', async () => {
const key = `throttle-${Math.random()}`;
const calls = [];

await pettyCache.throttle(key, { ttl: 1000 }, async () => calls.push('a'));
await pettyCache.throttle(key, { ttl: 1000 }, async () => calls.push('b'));
await pettyCache.throttle(key, { ttl: 1000 }, async () => calls.push('c'));

assert.deepStrictEqual(calls, ['a']);
});

t.test('after the window expires, next call wins again', async () => {
const key = `throttle-${Math.random()}`;
const calls = [];

await pettyCache.throttle(key, { ttl: 100 }, async () => calls.push('a'));
await timers.setTimeout(200);
await pettyCache.throttle(key, { ttl: 100 }, async () => calls.push('b'));

assert.deepStrictEqual(calls, ['a', 'b']);
});

t.test('errors thrown by fn propagate to the caller', async () => {
const key = `throttle-${Math.random()}`;

await assert.rejects(
pettyCache.throttle(key, { ttl: 1000 }, async () => { throw new Error('boom'); }),
/boom/
);
});

t.test('different keys are independent', async () => {
const key1 = `throttle-${Math.random()}-1`;
const key2 = `throttle-${Math.random()}-2`;
const calls = [];

await pettyCache.throttle(key1, { ttl: 1000 }, async () => calls.push('1'));
await pettyCache.throttle(key2, { ttl: 1000 }, async () => calls.push('2'));

assert.deepStrictEqual(calls, ['1', '2']);
});

t.test('absorbed callers return immediately without waiting on fn', async () => {
const key = `throttle-${Math.random()}`;

// First caller wins; fn takes 200ms
const winner = pettyCache.throttle(key, { ttl: 1000 }, async () => {
await timers.setTimeout(200);
});

// Brief pause to ensure the SETNX lands
await timers.setTimeout(20);

// Second caller is absorbed; should resolve quickly
const start = Date.now();
await pettyCache.throttle(key, { ttl: 1000 }, async () => {
await timers.setTimeout(5000);
});
assert.ok(Date.now() - start < 100, 'absorbed caller did not return immediately');

await winner;
});
});

t.test('redisClient', { concurrency: true }, async (t) => {
t.test('redisClient.mget(falsy keys)', (t, done) => {
const key1 = Math.random().toString();
Expand Down