Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ Thrown when a task is manually aborted during execution, explicitly indicating t

- **Request Grouping**: Groups requests into batches based on size or timeout.
- **Concurrency Control**: Limits the number of tasks that can run in parallel (`concurrencyLimit`).
- **Timeout Handling**: Supports execution timeouts (`batchTimeout`) and waiting timeouts (`maxWaitingTimeMs`).
- **Timeout Handling**: Supports execution timeouts (`timeoutMs`) and waiting timeouts (`maxWaitingTimeMs`).

---

Expand All @@ -263,7 +263,7 @@ interface IBatchAggregatorOptions {
maxBatchSize: number; // Maximum number of requests per batch
batchTimeMs: number; // Maximum time to form a batch
maxWaitingTimeMs?: number; // Maximum waiting time for tasks in the queue (only if concurrencyLimit > 0)
batchTimeout?: number; // Maximum execution time for batchFn (the function passed as the first argument)
timeoutMs: number; // Maximum execution time for batchFn (the function passed as the first argument)
}
```

Expand All @@ -284,7 +284,7 @@ const aggregator = new BatchAggregator<number, number>(
{
maxBatchSize: 3,
batchTimeMs: 100,
batchTimeout: 500,
timeoutMs: 500,
}
)

Expand All @@ -311,7 +311,7 @@ const aggregator = new BatchAggregator<number, number>(
{
maxBatchSize: 2,
batchTimeMs: 100,
batchTimeout: 500,
timeoutMs: 500,
concurrencyLimit: 2, // Limit to 2 parallel tasks
}
)
Expand Down Expand Up @@ -340,7 +340,7 @@ const aggregator = new BatchAggregator<number, number>(
{
maxBatchSize: 1,
batchTimeMs: 100,
batchTimeout: 500,
timeoutMs: 500,
concurrencyLimit: 1,
maxWaitingTimeMs: 100, // Timeout for tasks in the queue
}
Expand Down
43 changes: 19 additions & 24 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
"@types/debug": "^4.1.13",
"@types/eslint__js": "^9.14.0",
"@types/jest": "^30.0.0",
"@types/node": "^25.0.10",
"@types/node": "^25.6.2",
"eslint": "^10.1.0",
"eslint-config-prettier": "^10.1.8",
"eslint-plugin-prettier": "^5.5.5",
"jest": "^30.3.0",
"jest-watch-typeahead": "^3.0.1",
"prettier": "^3.8.1",
"ts-jest": "^29.4.6",
"ts-jest": "^29.4.9",
"typescript": "^5.9.3",
"typescript-eslint": "^8.58.0"
},
Expand Down
33 changes: 24 additions & 9 deletions src/batch-aggregator/batch-aggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,22 @@ import { ILimitedTimekeeperMetrics, ITask, ITimekeeper } from '../timekeeper/int
import { LimitedTimekeeper } from '../timekeeper/limited.timekeeper'
import { UnlimitedTimekeeper } from '../timekeeper/unlimited.timekeeper'
import createDebug from 'debug'
import { BatchLoaderFn, IBatchAggregatorMetrics, IBatchAggregatorOptions } from './interfaces'
import { BatchLoaderFn, DEFAULT_MAX_WAITING_TIME_MS, IBatchAggregatorMetrics, IBatchAggregatorOptions } from './interfaces'
import { LoaderError } from '../utils/errors'
const debug = createDebug('batchloader:aggregator')

export const validateAggregatorOptions = (o: IBatchAggregatorOptions) => {
if (!Number.isFinite(o.timeoutMs) || o.timeoutMs <= 0) throw new RangeError(`timeoutMs must be > 0`)
if (!Number.isFinite(o.batchTimeMs) || o.batchTimeMs < 0) throw new RangeError(`batchTimeMs must be >= 0`)
if (!Number.isInteger(o.maxBatchSize) || o.maxBatchSize < 1) throw new RangeError(`maxBatchSize must be a positive integer`)
if (o.concurrencyLimit !== undefined && o.concurrencyLimit !== Infinity) {
if (!Number.isInteger(o.concurrencyLimit) || o.concurrencyLimit < 1)
throw new RangeError(`concurrencyLimit must be a positive integer or Infinity`)
if (o.maxWaitingTimeMs !== undefined && (!Number.isFinite(o.maxWaitingTimeMs) || o.maxWaitingTimeMs <= 0))
throw new RangeError(`maxWaitingTimeMs must be > 0`)
}
}

interface TaskData<T, R> {
requests: T[]
responses: R[]
Expand All @@ -18,8 +30,8 @@ const createTimekeeperMetrics = (metrics?: IBatchAggregatorMetrics): ILimitedTim

if (metrics.resolveBatch) tkMetrics.resolveTask = task => metrics.resolveBatch?.(task.data.requests.length)
if (metrics.rejectBatch) tkMetrics.rejectTask = (_, task) => metrics.rejectBatch?.(task.data.requests.length)
if (metrics.parallelBatches) tkMetrics.runTask = runnedSize => metrics.parallelBatches?.(runnedSize)
if (metrics.waitingBatches) tkMetrics.waitTask = runnedSize => metrics.waitingBatches?.(runnedSize)
if (metrics.parallelBatches) tkMetrics.runTask = size => metrics.parallelBatches?.(size)
if (metrics.waitingBatches) tkMetrics.waitTask = size => metrics.waitingBatches?.(size)

return tkMetrics
}
Expand All @@ -28,11 +40,11 @@ export class BatchAggregator<T, R> {
private readonly timekeeper: ITimekeeper<TaskData<T, R>>

private readonly batchRunner = async (task: ITask<TaskData<T, R>>, signal: AbortSignal) => {
this.metrics?.rejectBatch?.(task.data.requests.length)
this.metrics?.runBatch?.(task.data.requests.length)
debug(`Running batchRunner with a query array of length ${task.data.requests.length}. task id="${task.id}"`)
const response = await this.batchLoaderFn([...task.data.requests], signal)
if (!Array.isArray(response) || response.length !== task.data.requests.length)
throw new LoaderError(`The result of batchLoadFn must be an array equal in length to the query array `)
throw new LoaderError(`The result of batchLoadFn must be an array equal in length to the query array`)

task.data.responses = response
}
Expand All @@ -42,19 +54,21 @@ export class BatchAggregator<T, R> {
private readonly options: IBatchAggregatorOptions,
private readonly metrics?: IBatchAggregatorMetrics,
) {
const { concurrencyLimit, maxWaitingTimeMs, batchTimeMs: runMs, timeoutMs } = options
validateAggregatorOptions(options)
const { concurrencyLimit, maxWaitingTimeMs, batchTimeMs: runMs, timeoutMs, unrefTimeouts } = options
const initialDataFactory = () => ({ requests: [], responses: [] })
this.timekeeper =
concurrencyLimit && concurrencyLimit > 0 && concurrencyLimit < Infinity
concurrencyLimit !== undefined && concurrencyLimit !== Infinity
? new LimitedTimekeeper(
{
concurrencyLimit,
initialDataFactory,
maxWaitingTimeMs: maxWaitingTimeMs || 60_000,
maxWaitingTimeMs: maxWaitingTimeMs ?? DEFAULT_MAX_WAITING_TIME_MS,
runMs,
runner: this.batchRunner,
timeoutMs,
callRejectedTask: false,
unrefTimeouts,
},
createTimekeeperMetrics(metrics),
)
Expand All @@ -65,6 +79,7 @@ export class BatchAggregator<T, R> {
runner: this.batchRunner,
timeoutMs,
callRejectedTask: false,
unrefTimeouts,
},
createTimekeeperMetrics(metrics),
)
Expand All @@ -86,7 +101,7 @@ export class BatchAggregator<T, R> {
const task = this.getCurrentTask()
const index = task.data.requests.length
this.metrics?.loadBatchItem?.()
debug(`Load data. task id="${task.id}"; curent index="${index}"`)
debug(`Load data. task id="${task.id}"; current index="${index}"`)
task.data.requests.push(request)
await this.timekeeper.wait(task)

Expand Down
8 changes: 8 additions & 0 deletions src/batch-aggregator/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export const DEFAULT_MAX_WAITING_TIME_MS = 60_000

export interface IBatchAggregatorOptions {
/**
* @description Maximum number of parallel tasks (default: unlimited)
Expand All @@ -23,6 +25,12 @@ export interface IBatchAggregatorOptions {
* @description Maximum execution time for batchFn (the function passed as the first argument)
*/
timeoutMs: number

/**
* @description Allows timers to avoid blocking the event loop
* @default true
*/
unrefTimeouts?: boolean
}

export type BatchLoaderFn<T, R> = (batchArray: T[], signal: AbortSignal) => Promise<R[]> | R[]
Expand Down
48 changes: 32 additions & 16 deletions src/batch-loader/batch-loader.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
import { BatchAggregator } from '../batch-aggregator/batch-aggregator'
import { BatchLoaderFn } from '../batch-aggregator/interfaces'
import { BatchAggregator, validateAggregatorOptions } from '../batch-aggregator/batch-aggregator'
import { BatchLoaderFn, DEFAULT_MAX_WAITING_TIME_MS } from '../batch-aggregator/interfaces'
import { Deduplicator } from '../deduplicator/deduplicator'
import { Key } from '../utils/interfaces'
import { CacheAdapter } from './cache-adapter'
import { IBatchLoaderOptions } from './interfaces'

const prepareOptions = <K, R>(options: IBatchLoaderOptions<K, R>) => ({
getKey: (query: K) => `${query}`,
timeoutMs: 60_000,
unrefTimeouts: false,
concurrencyLimit: Infinity,
maxBatchSize: 1000,
batchTimeMs: 50,
maxWaitingTimeMs: 60_000,
...options,
})
// Node's setTimeout silently caps delays here and warns - clamp upstream to avoid TimeoutOverflowWarning + firing in 1ms.
const TIMEOUT_MAX = 2_147_483_647

const prepareOptions = <K, R>(options: IBatchLoaderOptions<K, R>) => {
const merged = {
getKey: (query: K) => `${query}`,
timeoutMs: 60_000,
unrefTimeouts: true,
concurrencyLimit: Infinity,
maxBatchSize: 1000,
batchTimeMs: 50,
maxWaitingTimeMs: DEFAULT_MAX_WAITING_TIME_MS,
...options,
}
validateAggregatorOptions(merged)
return merged
}

export class BatchLoader<K, R> {
private readonly cache: CacheAdapter<R>
Expand All @@ -23,17 +30,24 @@ export class BatchLoader<K, R> {
private readonly getKey: (query: K) => Key

constructor(batchLoaderFn: BatchLoaderFn<K, R>, options: IBatchLoaderOptions<K, R>) {
const { cache, getKey, timeoutMs, unrefTimeouts, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs } =
const { cache, getKey, timeoutMs, unrefTimeouts, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs, metrics } =
prepareOptions(options)

const limited = concurrencyLimit !== Infinity
const deduplicatorTimeoutMs = Math.min(timeoutMs + batchTimeMs + (limited ? maxWaitingTimeMs : 0), TIMEOUT_MAX)

this.getKey = getKey
this.cache = new CacheAdapter(cache)
this.deduplicator = new Deduplicator<K, R>(this.deduplicatorRunner, {
getKey,
timeoutMs: timeoutMs + batchTimeMs,
unrefTimeouts: !!unrefTimeouts,
timeoutMs: deduplicatorTimeoutMs,
unrefTimeouts,
})
this.aggregator = new BatchAggregator(batchLoaderFn, { timeoutMs, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs })
this.aggregator = new BatchAggregator(
batchLoaderFn,
{ timeoutMs, batchTimeMs, concurrencyLimit, maxBatchSize, maxWaitingTimeMs, unrefTimeouts },
metrics,
)
}

private readonly deduplicatorRunner = async (query: K, signal: AbortSignal): Promise<R> => {
Expand All @@ -47,6 +61,8 @@ export class BatchLoader<K, R> {

const loaded = await this.aggregator.load(query)

if (signal.aborted) throw signal.reason

this.deduplicator.restartTimeout(query)
await this.cache.set(key, loaded)

Expand Down
10 changes: 9 additions & 1 deletion src/batch-loader/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { IBatchAggregatorMetrics } from '../batch-aggregator/interfaces'
import { Key } from '../utils/interfaces'

export interface ICache<T> {
Expand All @@ -7,6 +8,8 @@ export interface ICache<T> {
clear(): Promise<void>
}

export interface IBatchLoaderMetrics extends IBatchAggregatorMetrics {}

export interface IBatchLoaderOptions<K, R> {
/**
* @description Function to extract the key from a query
Expand All @@ -24,7 +27,7 @@ export interface IBatchLoaderOptions<K, R> {

/**
* @description Allows timers to avoid blocking the event loop
* @default false
* @default true
*/
unrefTimeouts?: boolean

Expand All @@ -51,4 +54,9 @@ export interface IBatchLoaderOptions<K, R> {
* @default 60_000
*/
maxWaitingTimeMs?: number

/**
* @description Optional metrics hooks. Forwarded to the internal BatchAggregator.
*/
metrics?: IBatchLoaderMetrics
}
Loading
Loading