Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [3.8.0] - 2026-04-25
### Added
- `pettyCache.debounce(key, { wait }, fn)` — distributed debounce primitive. The first caller for a given key in a quiet window invokes `fn`; subsequent calls within the window are no-ops. State is cleared on callback throw so the next caller can retry immediately. Async/await only.

## [3.7.0] - 2026-03-12
### Changed
- Added the ability for `pettyCache.bulkGet`, `pettyCache.bulkSet`, and `pettyCache.bulkFetch` functions to support callbacks and promises.
Expand Down
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ Functions executed on cache misses are wrapped in double-checked locking (http:/
**Mutex**
Provides a distributed lock (mutex) with the ability to retry a specified number of times after a specified interval of time when acquiring a lock.

**Debounce**
Provides a distributed debounce primitive: the first call within a quiet window invokes the callback; subsequent calls during the window are absorbed. State is cleared on callback throw so the next caller can retry immediately.

**Semaphore**
Provides a pool of distributed locks with the ability to release a slot back to the pool or remove the slot from the pool so that it's not used again.

Expand Down Expand Up @@ -385,6 +388,29 @@ pettyCache.mutex.unlock('key', function(err) {
await pettyCache.mutex.unlock('key');
```

## Debounce

### pettyCache.debounce(key, options, fn)

Distributed debounce: invokes `fn` once `wait` ms have elapsed since the last call for the given key, coalescing across multiple processes via Redis. Each call resets the wait window — `fn` does not run until calls stop arriving for `wait` ms.

The returned Promise resolves immediately after the call is recorded — `fn` runs detached from the caller.

```javascript
// Three rapid calls; fn runs once, ~5 seconds after the third call
await pettyCache.debounce('my-key', { wait: 5000 }, async () => doWork());
await pettyCache.debounce('my-key', { wait: 5000 }, async () => doWork());
await pettyCache.debounce('my-key', { wait: 5000 }, async () => doWork());
```

**Parameters**

- `key` (string) — Redis key. Callers compose their own naming convention.
- `options.wait` (number) — quiet-period required before `fn` fires, in ms.
- `fn` (async function) — invoked once after `wait` ms of no further calls. Runs detached from the returned Promise.

**Note on durability:** the deferred execution is held in-process via `setTimeout`. If the most-recent caller's process dies before its timer fires, the work is lost. Callers needing crash-resilient deferred execution should layer a periodic safety-net job or use a durable scheduler outside this primitive.

## Semaphore

Provides a pool of distributed locks. Once a consumer acquires a lock they have the ability to release the lock back to the pool or mark the lock as "consumed" so that it's not used again.
Expand Down
103 changes: 102 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
const crypto = require('node:crypto');
const timers = require('node:timers/promises');

const async = require('async');
const lock = require('lock').Lock();
const memoryCache = require('memory-cache');
Expand Down Expand Up @@ -334,6 +337,104 @@ function PettyCache() {
}
};

/**
* Distributed debounce: invokes `fn` once `wait` ms have elapsed since the last
* call for the given key, coalescing across multiple processes via Redis. Each call
* resets the wait window — `fn` does not run until calls stop arriving for `wait` ms.
*
* Mechanic: each call writes a unique id to the key (no TTL) and schedules a
* local setTimeout for `wait` ms. When the timeout fires, the call acquires a
* distributed mutex on the key, reads it, and only invokes `fn` if the id still
* matches its own — meaning no later call superseded it. The mutex is held for the
* duration of `fn` so concurrent fires are serialized.
*
* The returned Promise resolves immediately after the id is written — `fn` runs
* detached from the caller. Errors thrown by `fn` are re-thrown on `process.nextTick`
* so they surface via `unhandledRejection` / APM.
*
* Note on durability: the deferred execution is held in-process via `setTimeout`. If
* the most-recent caller's process dies before its timer fires, the work is lost.
* Callers that need crash-resilient deferred execution should layer a periodic
* safety-net job or use a durable scheduler outside this primitive.
*
* @param {string} key - The Redis key. Callers compose their own naming convention.
* @param {Object} options
* @param {number} options.wait - Quiet-period required before `fn` fires, in ms.
* @param {Function} fn - Async function invoked once after `wait` ms of no further
* calls. Runs detached from the returned Promise.
* @returns {Promise<void>} - Resolves immediately after the id is written.
*/
this.debounce = async (key, options, fn) => {
const wait = Object.hasOwn(options, 'wait') ? options.wait : 5000;
const id = crypto.randomUUID();
const mutexKey = `mutex:${key}`;
const ttl = wait * 2;

// Write our id with a TTL of 2× wait. SET overwrites any prior value, resetting
// the debounce timer for any in-flight setTimeouts that will see a non-matching
// id on read. The TTL prevents abandoned keys from accumulating in Redis when a
// process dies between the SET and the setTimeout firing.
await new Promise((resolve, reject) => {
redisClient.set(key, id, 'PX', ttl, (err) => {
if (err) {
return reject(err);
}

resolve();
});
});

// Detached fire-after-wait — caller's Promise resolves once the id is written above
(async () => {
await timers.setTimeout(wait);

// Retry generously — when many calls fire near-simultaneously, the mutex is
// held briefly by each in turn (just GET+DEL+fn). Waiting it out is correct;
// immediate failure would silently lose our chance to fire even though we
// may be the most recent id. If we still can't acquire after retries, treat
// it as "another fire is in progress" and skip silently.
try {
const interval = Math.max(Math.floor(wait / 100), 1);
const times = Math.ceil(ttl / interval);

await this.mutex.lock(mutexKey, { retry: { interval, times }, ttl });
} catch {
return;
}

try {
const currentId = await new Promise((resolve, reject) => {
redisClient.get(key, (err, val) => {
if (err) {
return reject(err);
}

resolve(val);
});
});

// A later call superseded ours — its detached fire will handle it
if (currentId !== id) {
return;
}

await new Promise((resolve, reject) => {
redisClient.del(key, (err) => {
if (err) {
return reject(err);
}

resolve();
});
});

await fn();
} finally {
await this.mutex.unlock(mutexKey);
}
})().catch(err => process.nextTick(() => { throw err; }));
};

/**
* Deletes a key from both the memory cache and Redis. Supports both callback and promise styles.
* @param {string} key - The cache key to delete.
Expand Down Expand Up @@ -363,7 +464,7 @@ function PettyCache() {

/**
* Returns data from cache if available; otherwise executes func, stores the result, and returns it.
* Uses double-checked locking to prevent cache stampedes. Supports async and callback func signatures.
* Uses double-checked locking to prevent cache idedes. Supports async and callback func signatures.
* @param {string} key - The cache key.
* @param {Function} func - Called on cache miss. Use func(callback) for callbacks or async func() for promises.
* @param {Object} [options] - Optional settings.
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@
"test": "node --test --test-force-exit --test-reporter=spec",
"test:only": "node --test --test-force-exit --test-only --test-reporter=spec"
},
"version": "3.7.0"
"version": "3.8.0"
}
97 changes: 97 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,103 @@ test('petty-cache', { concurrency: true }, async (t) => {
});
});

t.test('PettyCache.debounce', { concurrency: false }, async (t) => {
t.test('fn runs after wait ms of quiet — not before', async () => {
const key = `debounce-${Math.random()}`;
let invocations = 0;

await pettyCache.debounce(key, { wait: 200 }, async () => { invocations += 1; });

// Has not fired yet
assert.strictEqual(invocations, 0);

// Halfway through the wait — still has not fired
await timers.setTimeout(100);
assert.strictEqual(invocations, 0);

// Has fired after wait elapses
await timers.setTimeout(200);
assert.strictEqual(invocations, 1);
});

t.test('each call resets the timer — fn fires only after the LAST call quiets', async () => {
const key = `debounce-${Math.random()}`;
let invocations = 0;
const fn = async () => { invocations += 1; };

// t=0: first call schedules fire for t=200
await pettyCache.debounce(key, { wait: 200 }, fn);
await timers.setTimeout(100);

// t=100: second call resets the schedule — new fire at t=300
await pettyCache.debounce(key, { wait: 200 }, fn);
await timers.setTimeout(100);

// t=200: first call's setTimeout fires; sees superseded stamp; does nothing
assert.strictEqual(invocations, 0, 'first call should have been superseded by second');

// t=200: third call resets again — new fire at t=400
await pettyCache.debounce(key, { wait: 200 }, fn);
await timers.setTimeout(150);

// t=350: second call's setTimeout fired at t=300, sees superseded; did nothing
assert.strictEqual(invocations, 0, 'second call should have been superseded by third');

// t=450: third call's setTimeout fires at t=400, sees its own stamp, fires fn
await timers.setTimeout(100);
assert.strictEqual(invocations, 1, 'fn should have fired exactly once after the last call quieted');
});

t.test('coalesces three rapid calls into one fn invocation', async () => {
const key = `debounce-${Math.random()}`;
let invocations = 0;
const fn = async () => { invocations += 1; };

await pettyCache.debounce(key, { wait: 200 }, fn);
await pettyCache.debounce(key, { wait: 200 }, fn);
await pettyCache.debounce(key, { wait: 200 }, fn);

// Wait for the last timer to fire (allow extra time for mutex contention)
await timers.setTimeout(500);
assert.strictEqual(invocations, 1);
});

t.test('after fn fires, a new debounce on the same key works again', async () => {
const key = `debounce-${Math.random()}`;
let invocations = 0;

await pettyCache.debounce(key, { wait: 100 }, async () => { invocations += 1; });
await timers.setTimeout(300);

await pettyCache.debounce(key, { wait: 100 }, async () => { invocations += 1; });
await timers.setTimeout(300);

assert.strictEqual(invocations, 2);
});

t.test('different keys are independent', async () => {
const key1 = `debounce-${Math.random()}-1`;
const key2 = `debounce-${Math.random()}-2`;
const calls = [];

await pettyCache.debounce(key1, { wait: 100 }, async () => calls.push('1'));
await pettyCache.debounce(key2, { wait: 100 }, async () => calls.push('2'));

await timers.setTimeout(200);

assert.deepStrictEqual(calls.sort(), ['1', '2']);
});

t.test('debounce returns immediately — caller is not blocked by wait', async () => {
const key = `debounce-${Math.random()}`;
const startedAt = Date.now();

await pettyCache.debounce(key, { wait: 5000 }, async () => {});

assert.ok(Date.now() - startedAt < 200, 'debounce blocked the caller');
});
});

t.test('PettyCache.fetch', { concurrency: true }, async (t) => {
t.test('PettyCache.fetch', (t, done) => {
const key = Math.random().toString();
Expand Down
Loading