From 837f24bd59be541d0a3bbf8e74cd00ec3ae8cfdd Mon Sep 17 00:00:00 2001 From: nerjs Date: Mon, 11 May 2026 18:37:59 +0200 Subject: [PATCH 1/3] Fix timekeeper races, metrics and propagate unrefTimeouts - runBatch metric on batch start (was rejectBatch) - set task.runnedAt on run - check signal.aborted before cache.set - include maxWaitingTimeMs in deduplicator timeout when limited - fix race in LimitedTimekeeper.clear() - split LimitedTimekeeper.runTask into dispatch + enqueue - register Deduplicator runner before starting it - propagate unrefTimeouts through all timers - validate BatchLoader options - BatchLoader accepts metrics; export metric interfaces - ITask fields readonly; minor cleanup, README sync --- README.md | 10 ++--- src/batch-aggregator/batch-aggregator.ts | 14 ++++--- src/batch-aggregator/interfaces.ts | 6 +++ src/batch-loader/batch-loader.ts | 52 +++++++++++++++++------- src/deduplicator/deduplicator.ts | 9 ++-- src/index.ts | 4 +- src/timekeeper/interfaces.ts | 9 ++-- src/timekeeper/limited.timekeeper.ts | 35 +++++++++++----- src/timekeeper/unlimited.timekeeper.ts | 11 +++-- src/utils/is.ts | 5 +-- src/utils/signals.ts | 0 11 files changed, 104 insertions(+), 51 deletions(-) delete mode 100644 src/utils/signals.ts diff --git a/README.md b/README.md index 447203e..9fe4339 100644 --- a/README.md +++ b/README.md @@ -251,7 +251,7 @@ Thrown when a task is manually aborted during execution, explicitly indicating t - **Request Grouping**: Groups requests into batches based on size or timeout. - **Concurrency Control**: Limits the number of tasks that can run in parallel (`concurrencyLimit`). -- **Timeout Handling**: Supports execution timeouts (`batchTimeout`) and waiting timeouts (`maxWaitingTimeMs`). +- **Timeout Handling**: Supports execution timeouts (`timeoutMs`) and waiting timeouts (`maxWaitingTimeMs`). --- @@ -263,7 +263,7 @@ interface IBatchAggregatorOptions { maxBatchSize: number; // Maximum number of requests per batch batchTimeMs: number; // Maximum time to form a batch maxWaitingTimeMs?: number; // Maximum waiting time for tasks in the queue (only if concurrencyLimit > 0) - batchTimeout?: number; // Maximum execution time for batchFn (the function passed as the first argument) + timeoutMs: number; // Maximum execution time for batchFn (the function passed as the first argument) } ``` @@ -284,7 +284,7 @@ const aggregator = new BatchAggregator( { maxBatchSize: 3, batchTimeMs: 100, - batchTimeout: 500, + timeoutMs: 500, } ) @@ -311,7 +311,7 @@ const aggregator = new BatchAggregator( { maxBatchSize: 2, batchTimeMs: 100, - batchTimeout: 500, + timeoutMs: 500, concurrencyLimit: 2, // Limit to 2 parallel tasks } ) @@ -340,7 +340,7 @@ const aggregator = new BatchAggregator( { maxBatchSize: 1, batchTimeMs: 100, - batchTimeout: 500, + timeoutMs: 500, concurrencyLimit: 1, maxWaitingTimeMs: 100, // Timeout for tasks in the queue } diff --git a/src/batch-aggregator/batch-aggregator.ts b/src/batch-aggregator/batch-aggregator.ts index 9f61cd6..69ce239 100644 --- a/src/batch-aggregator/batch-aggregator.ts +++ b/src/batch-aggregator/batch-aggregator.ts @@ -18,8 +18,8 @@ const createTimekeeperMetrics = (metrics?: IBatchAggregatorMetrics): ILimitedTim if (metrics.resolveBatch) tkMetrics.resolveTask = task => metrics.resolveBatch?.(task.data.requests.length) if (metrics.rejectBatch) tkMetrics.rejectTask = (_, task) => metrics.rejectBatch?.(task.data.requests.length) - if (metrics.parallelBatches) tkMetrics.runTask = runnedSize => metrics.parallelBatches?.(runnedSize) - if (metrics.waitingBatches) tkMetrics.waitTask = runnedSize => metrics.waitingBatches?.(runnedSize) + if (metrics.parallelBatches) tkMetrics.runTask = size => metrics.parallelBatches?.(size) + if (metrics.waitingBatches) tkMetrics.waitTask = size => metrics.waitingBatches?.(size) return tkMetrics } @@ -28,11 +28,11 @@ export class BatchAggregator { private readonly timekeeper: ITimekeeper> private readonly batchRunner = async (task: ITask>, signal: AbortSignal) => { - this.metrics?.rejectBatch?.(task.data.requests.length) + this.metrics?.runBatch?.(task.data.requests.length) debug(`Running batchRunner with a query array of length ${task.data.requests.length}. task id="${task.id}"`) const response = await this.batchLoaderFn([...task.data.requests], signal) if (!Array.isArray(response) || response.length !== task.data.requests.length) - throw new LoaderError(`The result of batchLoadFn must be an array equal in length to the query array `) + throw new LoaderError(`The result of batchLoadFn must be an array equal in length to the query array`) task.data.responses = response } @@ -42,7 +42,7 @@ export class BatchAggregator { private readonly options: IBatchAggregatorOptions, private readonly metrics?: IBatchAggregatorMetrics, ) { - const { concurrencyLimit, maxWaitingTimeMs, batchTimeMs: runMs, timeoutMs } = options + const { concurrencyLimit, maxWaitingTimeMs, batchTimeMs: runMs, timeoutMs, unrefTimeouts } = options const initialDataFactory = () => ({ requests: [], responses: [] }) this.timekeeper = concurrencyLimit && concurrencyLimit > 0 && concurrencyLimit < Infinity @@ -55,6 +55,7 @@ export class BatchAggregator { runner: this.batchRunner, timeoutMs, callRejectedTask: false, + unrefTimeouts, }, createTimekeeperMetrics(metrics), ) @@ -65,6 +66,7 @@ export class BatchAggregator { runner: this.batchRunner, timeoutMs, callRejectedTask: false, + unrefTimeouts, }, createTimekeeperMetrics(metrics), ) @@ -86,7 +88,7 @@ export class BatchAggregator { const task = this.getCurrentTask() const index = task.data.requests.length this.metrics?.loadBatchItem?.() - debug(`Load data. task id="${task.id}"; curent index="${index}"`) + debug(`Load data. task id="${task.id}"; current index="${index}"`) task.data.requests.push(request) await this.timekeeper.wait(task) diff --git a/src/batch-aggregator/interfaces.ts b/src/batch-aggregator/interfaces.ts index 9d10a12..c7f41b5 100644 --- a/src/batch-aggregator/interfaces.ts +++ b/src/batch-aggregator/interfaces.ts @@ -23,6 +23,12 @@ export interface IBatchAggregatorOptions { * @description Maximum execution time for batchFn (the function passed as the first argument) */ timeoutMs: number + + /** + * @description Allows timers to avoid blocking the event loop + * @default false + */ + unrefTimeouts?: boolean } export type BatchLoaderFn = (batchArray: T[], signal: AbortSignal) => Promise | R[] diff --git a/src/batch-loader/batch-loader.ts b/src/batch-loader/batch-loader.ts index b780138..9d096a8 100644 --- a/src/batch-loader/batch-loader.ts +++ b/src/batch-loader/batch-loader.ts @@ -1,20 +1,35 @@ import { BatchAggregator } from '../batch-aggregator/batch-aggregator' -import { BatchLoaderFn } from '../batch-aggregator/interfaces' +import { BatchLoaderFn, IBatchAggregatorMetrics } from '../batch-aggregator/interfaces' import { Deduplicator } from '../deduplicator/deduplicator' import { Key } from '../utils/interfaces' import { CacheAdapter } from './cache-adapter' import { IBatchLoaderOptions } from './interfaces' -const prepareOptions = (options: IBatchLoaderOptions) => ({ - getKey: (query: K) => `${query}`, - timeoutMs: 60_000, - unrefTimeouts: false, - concurrencyLimit: Infinity, - maxBatchSize: 1000, - batchTimeMs: 50, - maxWaitingTimeMs: 60_000, - ...options, -}) +const prepareOptions = (options: IBatchLoaderOptions) => { + const merged = { + getKey: (query: K) => `${query}`, + timeoutMs: 60_000, + unrefTimeouts: false, + concurrencyLimit: Infinity, + maxBatchSize: 1000, + batchTimeMs: 50, + maxWaitingTimeMs: 60_000, + ...options, + } + + if (!Number.isFinite(merged.timeoutMs) || merged.timeoutMs <= 0) + throw new RangeError(`timeoutMs must be a positive finite number, got ${merged.timeoutMs}`) + if (!Number.isFinite(merged.batchTimeMs) || merged.batchTimeMs < 0) + throw new RangeError(`batchTimeMs must be a non-negative finite number, got ${merged.batchTimeMs}`) + if (!Number.isInteger(merged.maxBatchSize) || merged.maxBatchSize < 1) + throw new RangeError(`maxBatchSize must be a positive integer, got ${merged.maxBatchSize}`) + if (merged.concurrencyLimit !== Infinity && (!Number.isInteger(merged.concurrencyLimit) || merged.concurrencyLimit < 1)) + throw new RangeError(`concurrencyLimit must be a positive integer or Infinity, got ${merged.concurrencyLimit}`) + if (!Number.isFinite(merged.maxWaitingTimeMs) || merged.maxWaitingTimeMs <= 0) + throw new RangeError(`maxWaitingTimeMs must be a positive finite number, got ${merged.maxWaitingTimeMs}`) + + return merged +} export class BatchLoader { private readonly cache: CacheAdapter @@ -22,18 +37,25 @@ export class BatchLoader { private readonly aggregator: BatchAggregator private readonly getKey: (query: K) => Key - constructor(batchLoaderFn: BatchLoaderFn, options: IBatchLoaderOptions) { + constructor(batchLoaderFn: BatchLoaderFn, options: IBatchLoaderOptions, metrics?: IBatchAggregatorMetrics) { const { cache, getKey, timeoutMs, unrefTimeouts, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs } = prepareOptions(options) + const limited = concurrencyLimit !== Infinity + const deduplicatorTimeoutMs = timeoutMs + batchTimeMs + (limited ? maxWaitingTimeMs : 0) + this.getKey = getKey this.cache = new CacheAdapter(cache) this.deduplicator = new Deduplicator(this.deduplicatorRunner, { getKey, - timeoutMs: timeoutMs + batchTimeMs, + timeoutMs: deduplicatorTimeoutMs, unrefTimeouts: !!unrefTimeouts, }) - this.aggregator = new BatchAggregator(batchLoaderFn, { timeoutMs, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs }) + this.aggregator = new BatchAggregator( + batchLoaderFn, + { timeoutMs, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs, unrefTimeouts: !!unrefTimeouts }, + metrics, + ) } private readonly deduplicatorRunner = async (query: K, signal: AbortSignal): Promise => { @@ -47,6 +69,8 @@ export class BatchLoader { const loaded = await this.aggregator.load(query) + if (signal.aborted) throw signal.reason + this.deduplicator.restartTimeout(query) await this.cache.set(key, loaded) diff --git a/src/deduplicator/deduplicator.ts b/src/deduplicator/deduplicator.ts index b1e1d17..de09a89 100644 --- a/src/deduplicator/deduplicator.ts +++ b/src/deduplicator/deduplicator.ts @@ -47,7 +47,9 @@ export class Deduplicator { } private createTimeout(key: Key): NodeJS.Timeout { - return setTimeout(() => this.callError(key, new TimeoutError(this.options.timeoutMs)), this.options.timeoutMs) + const tid = setTimeout(() => this.callError(key, new TimeoutError(this.options.timeoutMs)), this.options.timeoutMs) + if (this.options.unrefTimeouts) tid?.unref?.() + return tid } private createRunner(key: Key, query: T): Defer { @@ -56,7 +58,8 @@ export class Deduplicator { const controller = new AbortController() const tid = this.createTimeout(key) - if (this.options.unrefTimeouts) tid?.unref?.() + + this.runners.set(key, { defer, controller, tid }) this.run(query, controller.signal) .then(result => defer.resolve(result)) @@ -68,8 +71,6 @@ export class Deduplicator { }) .finally(() => this.clearRunner(key)) - this.runners.set(key, { defer, controller, tid }) - return defer } diff --git a/src/index.ts b/src/index.ts index 1a95577..c377008 100644 --- a/src/index.ts +++ b/src/index.ts @@ -7,7 +7,7 @@ export { CacheAdapter, MapCache } from './batch-loader/cache-adapter' export { DeduplicatorRunnerCallback, IDeduplicatorOptions } from './deduplicator/interfaces' export { Deduplicator } from './deduplicator/deduplicator' -export { BatchLoaderFn, IBatchAggregatorOptions } from './batch-aggregator/interfaces' +export { BatchLoaderFn, IBatchAggregatorOptions, IBatchAggregatorMetrics } from './batch-aggregator/interfaces' export { BatchAggregator } from './batch-aggregator/batch-aggregator' export { @@ -18,6 +18,8 @@ export { UnlimitedTimekeeperOptions, TaskStatus, TimekeeperRunnerCallback, + IUnlimitedTimekeeperMetrics, + ILimitedTimekeeperMetrics, } from './timekeeper/interfaces' export { UnlimitedTimekeeper } from './timekeeper/unlimited.timekeeper' export { LimitedTimekeeper } from './timekeeper/limited.timekeeper' diff --git a/src/timekeeper/interfaces.ts b/src/timekeeper/interfaces.ts index 30c3af8..be0cf27 100644 --- a/src/timekeeper/interfaces.ts +++ b/src/timekeeper/interfaces.ts @@ -2,10 +2,10 @@ export type TaskStatus = 'pending' | 'runned' | 'resolved' | 'rejected' export interface ITask { readonly id: string - status: TaskStatus - data: D - runnedAt: number | null - createdAt: number + readonly status: TaskStatus + readonly data: D + readonly runnedAt: number | null + readonly createdAt: number } export type TimekeeperRunnerCallback = (task: ITask, signal: AbortSignal) => Promise | void @@ -25,6 +25,7 @@ export interface UnlimitedTimekeeperOptions { runner: TimekeeperRunnerCallback timeoutMs: number callRejectedTask?: boolean + unrefTimeouts?: boolean } export interface LimitedOptions { diff --git a/src/timekeeper/limited.timekeeper.ts b/src/timekeeper/limited.timekeeper.ts index f8e857f..1e8ebfd 100644 --- a/src/timekeeper/limited.timekeeper.ts +++ b/src/timekeeper/limited.timekeeper.ts @@ -22,26 +22,31 @@ export class LimitedTimekeeper extends UnlimitedTimekeeper): void { - if (this.runnedTasks.size < this.limitedOptions.concurrencyLimit) { - super.runTask(task) - task.defer.promise.finally(() => this.runNextWaitingTask()).catch(() => {}) + if (this.runnedTasks.size >= this.limitedOptions.concurrencyLimit) { + this.enqueueTask(task) return } + super.runTask(task) + task.defer.promise.finally(() => this.runNextWaitingTask()).catch(() => {}) + } + + private enqueueTask(task: Task): void { const runnedTime = Date.now() - task.tid = setTimeout(() => { + const tid = setTimeout(() => { debug( `A task on the waiting list is waiting longer than it should. id="${task.id}"; time="${Date.now() - runnedTime}"; maxWaitingTimeMs="${this.limitedOptions.maxWaitingTimeMs}"`, ) this.abort(task.id, new TimeoutError(this.limitedOptions.maxWaitingTimeMs)) - }, this.limitedOptions.maxWaitingTimeMs)?.unref() + }, this.limitedOptions.maxWaitingTimeMs) + if (this.options.unrefTimeouts) tid?.unref?.() + task.tid = tid this.waitingTasks.push(task) this.metrics?.waitTask?.(this.waitingTasks.length) debug(`The task has been added to the waiting list. id="${task.id}"`) @@ -61,7 +66,15 @@ export class LimitedTimekeeper extends UnlimitedTimekeeper this.abort(task.inner, new SilentAbortError('timekeeper'))) + waiting.forEach(task => { + if (task.tid) clearTimeout(task.tid) + const error = new SilentAbortError('timekeeper') + this.metrics?.rejectTask?.(error, task.inner) + this.callAbortedRunner(task, error) + debug(`The task was rejected. id="${task.id}"`) + }) } } diff --git a/src/timekeeper/unlimited.timekeeper.ts b/src/timekeeper/unlimited.timekeeper.ts index fcfdb0e..95ae4bd 100644 --- a/src/timekeeper/unlimited.timekeeper.ts +++ b/src/timekeeper/unlimited.timekeeper.ts @@ -72,10 +72,12 @@ export class UnlimitedTimekeeper { + const tid = setTimeout(() => { debug(`The current task is started by a timer. id=${this.currentTask?.id}`) this.runCurrentTask() - }, this.options.runMs)?.unref() + }, this.options.runMs) + if (this.options.unrefTimeouts) tid?.unref?.() + this.tidRunner = tid } private clearRunnerTimeout() { @@ -103,8 +105,11 @@ export class UnlimitedTimekeeper) { task.status = 'runned' + task.runnedAt = Date.now() task.controller = task.controller || new AbortController() - task.tid = setTimeout(() => this.abort(task.id, new TimeoutError(this.options.timeoutMs)), this.options.timeoutMs)?.unref() + const taskTid = setTimeout(() => this.abort(task.id, new TimeoutError(this.options.timeoutMs)), this.options.timeoutMs) + if (this.options.unrefTimeouts) taskTid?.unref?.() + task.tid = taskTid this.runnedTasks.set(task.id, task) this.metrics?.runTask?.(this.runnedTasks.size, task.inner) diff --git a/src/utils/is.ts b/src/utils/is.ts index 0d820ac..c20aeae 100644 --- a/src/utils/is.ts +++ b/src/utils/is.ts @@ -1,5 +1,4 @@ import { LoaderError } from './errors' -const isObject = (value: any): value is Record => value && typeof value === 'object' -export const isError = (value: any): value is Error => isObject(value) && value instanceof Error -export const isLoaderError = (value: any): value is LoaderError => isError(value) && value instanceof LoaderError +export const isError = (value: any): value is Error => value instanceof Error +export const isLoaderError = (value: any): value is LoaderError => value instanceof LoaderError diff --git a/src/utils/signals.ts b/src/utils/signals.ts deleted file mode 100644 index e69de29..0000000 From 1648420024991eebf1b0e5585cd66c472b9d2a3c Mon Sep 17 00:00:00 2001 From: nerjs Date: Mon, 11 May 2026 18:52:02 +0200 Subject: [PATCH 2/3] Bump @types/node and ts-jest, tidy tsconfig - @types/node 25.0.10 -> 25.6.2 - ts-jest 29.4.6 -> 29.4.9 - tsconfig: moduleResolution "Node" -> "node10", drop unused baseUrl typescript stays on ^5.9.3: ts-jest is not yet compatible with TS 6. --- package-lock.json | 43 +++++++++++++++++++------------------------ package.json | 4 ++-- tsconfig.json | 7 +++---- 3 files changed, 24 insertions(+), 30 deletions(-) diff --git a/package-lock.json b/package-lock.json index 93f3990..c730f57 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,14 +17,14 @@ "@types/debug": "^4.1.13", "@types/eslint__js": "^9.14.0", "@types/jest": "^30.0.0", - "@types/node": "^25.0.10", + "@types/node": "^25.6.2", "eslint": "^10.1.0", "eslint-config-prettier": "^10.1.8", "eslint-plugin-prettier": "^5.5.5", "jest": "^30.3.0", "jest-watch-typeahead": "^3.0.1", "prettier": "^3.8.1", - "ts-jest": "^29.4.6", + "ts-jest": "^29.4.9", "typescript": "^5.9.3", "typescript-eslint": "^8.58.0" } @@ -1579,13 +1579,12 @@ "dev": true }, "node_modules/@types/node": { - "version": "25.0.10", - "resolved": "https://registry.npmjs.org/@types/node/-/node-25.0.10.tgz", - "integrity": "sha512-zWW5KPngR/yvakJgGOmZ5vTBemDoSqF3AcV/LrO5u5wTWyEAVVh+IT39G4gtyAkh3CtTZs8aX/yRM82OfzHJRg==", + "version": "25.6.2", + "resolved": "https://registry.npmjs.org/@types/node/-/node-25.6.2.tgz", + "integrity": "sha512-sokuT28dxf9JT5Kady1fsXOvI4HVpjZa95NKT5y9PNTIrs2AsobR4GFAA90ZG8M+nxVRLysCXsVj6eGC7Vbrlw==", "dev": true, - "license": "MIT", "dependencies": { - "undici-types": "~7.16.0" + "undici-types": "~7.19.0" } }, "node_modules/@types/stack-utils": { @@ -5484,19 +5483,18 @@ } }, "node_modules/ts-jest": { - "version": "29.4.6", - "resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.4.6.tgz", - "integrity": "sha512-fSpWtOO/1AjSNQguk43hb/JCo16oJDnMJf3CdEGNkqsEX3t0KX96xvyX1D7PfLCpVoKu4MfVrqUkFyblYoY4lA==", + "version": "29.4.9", + "resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.4.9.tgz", + "integrity": "sha512-LTb9496gYPMCqjeDLdPrKuXtncudeV1yRZnF4Wo5l3SFi0RYEnYRNgMrFIdg+FHvfzjCyQk1cLncWVqiSX+EvQ==", "dev": true, - "license": "MIT", "dependencies": { "bs-logger": "^0.2.6", "fast-json-stable-stringify": "^2.1.0", - "handlebars": "^4.7.8", + "handlebars": "^4.7.9", "json5": "^2.2.3", "lodash.memoize": "^4.1.2", "make-error": "^1.3.6", - "semver": "^7.7.3", + "semver": "^7.7.4", "type-fest": "^4.41.0", "yargs-parser": "^21.1.1" }, @@ -5513,7 +5511,7 @@ "babel-jest": "^29.0.0 || ^30.0.0", "jest": "^29.0.0 || ^30.0.0", "jest-util": "^29.0.0 || ^30.0.0", - "typescript": ">=4.3 <6" + "typescript": ">=4.3 <7" }, "peerDependenciesMeta": { "@babel/core": { @@ -5537,11 +5535,10 @@ } }, "node_modules/ts-jest/node_modules/semver": { - "version": "7.7.3", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.3.tgz", - "integrity": "sha512-SdsKMrI9TdgjdweUSR9MweHA4EJ8YxHn8DFaDisvhVlUOe4BF1tLD7GAj0lIqWVl+dPb/rExr0Btby5loQm20Q==", + "version": "7.8.0", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.8.0.tgz", + "integrity": "sha512-AcM7dV/5ul4EekoQ29Agm5vri8JNqRyj39o0qpX6vDF2GZrtutZl5RwgD1XnZjiTAfncsJhMI48QQH3sN87YNA==", "dev": true, - "license": "ISC", "bin": { "semver": "bin/semver.js" }, @@ -5609,7 +5606,6 @@ "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, - "license": "Apache-2.0", "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -5657,11 +5653,10 @@ } }, "node_modules/undici-types": { - "version": "7.16.0", - "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.16.0.tgz", - "integrity": "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw==", - "dev": true, - "license": "MIT" + "version": "7.19.2", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.19.2.tgz", + "integrity": "sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg==", + "dev": true }, "node_modules/unrs-resolver": { "version": "1.11.1", diff --git a/package.json b/package.json index 4d573a7..97fd491 100644 --- a/package.json +++ b/package.json @@ -31,14 +31,14 @@ "@types/debug": "^4.1.13", "@types/eslint__js": "^9.14.0", "@types/jest": "^30.0.0", - "@types/node": "^25.0.10", + "@types/node": "^25.6.2", "eslint": "^10.1.0", "eslint-config-prettier": "^10.1.8", "eslint-plugin-prettier": "^5.5.5", "jest": "^30.3.0", "jest-watch-typeahead": "^3.0.1", "prettier": "^3.8.1", - "ts-jest": "^29.4.6", + "ts-jest": "^29.4.9", "typescript": "^5.9.3", "typescript-eslint": "^8.58.0" }, diff --git a/tsconfig.json b/tsconfig.json index b243a8e..a540070 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,15 +1,14 @@ { - "compilerOptions": { + "compilerOptions": { "module": "commonjs", "types": ["node", "jest"], - "moduleResolution": "Node", + "moduleResolution": "node10", "declaration": true, "removeComments": false, "allowSyntheticDefaultImports": true, "target": "ES2023", "sourceMap": true, "outDir": "./dist", - "baseUrl": "./src", "incremental": false, "skipLibCheck": true, "strictNullChecks": true, @@ -20,7 +19,7 @@ "strict": true, "resolveJsonModule": true, "useDefineForClassFields": false, - "noEmitOnError": true, + "noEmitOnError": true }, "include": ["src"] } \ No newline at end of file From fa25229b24fc9e886827c51a366fced40dd46d91 Mon Sep 17 00:00:00 2001 From: nerjs Date: Mon, 11 May 2026 21:38:11 +0200 Subject: [PATCH 3/3] Add input validation and improve timer handling --- src/batch-aggregator/batch-aggregator.ts | 19 ++++++++++--- src/batch-aggregator/interfaces.ts | 4 ++- src/batch-loader/batch-loader.ts | 34 +++++++++--------------- src/batch-loader/interfaces.ts | 10 ++++++- src/deduplicator/deduplicator.ts | 8 +++--- src/index.ts | 2 +- src/timekeeper/interfaces.ts | 8 ++++-- src/timekeeper/limited.timekeeper.ts | 34 +++++++++++++----------- src/timekeeper/unlimited.timekeeper.ts | 21 ++++++++------- src/utils/timer.ts | 10 +++++++ tsconfig.json | 2 +- 11 files changed, 94 insertions(+), 58 deletions(-) create mode 100644 src/utils/timer.ts diff --git a/src/batch-aggregator/batch-aggregator.ts b/src/batch-aggregator/batch-aggregator.ts index 69ce239..a71905d 100644 --- a/src/batch-aggregator/batch-aggregator.ts +++ b/src/batch-aggregator/batch-aggregator.ts @@ -2,10 +2,22 @@ import { ILimitedTimekeeperMetrics, ITask, ITimekeeper } from '../timekeeper/int import { LimitedTimekeeper } from '../timekeeper/limited.timekeeper' import { UnlimitedTimekeeper } from '../timekeeper/unlimited.timekeeper' import createDebug from 'debug' -import { BatchLoaderFn, IBatchAggregatorMetrics, IBatchAggregatorOptions } from './interfaces' +import { BatchLoaderFn, DEFAULT_MAX_WAITING_TIME_MS, IBatchAggregatorMetrics, IBatchAggregatorOptions } from './interfaces' import { LoaderError } from '../utils/errors' const debug = createDebug('batchloader:aggregator') +export const validateAggregatorOptions = (o: IBatchAggregatorOptions) => { + if (!Number.isFinite(o.timeoutMs) || o.timeoutMs <= 0) throw new RangeError(`timeoutMs must be > 0`) + if (!Number.isFinite(o.batchTimeMs) || o.batchTimeMs < 0) throw new RangeError(`batchTimeMs must be >= 0`) + if (!Number.isInteger(o.maxBatchSize) || o.maxBatchSize < 1) throw new RangeError(`maxBatchSize must be a positive integer`) + if (o.concurrencyLimit !== undefined && o.concurrencyLimit !== Infinity) { + if (!Number.isInteger(o.concurrencyLimit) || o.concurrencyLimit < 1) + throw new RangeError(`concurrencyLimit must be a positive integer or Infinity`) + if (o.maxWaitingTimeMs !== undefined && (!Number.isFinite(o.maxWaitingTimeMs) || o.maxWaitingTimeMs <= 0)) + throw new RangeError(`maxWaitingTimeMs must be > 0`) + } +} + interface TaskData { requests: T[] responses: R[] @@ -42,15 +54,16 @@ export class BatchAggregator { private readonly options: IBatchAggregatorOptions, private readonly metrics?: IBatchAggregatorMetrics, ) { + validateAggregatorOptions(options) const { concurrencyLimit, maxWaitingTimeMs, batchTimeMs: runMs, timeoutMs, unrefTimeouts } = options const initialDataFactory = () => ({ requests: [], responses: [] }) this.timekeeper = - concurrencyLimit && concurrencyLimit > 0 && concurrencyLimit < Infinity + concurrencyLimit !== undefined && concurrencyLimit !== Infinity ? new LimitedTimekeeper( { concurrencyLimit, initialDataFactory, - maxWaitingTimeMs: maxWaitingTimeMs || 60_000, + maxWaitingTimeMs: maxWaitingTimeMs ?? DEFAULT_MAX_WAITING_TIME_MS, runMs, runner: this.batchRunner, timeoutMs, diff --git a/src/batch-aggregator/interfaces.ts b/src/batch-aggregator/interfaces.ts index c7f41b5..567434b 100644 --- a/src/batch-aggregator/interfaces.ts +++ b/src/batch-aggregator/interfaces.ts @@ -1,3 +1,5 @@ +export const DEFAULT_MAX_WAITING_TIME_MS = 60_000 + export interface IBatchAggregatorOptions { /** * @description Maximum number of parallel tasks (default: unlimited) @@ -26,7 +28,7 @@ export interface IBatchAggregatorOptions { /** * @description Allows timers to avoid blocking the event loop - * @default false + * @default true */ unrefTimeouts?: boolean } diff --git a/src/batch-loader/batch-loader.ts b/src/batch-loader/batch-loader.ts index 9d096a8..9691b04 100644 --- a/src/batch-loader/batch-loader.ts +++ b/src/batch-loader/batch-loader.ts @@ -1,33 +1,25 @@ -import { BatchAggregator } from '../batch-aggregator/batch-aggregator' -import { BatchLoaderFn, IBatchAggregatorMetrics } from '../batch-aggregator/interfaces' +import { BatchAggregator, validateAggregatorOptions } from '../batch-aggregator/batch-aggregator' +import { BatchLoaderFn, DEFAULT_MAX_WAITING_TIME_MS } from '../batch-aggregator/interfaces' import { Deduplicator } from '../deduplicator/deduplicator' import { Key } from '../utils/interfaces' import { CacheAdapter } from './cache-adapter' import { IBatchLoaderOptions } from './interfaces' +// Node's setTimeout silently caps delays here and warns - clamp upstream to avoid TimeoutOverflowWarning + firing in 1ms. +const TIMEOUT_MAX = 2_147_483_647 + const prepareOptions = (options: IBatchLoaderOptions) => { const merged = { getKey: (query: K) => `${query}`, timeoutMs: 60_000, - unrefTimeouts: false, + unrefTimeouts: true, concurrencyLimit: Infinity, maxBatchSize: 1000, batchTimeMs: 50, - maxWaitingTimeMs: 60_000, + maxWaitingTimeMs: DEFAULT_MAX_WAITING_TIME_MS, ...options, } - - if (!Number.isFinite(merged.timeoutMs) || merged.timeoutMs <= 0) - throw new RangeError(`timeoutMs must be a positive finite number, got ${merged.timeoutMs}`) - if (!Number.isFinite(merged.batchTimeMs) || merged.batchTimeMs < 0) - throw new RangeError(`batchTimeMs must be a non-negative finite number, got ${merged.batchTimeMs}`) - if (!Number.isInteger(merged.maxBatchSize) || merged.maxBatchSize < 1) - throw new RangeError(`maxBatchSize must be a positive integer, got ${merged.maxBatchSize}`) - if (merged.concurrencyLimit !== Infinity && (!Number.isInteger(merged.concurrencyLimit) || merged.concurrencyLimit < 1)) - throw new RangeError(`concurrencyLimit must be a positive integer or Infinity, got ${merged.concurrencyLimit}`) - if (!Number.isFinite(merged.maxWaitingTimeMs) || merged.maxWaitingTimeMs <= 0) - throw new RangeError(`maxWaitingTimeMs must be a positive finite number, got ${merged.maxWaitingTimeMs}`) - + validateAggregatorOptions(merged) return merged } @@ -37,23 +29,23 @@ export class BatchLoader { private readonly aggregator: BatchAggregator private readonly getKey: (query: K) => Key - constructor(batchLoaderFn: BatchLoaderFn, options: IBatchLoaderOptions, metrics?: IBatchAggregatorMetrics) { - const { cache, getKey, timeoutMs, unrefTimeouts, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs } = + constructor(batchLoaderFn: BatchLoaderFn, options: IBatchLoaderOptions) { + const { cache, getKey, timeoutMs, unrefTimeouts, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs, metrics } = prepareOptions(options) const limited = concurrencyLimit !== Infinity - const deduplicatorTimeoutMs = timeoutMs + batchTimeMs + (limited ? maxWaitingTimeMs : 0) + const deduplicatorTimeoutMs = Math.min(timeoutMs + batchTimeMs + (limited ? maxWaitingTimeMs : 0), TIMEOUT_MAX) this.getKey = getKey this.cache = new CacheAdapter(cache) this.deduplicator = new Deduplicator(this.deduplicatorRunner, { getKey, timeoutMs: deduplicatorTimeoutMs, - unrefTimeouts: !!unrefTimeouts, + unrefTimeouts, }) this.aggregator = new BatchAggregator( batchLoaderFn, - { timeoutMs, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs, unrefTimeouts: !!unrefTimeouts }, + { timeoutMs, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs, unrefTimeouts }, metrics, ) } diff --git a/src/batch-loader/interfaces.ts b/src/batch-loader/interfaces.ts index e7f2449..1cc4c43 100644 --- a/src/batch-loader/interfaces.ts +++ b/src/batch-loader/interfaces.ts @@ -1,3 +1,4 @@ +import { IBatchAggregatorMetrics } from '../batch-aggregator/interfaces' import { Key } from '../utils/interfaces' export interface ICache { @@ -7,6 +8,8 @@ export interface ICache { clear(): Promise } +export interface IBatchLoaderMetrics extends IBatchAggregatorMetrics {} + export interface IBatchLoaderOptions { /** * @description Function to extract the key from a query @@ -24,7 +27,7 @@ export interface IBatchLoaderOptions { /** * @description Allows timers to avoid blocking the event loop - * @default false + * @default true */ unrefTimeouts?: boolean @@ -51,4 +54,9 @@ export interface IBatchLoaderOptions { * @default 60_000 */ maxWaitingTimeMs?: number + + /** + * @description Optional metrics hooks. Forwarded to the internal BatchAggregator. + */ + metrics?: IBatchLoaderMetrics } diff --git a/src/deduplicator/deduplicator.ts b/src/deduplicator/deduplicator.ts index de09a89..0217ca5 100644 --- a/src/deduplicator/deduplicator.ts +++ b/src/deduplicator/deduplicator.ts @@ -1,6 +1,7 @@ import { Defer } from '../utils/defer' import createDebug from 'debug' import { RejectedAbortError, SilentAbortError, TimeoutError } from '../utils/errors' +import { unrefTimer } from '../utils/timer' import { DeduplicatorRunnerCallback, IDeduplicatorOptions } from './interfaces' import { Key } from '../utils/interfaces' const debug = createDebug('batchloader:deduplicator') @@ -47,9 +48,10 @@ export class Deduplicator { } private createTimeout(key: Key): NodeJS.Timeout { - const tid = setTimeout(() => this.callError(key, new TimeoutError(this.options.timeoutMs)), this.options.timeoutMs) - if (this.options.unrefTimeouts) tid?.unref?.() - return tid + return unrefTimer( + setTimeout(() => this.callError(key, new TimeoutError(this.options.timeoutMs)), this.options.timeoutMs), + this.options.unrefTimeouts, + ) } private createRunner(key: Key, query: T): Defer { diff --git a/src/index.ts b/src/index.ts index c377008..24bf85f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,6 @@ export { Key } from './utils/interfaces' -export { IBatchLoaderOptions, ICache } from './batch-loader/interfaces' +export { IBatchLoaderOptions, IBatchLoaderMetrics, ICache } from './batch-loader/interfaces' export { BatchLoader } from './batch-loader/batch-loader' export { CacheAdapter, MapCache } from './batch-loader/cache-adapter' diff --git a/src/timekeeper/interfaces.ts b/src/timekeeper/interfaces.ts index be0cf27..35cbe36 100644 --- a/src/timekeeper/interfaces.ts +++ b/src/timekeeper/interfaces.ts @@ -25,6 +25,10 @@ export interface UnlimitedTimekeeperOptions { runner: TimekeeperRunnerCallback timeoutMs: number callRejectedTask?: boolean + /** + * @description Allows timers to avoid blocking the event loop + * @default true + */ unrefTimeouts?: boolean } @@ -39,10 +43,10 @@ export interface IUnlimitedTimekeeperMetrics { create?: () => void forcedRun?: () => void abort?: (task: ITask, error: unknown) => void - runTask?: (runnedSize: number, task: ITask) => void + runTask?: (size: number, task: ITask) => void resolveTask?: (task: ITask) => void rejectTask?: (error: unknown, task: ITask) => void } export interface ILimitedTimekeeperMetrics extends IUnlimitedTimekeeperMetrics { - waitTask?: (waitListSize: number) => void + waitTask?: (size: number) => void } diff --git a/src/timekeeper/limited.timekeeper.ts b/src/timekeeper/limited.timekeeper.ts index 1e8ebfd..5ebe98a 100644 --- a/src/timekeeper/limited.timekeeper.ts +++ b/src/timekeeper/limited.timekeeper.ts @@ -1,4 +1,5 @@ import { SilentAbortError, TimeoutError } from '../utils/errors' +import { unrefTimer } from '../utils/timer' import { ILimitedTimekeeperMetrics, ITask, ITimekeeper, LimitedOptions, LimitedTimekeeperOptions } from './interfaces' import { Task } from './task' import { UnlimitedTimekeeper } from './unlimited.timekeeper' @@ -38,15 +39,16 @@ export class LimitedTimekeeper extends UnlimitedTimekeeper): void { - const runnedTime = Date.now() - const tid = setTimeout(() => { - debug( - `A task on the waiting list is waiting longer than it should. id="${task.id}"; time="${Date.now() - runnedTime}"; maxWaitingTimeMs="${this.limitedOptions.maxWaitingTimeMs}"`, - ) - this.abort(task.id, new TimeoutError(this.limitedOptions.maxWaitingTimeMs)) - }, this.limitedOptions.maxWaitingTimeMs) - if (this.options.unrefTimeouts) tid?.unref?.() - task.tid = tid + const enqueuedAt = Date.now() + task.tid = unrefTimer( + setTimeout(() => { + debug( + `A task on the waiting list is waiting longer than it should. id="${task.id}"; time="${Date.now() - enqueuedAt}"; maxWaitingTimeMs="${this.limitedOptions.maxWaitingTimeMs}"`, + ) + this.abort(task.id, new TimeoutError(this.limitedOptions.maxWaitingTimeMs)) + }, this.limitedOptions.maxWaitingTimeMs), + this.options.unrefTimeouts, + ) this.waitingTasks.push(task) this.metrics?.waitTask?.(this.waitingTasks.length) debug(`The task has been added to the waiting list. id="${task.id}"`) @@ -59,6 +61,11 @@ export class LimitedTimekeeper extends UnlimitedTimekeeper, error: unknown): void { if (this.currentTask?.id === task.id) return super.rejectPendingTask(task, error) this.waitingTasks = this.waitingTasks.filter(({ id }) => id !== task.id) + this.rejectWaitingTask(task, error) + } + + private rejectWaitingTask(task: Task, error: unknown): void { + if (task.status !== 'pending') return if (task.tid) clearTimeout(task.tid) this.metrics?.rejectTask?.(error, task.inner) this.callAbortedRunner(task, error) @@ -69,12 +76,7 @@ export class LimitedTimekeeper extends UnlimitedTimekeeper { - if (task.tid) clearTimeout(task.tid) - const error = new SilentAbortError('timekeeper') - this.metrics?.rejectTask?.(error, task.inner) - this.callAbortedRunner(task, error) - debug(`The task was rejected. id="${task.id}"`) - }) + const error = new SilentAbortError('timekeeper') + waiting.forEach(task => this.rejectWaitingTask(task, error)) } } diff --git a/src/timekeeper/unlimited.timekeeper.ts b/src/timekeeper/unlimited.timekeeper.ts index 95ae4bd..504fabd 100644 --- a/src/timekeeper/unlimited.timekeeper.ts +++ b/src/timekeeper/unlimited.timekeeper.ts @@ -1,5 +1,6 @@ import { AbortError, SilentAbortError, TimeoutError } from '../utils/errors' import { isLoaderError } from '../utils/is' +import { unrefTimer } from '../utils/timer' import { ITask, ITimekeeper, IUnlimitedTimekeeperMetrics, UnlimitedTimekeeperOptions } from './interfaces' import { Task } from './task' import createDebug from 'debug' @@ -72,12 +73,13 @@ export class UnlimitedTimekeeper { - debug(`The current task is started by a timer. id=${this.currentTask?.id}`) - this.runCurrentTask() - }, this.options.runMs) - if (this.options.unrefTimeouts) tid?.unref?.() - this.tidRunner = tid + this.tidRunner = unrefTimer( + setTimeout(() => { + debug(`The current task is started by a timer. id=${this.currentTask?.id}`) + this.runCurrentTask() + }, this.options.runMs), + this.options.unrefTimeouts, + ) } private clearRunnerTimeout() { @@ -107,9 +109,10 @@ export class UnlimitedTimekeeper this.abort(task.id, new TimeoutError(this.options.timeoutMs)), this.options.timeoutMs) - if (this.options.unrefTimeouts) taskTid?.unref?.() - task.tid = taskTid + task.tid = unrefTimer( + setTimeout(() => this.abort(task.id, new TimeoutError(this.options.timeoutMs)), this.options.timeoutMs), + this.options.unrefTimeouts, + ) this.runnedTasks.set(task.id, task) this.metrics?.runTask?.(this.runnedTasks.size, task.inner) diff --git a/src/utils/timer.ts b/src/utils/timer.ts new file mode 100644 index 0000000..cd70a22 --- /dev/null +++ b/src/utils/timer.ts @@ -0,0 +1,10 @@ +/** + * Conditionally unrefs the timer. When `unrefTimeouts` is left `undefined`, + * the default is to unref - matching the historical behaviour of this library + * where pending timers never blocked process exit. Pass `false` explicitly to + * keep the event loop alive while timers are pending. + */ +export const unrefTimer = (tid: NodeJS.Timeout, unrefTimeouts: boolean | undefined): NodeJS.Timeout => { + if (unrefTimeouts !== false) tid?.unref?.() + return tid +} diff --git a/tsconfig.json b/tsconfig.json index a540070..c44b64f 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -22,4 +22,4 @@ "noEmitOnError": true }, "include": ["src"] - } \ No newline at end of file + }