From 6781c98536fe69c891cee7cb1c2ebb7376babf54 Mon Sep 17 00:00:00 2001 From: netanelC Date: Mon, 1 Jun 2026 09:58:27 +0300 Subject: [PATCH 1/4] feat: add jitter --- README.md | 2 +- src/constants.ts | 2 ++ src/rollout/ChangeDetector.ts | 37 +++++++++++++++----- tests/rollout.spec.ts | 66 +++++++++++++++++++++++++++++++++-- 4 files changed, 95 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 2da1b2f..6c5bf82 100644 --- a/README.md +++ b/README.md @@ -164,7 +164,7 @@ The package supports merging configurations from multiple sources (local, remote 1. The remote configuration is fetched from the server specified by the `configServerUrl` option. 2. If the `version` is set to `'latest'`, the latest version of the configuration is fetched. Otherwise, the specified version is fetched. -3. **Continuous Polling:** If an `onChange` callback is provided, the SDK continuously polls the server using HTTP ETags (`If-None-Match`). When a `200 OK` is received (indicating a change), the configuration is automatically re-merged, validated, and the callback is triggered. `304 Not Modified` responses are silently ignored. +3. **Continuous Polling:** If an `onChange` callback is provided, the SDK continuously polls the server using HTTP ETags (`If-None-Match`). When a `200 OK` is received (indicating a change), the configuration is automatically re-merged, validated, and the callback is triggered. `304 Not Modified` responses are silently ignored. To prevent cluster-wide traffic spikes (thundering herd), a **randomized jitter of +/- 15%** is automatically applied to each polling cycle. ### Environment Variables diff --git a/src/constants.ts b/src/constants.ts index 0f459d1..8f94983 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -9,3 +9,5 @@ export const SCHEMA_DOMAIN = 'https://mapcolonies.com/'; export const SCHEMAS_PACKAGE_RESOLVED_PATH = require.resolve('@map-colonies/schemas'); export const SCHEMA_BASE_PATH = SCHEMAS_PACKAGE_RESOLVED_PATH.substring(0, SCHEMAS_PACKAGE_RESOLVED_PATH.lastIndexOf(path.sep)); + +export const JITTER_PERCENTAGE = 0.15; diff --git a/src/rollout/ChangeDetector.ts b/src/rollout/ChangeDetector.ts index 06fa814..90e3301 100644 --- a/src/rollout/ChangeDetector.ts +++ b/src/rollout/ChangeDetector.ts @@ -1,3 +1,4 @@ +import { JITTER_PERCENTAGE } from '../constants'; import { getRemoteConfig } from '../httpClient'; import { BaseOptions } from '../types'; import { createDebug } from '../utils/debug'; @@ -22,23 +23,41 @@ export class ChangeDetector { } public start(): void { - const interval = this.options.pollIntervalMs; - debug('Starting change detector with interval %d ms', interval); - - this.timer = setInterval(() => { - this.poll().catch((err) => { - debug('Error during polling: %s', (err as Error).message); - }); - }, interval); + debug('Starting change detector'); + this.scheduleNextPoll(); } public stop(): void { if (this.timer) { - clearInterval(this.timer); + clearTimeout(this.timer); this.timer = undefined; } } + private scheduleNextPoll(): void { + if (this.timer) { + clearTimeout(this.timer); + } + const baseInterval = this.options.pollIntervalMs!; + const jitter = baseInterval * JITTER_PERCENTAGE; + // eslint-disable-next-line @typescript-eslint/no-magic-numbers + const randomJitter = (Math.random() * 2 - 1) * jitter; + const nextInterval = baseInterval + randomJitter; + + debug('Scheduling next poll in %d ms', nextInterval); + this.timer = setTimeout(() => { + this.poll() + .catch((err) => { + debug('Error during polling: %s', (err as Error).message); + }) + .finally(() => { + if (this.timer !== undefined) { + this.scheduleNextPoll(); + } + }); + }, nextInterval); + } + private async poll(): Promise { debug('Polling config %s@%s with etag %s', this.options.configName, this.options.version, this.currentEtag); diff --git a/tests/rollout.spec.ts b/tests/rollout.spec.ts index abde307..046e858 100644 --- a/tests/rollout.spec.ts +++ b/tests/rollout.spec.ts @@ -3,6 +3,7 @@ import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici'; import { commonDbPartialV1 } from '@map-colonies/schemas'; import { StatusCodes } from 'http-status-codes'; import { config } from '../src/config'; +import { JITTER_PERCENTAGE } from '../src/constants'; const URL = 'http://localhost:8080'; const DEFAULT_POLL_INTERVAL = 10000; @@ -74,7 +75,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); // Act (Wait for Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); // Assert (Updated State) expect(onChangeMock).toHaveBeenCalledTimes(1); @@ -121,7 +122,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.NOT_MODIFIED); // Act (Wait for Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); // Assert expect(onChangeMock).not.toHaveBeenCalled(); @@ -166,4 +167,65 @@ describe('Continuous Polling (ChangeDetector)', () => { // Assert expect(onChangeMock).not.toHaveBeenCalled(); }); + + it('should apply randomized jitter within boundaries over 10 cycles', async () => { + // Arrange + const initialConfigData = { + configName: 'name', + schemaId: commonDbPartialV1.$id, + version: 1, + config: { host: 'initial-host' }, + createdAt: 0, + }; + + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + // Setup 10 mock 304 responses + client + .intercept({ + path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, + method: 'GET', + }) + .reply(StatusCodes.NOT_MODIFIED) + .times(10); + + const setTimeoutSpy = vi.spyOn(global, 'setTimeout'); + + // Act + await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + configServerUrl: URL, + localConfigPath: './tests/config', + pollIntervalMs: DEFAULT_POLL_INTERVAL, + onChange: vi.fn(), + }); + + const maxJitter = DEFAULT_POLL_INTERVAL * JITTER_PERCENTAGE; + const minWait = DEFAULT_POLL_INTERVAL - maxJitter; + const maxWait = DEFAULT_POLL_INTERVAL + maxJitter; + + // Run 10 cycles + for (let i = 0; i < 10; i++) { + // Advance timers by maxWait to definitely trigger the next poll + await vi.advanceTimersByTimeAsync(maxWait + 1); + } + + // Assert + const pollTimeouts = setTimeoutSpy.mock.calls.map((call) => call[1] as number).filter((time) => time >= minWait && time <= maxWait); + + expect(pollTimeouts.length).toBeGreaterThanOrEqual(10); + pollTimeouts.forEach((time) => { + expect(time).toBeGreaterThanOrEqual(minWait); + expect(time).toBeLessThanOrEqual(maxWait); + }); + + setTimeoutSpy.mockRestore(); + }); }); From a875811fa93ef5405f180c831b872b129b92851f Mon Sep 17 00:00:00 2001 From: netanelC Date: Thu, 4 Jun 2026 19:29:14 +0300 Subject: [PATCH 2/4] test: advance the time longer than the jitter Co-authored-by: Copilot --- tests/rollout.spec.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/rollout.spec.ts b/tests/rollout.spec.ts index 4955464..2c3db78 100644 --- a/tests/rollout.spec.ts +++ b/tests/rollout.spec.ts @@ -78,7 +78,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); // Act (Wait for Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); // Assert (Updated State) expect(onChangeMock).toHaveBeenCalledTimes(1); @@ -134,7 +134,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); // Act (Wait for Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); // Assert expect(exitMock).toHaveBeenCalledWith(0); @@ -190,7 +190,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); // Act (Wait for Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); // Assert expect(onChangeMock).toHaveBeenCalledTimes(1); @@ -237,7 +237,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.NOT_MODIFIED); // Act (Wait for Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); // Assert expect(onChangeMock).not.toHaveBeenCalled(); From 8826504de2026efb5d8b827031a2042cb1ecf0a3 Mon Sep 17 00:00:00 2001 From: netanelC Date: Thu, 18 Jun 2026 13:29:10 +0300 Subject: [PATCH 3/4] refactor: set proper error handling --- src/rollout/ChangeDetector.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/rollout/ChangeDetector.ts b/src/rollout/ChangeDetector.ts index 6390bb9..55c850e 100644 --- a/src/rollout/ChangeDetector.ts +++ b/src/rollout/ChangeDetector.ts @@ -49,7 +49,11 @@ export class ChangeDetector { this.timer = setTimeout(() => { this.poll() .catch((err) => { - debug('Error during polling: %s', (err as Error).message); + if (isConfigError(err, 'httpResponseError') || isConfigError(err, 'httpGeneralError')) { + debug('Error during polling: %s', err.message); + } else { + debug('Unknown error during polling: %O', err); + } }) .finally(() => { if (this.timer !== undefined) { From 9a43a6ff04e2c09917b31e487a3672543773a457 Mon Sep 17 00:00:00 2001 From: netanelC Date: Thu, 18 Jun 2026 17:31:06 +0300 Subject: [PATCH 4/4] test: fix tests Co-authored-by: Copilot --- tests/rollout.spec.ts | 29 ++++++++++------------------- 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/tests/rollout.spec.ts b/tests/rollout.spec.ts index f7e41aa..4439b6c 100644 --- a/tests/rollout.spec.ts +++ b/tests/rollout.spec.ts @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici'; import { commonDbPartialV1 } from '@map-colonies/schemas'; import { StatusCodes } from 'http-status-codes'; +import { random } from 'lodash'; import { config } from '../src/config'; import { JITTER_PERCENTAGE } from '../src/constants'; import { createMockConfigData } from './mocks'; @@ -103,13 +104,13 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); // Act (Wait for First Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); // Assert (First change triggered) expect(onChangeMock).toHaveBeenCalledTimes(1); // Act (Advance time to see if polling continues) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); // Assert (Polling stopped, no further calls) expect(onChangeMock).toHaveBeenCalledTimes(1); @@ -240,13 +241,7 @@ describe('Continuous Polling (ChangeDetector)', () => { it('should apply randomized jitter within boundaries over 10 cycles', async () => { // Arrange - const initialConfigData = { - configName: 'name', - schemaId: commonDbPartialV1.$id, - version: 1, - config: { host: 'initial-host' }, - createdAt: 0, - }; + const initialConfigData = createMockConfigData(); client .intercept({ path: '/capabilities', method: 'GET' }) @@ -255,14 +250,14 @@ describe('Continuous Polling (ChangeDetector)', () => { .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); - // Setup 10 mock 304 responses + const cycles = random(1, 10); // Random number of cycles to test jitter client .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET', }) .reply(StatusCodes.NOT_MODIFIED) - .times(10); + .times(cycles); const setTimeoutSpy = vi.spyOn(global, 'setTimeout'); @@ -271,18 +266,16 @@ describe('Continuous Polling (ChangeDetector)', () => { configName: 'name', version: 1, schema: commonDbPartialV1, - configServerUrl: URL, localConfigPath: './tests/config', - pollIntervalMs: DEFAULT_POLL_INTERVAL, - onChange: vi.fn(), + onChange: onChangeMock, }); const maxJitter = DEFAULT_POLL_INTERVAL * JITTER_PERCENTAGE; const minWait = DEFAULT_POLL_INTERVAL - maxJitter; const maxWait = DEFAULT_POLL_INTERVAL + maxJitter; - // Run 10 cycles - for (let i = 0; i < 10; i++) { + // Run the randomized number of cycles + for (let i = 0; i < cycles; i++) { // Advance timers by maxWait to definitely trigger the next poll await vi.advanceTimersByTimeAsync(maxWait + 1); } @@ -290,12 +283,10 @@ describe('Continuous Polling (ChangeDetector)', () => { // Assert const pollTimeouts = setTimeoutSpy.mock.calls.map((call) => call[1] as number).filter((time) => time >= minWait && time <= maxWait); - expect(pollTimeouts.length).toBeGreaterThanOrEqual(10); + expect(pollTimeouts.length).toBeGreaterThanOrEqual(cycles); pollTimeouts.forEach((time) => { expect(time).toBeGreaterThanOrEqual(minWait); expect(time).toBeLessThanOrEqual(maxWait); }); - - setTimeoutSpy.mockRestore(); }); });