From 2f094a96a7ee0eb0a6e5c7023b0744ce8fc4ea9e Mon Sep 17 00:00:00 2001 From: netanelC Date: Mon, 1 Jun 2026 08:22:10 +0300 Subject: [PATCH 01/14] feat: add polls for detecting changes --- README.md | 35 +++++++-- src/config.ts | 97 ++++++++++++++++-------- src/httpClient.ts | 23 ++++-- src/options.ts | 2 + src/rollout/ChangeDetector.ts | 56 ++++++++++++++ src/types.ts | 17 ++++- tests/config.spec.ts | 2 + tests/httpClient.spec.ts | 2 +- tests/options.spec.ts | 1 + tests/rollout.spec.ts | 139 ++++++++++++++++++++++++++++++++++ 10 files changed, 329 insertions(+), 45 deletions(-) create mode 100644 src/rollout/ChangeDetector.ts create mode 100644 tests/rollout.spec.ts diff --git a/README.md b/README.md index bd11ae9..16b0553 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,12 @@ const configInstance = await config({ configServerUrl: 'http://localhost:8080', schema: commonBoilerplateV4, version: 'latest', - offlineMode: false + offlineMode: false, + pollIntervalMs: 30000, + onChange: (updatedConfig) => { + console.log('Configuration updated:', updatedConfig); + // Re-initialize DB connections, etc. + } }); const port = configInstance.get('server.port'); @@ -36,26 +41,26 @@ This section describes the API provided by the package for interacting with the ### `ConfigInstance` -The `ConfigInstance` interface represents the your way to interact with the configuration. It provides methods to retrieve configuration values and parts. +The `ConfigInstance` interface represents your way to interact with the configuration. When hot-reloading is enabled, this instance acts as a **live state machine**, updating its internal state dynamically. `T` is the typescript type associated with the chosen schema. it can be imported from the `@map-colonies/schemas` package. #### Methods ##### `get(path: TPath): _.GetFieldType` -- **Description**: Retrieves the value at the specified path from the configuration object. Note that the type of returned object is based on the path in the schema. +- **Description**: Retrieves the value at the specified path from the configuration object. If hot-reloading is active, this returns the value from the **most recent** configuration update. - **Parameters**: - `path` (`TPath`): The path to the desired value. - **Returns**: The value at the specified path. ##### `getAll(): T` -- **Description**: Retrieves the entire configuration object. +- **Description**: Retrieves the entire configuration object. If hot-reloading is active, this returns the **most recent** configuration state. - **Returns**: The entire configuration object. ##### `getConfigParts(): { localConfig: object; config: object; envConfig: object }` -- **Description**: Retrieves different parts of the configuration object before being merged and validated. Useful for debugging. +- **Description**: Retrieves different parts of the configuration object before being merged and validated. If hot-reloading is active, the `config` part reflects the **latest remote payload**. - **Returns**: An object containing the `localConfig`, `config`, and `envConfig` parts of the configuration. - `localConfig`: The local configuration object. - `config`: The remote configuration object. @@ -121,6 +126,18 @@ This package allows you to configure various options for loading and managing co - **Default**: `./config` - **Description**: The path to the local configuration folder. +### `pollIntervalMs` +- **Type**: `number` +- **Optional**: `true` +- **Default**: `30000` +- **Description**: The polling interval in milliseconds for hot-reloading. +- **Environment Variable**: `CONFIG_POLL_INTERVAL_MS` + +### `onChange` +- **Type**: `(config: T) => void | Promise` +- **Optional**: `true` +- **Description**: A callback function triggered when a configuration change is detected. + ## Environment Variable Configuration The following environment variables can be used to configure the options: @@ -130,6 +147,7 @@ The following environment variables can be used to configure the options: - `CONFIG_SERVER_URL`: Sets the `configServerUrl` option. - `CONFIG_OFFLINE_MODE`: Sets the `offlineMode` option. - `CONFIG_IGNORE_SERVER_IS_OLDER_VERSION_ERROR`: Sets the `ignoreServerIsOlderVersionError` option. +- `CONFIG_POLL_INTERVAL_MS`: Sets the `pollIntervalMs` option. ## Configuration Merging and Validation @@ -143,6 +161,7 @@ The package supports merging configurations from multiple sources (local, remote 1. The remote configuration is fetched from the server specified by the `configServerUrl` option. 2. If the `version` is set to `'latest'`, the latest version of the configuration is fetched. Otherwise, the specified version is fetched. +3. **Continuous Polling:** If an `onChange` callback is provided, the SDK continuously polls the server using HTTP ETags (`If-None-Match`). When a `200 OK` is received (indicating a change), the configuration is automatically re-merged, validated, and the callback is triggered. `304 Not Modified` responses are silently ignored. ### Environment Variables @@ -158,13 +177,15 @@ If the value of the `x-env-format` key is `json`, the environment variable value - Local configuration 2. If a configuration option is specified in multiple sources, the value from the source with higher precedence (as listed above) is used. +3. When a hot-reload occurs, the **Remote configuration** part is updated, and the merge is re-calculated, ensuring environment variables still win. ### Validation -1. After merging, the final configuration is validated against the defined schema using ajv. +1. After merging (and after every hot-reload), the final configuration is validated against the defined schema using ajv. 2. The validation ensures that all required properties are present, and the types and values of properties conform to the schema. 3. Any default value according to the schema is added to the final object. -4. If the validation fails, an error is thrown, indicating the invalid properties and their issues. +4. If the validation fails, an error is thrown (for initial boot) or logged (for background updates), indicating the invalid properties and their issues. +5. **Atomic Updates:** The `ConfigInstance` only updates its internal state if the new configuration passes validation. If validation fails during a hot-reload, the previous valid state is preserved. # Error handling diff --git a/src/config.ts b/src/config.ts index 625558e..727843f 100644 --- a/src/config.ts +++ b/src/config.ts @@ -15,6 +15,7 @@ import { LOCAL_SCHEMAS_PACKAGE_VERSION } from './constants'; import { createConfigError } from './errors'; import { initializeMetrics as initializeMetricsInternal } from './metrics'; import { deepFreeze } from './utils/helpers'; +import { ChangeDetector } from './rollout/ChangeDetector'; const debug = createDebug('config'); @@ -25,22 +26,56 @@ const semverSatisfies = '2.x'; /** * Retrieves the configuration based on the provided options. * + * If `onChange` is provided in the options and `offlineMode` is not enabled, the SDK starts a background + * polling mechanism. The returned `ConfigInstance` serves as a live state machine; its `get` and `getAll` + * methods will return the most recent configuration retrieved from the server during hot-reloads. + * * @template T - The type of the configuration schema. * @param {ConfigOptions} options - The options for retrieving the configuration. - * @returns {Promise>} - A promise that resolves to the configuration object. + * @returns {Promise>} - A promise that resolves to the configuration object. */ export async function config( options: ConfigOptions ): Promise> { // handle package options debug('config called with options: %j', { ...options, schema: options.schema.$id }); - const { schema: baseSchema, metricsRegistry, ...unvalidatedOptions } = options; - const { configName, offlineMode, version, ignoreServerIsOlderVersionError } = initializeOptions(unvalidatedOptions); + const { schema: baseSchema, metricsRegistry, onChange, ...unvalidatedOptions } = options; + const initOptions = initializeOptions(unvalidatedOptions); + const { configName, offlineMode, version, ignoreServerIsOlderVersionError } = initOptions; - let remoteConfig: object | T = {}; + // Load Local and Env Configs First (Independent of remote state) + const dereferencedSchema = await loadSchema(baseSchema); + const localConfig = configPkg.util.loadFileConfigs(options.localConfigPath) as { [key: string]: unknown }; + debug('local config: %j', localConfig); + const envConfig = getEnvValues(dereferencedSchema); + debug('env config: %j', envConfig); + /** + * Function to merge local, remote, and environment configs and validate the merged result against the schema. + * The precedence order for merging is: local < remote < environment. + */ + function mergeAndValidate(remoteConfig: object | T): unknown { + const mergedConfig = deepmerge.all([localConfig, remoteConfig, envConfig], { arrayMerge }); + debug('merged config: %j', mergedConfig); + + // validate the merged config + const [errors, validatedConfig] = validate(ajvConfigValidator, dereferencedSchema, mergedConfig); + if (errors) { + debug('config validation error: %j', errors); + throw createConfigError('configValidationError', 'Config validation error', errors); + } + + debug('freezing validated config'); + // freeze the merged config so it can't be modified by the package user and to ensure that the config instance always returns the same reference for the same config, which is important for change detection and to prevent unnecessary re-renders in case the config is used in a React application and the user is using the getConfigParts method to get the config and pass it to their components + deepFreeze(validatedConfig); + return validatedConfig; + } + + let remoteConfig: object | T = {}; let serverConfigResponse: Config | undefined = undefined; - // handle remote config + let validatedConfig: ReturnType; + + // Handle Remote Config and Polling if (offlineMode !== true) { debug('handling fetching remote data'); // check if the server is using an older version of the schemas package @@ -70,8 +105,10 @@ export async function config( ); } - // get the remote config - serverConfigResponse = await getRemoteConfig(configName, options.schema.$id, version); + // get the initial remote config + const remoteResponse = await getRemoteConfig(configName, options.schema.$id, version); + serverConfigResponse = remoteResponse.config!; + const currentEtag = remoteResponse.etag; if (serverConfigResponse.schemaId !== baseSchema.$id) { debug('schema version mismatch. local: %s, remote: %s', baseSchema.$id, serverConfigResponse.schemaId); @@ -86,31 +123,29 @@ export async function config( } remoteConfig = serverConfigResponse.config; + debug('remote config: %j', remoteConfig); + + validatedConfig = mergeAndValidate(remoteConfig); + + // Setup polling + if (onChange) { + const changeDetector = new ChangeDetector( + baseSchema.$id, + initOptions, + async (newRemoteConfig: object) => { + const newlyValidatedConfig = mergeAndValidate(newRemoteConfig); + validatedConfig = newlyValidatedConfig; + remoteConfig = newRemoteConfig; + await onChange(newlyValidatedConfig); + }, + currentEtag + ); + changeDetector.start(); + } + } else { + // If offline, bypass remote and just merge local/env + validatedConfig = mergeAndValidate({}); } - debug('remote config: %j', remoteConfig); - - const dereferencedSchema = await loadSchema(baseSchema); - - const localConfig = configPkg.util.loadFileConfigs(options.localConfigPath) as { [key: string]: unknown }; - debug('local config: %j', localConfig); - - const envConfig = getEnvValues(dereferencedSchema); - debug('env config: %j', envConfig); - - // merge all the configs into one object with the following priority: localConfig < remoteConfig < envConfig - const mergedConfig = deepmerge.all([localConfig, remoteConfig, envConfig], { arrayMerge }); - debug('merged config: %j', mergedConfig); - - // validate the merged config - const [errors, validatedConfig] = validate(ajvConfigValidator, dereferencedSchema, mergedConfig); - if (errors) { - debug('config validation error: %j', errors); - throw createConfigError('configValidationError', 'Config validation error', errors); - } - - debug('freezing validated config'); - // freeze the merged config so it can't be modified by the package user - deepFreeze(validatedConfig); function get(path: TPath): GetFieldType { debug('get called with path: %s', path); diff --git a/src/httpClient.ts b/src/httpClient.ts index eef4279..e3f5053 100644 --- a/src/httpClient.ts +++ b/src/httpClient.ts @@ -15,10 +15,10 @@ async function createHttpErrorPayload(res: Dispatcher.ResponseData): Promise): Promise { +async function requestWrapper(url: string, options: Parameters>[1] = undefined): Promise { debug('Making request to %s', url); try { - const res = await request(url, { query }); + const res = await request(url, options); if (res.statusCode > statusCodes.NOT_FOUND) { debug('Failed to fetch config. Status code: %d', res.statusCode); throw createConfigError('httpResponseError', 'Failed to fetch', await createHttpErrorPayload(res)); @@ -33,12 +33,25 @@ async function requestWrapper(url: string, query?: Record): Pro } } -export async function getRemoteConfig(configName: string, schemaId: string, version: number | 'latest'): Promise { +export async function getRemoteConfig( + configName: string, + schemaId: string, + version: number | 'latest', + etag?: string +): Promise<{ config: Config | null; etag: string }> { debug('Fetching remote config %s@%s', configName, version); const { configServerUrl } = getOptions(); const url = `${configServerUrl}/config/${configName}/${version}`; - const res = await requestWrapper(url, { shouldDereference: true, schemaId }); + const headers = etag !== undefined ? { 'If-None-Match': etag } : undefined; + const queryParams = { schemaId, shouldDereference: 'true' }; + + const res = await requestWrapper(url, { query: queryParams, headers }); + + if (res.statusCode === statusCodes.NOT_MODIFIED) { + debug('Config was not modified'); + return { config: null, etag: etag! }; + } if (res.statusCode === statusCodes.BAD_REQUEST) { debug('Invalid request to getConfig'); @@ -51,7 +64,7 @@ export async function getRemoteConfig(configName: string, schemaId: string, vers } debug('Config fetched successfully'); - return (await res.body.json()) as Config; + return { config: (await res.body.json()) as Config, etag: res.headers.etag as string }; } export async function getServerCapabilities(): Promise { diff --git a/src/options.ts b/src/options.ts index 6ef024d..81459fe 100644 --- a/src/options.ts +++ b/src/options.ts @@ -11,6 +11,7 @@ const defaultOptions: BaseOptions = { configName: PACKAGE_NAME, configServerUrl: 'http://localhost:8080', version: 'latest', + pollIntervalMs: 30000, }; const envOptions: Partial> = { @@ -19,6 +20,7 @@ const envOptions: Partial> = { version: process.env.CONFIG_VERSION, offlineMode: process.env.CONFIG_OFFLINE_MODE, ignoreServerIsOlderVersionError: process.env.CONFIG_IGNORE_SERVER_IS_OLDER_VERSION_ERROR, + pollIntervalMs: process.env.CONFIG_POLL_INTERVAL_MS, }; // in order to merge correctly the keys should not exist, undefined is not enough diff --git a/src/rollout/ChangeDetector.ts b/src/rollout/ChangeDetector.ts new file mode 100644 index 0000000..06fa814 --- /dev/null +++ b/src/rollout/ChangeDetector.ts @@ -0,0 +1,56 @@ +import { getRemoteConfig } from '../httpClient'; +import { BaseOptions } from '../types'; +import { createDebug } from '../utils/debug'; + +const debug = createDebug('changeDetector'); + +/** + * Class responsible for detecting changes in the remote configuration by periodically polling the server and comparing ETags. + * If a change is detected, it invokes the provided callback with the new configuration. + */ +export class ChangeDetector { + private currentEtag: string; + private timer?: NodeJS.Timeout; + + public constructor( + private readonly schemaId: string, + private readonly options: BaseOptions, + private readonly onConfigUpdate: (newRemoteConfig: object) => void | Promise, + initialEtag: string + ) { + this.currentEtag = initialEtag; + } + + public start(): void { + const interval = this.options.pollIntervalMs; + debug('Starting change detector with interval %d ms', interval); + + this.timer = setInterval(() => { + this.poll().catch((err) => { + debug('Error during polling: %s', (err as Error).message); + }); + }, interval); + } + + public stop(): void { + if (this.timer) { + clearInterval(this.timer); + this.timer = undefined; + } + } + + private async poll(): Promise { + debug('Polling config %s@%s with etag %s', this.options.configName, this.options.version, this.currentEtag); + + const response = await getRemoteConfig(this.options.configName, this.schemaId, this.options.version, this.currentEtag); + + if (response.config === null) { + debug('No config changes detected'); + return; + } + + debug('Config change detected'); + this.currentEtag = response.etag!; + await this.onConfigUpdate(response.config.config); + } +} diff --git a/src/types.ts b/src/types.ts index 8854437..6e04eda 100644 --- a/src/types.ts +++ b/src/types.ts @@ -66,6 +66,11 @@ export interface BaseOptions { * @default './config' */ localConfigPath?: string; + /** + * The polling interval in milliseconds. + * @default 30000 + */ + pollIntervalMs?: number; } /** @@ -82,6 +87,10 @@ export type ConfigOptions = Prettify< * Depends on the prom-client package being installed. */ metricsRegistry?: Registry; + /** + * The callback function that is triggered when the configuration changes. + */ + onChange?: (config: unknown) => void | Promise; } >; @@ -101,16 +110,20 @@ export const optionsSchema: JSONSchemaType = { offlineMode: { type: 'boolean', nullable: true }, ignoreServerIsOlderVersionError: { type: 'boolean', nullable: true }, localConfigPath: { type: 'string', default: './config', nullable: true }, + pollIntervalMs: { type: 'integer', default: 30000, nullable: true }, }, }; /** - * Represents the schema of the configuration object. + * Represents a live configuration instance. + * When hot-reloading is enabled, this instance acts as a state machine that updates its internal + * configuration state dynamically. * @template T - The type of the configuration schema. */ export interface ConfigInstance { /** * Retrieves the value at the specified path from the configuration object. + * If hot-reloading is active, this returns the value from the most recent configuration update. * @template TPath - The type of the path. * @param path - The path to the desired value. * @returns The value at the specified path. @@ -119,12 +132,14 @@ export interface ConfigInstance { /** * Retrieves the entire configuration object. + * If hot-reloading is active, this returns the most recent configuration state. * @returns The entire configuration object. */ getAll: () => T; /** * Retrieves different parts of the configuration object before being merged and validated. + * If hot-reloading is active, 'config' reflects the latest remote payload. * @returns An object containing the localConfig, config, and envConfig parts of the configuration. */ getConfigParts: () => { diff --git a/tests/config.spec.ts b/tests/config.spec.ts index 8dbb0de..fb4b411 100644 --- a/tests/config.spec.ts +++ b/tests/config.spec.ts @@ -210,6 +210,7 @@ describe('config', () => { configServerUrl: URL, localConfigPath: './tests/config', offlineMode: true, + pollIntervalMs: 3000, }); const options = configInstance.getResolvedOptions(); @@ -220,6 +221,7 @@ describe('config', () => { configServerUrl: URL, localConfigPath: './tests/config', offlineMode: true, + pollIntervalMs: 3000, }); }); diff --git a/tests/httpClient.spec.ts b/tests/httpClient.spec.ts index fbb7772..61af7c8 100644 --- a/tests/httpClient.spec.ts +++ b/tests/httpClient.spec.ts @@ -62,7 +62,7 @@ describe('httpClient', () => { client.intercept({ path: '/config/name/1?shouldDereference=true&schemaId=schema', method: 'GET' }).reply(StatusCodes.OK, config); const result = await getRemoteConfig('name', 'schema', 1); - expect(result).toEqual(config); + expect(result).toEqual({ config: config, etag: undefined }); }); it('should throw an error if the response is bad request', async () => { diff --git a/tests/options.spec.ts b/tests/options.spec.ts index b228eae..2fbaf93 100644 --- a/tests/options.spec.ts +++ b/tests/options.spec.ts @@ -73,6 +73,7 @@ describe('options', () => { ['version', 'CONFIG_VERSION', 'latest', 'latest'], ['offlineMode', 'CONFIG_OFFLINE_MODE', 'true', true], ['ignoreServerIsOlderVersionError', 'CONFIG_IGNORE_SERVER_IS_OLDER_VERSION_ERROR', 'true', true], + ['pollIntervalMs', 'CONFIG_POLL_INTERVAL_MS', '10000', 10000], ])('should initialize options and override with provided environment variable %s', async (key, envKey, envValue, expected) => { process.env[envKey] = envValue; diff --git a/tests/rollout.spec.ts b/tests/rollout.spec.ts new file mode 100644 index 0000000..5f741da --- /dev/null +++ b/tests/rollout.spec.ts @@ -0,0 +1,139 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici'; +import { commonDbPartialV1 } from '@map-colonies/schemas'; +import { StatusCodes } from 'http-status-codes'; +import { config } from '../src/config'; + +const URL = 'http://localhost:8080'; + +describe('Continuous Polling (ChangeDetector)', () => { + let client: Interceptable; + + beforeEach(() => { + vi.useFakeTimers(); + const agent = new MockAgent(); + agent.disableNetConnect(); + + setGlobalDispatcher(agent); + client = agent.get(URL); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('should trigger onChange when polling returns a new config (200 OK)', async () => { + const initialConfigData = { + configName: 'name', + schemaId: commonDbPartialV1.$id, + version: 1, + config: { + host: 'initial-host', + }, + createdAt: 0, + }; + + const newConfigData = { + configName: 'name', + schemaId: commonDbPartialV1.$id, + version: 1, + config: { + host: 'updated-host', + }, + createdAt: 1, + }; + + // Initial capabilities fetch + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + + // Initial config fetch + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + const onChangeMock = vi.fn(); + + const configInstance = await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + configServerUrl: URL, + localConfigPath: './tests/config', + pollIntervalMs: 10000, + onChange: onChangeMock, + }); + + expect(configInstance.get('host')).toBe('initial-host'); + expect(onChangeMock).not.toHaveBeenCalled(); + + // Setup next poll response + client + .intercept({ + path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, + method: 'GET', + headers: { 'if-none-match': 'etag-1' }, + }) + .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); + + // Advance time to trigger poll + await vi.advanceTimersByTimeAsync(10000); + + expect(onChangeMock).toHaveBeenCalledTimes(1); + expect(onChangeMock).toHaveBeenCalledWith( + expect.objectContaining({ + host: 'updated-host', + }) + ); + }); + + it('should not trigger onChange when polling returns 304 Not Modified', async () => { + const initialConfigData = { + configName: 'name', + schemaId: commonDbPartialV1.$id, + version: 1, + config: { + host: 'initial-host', + }, + createdAt: 0, + }; + + // Initial capabilities fetch + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + + // Initial config fetch + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + const onChangeMock = vi.fn(); + + const configInstance = await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + configServerUrl: URL, + localConfigPath: './tests/config', + pollIntervalMs: 10000, + onChange: onChangeMock, + }); + + // Setup next poll response (304) + client + .intercept({ + path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, + method: 'GET', + headers: { 'if-none-match': 'etag-1' }, + }) + .reply(StatusCodes.NOT_MODIFIED); + + // Advance time to trigger poll + await vi.advanceTimersByTimeAsync(10000); + + expect(onChangeMock).not.toHaveBeenCalled(); + expect(configInstance.get('host')).toBe('initial-host'); + }); +}); From 85ac040a5c7386781e13e5cdf4577a63e164357f Mon Sep 17 00:00:00 2001 From: netanelC Date: Mon, 1 Jun 2026 09:34:24 +0300 Subject: [PATCH 02/14] feat: add stop function --- README.md | 3 ++ src/config.ts | 12 +++++- src/types.ts | 5 +++ tests/rollout.spec.ts | 88 +++++++++++++++++++++++++++++-------------- 4 files changed, 77 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 16b0553..2da1b2f 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,9 @@ The `ConfigInstance` interface represents your way to interact with the configur - **Parameters**: - `registry` (`promClient.Registry`): The prometheus registry to use for the metrics. +##### `stop(): void` +- **Description**: Stops any background processes (like hot-reloading polling). Use this during application teardown or in tests to prevent memory leaks and hanging processes. + # Configuration Options This package allows you to configure various options for loading and managing configurations. Below are the available options and their descriptions. diff --git a/src/config.ts b/src/config.ts index 727843f..967e4d3 100644 --- a/src/config.ts +++ b/src/config.ts @@ -71,6 +71,7 @@ export async function config( return validatedConfig; } + let changeDetector: ChangeDetector | undefined = undefined; let remoteConfig: object | T = {}; let serverConfigResponse: Config | undefined = undefined; let validatedConfig: ReturnType; @@ -129,7 +130,7 @@ export async function config( // Setup polling if (onChange) { - const changeDetector = new ChangeDetector( + changeDetector = new ChangeDetector( baseSchema.$id, initOptions, async (newRemoteConfig: object) => { @@ -175,5 +176,12 @@ export async function config( initializeMetricsInternal(registry, baseSchema.$id, serverConfigResponse?.version); } - return { get, getAll, getConfigParts, getResolvedOptions, initializeMetrics }; + function stop(): void { + debug('stop called'); + if (changeDetector) { + changeDetector.stop(); + } + } + + return { get, getAll, getConfigParts, getResolvedOptions, initializeMetrics, stop }; } diff --git a/src/types.ts b/src/types.ts index 6e04eda..e6c4eaa 100644 --- a/src/types.ts +++ b/src/types.ts @@ -159,4 +159,9 @@ export interface ConfigInstance { * @param registry - The registry for the metrics. */ initializeMetrics: (registry: Registry) => void; + + /** + * Stops any background processes (like hot-reloading polling). + */ + stop: () => void; } diff --git a/tests/rollout.spec.ts b/tests/rollout.spec.ts index 5f741da..abde307 100644 --- a/tests/rollout.spec.ts +++ b/tests/rollout.spec.ts @@ -5,6 +5,7 @@ import { StatusCodes } from 'http-status-codes'; import { config } from '../src/config'; const URL = 'http://localhost:8080'; +const DEFAULT_POLL_INTERVAL = 10000; describe('Continuous Polling (ChangeDetector)', () => { let client: Interceptable; @@ -23,52 +24,47 @@ describe('Continuous Polling (ChangeDetector)', () => { }); it('should trigger onChange when polling returns a new config (200 OK)', async () => { + // Arrange const initialConfigData = { configName: 'name', schemaId: commonDbPartialV1.$id, version: 1, - config: { - host: 'initial-host', - }, + config: { host: 'initial-host' }, createdAt: 0, }; - const newConfigData = { configName: 'name', schemaId: commonDbPartialV1.$id, version: 1, - config: { - host: 'updated-host', - }, + config: { host: 'updated-host' }, createdAt: 1, }; - // Initial capabilities fetch client .intercept({ path: '/capabilities', method: 'GET' }) .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); - - // Initial config fetch client .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); const onChangeMock = vi.fn(); + // Act const configInstance = await config({ configName: 'name', version: 1, schema: commonDbPartialV1, configServerUrl: URL, localConfigPath: './tests/config', - pollIntervalMs: 10000, + pollIntervalMs: DEFAULT_POLL_INTERVAL, onChange: onChangeMock, }); + // Assert (Initial State) expect(configInstance.get('host')).toBe('initial-host'); expect(onChangeMock).not.toHaveBeenCalled(); - // Setup next poll response + // Arrange (Next Poll) client .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, @@ -77,51 +73,45 @@ describe('Continuous Polling (ChangeDetector)', () => { }) .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); - // Advance time to trigger poll - await vi.advanceTimersByTimeAsync(10000); + // Act (Wait for Poll) + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + // Assert (Updated State) expect(onChangeMock).toHaveBeenCalledTimes(1); - expect(onChangeMock).toHaveBeenCalledWith( - expect.objectContaining({ - host: 'updated-host', - }) - ); + expect(onChangeMock).toHaveBeenCalledWith(expect.objectContaining({ host: 'updated-host' })); }); it('should not trigger onChange when polling returns 304 Not Modified', async () => { + // Arrange const initialConfigData = { configName: 'name', schemaId: commonDbPartialV1.$id, version: 1, - config: { - host: 'initial-host', - }, + config: { host: 'initial-host' }, createdAt: 0, }; - // Initial capabilities fetch client .intercept({ path: '/capabilities', method: 'GET' }) .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); - - // Initial config fetch client .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); const onChangeMock = vi.fn(); + // Act const configInstance = await config({ configName: 'name', version: 1, schema: commonDbPartialV1, configServerUrl: URL, localConfigPath: './tests/config', - pollIntervalMs: 10000, + pollIntervalMs: DEFAULT_POLL_INTERVAL, onChange: onChangeMock, }); - // Setup next poll response (304) + // Arrange (Setup 304 response) client .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, @@ -130,10 +120,50 @@ describe('Continuous Polling (ChangeDetector)', () => { }) .reply(StatusCodes.NOT_MODIFIED); - // Advance time to trigger poll - await vi.advanceTimersByTimeAsync(10000); + // Act (Wait for Poll) + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + // Assert expect(onChangeMock).not.toHaveBeenCalled(); expect(configInstance.get('host')).toBe('initial-host'); }); + + it('should stop polling when stop is called', async () => { + // Arrange + const initialConfigData = { + configName: 'name', + schemaId: commonDbPartialV1.$id, + version: 1, + config: { host: 'initial-host' }, + createdAt: 0, + }; + + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + const onChangeMock = vi.fn(); + + const configInstance = await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + configServerUrl: URL, + localConfigPath: './tests/config', + pollIntervalMs: DEFAULT_POLL_INTERVAL, + onChange: onChangeMock, + }); + + // Act + configInstance.stop(); + + // Advance time beyond the polling interval + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); + + // Assert + expect(onChangeMock).not.toHaveBeenCalled(); + }); }); From f2363e806c842709f613a9baa8e9172ccac05cb0 Mon Sep 17 00:00:00 2001 From: netanelC Date: Mon, 1 Jun 2026 09:58:27 +0300 Subject: [PATCH 03/14] feat: add jitter --- README.md | 2 +- src/constants.ts | 2 ++ src/rollout/ChangeDetector.ts | 37 +++++++++++++++----- tests/rollout.spec.ts | 66 +++++++++++++++++++++++++++++++++-- 4 files changed, 95 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 2da1b2f..6c5bf82 100644 --- a/README.md +++ b/README.md @@ -164,7 +164,7 @@ The package supports merging configurations from multiple sources (local, remote 1. The remote configuration is fetched from the server specified by the `configServerUrl` option. 2. If the `version` is set to `'latest'`, the latest version of the configuration is fetched. Otherwise, the specified version is fetched. -3. **Continuous Polling:** If an `onChange` callback is provided, the SDK continuously polls the server using HTTP ETags (`If-None-Match`). When a `200 OK` is received (indicating a change), the configuration is automatically re-merged, validated, and the callback is triggered. `304 Not Modified` responses are silently ignored. +3. **Continuous Polling:** If an `onChange` callback is provided, the SDK continuously polls the server using HTTP ETags (`If-None-Match`). When a `200 OK` is received (indicating a change), the configuration is automatically re-merged, validated, and the callback is triggered. `304 Not Modified` responses are silently ignored. To prevent cluster-wide traffic spikes (thundering herd), a **randomized jitter of +/- 15%** is automatically applied to each polling cycle. ### Environment Variables diff --git a/src/constants.ts b/src/constants.ts index 0f459d1..8f94983 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -9,3 +9,5 @@ export const SCHEMA_DOMAIN = 'https://mapcolonies.com/'; export const SCHEMAS_PACKAGE_RESOLVED_PATH = require.resolve('@map-colonies/schemas'); export const SCHEMA_BASE_PATH = SCHEMAS_PACKAGE_RESOLVED_PATH.substring(0, SCHEMAS_PACKAGE_RESOLVED_PATH.lastIndexOf(path.sep)); + +export const JITTER_PERCENTAGE = 0.15; diff --git a/src/rollout/ChangeDetector.ts b/src/rollout/ChangeDetector.ts index 06fa814..90e3301 100644 --- a/src/rollout/ChangeDetector.ts +++ b/src/rollout/ChangeDetector.ts @@ -1,3 +1,4 @@ +import { JITTER_PERCENTAGE } from '../constants'; import { getRemoteConfig } from '../httpClient'; import { BaseOptions } from '../types'; import { createDebug } from '../utils/debug'; @@ -22,23 +23,41 @@ export class ChangeDetector { } public start(): void { - const interval = this.options.pollIntervalMs; - debug('Starting change detector with interval %d ms', interval); - - this.timer = setInterval(() => { - this.poll().catch((err) => { - debug('Error during polling: %s', (err as Error).message); - }); - }, interval); + debug('Starting change detector'); + this.scheduleNextPoll(); } public stop(): void { if (this.timer) { - clearInterval(this.timer); + clearTimeout(this.timer); this.timer = undefined; } } + private scheduleNextPoll(): void { + if (this.timer) { + clearTimeout(this.timer); + } + const baseInterval = this.options.pollIntervalMs!; + const jitter = baseInterval * JITTER_PERCENTAGE; + // eslint-disable-next-line @typescript-eslint/no-magic-numbers + const randomJitter = (Math.random() * 2 - 1) * jitter; + const nextInterval = baseInterval + randomJitter; + + debug('Scheduling next poll in %d ms', nextInterval); + this.timer = setTimeout(() => { + this.poll() + .catch((err) => { + debug('Error during polling: %s', (err as Error).message); + }) + .finally(() => { + if (this.timer !== undefined) { + this.scheduleNextPoll(); + } + }); + }, nextInterval); + } + private async poll(): Promise { debug('Polling config %s@%s with etag %s', this.options.configName, this.options.version, this.currentEtag); diff --git a/tests/rollout.spec.ts b/tests/rollout.spec.ts index abde307..046e858 100644 --- a/tests/rollout.spec.ts +++ b/tests/rollout.spec.ts @@ -3,6 +3,7 @@ import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici'; import { commonDbPartialV1 } from '@map-colonies/schemas'; import { StatusCodes } from 'http-status-codes'; import { config } from '../src/config'; +import { JITTER_PERCENTAGE } from '../src/constants'; const URL = 'http://localhost:8080'; const DEFAULT_POLL_INTERVAL = 10000; @@ -74,7 +75,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); // Act (Wait for Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); // Assert (Updated State) expect(onChangeMock).toHaveBeenCalledTimes(1); @@ -121,7 +122,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.NOT_MODIFIED); // Act (Wait for Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); // Assert expect(onChangeMock).not.toHaveBeenCalled(); @@ -166,4 +167,65 @@ describe('Continuous Polling (ChangeDetector)', () => { // Assert expect(onChangeMock).not.toHaveBeenCalled(); }); + + it('should apply randomized jitter within boundaries over 10 cycles', async () => { + // Arrange + const initialConfigData = { + configName: 'name', + schemaId: commonDbPartialV1.$id, + version: 1, + config: { host: 'initial-host' }, + createdAt: 0, + }; + + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + // Setup 10 mock 304 responses + client + .intercept({ + path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, + method: 'GET', + }) + .reply(StatusCodes.NOT_MODIFIED) + .times(10); + + const setTimeoutSpy = vi.spyOn(global, 'setTimeout'); + + // Act + await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + configServerUrl: URL, + localConfigPath: './tests/config', + pollIntervalMs: DEFAULT_POLL_INTERVAL, + onChange: vi.fn(), + }); + + const maxJitter = DEFAULT_POLL_INTERVAL * JITTER_PERCENTAGE; + const minWait = DEFAULT_POLL_INTERVAL - maxJitter; + const maxWait = DEFAULT_POLL_INTERVAL + maxJitter; + + // Run 10 cycles + for (let i = 0; i < 10; i++) { + // Advance timers by maxWait to definitely trigger the next poll + await vi.advanceTimersByTimeAsync(maxWait + 1); + } + + // Assert + const pollTimeouts = setTimeoutSpy.mock.calls.map((call) => call[1] as number).filter((time) => time >= minWait && time <= maxWait); + + expect(pollTimeouts.length).toBeGreaterThanOrEqual(10); + pollTimeouts.forEach((time) => { + expect(time).toBeGreaterThanOrEqual(minWait); + expect(time).toBeLessThanOrEqual(maxWait); + }); + + setTimeoutSpy.mockRestore(); + }); }); From 98351fb62302489ed325bebac9219efab01ba924 Mon Sep 17 00:00:00 2001 From: netanelC Date: Mon, 1 Jun 2026 11:47:26 +0300 Subject: [PATCH 04/14] feat: add disableHotReload param --- README.md | 9 +++++++++ src/config.ts | 15 +++++++++----- src/errors.ts | 1 + src/metrics.ts | 2 +- src/options.ts | 5 +++++ src/rollout/ChangeDetector.ts | 2 +- src/types.ts | 23 +++++++++++++-------- tests/config.spec.ts | 8 ++++++++ tests/rollout.spec.ts | 38 +++++++++++++++++++++++++++++++++++ 9 files changed, 88 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 6c5bf82..a86d02b 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,7 @@ This package allows you to configure various options for loading and managing co ### `ignoreServerIsOlderVersionError` - **Type**: `boolean` - **Optional**: `true` +- **Default**: `false` - **Description**: Indicates whether to ignore the error when the server version is older than the requested version. - **Environment Variable**: `CONFIG_IGNORE_SERVER_IS_OLDER_VERSION_ERROR` @@ -129,6 +130,13 @@ This package allows you to configure various options for loading and managing co - **Default**: `./config` - **Description**: The path to the local configuration folder. +### `disableHotReload` +- **Type**: `boolean` +- **Optional**: `true` +- **Default**: `false` +- **Description**: Indicates whether hot-reloading should be disabled. If true, the SDK fetches the remote configuration exactly once upon startup and never starts the background polling loop. +- **Environment Variable**: `CONFIG_DISABLE_HOT_RELOAD` + ### `pollIntervalMs` - **Type**: `number` - **Optional**: `true` @@ -151,6 +159,7 @@ The following environment variables can be used to configure the options: - `CONFIG_OFFLINE_MODE`: Sets the `offlineMode` option. - `CONFIG_IGNORE_SERVER_IS_OLDER_VERSION_ERROR`: Sets the `ignoreServerIsOlderVersionError` option. - `CONFIG_POLL_INTERVAL_MS`: Sets the `pollIntervalMs` option. +- `CONFIG_DISABLE_HOT_RELOAD`: Sets the `disableHotReload` option. ## Configuration Merging and Validation diff --git a/src/config.ts b/src/config.ts index 967e4d3..012e32a 100644 --- a/src/config.ts +++ b/src/config.ts @@ -41,7 +41,7 @@ export async function config( debug('config called with options: %j', { ...options, schema: options.schema.$id }); const { schema: baseSchema, metricsRegistry, onChange, ...unvalidatedOptions } = options; const initOptions = initializeOptions(unvalidatedOptions); - const { configName, offlineMode, version, ignoreServerIsOlderVersionError } = initOptions; + const { configName, offlineMode, version, ignoreServerIsOlderVersionError, disableHotReload } = initOptions; // Load Local and Env Configs First (Independent of remote state) const dereferencedSchema = await loadSchema(baseSchema); @@ -77,7 +77,12 @@ export async function config( let validatedConfig: ReturnType; // Handle Remote Config and Polling - if (offlineMode !== true) { + if (!offlineMode) { + if (!disableHotReload && onChange === undefined) { + debug('Hot reload is enabled but no onChange callback was provided'); + throw createConfigError('onChangeCallbackMissingError', `Hot reload is enabled but no 'onChange' callback was provided`, {}); + } + debug('handling fetching remote data'); // check if the server is using an older version of the schemas package const capabilitiesResponse = await getServerCapabilities(); @@ -90,7 +95,7 @@ export async function config( satisfies: semverSatisfies, }); } - if (ignoreServerIsOlderVersionError !== true && gt(LOCAL_SCHEMAS_PACKAGE_VERSION, capabilitiesResponse.schemasPackageVersion)) { + if (!ignoreServerIsOlderVersionError && gt(LOCAL_SCHEMAS_PACKAGE_VERSION, capabilitiesResponse.schemasPackageVersion)) { debug( 'server is using an older version of the schemas package. local: %s, remote: %s', LOCAL_SCHEMAS_PACKAGE_VERSION, @@ -129,7 +134,7 @@ export async function config( validatedConfig = mergeAndValidate(remoteConfig); // Setup polling - if (onChange) { + if (!disableHotReload) { changeDetector = new ChangeDetector( baseSchema.$id, initOptions, @@ -137,7 +142,7 @@ export async function config( const newlyValidatedConfig = mergeAndValidate(newRemoteConfig); validatedConfig = newlyValidatedConfig; remoteConfig = newRemoteConfig; - await onChange(newlyValidatedConfig); + await onChange!(newlyValidatedConfig); }, currentEtag ); diff --git a/src/errors.ts b/src/errors.ts index bcb1d37..d0732a9 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -11,6 +11,7 @@ const configErrors = { schemaVersionMismatchError: { code: 7, payload: {} as { remoteSchemaVersion: string; localSchemaVersion: string } }, promClientNotInstalledError: { code: 8, payload: {} as { message: string } }, serverVersionMismatchError: { code: 9, payload: {} as { remoteServerVersion: string; localServerVersion: string; satisfies: string } }, + onChangeCallbackMissingError: { code: 10, payload: {} }, // eslint-disable-next-line @typescript-eslint/no-explicit-any } as const satisfies Record; diff --git a/src/metrics.ts b/src/metrics.ts index d45b686..cc8547b 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -46,7 +46,7 @@ export function initializeMetrics(registry: Registry, schemaId: string, actualVe name: configName, request_version: version, actual_version: actualVersion, - offline_mode: String(offlineMode ?? false), + offline_mode: String(offlineMode), schemas_package_version: LOCAL_SCHEMAS_PACKAGE_VERSION, package_version: PACKAGE_VERSION, schema_id: schemaId, diff --git a/src/options.ts b/src/options.ts index 81459fe..421b74c 100644 --- a/src/options.ts +++ b/src/options.ts @@ -12,6 +12,10 @@ const defaultOptions: BaseOptions = { configServerUrl: 'http://localhost:8080', version: 'latest', pollIntervalMs: 30000, + offlineMode: false, + ignoreServerIsOlderVersionError: false, + localConfigPath: './config', + disableHotReload: false, }; const envOptions: Partial> = { @@ -21,6 +25,7 @@ const envOptions: Partial> = { offlineMode: process.env.CONFIG_OFFLINE_MODE, ignoreServerIsOlderVersionError: process.env.CONFIG_IGNORE_SERVER_IS_OLDER_VERSION_ERROR, pollIntervalMs: process.env.CONFIG_POLL_INTERVAL_MS, + disableHotReload: process.env.CONFIG_DISABLE_HOT_RELOAD, }; // in order to merge correctly the keys should not exist, undefined is not enough diff --git a/src/rollout/ChangeDetector.ts b/src/rollout/ChangeDetector.ts index 90e3301..2d8ad58 100644 --- a/src/rollout/ChangeDetector.ts +++ b/src/rollout/ChangeDetector.ts @@ -38,7 +38,7 @@ export class ChangeDetector { if (this.timer) { clearTimeout(this.timer); } - const baseInterval = this.options.pollIntervalMs!; + const baseInterval = this.options.pollIntervalMs; const jitter = baseInterval * JITTER_PERCENTAGE; // eslint-disable-next-line @typescript-eslint/no-magic-numbers const randomJitter = (Math.random() * 2 - 1) * jitter; diff --git a/src/types.ts b/src/types.ts index e6c4eaa..4d4acaa 100644 --- a/src/types.ts +++ b/src/types.ts @@ -56,21 +56,27 @@ export interface BaseOptions { /** * Indicates whether the configuration should be loaded in offline mode. */ - offlineMode?: boolean; + offlineMode: boolean; /** * Indicates whether to ignore the error when the server version is older than the requested version. */ - ignoreServerIsOlderVersionError?: boolean; + ignoreServerIsOlderVersionError: boolean; /** * The path to the local configuration folder. * @default './config' */ - localConfigPath?: string; + localConfigPath: string; /** * The polling interval in milliseconds. * @default 30000 */ - pollIntervalMs?: number; + pollIntervalMs: number; + /** + * Indicates whether hot-reloading should be disabled. + * If true, the SDK fetches the remote configuration exactly once upon startup. + * @default false + */ + disableHotReload: boolean; } /** @@ -107,10 +113,11 @@ export const optionsSchema: JSONSchemaType = { ], }, configServerUrl: { type: 'string' }, - offlineMode: { type: 'boolean', nullable: true }, - ignoreServerIsOlderVersionError: { type: 'boolean', nullable: true }, - localConfigPath: { type: 'string', default: './config', nullable: true }, - pollIntervalMs: { type: 'integer', default: 30000, nullable: true }, + offlineMode: { type: 'boolean' }, + ignoreServerIsOlderVersionError: { type: 'boolean' }, + localConfigPath: { type: 'string', default: './config' }, + pollIntervalMs: { type: 'integer', default: 30000 }, + disableHotReload: { type: 'boolean', default: false }, }, }; diff --git a/tests/config.spec.ts b/tests/config.spec.ts index fb4b411..a85bfab 100644 --- a/tests/config.spec.ts +++ b/tests/config.spec.ts @@ -40,6 +40,7 @@ describe('config', () => { schema: commonDbPartialV1, configServerUrl: URL, localConfigPath: './tests/config', + onChange: async () => {}, }); const conf = configInstance.getAll(); @@ -125,6 +126,7 @@ describe('config', () => { schema: commonDbPartialV1, configServerUrl: URL, localConfigPath: './tests/config', + onChange: async () => {}, }); const conf = configInstance.getAll(); @@ -185,6 +187,7 @@ describe('config', () => { schema: commonDbPartialV1, configServerUrl: URL, localConfigPath: './tests/config', + onChange: async () => {}, }); const parts = configInstance.getConfigParts(); @@ -222,6 +225,8 @@ describe('config', () => { localConfigPath: './tests/config', offlineMode: true, pollIntervalMs: 3000, + disableHotReload: false, + ignoreServerIsOlderVersionError: false, }); }); @@ -249,6 +254,7 @@ describe('config', () => { schema: commonS3PartialV1, configServerUrl: URL, localConfigPath: './tests/config', + onChange: async () => {}, }); await expect(promise).rejects.toThrow('The schema version of the remote config does not match the schema version of the local config'); @@ -280,6 +286,7 @@ describe('config', () => { schema: commonDbPartialV1, configServerUrl: URL, localConfigPath: './tests/config', + onChange: async () => {}, }); await expect(promise).rejects.toThrow('Config validation error'); @@ -378,6 +385,7 @@ describe('config', () => { schema: commonDbPartialV1, configServerUrl: URL, localConfigPath: './tests/config', + onChange: async () => {}, }); await expect(promise).rejects.toThrow('The server version does not satisfy the required version.'); diff --git a/tests/rollout.spec.ts b/tests/rollout.spec.ts index 046e858..be36cdd 100644 --- a/tests/rollout.spec.ts +++ b/tests/rollout.spec.ts @@ -228,4 +228,42 @@ describe('Continuous Polling (ChangeDetector)', () => { setTimeoutSpy.mockRestore(); }); + + it('should not start polling if disableHotReload is true', async () => { + // Arrange + const initialConfigData = { + configName: 'name', + schemaId: commonDbPartialV1.$id, + version: 1, + config: { host: 'initial-host' }, + createdAt: 0, + }; + + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + const onChangeMock = vi.fn(); + + // Act + await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + configServerUrl: URL, + localConfigPath: './tests/config', + pollIntervalMs: DEFAULT_POLL_INTERVAL, + onChange: onChangeMock, + disableHotReload: true, + }); + + // Advance time beyond the polling interval + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE) + 1); + + // Assert + expect(onChangeMock).not.toHaveBeenCalled(); + }); }); From c879dadac5b47d8bdfeafe36adaf524565d2d6f6 Mon Sep 17 00:00:00 2001 From: netanelC Date: Wed, 3 Jun 2026 09:15:42 +0300 Subject: [PATCH 05/14] feat: add lock --- README.md | 5 ++ src/config.ts | 3 + src/httpClient.ts | 56 +++++++++++++++ src/options.ts | 7 ++ src/rollout/ChangeDetector.ts | 14 +++- src/rollout/LockCoordinator.ts | 39 +++++++++++ src/types.ts | 18 +++++ tests/config.spec.ts | 4 ++ tests/lock.spec.ts | 123 +++++++++++++++++++++++++++++++++ tests/rollout.spec.ts | 8 ++- 10 files changed, 274 insertions(+), 3 deletions(-) create mode 100644 src/rollout/LockCoordinator.ts create mode 100644 tests/lock.spec.ts diff --git a/README.md b/README.md index a86d02b..650d063 100644 --- a/README.md +++ b/README.md @@ -174,6 +174,11 @@ The package supports merging configurations from multiple sources (local, remote 1. The remote configuration is fetched from the server specified by the `configServerUrl` option. 2. If the `version` is set to `'latest'`, the latest version of the configuration is fetched. Otherwise, the specified version is fetched. 3. **Continuous Polling:** If an `onChange` callback is provided, the SDK continuously polls the server using HTTP ETags (`If-None-Match`). When a `200 OK` is received (indicating a change), the configuration is automatically re-merged, validated, and the callback is triggered. `304 Not Modified` responses are silently ignored. To prevent cluster-wide traffic spikes (thundering herd), a **randomized jitter of +/- 15%** is automatically applied to each polling cycle. +4. **Distributed Semaphore Locking:** To control rollout concurrency across a cluster, the SDK implements a distributed locking mechanism. + - **Lock Acquisition:** Before triggering the `onChange` callback during a hot-reload, the SDK attempts to acquire a lock from the configuration server. + - **Lock Release:** The lock is automatically released after the `onChange` callback completes (whether it succeeds or throws). + - **423 Locked & Retry-After:** If the server returns `423 Locked`, it indicates the rollout limit has been reached. The SDK will respect the `Retry-After` header provided by the server, waiting for the specified duration before attempting to acquire the lock again. + - **Cold-Start Bypass:** Distributed locking is **only** active during hot-reloads. Initial configuration fetches during application startup (cold-start) always bypass the lock to ensure immediate availability. ### Environment Variables diff --git a/src/config.ts b/src/config.ts index 012e32a..861e8a9 100644 --- a/src/config.ts +++ b/src/config.ts @@ -16,6 +16,7 @@ import { createConfigError } from './errors'; import { initializeMetrics as initializeMetricsInternal } from './metrics'; import { deepFreeze } from './utils/helpers'; import { ChangeDetector } from './rollout/ChangeDetector'; +import { LockCoordinator } from './rollout/LockCoordinator'; const debug = createDebug('config'); @@ -135,9 +136,11 @@ export async function config( // Setup polling if (!disableHotReload) { + const lockCoordinator = new LockCoordinator(initOptions); changeDetector = new ChangeDetector( baseSchema.$id, initOptions, + lockCoordinator, async (newRemoteConfig: object) => { const newlyValidatedConfig = mergeAndValidate(newRemoteConfig); validatedConfig = newlyValidatedConfig; diff --git a/src/httpClient.ts b/src/httpClient.ts index e3f5053..2c5bb6f 100644 --- a/src/httpClient.ts +++ b/src/httpClient.ts @@ -76,3 +76,59 @@ export async function getServerCapabilities(): Promise { debug('Server capabilities fetched successfully'); return (await body.json()) as ServerCapabilities; } + +export async function acquireLock( + rolloutKey: string, + rolloutLimit: number, + lockTtlSeconds: number +): Promise<{ acquired: boolean; retryAfter?: number }> { + debug('Acquiring lock for key %s with limit %d and ttl %d', rolloutKey, rolloutLimit, lockTtlSeconds); + const { configServerUrl } = getOptions(); + const url = `${configServerUrl}/locks`; + + const res = await request(url, { + method: 'POST', + body: JSON.stringify({ rolloutKey, rolloutLimit, lockTtlSeconds }), + headers: { 'Content-Type': 'application/json' }, + }); + + if (res.statusCode === statusCodes.CREATED) { + debug('Lock acquired successfully'); + return { acquired: true }; + } + + if (res.statusCode === statusCodes.LOCKED) { + const retryAfterHeader = res.headers['retry-after']; + const retryAfter = retryAfterHeader !== undefined ? parseInt(retryAfterHeader as string, 10) : undefined; + debug('Lock is already held. Retry-after: %d', retryAfter); + return { acquired: false, retryAfter }; + } + + if (res.statusCode > statusCodes.NOT_FOUND) { + debug('Failed to acquire lock. Status code: %d', res.statusCode); + throw createConfigError('httpResponseError', 'Failed to acquire lock', await createHttpErrorPayload(res)); + } + + debug('Unexpected status code while acquiring lock: %d', res.statusCode); + return { acquired: false }; +} + +export async function releaseLock(rolloutKey: string): Promise { + debug('Releasing lock for key %s', rolloutKey); + const { configServerUrl } = getOptions(); + const url = `${configServerUrl}/locks/${rolloutKey}`; + + const res = await request(url, { method: 'DELETE' }); + + if (res.statusCode === statusCodes.NO_CONTENT || res.statusCode === statusCodes.NOT_FOUND) { + debug('Lock released successfully'); + return; + } + + if (res.statusCode > statusCodes.NOT_FOUND) { + debug('Failed to release lock. Status code: %d', res.statusCode); + throw createConfigError('httpResponseError', 'Failed to release lock', await createHttpErrorPayload(res)); + } + + debug('Unexpected status code while releasing lock: %d', res.statusCode); +} diff --git a/src/options.ts b/src/options.ts index 421b74c..4c215d5 100644 --- a/src/options.ts +++ b/src/options.ts @@ -1,3 +1,4 @@ +import { hostname } from 'os'; import deepmerge from 'deepmerge'; import { BaseOptions, optionsSchema } from './types'; import { ajvOptionsValidator, validate } from './validator'; @@ -16,6 +17,9 @@ const defaultOptions: BaseOptions = { ignoreServerIsOlderVersionError: false, localConfigPath: './config', disableHotReload: false, + rolloutKey: hostname(), + rolloutLimit: 1, + lockTtlSeconds: 20, }; const envOptions: Partial> = { @@ -26,6 +30,9 @@ const envOptions: Partial> = { ignoreServerIsOlderVersionError: process.env.CONFIG_IGNORE_SERVER_IS_OLDER_VERSION_ERROR, pollIntervalMs: process.env.CONFIG_POLL_INTERVAL_MS, disableHotReload: process.env.CONFIG_DISABLE_HOT_RELOAD, + rolloutKey: process.env.CONFIG_ROLLOUT_KEY, + rolloutLimit: process.env.CONFIG_ROLLOUT_LIMIT, + lockTtlSeconds: process.env.CONFIG_LOCK_TTL_SECONDS, }; // in order to merge correctly the keys should not exist, undefined is not enough diff --git a/src/rollout/ChangeDetector.ts b/src/rollout/ChangeDetector.ts index 2d8ad58..5623fc1 100644 --- a/src/rollout/ChangeDetector.ts +++ b/src/rollout/ChangeDetector.ts @@ -2,6 +2,7 @@ import { JITTER_PERCENTAGE } from '../constants'; import { getRemoteConfig } from '../httpClient'; import { BaseOptions } from '../types'; import { createDebug } from '../utils/debug'; +import { LockCoordinator } from './LockCoordinator'; const debug = createDebug('changeDetector'); @@ -16,6 +17,7 @@ export class ChangeDetector { public constructor( private readonly schemaId: string, private readonly options: BaseOptions, + private readonly lockCoordinator: LockCoordinator, private readonly onConfigUpdate: (newRemoteConfig: object) => void | Promise, initialEtag: string ) { @@ -69,7 +71,15 @@ export class ChangeDetector { } debug('Config change detected'); - this.currentEtag = response.etag!; - await this.onConfigUpdate(response.config.config); + await this.lockCoordinator.acquire(); + try { + await this.onConfigUpdate(response.config.config); + this.currentEtag = response.etag; + } catch (err) { + debug('Error during callback execution: %s', (err as Error).message); + } + finally { + await this.lockCoordinator.release(); + } } } diff --git a/src/rollout/LockCoordinator.ts b/src/rollout/LockCoordinator.ts new file mode 100644 index 0000000..4867034 --- /dev/null +++ b/src/rollout/LockCoordinator.ts @@ -0,0 +1,39 @@ +import { acquireLock, releaseLock } from '../httpClient'; +import { BaseOptions } from '../types'; +import { createDebug } from '../utils/debug'; + +const debug = createDebug('lockCoordinator'); + +/** + * Class responsible for coordinating distributed locks to control rollout concurrency. + */ +export class LockCoordinator { + public constructor(private readonly options: BaseOptions) {} + + /** + * Acquires a distributed lock. If the lock is already held, it waits according to the Retry-After header. + */ + public async acquire(): Promise { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (true) { + const { acquired, retryAfter } = await acquireLock(this.options.rolloutKey, this.options.rolloutLimit, this.options.lockTtlSeconds); + + if (acquired) { + return; + } + + // If not acquired, wait for retryAfter (in seconds) or a default value of 1 second + // eslint-disable-next-line @typescript-eslint/no-magic-numbers + const waitTime = retryAfter! * 1000; + debug('Lock not acquired, waiting for %d ms', waitTime); + await new Promise((resolve) => setTimeout(resolve, waitTime)); + } + } + + /** + * Releases the distributed lock. + */ + public async release(): Promise { + await releaseLock(this.options.rolloutKey); + } +} diff --git a/src/types.ts b/src/types.ts index 4d4acaa..7d82581 100644 --- a/src/types.ts +++ b/src/types.ts @@ -77,6 +77,21 @@ export interface BaseOptions { * @default false */ disableHotReload: boolean; + /** + * The key used for the distributed lock. + * @default os.hostname() + */ + rolloutKey: string; + /** + * The maximum number of concurrent rollouts allowed. + * @default 1 + */ + rolloutLimit: number; + /** + * The time-to-live for the lock in seconds. + * @default 20 + */ + lockTtlSeconds: number; } /** @@ -118,6 +133,9 @@ export const optionsSchema: JSONSchemaType = { localConfigPath: { type: 'string', default: './config' }, pollIntervalMs: { type: 'integer', default: 30000 }, disableHotReload: { type: 'boolean', default: false }, + rolloutKey: { type: 'string' }, + rolloutLimit: { type: 'integer', minimum: 1, default: 1 }, + lockTtlSeconds: { type: 'integer', minimum: 1, default: 20 }, }, }; diff --git a/tests/config.spec.ts b/tests/config.spec.ts index a85bfab..4549697 100644 --- a/tests/config.spec.ts +++ b/tests/config.spec.ts @@ -1,3 +1,4 @@ +import { hostname } from 'node:os'; import { beforeEach, describe, expect, it, vi } from 'vitest'; import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici'; import { commonDbPartialV1, commonS3PartialV1 } from '@map-colonies/schemas'; @@ -227,6 +228,9 @@ describe('config', () => { pollIntervalMs: 3000, disableHotReload: false, ignoreServerIsOlderVersionError: false, + lockTtlSeconds: 20, + rolloutKey: hostname(), + rolloutLimit: 1, }); }); diff --git a/tests/lock.spec.ts b/tests/lock.spec.ts new file mode 100644 index 0000000..5fcaade --- /dev/null +++ b/tests/lock.spec.ts @@ -0,0 +1,123 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici'; +import { commonDbPartialV1 } from '@map-colonies/schemas'; +import { StatusCodes } from 'http-status-codes'; +import { config } from '../src/config'; +import { JITTER_PERCENTAGE } from '../src/constants'; + +const URL = 'http://localhost:8080'; +const DEFAULT_POLL_INTERVAL = 10000; + +describe('Distributed Semaphore Locking', () => { + let client: Interceptable; + + beforeEach(() => { + vi.useFakeTimers(); + const agent = new MockAgent(); + agent.disableNetConnect(); + + setGlobalDispatcher(agent); + client = agent.get(URL); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('should acquire lock before onChange and release it after (during hot-reload)', async () => { + // Arrange + const initialConfigData = { + configName: 'name', + schemaId: commonDbPartialV1.$id, + version: 1, + config: { host: 'initial-host' }, + createdAt: 0, + }; + const newConfigData = { + configName: 'name', + schemaId: commonDbPartialV1.$id, + version: 1, + config: { host: 'updated-host' }, + createdAt: 1, + }; + + // Cold-start requests + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + const onChangeMock = vi.fn(); + + // Act + await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + configServerUrl: URL, + localConfigPath: './tests/config', + pollIntervalMs: DEFAULT_POLL_INTERVAL, + onChange: onChangeMock, + rolloutKey: 'my-lock', + }); + + // Arrange (Hot-reload triggers) + client + .intercept({ + path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, + method: 'GET', + headers: { 'if-none-match': 'etag-1' }, + }) + .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); + // Mock Lock Acquisition + client + .intercept({ + path: '/locks', + method: 'POST', + body: JSON.stringify({ rolloutKey: 'my-lock', rolloutLimit: 1, lockTtlSeconds: 20 }), + }) + .reply(StatusCodes.CREATED); + // Mock Lock Release + client.intercept({ path: '/locks/my-lock', method: 'DELETE' }).reply(StatusCodes.NO_CONTENT); + + // Act (Wait for Poll) + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); + + // Assert + expect(onChangeMock).toHaveBeenCalledTimes(1); + }); + + it('should bypass lock during initial cold-start', async () => { + // Arrange + const initialConfigData = { + configName: 'name', + schemaId: commonDbPartialV1.$id, + version: 1, + config: { host: 'initial-host' }, + createdAt: 0, + }; + + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + // Act + const configInstance = await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + configServerUrl: URL, + localConfigPath: './tests/config', + pollIntervalMs: DEFAULT_POLL_INTERVAL, + onChange: vi.fn(), + }); + + // Assert + expect(configInstance.get('host')).toBe('initial-host'); + }); +}); diff --git a/tests/rollout.spec.ts b/tests/rollout.spec.ts index be36cdd..c348302 100644 --- a/tests/rollout.spec.ts +++ b/tests/rollout.spec.ts @@ -74,11 +74,17 @@ describe('Continuous Polling (ChangeDetector)', () => { }) .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); + // Mock Lock Acquisition and Release + client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.CREATED); + client.intercept({ path: /\/locks\/.*/, method: 'DELETE' }).reply(StatusCodes.NO_CONTENT); + // Act (Wait for Poll) await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); + // Use waitFor to allow async promises to resolve without running future timers + await vi.waitFor(() => expect(onChangeMock).toHaveBeenCalledTimes(1)); + // Assert (Updated State) - expect(onChangeMock).toHaveBeenCalledTimes(1); expect(onChangeMock).toHaveBeenCalledWith(expect.objectContaining({ host: 'updated-host' })); }); From f8f34c78fa24f56df77bd6f8e8654391033dd0f7 Mon Sep 17 00:00:00 2001 From: netanelC Date: Wed, 3 Jun 2026 19:59:51 +0300 Subject: [PATCH 06/14] fix: add callerID and key properly Co-authored-by: Copilot --- README.md | 4 ++ src/httpClient.ts | 16 +++----- src/options.ts | 4 +- src/rollout/LockCoordinator.ts | 9 +++- src/types.ts | 7 +++- tests/config.spec.ts | 4 +- tests/lock.spec.ts | 75 ++++++++++++++++++++++++++++++++-- 7 files changed, 101 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 650d063..96a8273 100644 --- a/README.md +++ b/README.md @@ -160,6 +160,10 @@ The following environment variables can be used to configure the options: - `CONFIG_IGNORE_SERVER_IS_OLDER_VERSION_ERROR`: Sets the `ignoreServerIsOlderVersionError` option. - `CONFIG_POLL_INTERVAL_MS`: Sets the `pollIntervalMs` option. - `CONFIG_DISABLE_HOT_RELOAD`: Sets the `disableHotReload` option. +- `CONFIG_ROLLOUT_KEY`: Sets the `rolloutKey` option. +- `CONFIG_CALLER_ID`: Sets the `callerId` option. +- `CONFIG_ROLLOUT_LIMIT`: Sets the `rolloutLimit` option. +- `CONFIG_LOCK_TTL_SECONDS`: Sets the `lockTtlSeconds` option. ## Configuration Merging and Validation diff --git a/src/httpClient.ts b/src/httpClient.ts index 2c5bb6f..9d53ec3 100644 --- a/src/httpClient.ts +++ b/src/httpClient.ts @@ -77,18 +77,14 @@ export async function getServerCapabilities(): Promise { return (await body.json()) as ServerCapabilities; } -export async function acquireLock( - rolloutKey: string, - rolloutLimit: number, - lockTtlSeconds: number -): Promise<{ acquired: boolean; retryAfter?: number }> { - debug('Acquiring lock for key %s with limit %d and ttl %d', rolloutKey, rolloutLimit, lockTtlSeconds); +export async function acquireLock(key: string, callerId: string, limit: number, ttl: number): Promise<{ acquired: boolean; retryAfter?: number }> { + debug('Acquiring lock for key %s (caller: %s) with limit %d and ttl %d', key, callerId, limit, ttl); const { configServerUrl } = getOptions(); const url = `${configServerUrl}/locks`; const res = await request(url, { method: 'POST', - body: JSON.stringify({ rolloutKey, rolloutLimit, lockTtlSeconds }), + body: JSON.stringify({ key, callerId, limit, ttl }), headers: { 'Content-Type': 'application/json' }, }); @@ -113,10 +109,10 @@ export async function acquireLock( return { acquired: false }; } -export async function releaseLock(rolloutKey: string): Promise { - debug('Releasing lock for key %s', rolloutKey); +export async function releaseLock(key: string, callerId: string): Promise { + debug('Releasing lock for key %s (caller: %s)', key, callerId); const { configServerUrl } = getOptions(); - const url = `${configServerUrl}/locks/${rolloutKey}`; + const url = `${configServerUrl}/locks/${key}/${callerId}`; const res = await request(url, { method: 'DELETE' }); diff --git a/src/options.ts b/src/options.ts index 4c215d5..36d384f 100644 --- a/src/options.ts +++ b/src/options.ts @@ -17,9 +17,10 @@ const defaultOptions: BaseOptions = { ignoreServerIsOlderVersionError: false, localConfigPath: './config', disableHotReload: false, - rolloutKey: hostname(), + rolloutKey: PACKAGE_NAME, rolloutLimit: 1, lockTtlSeconds: 20, + callerId: hostname(), }; const envOptions: Partial> = { @@ -33,6 +34,7 @@ const envOptions: Partial> = { rolloutKey: process.env.CONFIG_ROLLOUT_KEY, rolloutLimit: process.env.CONFIG_ROLLOUT_LIMIT, lockTtlSeconds: process.env.CONFIG_LOCK_TTL_SECONDS, + callerId: process.env.CONFIG_CALLER_ID, }; // in order to merge correctly the keys should not exist, undefined is not enough diff --git a/src/rollout/LockCoordinator.ts b/src/rollout/LockCoordinator.ts index 4867034..019ba24 100644 --- a/src/rollout/LockCoordinator.ts +++ b/src/rollout/LockCoordinator.ts @@ -16,7 +16,12 @@ export class LockCoordinator { public async acquire(): Promise { // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition while (true) { - const { acquired, retryAfter } = await acquireLock(this.options.rolloutKey, this.options.rolloutLimit, this.options.lockTtlSeconds); + const { acquired, retryAfter } = await acquireLock( + this.options.rolloutKey, + this.options.callerId, + this.options.rolloutLimit, + this.options.lockTtlSeconds + ); if (acquired) { return; @@ -34,6 +39,6 @@ export class LockCoordinator { * Releases the distributed lock. */ public async release(): Promise { - await releaseLock(this.options.rolloutKey); + await releaseLock(this.options.rolloutKey, this.options.callerId); } } diff --git a/src/types.ts b/src/types.ts index 7d82581..3717697 100644 --- a/src/types.ts +++ b/src/types.ts @@ -79,7 +79,6 @@ export interface BaseOptions { disableHotReload: boolean; /** * The key used for the distributed lock. - * @default os.hostname() */ rolloutKey: string; /** @@ -92,6 +91,11 @@ export interface BaseOptions { * @default 20 */ lockTtlSeconds: number; + /** + * The ID of the caller requesting the lock. + * @default os.hostname() + */ + callerId: string; } /** @@ -134,6 +138,7 @@ export const optionsSchema: JSONSchemaType = { pollIntervalMs: { type: 'integer', default: 30000 }, disableHotReload: { type: 'boolean', default: false }, rolloutKey: { type: 'string' }, + callerId: { type: 'string' }, rolloutLimit: { type: 'integer', minimum: 1, default: 1 }, lockTtlSeconds: { type: 'integer', minimum: 1, default: 20 }, }, diff --git a/tests/config.spec.ts b/tests/config.spec.ts index 4549697..68b7baa 100644 --- a/tests/config.spec.ts +++ b/tests/config.spec.ts @@ -4,6 +4,7 @@ import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici'; import { commonDbPartialV1, commonS3PartialV1 } from '@map-colonies/schemas'; import { StatusCodes } from 'http-status-codes'; import { config } from '../src/config'; +import { PACKAGE_NAME } from '../src/constants'; const URL = 'http://localhost:8080'; describe('config', () => { @@ -229,7 +230,8 @@ describe('config', () => { disableHotReload: false, ignoreServerIsOlderVersionError: false, lockTtlSeconds: 20, - rolloutKey: hostname(), + rolloutKey: PACKAGE_NAME, + callerId: hostname(), rolloutLimit: 1, }); }); diff --git a/tests/lock.spec.ts b/tests/lock.spec.ts index 5fcaade..6ac66cf 100644 --- a/tests/lock.spec.ts +++ b/tests/lock.spec.ts @@ -61,6 +61,7 @@ describe('Distributed Semaphore Locking', () => { pollIntervalMs: DEFAULT_POLL_INTERVAL, onChange: onChangeMock, rolloutKey: 'my-lock', + callerId: 'my-caller', }); // Arrange (Hot-reload triggers) @@ -76,17 +77,19 @@ describe('Distributed Semaphore Locking', () => { .intercept({ path: '/locks', method: 'POST', - body: JSON.stringify({ rolloutKey: 'my-lock', rolloutLimit: 1, lockTtlSeconds: 20 }), + body: JSON.stringify({ key: 'my-lock', callerId: 'my-caller', limit: 1, ttl: 20 }), }) .reply(StatusCodes.CREATED); + // Mock Lock Release - client.intercept({ path: '/locks/my-lock', method: 'DELETE' }).reply(StatusCodes.NO_CONTENT); + client.intercept({ path: '/locks/my-lock/my-caller', method: 'DELETE' }).reply(StatusCodes.NO_CONTENT); // Act (Wait for Poll) await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); + await vi.waitFor(() => expect(onChangeMock).toHaveBeenCalledTimes(1)); // Assert - expect(onChangeMock).toHaveBeenCalledTimes(1); + expect(onChangeMock).toHaveBeenCalledWith(expect.objectContaining({ host: 'updated-host' })); }); it('should bypass lock during initial cold-start', async () => { @@ -120,4 +123,70 @@ describe('Distributed Semaphore Locking', () => { // Assert expect(configInstance.get('host')).toBe('initial-host'); }); + + it('should wait and retry if lock acquisition returns 423 Locked with Retry-After', async () => { + // Arrange + const initialConfigData = { + configName: 'name', + schemaId: commonDbPartialV1.$id, + version: 1, + config: { host: 'initial-host' }, + createdAt: 0, + }; + const newConfigData = { + configName: 'name', + schemaId: commonDbPartialV1.$id, + version: 1, + config: { host: 'updated-host' }, + createdAt: 1, + }; + + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + const onChangeMock = vi.fn(); + + await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + configServerUrl: URL, + localConfigPath: './tests/config', + pollIntervalMs: DEFAULT_POLL_INTERVAL, + onChange: onChangeMock, + rolloutKey: 'my-lock', + callerId: 'my-caller', + }); + + // Arrange (Hot-reload) + client + .intercept({ + path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, + method: 'GET', + }) + .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); + + // Mock first lock attempt failing with 423 + client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.LOCKED, {}, { headers: { 'retry-after': '2' } }); + + // Mock second lock attempt succeeding + client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.CREATED); + + client.intercept({ path: '/locks/my-lock/my-caller', method: 'DELETE' }).reply(StatusCodes.NO_CONTENT); + + // Act (Wait for Poll) + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); + + // Wait for the retry interval (2 seconds) + await vi.advanceTimersByTimeAsync(2001); + + await vi.waitFor(() => expect(onChangeMock).toHaveBeenCalledTimes(1)); + + // Assert + expect(onChangeMock).toHaveBeenCalledWith(expect.objectContaining({ host: 'updated-host' })); + }); }); From 08ff25ba251e7351b8a1be0262c1a52d15b26775 Mon Sep 17 00:00:00 2001 From: netanelC Date: Thu, 4 Jun 2026 15:43:29 +0300 Subject: [PATCH 07/14] fix: tests --- README.md | 36 ++++++++++++-- src/httpClient.ts | 19 ++++---- src/rollout/ChangeDetector.ts | 24 ++++++--- src/types.ts | 12 ++--- tests/lock.spec.ts | 22 ++++++--- tests/rollout.spec.ts | 92 ++++++++++++++++++++++++++++++++--- 6 files changed, 165 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 96a8273..e476988 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,8 @@ const configInstance = await config({ pollIntervalMs: 30000, onChange: (updatedConfig) => { console.log('Configuration updated:', updatedConfig); - // Re-initialize DB connections, etc. + // Note: The SDK will forcefully terminate the process + // immediately after this callback completes to trigger a fresh restart. } }); @@ -144,6 +145,34 @@ This package allows you to configure various options for loading and managing co - **Description**: The polling interval in milliseconds for hot-reloading. - **Environment Variable**: `CONFIG_POLL_INTERVAL_MS` +### `rolloutKey` +- **Type**: `string` +- **Optional**: `true` +- **Default**: `PACKAGE_NAME` +- **Description**: The key used for the distributed lock (opaque identifier for the resource). +- **Environment Variable**: `CONFIG_ROLLOUT_KEY` + +### `callerId` +- **Type**: `string` +- **Optional**: `true` +- **Default**: `os.hostname()` +- **Description**: The unique ID of the instance holding the lock. +- **Environment Variable**: `CONFIG_CALLER_ID` + +### `rolloutLimit` +- **Type**: `number` +- **Optional**: `true` +- **Default**: `1` +- **Description**: The maximum number of concurrent rollouts allowed. +- **Environment Variable**: `CONFIG_ROLLOUT_LIMIT` + +### `lockTtlSeconds` +- **Type**: `number` +- **Optional**: `true` +- **Default**: `20` +- **Description**: The time-to-live for the lock in seconds. +- **Environment Variable**: `CONFIG_LOCK_TTL_SECONDS` + ### `onChange` - **Type**: `(config: T) => void | Promise` - **Optional**: `true` @@ -177,8 +206,9 @@ The package supports merging configurations from multiple sources (local, remote 1. The remote configuration is fetched from the server specified by the `configServerUrl` option. 2. If the `version` is set to `'latest'`, the latest version of the configuration is fetched. Otherwise, the specified version is fetched. -3. **Continuous Polling:** If an `onChange` callback is provided, the SDK continuously polls the server using HTTP ETags (`If-None-Match`). When a `200 OK` is received (indicating a change), the configuration is automatically re-merged, validated, and the callback is triggered. `304 Not Modified` responses are silently ignored. To prevent cluster-wide traffic spikes (thundering herd), a **randomized jitter of +/- 15%** is automatically applied to each polling cycle. -4. **Distributed Semaphore Locking:** To control rollout concurrency across a cluster, the SDK implements a distributed locking mechanism. +3. **Continuous Polling:** If an `onChange` callback is provided, the SDK continuously polls the server using HTTP ETags (`If-None-Match`). When a `200 OK` is received (indicating a change), the configuration is automatically re-merged and the callback is triggered. `304 Not Modified` responses are silently ignored. To prevent cluster-wide traffic spikes (thundering herd), a **randomized jitter of +/- 15%** is automatically applied to each polling cycle. +4. **Hard Termination Lifecycle:** The SDK follows a "Stupid Client" philosophy. Upon a successful configuration fetch and lock acquisition, it executes the `onChange` callback. Regardless of whether the callback succeeds or fails, the SDK will gracefully release the lock and then **immediately terminate the process (`process.exit(0)`)**. This delegates state reconciliation and service recovery to the container orchestrator (e.g., Kubernetes). +5. **Distributed Semaphore Locking:** To control rollout concurrency across a cluster, the SDK implements a distributed locking mechanism using four parameters: `key` (opaque identifier), `callerId` (instance ID), `ttl` (lock duration), and `limit` (max concurrent locks). - **Lock Acquisition:** Before triggering the `onChange` callback during a hot-reload, the SDK attempts to acquire a lock from the configuration server. - **Lock Release:** The lock is automatically released after the `onChange` callback completes (whether it succeeds or throws). - **423 Locked & Retry-After:** If the server returns `423 Locked`, it indicates the rollout limit has been reached. The SDK will respect the `Retry-After` header provided by the server, waiting for the specified duration before attempting to acquire the lock again. diff --git a/src/httpClient.ts b/src/httpClient.ts index 9d53ec3..d38586a 100644 --- a/src/httpClient.ts +++ b/src/httpClient.ts @@ -114,17 +114,16 @@ export async function releaseLock(key: string, callerId: string): Promise const { configServerUrl } = getOptions(); const url = `${configServerUrl}/locks/${key}/${callerId}`; - const res = await request(url, { method: 'DELETE' }); + try { + const res = await request(url, { method: 'DELETE' }); - if (res.statusCode === statusCodes.NO_CONTENT || res.statusCode === statusCodes.NOT_FOUND) { - debug('Lock released successfully'); - return; - } + if (res.statusCode === statusCodes.NO_CONTENT || res.statusCode === statusCodes.NOT_FOUND) { + debug('Lock released successfully'); + return; + } - if (res.statusCode > statusCodes.NOT_FOUND) { - debug('Failed to release lock. Status code: %d', res.statusCode); - throw createConfigError('httpResponseError', 'Failed to release lock', await createHttpErrorPayload(res)); + debug('Unexpected status code while releasing lock: %d', res.statusCode); + } catch (error) { + debug('Error during best-effort lock release (swallowed): %s', (error as Error).message); } - - debug('Unexpected status code while releasing lock: %d', res.statusCode); } diff --git a/src/rollout/ChangeDetector.ts b/src/rollout/ChangeDetector.ts index 5623fc1..bf77cbb 100644 --- a/src/rollout/ChangeDetector.ts +++ b/src/rollout/ChangeDetector.ts @@ -11,7 +11,7 @@ const debug = createDebug('changeDetector'); * If a change is detected, it invokes the provided callback with the new configuration. */ export class ChangeDetector { - private currentEtag: string; + private readonly currentEtag: string; private timer?: NodeJS.Timeout; public constructor( @@ -71,15 +71,23 @@ export class ChangeDetector { } debug('Config change detected'); - await this.lockCoordinator.acquire(); try { - await this.onConfigUpdate(response.config.config); - this.currentEtag = response.etag; + await this.lockCoordinator.acquire(); + try { + await this.onConfigUpdate(response.config.config); + } catch (err) { + debug('Error during onChange callback: %s', (err as Error).message); + } finally { + try { + await this.lockCoordinator.release(); + } catch (releaseErr) { + debug('Best-effort lock release failed: %s', (releaseErr as Error).message); + } + debug('Hard termination triggered. Exiting process.'); + process.exit(0); + } } catch (err) { - debug('Error during callback execution: %s', (err as Error).message); - } - finally { - await this.lockCoordinator.release(); + debug('Error during lock acquisition: %s', (err as Error).message); } } } diff --git a/src/types.ts b/src/types.ts index 3717697..cff2b3d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -78,9 +78,14 @@ export interface BaseOptions { */ disableHotReload: boolean; /** - * The key used for the distributed lock. + * The key used for the distributed lock (opaque identifier for the resource). */ rolloutKey: string; + /** + * The unique ID of the instance holding the lock. + * @default os.hostname() + */ + callerId: string; /** * The maximum number of concurrent rollouts allowed. * @default 1 @@ -91,11 +96,6 @@ export interface BaseOptions { * @default 20 */ lockTtlSeconds: number; - /** - * The ID of the caller requesting the lock. - * @default os.hostname() - */ - callerId: string; } /** diff --git a/tests/lock.spec.ts b/tests/lock.spec.ts index 6ac66cf..8cd34e1 100644 --- a/tests/lock.spec.ts +++ b/tests/lock.spec.ts @@ -1,4 +1,4 @@ -import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, MockInstance, vi } from 'vitest'; import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici'; import { commonDbPartialV1 } from '@map-colonies/schemas'; import { StatusCodes } from 'http-status-codes'; @@ -10,9 +10,12 @@ const DEFAULT_POLL_INTERVAL = 10000; describe('Distributed Semaphore Locking', () => { let client: Interceptable; + let exitSpy: MockInstance; + const instances: { stop: () => void }[] = []; beforeEach(() => { vi.useFakeTimers(); + exitSpy = vi.spyOn(process, 'exit').mockImplementation(() => undefined as never); const agent = new MockAgent(); agent.disableNetConnect(); @@ -21,7 +24,10 @@ describe('Distributed Semaphore Locking', () => { }); afterEach(() => { + instances.forEach((instance) => instance.stop()); + instances.length = 0; vi.restoreAllMocks(); + vi.clearAllMocks(); }); it('should acquire lock before onChange and release it after (during hot-reload)', async () => { @@ -52,7 +58,7 @@ describe('Distributed Semaphore Locking', () => { const onChangeMock = vi.fn(); // Act - await config({ + const configInstance = await config({ configName: 'name', version: 1, schema: commonDbPartialV1, @@ -63,6 +69,7 @@ describe('Distributed Semaphore Locking', () => { rolloutKey: 'my-lock', callerId: 'my-caller', }); + instances.push(configInstance); // Arrange (Hot-reload triggers) client @@ -72,6 +79,7 @@ describe('Distributed Semaphore Locking', () => { headers: { 'if-none-match': 'etag-1' }, }) .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); + // Mock Lock Acquisition client .intercept({ @@ -86,7 +94,7 @@ describe('Distributed Semaphore Locking', () => { // Act (Wait for Poll) await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); - await vi.waitFor(() => expect(onChangeMock).toHaveBeenCalledTimes(1)); + await vi.waitFor(() => expect(exitSpy).toHaveBeenCalledWith(0)); // Assert expect(onChangeMock).toHaveBeenCalledWith(expect.objectContaining({ host: 'updated-host' })); @@ -119,9 +127,10 @@ describe('Distributed Semaphore Locking', () => { pollIntervalMs: DEFAULT_POLL_INTERVAL, onChange: vi.fn(), }); - + instances.push(configInstance); // Assert expect(configInstance.get('host')).toBe('initial-host'); + expect(exitSpy).not.toHaveBeenCalled(); }); it('should wait and retry if lock acquisition returns 423 Locked with Retry-After', async () => { @@ -150,7 +159,7 @@ describe('Distributed Semaphore Locking', () => { const onChangeMock = vi.fn(); - await config({ + const configInstance = await config({ configName: 'name', version: 1, schema: commonDbPartialV1, @@ -161,6 +170,7 @@ describe('Distributed Semaphore Locking', () => { rolloutKey: 'my-lock', callerId: 'my-caller', }); + instances.push(configInstance); // Arrange (Hot-reload) client @@ -184,7 +194,7 @@ describe('Distributed Semaphore Locking', () => { // Wait for the retry interval (2 seconds) await vi.advanceTimersByTimeAsync(2001); - await vi.waitFor(() => expect(onChangeMock).toHaveBeenCalledTimes(1)); + await vi.waitFor(() => expect(exitSpy).toHaveBeenCalledWith(0)); // Assert expect(onChangeMock).toHaveBeenCalledWith(expect.objectContaining({ host: 'updated-host' })); diff --git a/tests/rollout.spec.ts b/tests/rollout.spec.ts index c348302..28371ae 100644 --- a/tests/rollout.spec.ts +++ b/tests/rollout.spec.ts @@ -1,4 +1,4 @@ -import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, MockInstance, vi } from 'vitest'; import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici'; import { commonDbPartialV1 } from '@map-colonies/schemas'; import { StatusCodes } from 'http-status-codes'; @@ -10,9 +10,12 @@ const DEFAULT_POLL_INTERVAL = 10000; describe('Continuous Polling (ChangeDetector)', () => { let client: Interceptable; + let exitSpy: MockInstance; + const instances: { stop: () => void }[] = []; beforeEach(() => { vi.useFakeTimers(); + exitSpy = vi.spyOn(process, 'exit').mockImplementation(() => undefined as never); const agent = new MockAgent(); agent.disableNetConnect(); @@ -21,10 +24,13 @@ describe('Continuous Polling (ChangeDetector)', () => { }); afterEach(() => { + instances.forEach((instance) => instance.stop()); + instances.length = 0; vi.restoreAllMocks(); + vi.clearAllMocks(); }); - it('should trigger onChange when polling returns a new config (200 OK)', async () => { + it('should trigger onChange and exit process when polling returns a new config (200 OK)', async () => { // Arrange const initialConfigData = { configName: 'name', @@ -60,6 +66,7 @@ describe('Continuous Polling (ChangeDetector)', () => { pollIntervalMs: DEFAULT_POLL_INTERVAL, onChange: onChangeMock, }); + instances.push(configInstance); // Assert (Initial State) expect(configInstance.get('host')).toBe('initial-host'); @@ -81,14 +88,15 @@ describe('Continuous Polling (ChangeDetector)', () => { // Act (Wait for Poll) await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); - // Use waitFor to allow async promises to resolve without running future timers + // Use waitFor to allow async promises to resolve await vi.waitFor(() => expect(onChangeMock).toHaveBeenCalledTimes(1)); - // Assert (Updated State) + // Assert (Updated State & Hard Termination) expect(onChangeMock).toHaveBeenCalledWith(expect.objectContaining({ host: 'updated-host' })); + expect(exitSpy).toHaveBeenCalledWith(0); }); - it('should not trigger onChange when polling returns 304 Not Modified', async () => { + it('should not trigger onChange or exit when polling returns 304 Not Modified', async () => { // Arrange const initialConfigData = { configName: 'name', @@ -117,6 +125,7 @@ describe('Continuous Polling (ChangeDetector)', () => { pollIntervalMs: DEFAULT_POLL_INTERVAL, onChange: onChangeMock, }); + instances.push(configInstance); // Arrange (Setup 304 response) client @@ -132,6 +141,7 @@ describe('Continuous Polling (ChangeDetector)', () => { // Assert expect(onChangeMock).not.toHaveBeenCalled(); + expect(exitSpy).not.toHaveBeenCalled(); expect(configInstance.get('host')).toBe('initial-host'); }); @@ -163,6 +173,7 @@ describe('Continuous Polling (ChangeDetector)', () => { pollIntervalMs: DEFAULT_POLL_INTERVAL, onChange: onChangeMock, }); + instances.push(configInstance); // Act configInstance.stop(); @@ -172,6 +183,7 @@ describe('Continuous Polling (ChangeDetector)', () => { // Assert expect(onChangeMock).not.toHaveBeenCalled(); + expect(exitSpy).not.toHaveBeenCalled(); }); it('should apply randomized jitter within boundaries over 10 cycles', async () => { @@ -203,7 +215,7 @@ describe('Continuous Polling (ChangeDetector)', () => { const setTimeoutSpy = vi.spyOn(global, 'setTimeout'); // Act - await config({ + const configInstance = await config({ configName: 'name', version: 1, schema: commonDbPartialV1, @@ -212,6 +224,7 @@ describe('Continuous Polling (ChangeDetector)', () => { pollIntervalMs: DEFAULT_POLL_INTERVAL, onChange: vi.fn(), }); + instances.push(configInstance); const maxJitter = DEFAULT_POLL_INTERVAL * JITTER_PERCENTAGE; const minWait = DEFAULT_POLL_INTERVAL - maxJitter; @@ -255,7 +268,7 @@ describe('Continuous Polling (ChangeDetector)', () => { const onChangeMock = vi.fn(); // Act - await config({ + const configInstance = await config({ configName: 'name', version: 1, schema: commonDbPartialV1, @@ -265,11 +278,76 @@ describe('Continuous Polling (ChangeDetector)', () => { onChange: onChangeMock, disableHotReload: true, }); + instances.push(configInstance); // Advance time beyond the polling interval await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE) + 1); // Assert expect(onChangeMock).not.toHaveBeenCalled(); + expect(exitSpy).not.toHaveBeenCalled(); + }); + + it('should exit with 0 even if onChange throws an error', async () => { + // Arrange + const initialConfigData = { + configName: 'name', + schemaId: commonDbPartialV1.$id, + version: 1, + config: { host: 'initial-host' }, + createdAt: 0, + }; + const badConfigData = { + configName: 'name', + schemaId: commonDbPartialV1.$id, + version: 1, + config: { host: 'bad-host' }, + createdAt: 1, + }; + + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'initial-etag' } }); + + const onChangeMock = vi.fn().mockImplementation(() => { + throw new Error('Boom!'); + }); + + // Act + const configInstance = await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + configServerUrl: URL, + localConfigPath: './tests/config', + pollIntervalMs: DEFAULT_POLL_INTERVAL, + onChange: onChangeMock, + }); + instances.push(configInstance); + + // Act (Trigger poll with bad config) + client + .intercept({ + path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, + method: 'GET', + headers: { 'if-none-match': 'initial-etag' }, + }) + .reply(StatusCodes.OK, badConfigData, { headers: { etag: 'bad-etag' } }); + + // Mock Lock Acquisition and Release + client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.CREATED); + client.intercept({ path: /\/locks\/.*/, method: 'DELETE' }).reply(StatusCodes.NO_CONTENT); + + // Act (Wait for Poll) + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); + + // Use waitFor to allow async promises to resolve + await vi.waitFor(() => expect(exitSpy).toHaveBeenCalledWith(0)); + + // Assert (Fails once but still exits) + expect(onChangeMock).toHaveBeenCalledTimes(1); }); }); From c5d7f3a48fb2769c553cde8f37cad888864a8fb7 Mon Sep 17 00:00:00 2001 From: netanelC Date: Sat, 6 Jun 2026 22:50:40 +0300 Subject: [PATCH 08/14] fix: server responses Co-authored-by: Copilot --- src/httpClient.ts | 13 +++++++++---- tests/lock.spec.ts | 13 +++++-------- tests/rollout.spec.ts | 6 +++--- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/httpClient.ts b/src/httpClient.ts index d38586a..67f6883 100644 --- a/src/httpClient.ts +++ b/src/httpClient.ts @@ -88,7 +88,7 @@ export async function acquireLock(key: string, callerId: string, limit: number, headers: { 'Content-Type': 'application/json' }, }); - if (res.statusCode === statusCodes.CREATED) { + if (res.statusCode === statusCodes.OK) { debug('Lock acquired successfully'); return { acquired: true }; } @@ -100,8 +100,8 @@ export async function acquireLock(key: string, callerId: string, limit: number, return { acquired: false, retryAfter }; } - if (res.statusCode > statusCodes.NOT_FOUND) { - debug('Failed to acquire lock. Status code: %d', res.statusCode); + if (res.statusCode === statusCodes.BAD_REQUEST) { + debug('Failed to acquire lock. Bad request'); throw createConfigError('httpResponseError', 'Failed to acquire lock', await createHttpErrorPayload(res)); } @@ -117,11 +117,16 @@ export async function releaseLock(key: string, callerId: string): Promise try { const res = await request(url, { method: 'DELETE' }); - if (res.statusCode === statusCodes.NO_CONTENT || res.statusCode === statusCodes.NOT_FOUND) { + if (res.statusCode === statusCodes.NO_CONTENT) { debug('Lock released successfully'); return; } + if (res.statusCode === statusCodes.BAD_REQUEST) { + debug('Failed to release lock. Bad request'); + throw createConfigError('httpResponseError', 'Failed to release lock', await createHttpErrorPayload(res)); + } + debug('Unexpected status code while releasing lock: %d', res.statusCode); } catch (error) { debug('Error during best-effort lock release (swallowed): %s', (error as Error).message); diff --git a/tests/lock.spec.ts b/tests/lock.spec.ts index d5ea89f..d22e644 100644 --- a/tests/lock.spec.ts +++ b/tests/lock.spec.ts @@ -7,9 +7,6 @@ import { JITTER_PERCENTAGE } from '../src/constants'; const URL = 'http://localhost:8080'; const DEFAULT_POLL_INTERVAL = 10000; -const LOCK_TTL_SECONDS = 20; -const RETRY_AFTER_HEADER_VALUE = '2'; -const RETRY_AFTER_WAIT_MS = 2001; describe('Distributed Semaphore Locking', () => { let client: Interceptable; @@ -88,9 +85,9 @@ describe('Distributed Semaphore Locking', () => { .intercept({ path: '/locks', method: 'POST', - body: JSON.stringify({ key: 'my-lock', callerId: 'my-caller', limit: 1, ttl: LOCK_TTL_SECONDS }), + body: JSON.stringify({ key: 'my-lock', callerId: 'my-caller', limit: 1, ttl: 20 }), }) - .reply(StatusCodes.CREATED); + .reply(StatusCodes.OK); // Mock Lock Release client.intercept({ path: '/locks/my-lock/my-caller', method: 'DELETE' }).reply(StatusCodes.NO_CONTENT); @@ -185,16 +182,16 @@ describe('Distributed Semaphore Locking', () => { .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); // Mock first lock attempt failing with 423 - client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.LOCKED, {}, { headers: { 'retry-after': RETRY_AFTER_HEADER_VALUE } }); + client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.LOCKED, {}, { headers: { 'retry-after': '2' } }); // Mock second lock attempt succeeding - client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.CREATED); + client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.OK); client.intercept({ path: '/locks/my-lock/my-caller', method: 'DELETE' }).reply(StatusCodes.NO_CONTENT); // Act (Wait for Poll) await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); // Wait for the retry interval (2 seconds) - await vi.advanceTimersByTimeAsync(RETRY_AFTER_WAIT_MS); + await vi.advanceTimersByTimeAsync(2001); await vi.waitFor(() => expect(exitSpy).toHaveBeenCalledWith(0)); // Assert diff --git a/tests/rollout.spec.ts b/tests/rollout.spec.ts index 5d90d53..036337c 100644 --- a/tests/rollout.spec.ts +++ b/tests/rollout.spec.ts @@ -82,7 +82,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); // Mock Lock Acquisition and Release - client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.CREATED); + client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.OK); client.intercept({ path: /\/locks\/.*/, method: 'DELETE' }).reply(StatusCodes.NO_CONTENT); // Act (Wait for Poll) @@ -333,7 +333,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); // Mock Lock Acquisition and Release - client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.CREATED); + client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.OK); client.intercept({ path: /\/locks\/.*/, method: 'DELETE' }).reply(StatusCodes.NO_CONTENT); // Act (Wait for Poll) @@ -396,7 +396,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.OK, badConfigData, { headers: { etag: 'bad-etag' } }); // Mock Lock Acquisition and Release - client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.CREATED); + client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.OK); client.intercept({ path: /\/locks\/.*/, method: 'DELETE' }).reply(StatusCodes.NO_CONTENT); // Act (Wait for Poll) From 5c7c50f1df1809529ad5de3b8b434c70b134b35b Mon Sep 17 00:00:00 2001 From: netanelC Date: Thu, 18 Jun 2026 14:53:52 +0300 Subject: [PATCH 09/14] chore: wip --- src/config.ts | 3 ++- src/rollout/ChangeDetector.ts | 20 ++++---------------- src/types.ts | 2 ++ tests/lock.spec.ts | 15 +++++++++------ tests/rollout.spec.ts | 14 +++++++++++--- 5 files changed, 28 insertions(+), 26 deletions(-) diff --git a/src/config.ts b/src/config.ts index a199155..9e14551 100644 --- a/src/config.ts +++ b/src/config.ts @@ -31,7 +31,7 @@ const semverSatisfies = '2.x'; * polling mechanism. When a configuration change is detected, the SDK * will execute the `onChange` callback (if provided). * Depending on the `terminatePod` option (default `true`), it will either - * stop polling and wait for the user to terminate the process, or continue polling. + * stop polling and wait for the user to terminate the process, or release the lock and continue polling. * * @template T - The type of the configuration schema. * @param {ConfigOptions} options - The options for retrieving the configuration. @@ -134,6 +134,7 @@ export async function config( // Setup polling if (!disableHotReload) { const lockCoordinator = new LockCoordinator(initOptions); + await lockCoordinator.release(); changeDetector = new ChangeDetector(baseSchema.$id, initOptions, currentEtag, lockCoordinator, onChange); changeDetector.start(); } diff --git a/src/rollout/ChangeDetector.ts b/src/rollout/ChangeDetector.ts index dee679c..3e87fe9 100644 --- a/src/rollout/ChangeDetector.ts +++ b/src/rollout/ChangeDetector.ts @@ -77,23 +77,10 @@ export class ChangeDetector { debug('Config change detected. Stopping polling. New etag: %s', response.etag); this.stop(); + await this.lockCoordinator.acquire(); try { - await this.lockCoordinator.acquire(); - try { - if (this.onConfigUpdate) { - await this.onConfigUpdate(); - } - } catch (err) { - debug('Error during onChange callback: %s', (err as Error).message); - } finally { - try { - await this.lockCoordinator.release(); - } catch (releaseErr) { - debug('Best-effort lock release failed: %s', (releaseErr as Error).message); - } - debug('Hard termination triggered. Exiting process.'); - this.stop(); - process.exit(0); + if (this.onConfigUpdate) { + await this.onConfigUpdate(); } } catch (err) { debug('Error during lock acquisition: %s', (err as Error).message); @@ -109,6 +96,7 @@ export class ChangeDetector { this.currentEtag = response.etag; debug('Pod termination is not expected.'); this.start(); + await this.lockCoordinator.release(); } } } diff --git a/src/types.ts b/src/types.ts index a9162b5..aeac89a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -98,6 +98,8 @@ export interface BaseOptions { lockTtlSeconds: number; /** * Indicates whether the pod will be terminated after an update. + * If true, the SDK will stop polling and not release the lock. + * If false, the SDK will release the lock and continue polling. * @default true */ terminatePod: boolean; diff --git a/tests/lock.spec.ts b/tests/lock.spec.ts index d22e644..b4d3df0 100644 --- a/tests/lock.spec.ts +++ b/tests/lock.spec.ts @@ -1,4 +1,4 @@ -import { afterEach, beforeEach, describe, expect, it, MockInstance, vi } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici'; import { commonDbPartialV1 } from '@map-colonies/schemas'; import { StatusCodes } from 'http-status-codes'; @@ -10,17 +10,21 @@ const DEFAULT_POLL_INTERVAL = 10000; describe('Distributed Semaphore Locking', () => { let client: Interceptable; - let exitSpy: MockInstance; const instances: { stop: () => void }[] = []; beforeEach(() => { vi.useFakeTimers(); - exitSpy = vi.spyOn(process, 'exit').mockImplementation(() => undefined as never); const agent = new MockAgent(); agent.disableNetConnect(); setGlobalDispatcher(agent); client = agent.get(URL); + + // Add default mock for lock release on startup + client + .intercept({ path: /\/locks\/.*/, method: 'DELETE' }) + .reply(StatusCodes.NO_CONTENT) + .persist(); }); afterEach(() => { @@ -94,7 +98,7 @@ describe('Distributed Semaphore Locking', () => { // Act (Wait for Poll) await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); - await vi.waitFor(() => expect(exitSpy).toHaveBeenCalledWith(0)); + await vi.waitFor(() => expect(onChangeMock).toHaveBeenCalled()); // Assert expect(onChangeMock).toHaveBeenCalled(); @@ -131,7 +135,6 @@ describe('Distributed Semaphore Locking', () => { // Assert expect(configInstance.get('host')).toBe('initial-host'); - expect(exitSpy).not.toHaveBeenCalled(); }); it('should wait and retry if lock acquisition returns 423 Locked with Retry-After', async () => { @@ -192,7 +195,7 @@ describe('Distributed Semaphore Locking', () => { // Wait for the retry interval (2 seconds) await vi.advanceTimersByTimeAsync(2001); - await vi.waitFor(() => expect(exitSpy).toHaveBeenCalledWith(0)); + await vi.waitFor(() => expect(onChangeMock).toHaveBeenCalled()); // Assert expect(onChangeMock).toHaveBeenCalled(); diff --git a/tests/rollout.spec.ts b/tests/rollout.spec.ts index 6a6e125..683946d 100644 --- a/tests/rollout.spec.ts +++ b/tests/rollout.spec.ts @@ -19,6 +19,12 @@ describe('Continuous Polling (ChangeDetector)', () => { setGlobalDispatcher(agent); client = agent.get(URL); + + // Add default mock for lock release on startup + client + .intercept({ path: /\/locks\/.*/, method: 'DELETE' }) + .reply(StatusCodes.NO_CONTENT) + .persist(); }); afterEach(() => { @@ -86,9 +92,11 @@ describe('Continuous Polling (ChangeDetector)', () => { // Act (Wait for Poll) await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); - // Assert (Updated State) - expect(onChangeMock).toHaveBeenCalledTimes(1); - expect(onChangeMock).toHaveBeenCalledWith(); + // Use waitFor to allow async promises to resolve + await vi.waitFor(() => expect(onChangeMock).toHaveBeenCalled()); + + // Assert (Updated State & Hard Termination) + expect(onChangeMock).toHaveBeenCalled(); }); it('should not trigger onChange when polling returns 304 Not Modified', async () => { From 07112635f5491c224a8a4b99bd9cf42a647329f5 Mon Sep 17 00:00:00 2001 From: netanelC Date: Thu, 18 Jun 2026 18:47:09 +0300 Subject: [PATCH 10/14] test: add tests --- tests/lock.spec.ts | 129 ++++++++++++++++++++++++------------------ tests/rollout.spec.ts | 124 ++++++++++++---------------------------- 2 files changed, 108 insertions(+), 145 deletions(-) diff --git a/tests/lock.spec.ts b/tests/lock.spec.ts index b4d3df0..3512bae 100644 --- a/tests/lock.spec.ts +++ b/tests/lock.spec.ts @@ -4,13 +4,15 @@ import { commonDbPartialV1 } from '@map-colonies/schemas'; import { StatusCodes } from 'http-status-codes'; import { config } from '../src/config'; import { JITTER_PERCENTAGE } from '../src/constants'; +import * as httpClient from '../src/httpClient'; +import { createMockConfigData } from './mocks'; const URL = 'http://localhost:8080'; -const DEFAULT_POLL_INTERVAL = 10000; +const DEFAULT_POLL_INTERVAL = 30000; describe('Distributed Semaphore Locking', () => { let client: Interceptable; - const instances: { stop: () => void }[] = []; + const onChangeMock = vi.fn(); beforeEach(() => { vi.useFakeTimers(); @@ -28,28 +30,13 @@ describe('Distributed Semaphore Locking', () => { }); afterEach(() => { - instances.forEach((instance) => instance.stop()); - instances.length = 0; - vi.restoreAllMocks(); vi.clearAllMocks(); }); - it('should acquire lock before onChange and release it after (during hot-reload)', async () => { + it('when terminatePod is false, should acquire lock before onChange and release it after', async () => { // Arrange - const initialConfigData = { - configName: 'name', - schemaId: commonDbPartialV1.$id, - version: 1, - config: { host: 'initial-host' }, - createdAt: 0, - }; - const newConfigData = { - configName: 'name', - schemaId: commonDbPartialV1.$id, - version: 1, - config: { host: 'updated-host' }, - createdAt: 1, - }; + const initialConfigData = createMockConfigData(); + const newConfigData = createMockConfigData({ config: { host: 'updated-host' }, createdAt: 1 }); // Cold-start requests client @@ -59,21 +46,17 @@ describe('Distributed Semaphore Locking', () => { .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); - const onChangeMock = vi.fn(); - // Act - const configInstance = await config({ + await config({ configName: 'name', version: 1, schema: commonDbPartialV1, - configServerUrl: URL, localConfigPath: './tests/config', - pollIntervalMs: DEFAULT_POLL_INTERVAL, onChange: onChangeMock, rolloutKey: 'my-lock', callerId: 'my-caller', + terminatePod: false, }); - instances.push(configInstance); // Arrange (Hot-reload triggers) client @@ -104,15 +87,64 @@ describe('Distributed Semaphore Locking', () => { expect(onChangeMock).toHaveBeenCalled(); }); - it('should bypass lock during initial cold-start', async () => { + it('when terminatePod is true, should acquire lock before onChange and not release it after', async () => { // Arrange - const initialConfigData = { + const initialConfigData = createMockConfigData(); + const newConfigData = createMockConfigData({ config: { host: 'updated-host' }, createdAt: 1 }); + + // Cold-start requests + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + const releaseSpy = vi.spyOn(httpClient, 'releaseLock'); + + // Act + await config({ configName: 'name', - schemaId: commonDbPartialV1.$id, version: 1, - config: { host: 'initial-host' }, - createdAt: 0, - }; + schema: commonDbPartialV1, + localConfigPath: './tests/config', + onChange: onChangeMock, + rolloutKey: 'my-lock', + callerId: 'my-caller', + terminatePod: true, + }); + + // Arrange (Hot-reload triggers) + client + .intercept({ + path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, + method: 'GET', + headers: { 'if-none-match': 'etag-1' }, + }) + .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); + + // Mock Lock Acquisition + client + .intercept({ + path: '/locks', + method: 'POST', + body: JSON.stringify({ key: 'my-lock', callerId: 'my-caller', limit: 1, ttl: 20 }), + }) + .reply(StatusCodes.OK); + + // Act (Wait for Poll) + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); + await vi.waitFor(() => expect(onChangeMock).toHaveBeenCalled()); + + // Assert + expect(onChangeMock).toHaveBeenCalled(); + // Verify release was only called once (during cold-start) + expect(releaseSpy).toHaveBeenCalledTimes(1); + }); + + it('should release the lock during initial cold-start', async () => { + // Arrange + const initialConfigData = createMockConfigData({ config: { host: 'initial-host' } }); client .intercept({ path: '/capabilities', method: 'GET' }) @@ -121,38 +153,28 @@ describe('Distributed Semaphore Locking', () => { .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + const releaseSpy = vi.spyOn(httpClient, 'releaseLock'); + // Act const configInstance = await config({ configName: 'name', version: 1, schema: commonDbPartialV1, - configServerUrl: URL, localConfigPath: './tests/config', - pollIntervalMs: DEFAULT_POLL_INTERVAL, - onChange: vi.fn(), + onChange: onChangeMock, + rolloutKey: 'my-lock', + callerId: 'my-caller', }); - instances.push(configInstance); // Assert expect(configInstance.get('host')).toBe('initial-host'); + expect(releaseSpy).toHaveBeenCalledWith('my-lock', 'my-caller'); }); it('should wait and retry if lock acquisition returns 423 Locked with Retry-After', async () => { // Arrange - const initialConfigData = { - configName: 'name', - schemaId: commonDbPartialV1.$id, - version: 1, - config: { host: 'initial-host' }, - createdAt: 0, - }; - const newConfigData = { - configName: 'name', - schemaId: commonDbPartialV1.$id, - version: 1, - config: { host: 'updated-host' }, - createdAt: 1, - }; + const initialConfigData = createMockConfigData({ config: { host: 'initial-host' } }); + const newConfigData = createMockConfigData({ config: { host: 'updated-host' }, createdAt: 1 }); client .intercept({ path: '/capabilities', method: 'GET' }) @@ -161,20 +183,15 @@ describe('Distributed Semaphore Locking', () => { .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); - const onChangeMock = vi.fn(); - - const configInstance = await config({ + await config({ configName: 'name', version: 1, schema: commonDbPartialV1, - configServerUrl: URL, localConfigPath: './tests/config', - pollIntervalMs: DEFAULT_POLL_INTERVAL, onChange: onChangeMock, rolloutKey: 'my-lock', callerId: 'my-caller', }); - instances.push(configInstance); // Arrange (Hot-reload triggers) client diff --git a/tests/rollout.spec.ts b/tests/rollout.spec.ts index 78456ae..61ff3f6 100644 --- a/tests/rollout.spec.ts +++ b/tests/rollout.spec.ts @@ -22,17 +22,18 @@ describe('Continuous Polling (ChangeDetector)', () => { setGlobalDispatcher(agent); client = agent.get(URL); - // Add default mock for lock release on startup client - .intercept({ path: /\/locks\/.*/, method: 'DELETE' }) + .intercept({ path: /\/locks.*/, method: 'POST' }) + .reply(StatusCodes.OK) + .persist(); + client + .intercept({ path: /\/locks.*/, method: 'DELETE' }) .reply(StatusCodes.NO_CONTENT) .persist(); }); afterEach(() => { - vi.restoreAllMocks(); vi.clearAllMocks(); - onChangeMock.mockReset(); }); it('should trigger onChange when polling returns a new config (200 OK)', async () => { @@ -146,14 +147,38 @@ describe('Continuous Polling (ChangeDetector)', () => { onChange: onChangeMock, }); - // Act (Wait for Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); + // Assert (Initial State) + expect(onChangeMock).not.toHaveBeenCalled(); + + // Arrange (First config change) + client + .intercept({ + path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, + method: 'GET', + headers: { 'if-none-match': 'etag-1' }, + }) + .reply(StatusCodes.OK, newConfigData1, { headers: { etag: 'etag-2' } }); - // Use waitFor to allow async promises to resolve - await vi.waitFor(() => expect(onChangeMock).toHaveBeenCalled()); + // Act (Wait for First Poll) + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); - // Assert (Updated State & Hard Termination) - expect(onChangeMock).toHaveBeenCalled(); + // Assert (First change triggered) + expect(onChangeMock).toHaveBeenCalledTimes(1); + + // Arrange (Second config change to verify polling resumed) + client + .intercept({ + path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, + method: 'GET', + headers: { 'if-none-match': 'etag-2' }, + }) + .reply(StatusCodes.OK, newConfigData2, { headers: { etag: 'etag-3' } }); + + // Act (Wait for Second Poll) + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); + + // Assert (Second change triggered) + expect(onChangeMock).toHaveBeenCalledTimes(2); }); it('should not trigger onChange when polling returns 304 Not Modified', async () => { @@ -300,83 +325,4 @@ describe('Continuous Polling (ChangeDetector)', () => { // Assert expect(onChangeMock).not.toHaveBeenCalled(); }); - - it('should trigger onChange and resume polling when polling returns a new config and terminatePod is false', async () => { - // Arrange - const initialConfigData = { - configName: 'name', - schemaId: commonDbPartialV1.$id, - version: 1, - config: { host: 'initial-host' }, - createdAt: 0, - }; - const newConfigData1 = { - configName: 'name', - schemaId: commonDbPartialV1.$id, - version: 1, - config: { host: 'updated-host' }, - createdAt: 1, - }; - const newConfigData2 = { - configName: 'name', - schemaId: commonDbPartialV1.$id, - version: 1, - config: { host: 'updated-host-again' }, - createdAt: 2, - }; - - client - .intercept({ path: '/capabilities', method: 'GET' }) - .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); - client - .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) - .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); - - const onChangeMock = vi.fn(); - - // Act - await config({ - configName: 'name', - version: 1, - schema: commonDbPartialV1, - configServerUrl: URL, - localConfigPath: './tests/config', - pollIntervalMs: DEFAULT_POLL_INTERVAL, - terminatePod: false, - onChange: onChangeMock, - }); - - // Assert (Initial State) - expect(onChangeMock).not.toHaveBeenCalled(); - - // Arrange (First config change) - client - .intercept({ - path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, - method: 'GET', - headers: { 'if-none-match': 'etag-1' }, - }) - .reply(StatusCodes.OK, newConfigData1, { headers: { etag: 'etag-2' } }); - - // Act (Wait for First Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); - - // Assert (First change triggered) - expect(onChangeMock).toHaveBeenCalledTimes(1); - - // Arrange (Second config change to verify polling resumed) - client - .intercept({ - path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, - method: 'GET', - headers: { 'if-none-match': 'etag-2' }, - }) - .reply(StatusCodes.OK, newConfigData2, { headers: { etag: 'etag-3' } }); - - // Act (Wait for Second Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); - - // Assert (Second change triggered) - expect(onChangeMock).toHaveBeenCalledTimes(2); - }); }); From 8338de83bd661f620c40a2ce610d5dba6d3043e9 Mon Sep 17 00:00:00 2001 From: netanelC Date: Mon, 22 Jun 2026 10:14:08 +0300 Subject: [PATCH 11/14] deps: update schema package --- package-lock.json | 8 ++++---- package.json | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/package-lock.json b/package-lock.json index 643a156..eb5b61f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -51,7 +51,7 @@ "node": ">=24.0.0" }, "peerDependencies": { - "@map-colonies/schemas": "^1.9.0", + "@map-colonies/schemas": "^1.22.0", "prom-client": "^15.0.0" }, "peerDependenciesMeta": { @@ -1059,9 +1059,9 @@ } }, "node_modules/@map-colonies/schemas": { - "version": "1.9.0", - "resolved": "https://registry.npmjs.org/@map-colonies/schemas/-/schemas-1.9.0.tgz", - "integrity": "sha512-Xr7bWXfUAQNUAap8U4BnbGgOjW2LwMLd5JTCxIgZ9S4Vussw0ZMvT6rh5r2jIg6dcpu3w2JKZf32vlS7laK4Ug==", + "version": "1.22.0", + "resolved": "https://registry.npmjs.org/@map-colonies/schemas/-/schemas-1.22.0.tgz", + "integrity": "sha512-yieLkagKcCSHiAy3/V6OC+Zo8ougtAVuyP2poFGFJFAUXH17yu8CgxHXTARVuQCKCXxcH/Rem8QWfW3EdTggPA==", "license": "MIT", "peer": true }, diff --git a/package.json b/package.json index ce25e98..395e9c9 100644 --- a/package.json +++ b/package.json @@ -65,7 +65,7 @@ "undici": "^7.3.0" }, "peerDependencies": { - "@map-colonies/schemas": "^1.9.0", + "@map-colonies/schemas": "^1.22.0", "prom-client": "^15.0.0" }, "peerDependenciesMeta": { From 006a1c767cd0e39fdc80725c65e91fa186743012 Mon Sep 17 00:00:00 2001 From: netanelC Date: Mon, 22 Jun 2026 12:31:36 +0300 Subject: [PATCH 12/14] chore: remove unnecessary log --- src/rollout/ChangeDetector.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rollout/ChangeDetector.ts b/src/rollout/ChangeDetector.ts index 3e87fe9..fe48e40 100644 --- a/src/rollout/ChangeDetector.ts +++ b/src/rollout/ChangeDetector.ts @@ -83,7 +83,6 @@ export class ChangeDetector { await this.onConfigUpdate(); } } catch (err) { - debug('Error during lock acquisition: %s', (err as Error).message); if (isConfigError(err, 'httpResponseError') || isConfigError(err, 'httpGeneralError')) { debug('Error during onChange callback: %s', err.message); } else { From 93209f0a5ba14b02c094cebc6c579ab87d0cbd75 Mon Sep 17 00:00:00 2001 From: netanelC Date: Thu, 25 Jun 2026 12:56:17 +0300 Subject: [PATCH 13/14] refactor: change lock to 120 seconds --- src/options.ts | 2 +- tests/config.spec.ts | 2 +- tests/lock.spec.ts | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/options.ts b/src/options.ts index fc2e302..1cc1de2 100644 --- a/src/options.ts +++ b/src/options.ts @@ -19,7 +19,7 @@ const defaultOptions: BaseOptions = { disableHotReload: false, rolloutKey: PACKAGE_NAME, rolloutLimit: 1, - lockTtlSeconds: 20, + lockTtlSeconds: 120, callerId: hostname(), terminatePod: true, }; diff --git a/tests/config.spec.ts b/tests/config.spec.ts index f7bd397..7d10fab 100644 --- a/tests/config.spec.ts +++ b/tests/config.spec.ts @@ -219,7 +219,7 @@ describe('config', () => { pollIntervalMs: 3000, disableHotReload: false, ignoreServerIsOlderVersionError: false, - lockTtlSeconds: 20, + lockTtlSeconds: 120, rolloutKey: PACKAGE_NAME, callerId: hostname(), rolloutLimit: 1, diff --git a/tests/lock.spec.ts b/tests/lock.spec.ts index 3512bae..4893a65 100644 --- a/tests/lock.spec.ts +++ b/tests/lock.spec.ts @@ -72,7 +72,7 @@ describe('Distributed Semaphore Locking', () => { .intercept({ path: '/locks', method: 'POST', - body: JSON.stringify({ key: 'my-lock', callerId: 'my-caller', limit: 1, ttl: 20 }), + body: JSON.stringify({ key: 'my-lock', callerId: 'my-caller', limit: 1, ttl: 120 }), }) .reply(StatusCodes.OK); @@ -128,7 +128,7 @@ describe('Distributed Semaphore Locking', () => { .intercept({ path: '/locks', method: 'POST', - body: JSON.stringify({ key: 'my-lock', callerId: 'my-caller', limit: 1, ttl: 20 }), + body: JSON.stringify({ key: 'my-lock', callerId: 'my-caller', limit: 1, ttl: 120 }), }) .reply(StatusCodes.OK); From 2377193c69fdbefb309ef6cf94e6ed0d8c2b6a6d Mon Sep 17 00:00:00 2001 From: netanelC Date: Mon, 29 Jun 2026 10:32:33 +0300 Subject: [PATCH 14/14] refactor: better code --- src/rollout/LockCoordinator.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rollout/LockCoordinator.ts b/src/rollout/LockCoordinator.ts index 019ba24..92f9e51 100644 --- a/src/rollout/LockCoordinator.ts +++ b/src/rollout/LockCoordinator.ts @@ -27,9 +27,9 @@ export class LockCoordinator { return; } + const MS_IN_SECOND = 1000; // If not acquired, wait for retryAfter (in seconds) or a default value of 1 second - // eslint-disable-next-line @typescript-eslint/no-magic-numbers - const waitTime = retryAfter! * 1000; + const waitTime = MS_IN_SECOND * (retryAfter ?? 1); debug('Lock not acquired, waiting for %d ms', waitTime); await new Promise((resolve) => setTimeout(resolve, waitTime)); }