diff --git a/flagsmith-core.ts b/flagsmith-core.ts index d81d05e..a386703 100644 --- a/flagsmith-core.ts +++ b/flagsmith-core.ts @@ -9,6 +9,8 @@ import { IFlagsmithResponse, IFlagsmithTrait, IInitConfig, + IPipelineEvent, + IPipelineEventBatch, ISentryClient, IState, ITraits, @@ -64,6 +66,7 @@ type Config = { const FLAGSMITH_CONFIG_ANALYTICS_KEY = "flagsmith_value_"; const FLAGSMITH_FLAG_ANALYTICS_KEY = "flagsmith_enabled_"; const FLAGSMITH_TRAIT_ANALYTICS_KEY = "flagsmith_trait_"; +const DEFAULT_PIPELINE_FLUSH_INTERVAL = 10000; const Flagsmith = class { _trigger?:(()=>void)|null= null @@ -265,6 +268,47 @@ const Flagsmith = class { } }; + flushPipelineAnalytics = async () => { + const isEvaluationEnabled = this.evaluationAnalyticsUrl && this.evaluationContext.environment; + const isReadyToFlush = this.pipelineEvents.length > 0 && (!this.isPipelineFlushing || this.pipelineFlushInterval === 0); + if (!isEvaluationEnabled || !isReadyToFlush) { + return; + } + + const environmentKey = this.evaluationContext.environment!.apiKey; + this.isPipelineFlushing = true; + const eventsToSend = this.pipelineEvents; + this.pipelineEvents = []; + this.pipelineRecordedKeys.clear(); + + const batch: IPipelineEventBatch = { + events: eventsToSend, + environment_key: environmentKey, + }; + + try { + const res = await _fetch(this.evaluationAnalyticsUrl + 'v1/analytics/batch', { + method: 'POST', + body: JSON.stringify(batch), + headers: { + 'Content-Type': 'application/json; charset=utf-8', + 'X-Environment-Key': environmentKey, + ...(SDK_VERSION ? { 'Flagsmith-SDK-User-Agent': `flagsmith-js-sdk/${SDK_VERSION}` } : {}), + }, + }); + if (!res.status || res.status < 200 || res.status >= 300) { + throw new Error(`Pipeline analytics: unexpected status ${res.status}`); + } + this.log('Pipeline analytics: flush successful'); + } catch (err) { + this.pipelineEvents = eventsToSend.concat(this.pipelineEvents); + this.trimPipelineBuffer(); + this.log('Pipeline analytics: flush failed, events re-queued', err); + } finally { + this.isPipelineFlushing = false; + } + }; + datadogRum: IDatadogRum | null = null; loadingState: LoadingState = {isLoading: true, isFetching: true, error: null, source: FlagSource.NONE} canUseStorage = false @@ -290,6 +334,12 @@ const Flagsmith = class { sentryClient: ISentryClient | null = null withTraits?: ITraits|null= null cacheOptions = {ttl:0, skipAPI: false, loadStale: false, storageKey: undefined as string|undefined} + evaluationAnalyticsUrl: string | null = null + evaluationAnalyticsMaxBuffer: number = 1000 + pipelineEvents: IPipelineEvent[] = [] + pipelineAnalyticsInterval: ReturnType | null = null + isPipelineFlushing = false + pipelineRecordedKeys: Map = new Map() async init(config: IInitConfig) { const evaluationContext = toEvaluationContext(config.evaluationContext || this.evaluationContext); try { @@ -308,6 +358,7 @@ const Flagsmith = class { enableDynatrace, enableLogs, environmentID, + evaluationAnalyticsConfig, eventSourceUrl= "https://realtime.flagsmith.com/", fetch: fetchImplementation, headers, @@ -441,6 +492,12 @@ const Flagsmith = class { } } + if (evaluationAnalyticsConfig) { + this.initPipelineAnalytics(evaluationAnalyticsConfig); + } else { + this.stopPipelineAnalytics(); + } + //If the user specified default flags emit a changed event immediately if (cacheFlags) { if (AsyncStorage && this.canUseStorage) { @@ -916,9 +973,82 @@ const Flagsmith = class { } this.evaluationEvent[this.evaluationContext.environment.apiKey][key] += 1; } + + if (this.evaluationAnalyticsUrl) { + this.recordPipelineEvent(key); + } + this.updateEventStorage(); }; + private pipelineFlushInterval: number = DEFAULT_PIPELINE_FLUSH_INTERVAL; + + private initPipelineAnalytics(config: NonNullable) { + this.stopPipelineAnalytics(); + this.evaluationAnalyticsUrl = ensureTrailingSlash(config.analyticsServerUrl); + this.evaluationAnalyticsMaxBuffer = config.maxBuffer ?? 1000; + this.pipelineFlushInterval = config.flushInterval ?? DEFAULT_PIPELINE_FLUSH_INTERVAL; + this.pipelineEvents = []; + if (this.pipelineFlushInterval > 0) { + this.pipelineAnalyticsInterval = setInterval( + this.flushPipelineAnalytics, + this.pipelineFlushInterval, + ); + this.pipelineAnalyticsInterval?.unref?.(); + } + } + + private stopPipelineAnalytics() { + if (this.pipelineAnalyticsInterval) { + clearInterval(this.pipelineAnalyticsInterval); + this.pipelineAnalyticsInterval = null; + } + this.evaluationAnalyticsUrl = null; + this.pipelineEvents = []; + this.pipelineRecordedKeys.clear(); + } + + private trimPipelineBuffer() { + if (this.pipelineEvents.length > this.evaluationAnalyticsMaxBuffer) { + const excess = this.pipelineEvents.length - this.evaluationAnalyticsMaxBuffer; + this.pipelineEvents = this.pipelineEvents.slice(excess); + } + } + + // Pipeline event schema — must match the pipeline server's Event struct. + // To update: 1) IPipelineEvent in types.d.ts 2) event object below 3) tests in test/analytics-pipeline.test.ts + private recordPipelineEvent(key: string) { + const flagKey = key.toLowerCase().replace(/ /g, '_'); + const flag = this.flags && this.flags[flagKey]; + const fingerprint = `${this.evaluationContext.identity?.identifier ?? 'none'}|${flag?.enabled ?? false}|${flag?.value ?? 'null'}`; + if (this.pipelineRecordedKeys.get(flagKey) === fingerprint) { + return; + } + this.pipelineRecordedKeys.set(flagKey, fingerprint); + const event: IPipelineEvent = { + event_id: flagKey, + event_type: 'flag_evaluation', + evaluated_at: Date.now(), + identity_identifier: this.evaluationContext.identity?.identifier ?? null, + enabled: flag ? flag.enabled : null, + value: flag ? flag.value : null, + traits: this.evaluationContext.identity?.traits + ? { ...this.evaluationContext.identity.traits } + : null, + metadata: { + ...(flag ? { id: flag.id } : {}), + ...(typeof window !== 'undefined' && window.location ? { page_url: window.location.href } : {}), + ...(SDK_VERSION ? { sdk_version: SDK_VERSION } : {}), + }, + }; + this.pipelineEvents.push(event); + this.trimPipelineBuffer(); + + if (this.pipelineFlushInterval === 0) { + this.flushPipelineAnalytics(); + } + } + private setLoadingState(loadingState: LoadingState) { if (!deepEqual(loadingState, this.loadingState)) { this.loadingState = { ...loadingState }; diff --git a/test/analytics-pipeline.test.ts b/test/analytics-pipeline.test.ts new file mode 100644 index 0000000..6158bfd --- /dev/null +++ b/test/analytics-pipeline.test.ts @@ -0,0 +1,215 @@ +import { getFlagsmith, environmentID, testIdentity } from './test-constants'; + +const pipelineUrl = 'https://analytics.flagsmith.com/'; + +function getPipelineCalls(mockFetch: jest.Mock) { + return mockFetch.mock.calls.filter( + ([url]: [string]) => url.includes('v1/analytics/batch') + ); +} + +describe('Pipeline Analytics', () => { + test('should not send pipeline events when evaluationAnalyticsConfig is not set', async () => { + const { flagsmith, initConfig, mockFetch } = getFlagsmith(); + await flagsmith.init(initConfig); + + flagsmith.getValue('hero'); + flagsmith.hasFeature('font_size'); + + expect(getPipelineCalls(mockFetch)).toHaveLength(0); + // @ts-ignore + expect(flagsmith.pipelineEvents).toHaveLength(0); + }); + + test('should buffer events and flush with correct shape and headers', async () => { + const { flagsmith, initConfig, mockFetch } = getFlagsmith({ + evaluationAnalyticsConfig: { + analyticsServerUrl: pipelineUrl, + flushInterval: 60000, + }, + }); + await flagsmith.init(initConfig); + + flagsmith.getValue('font_size'); + flagsmith.hasFeature('hero'); + + // @ts-ignore + await flagsmith.flushPipelineAnalytics(); + + const calls = getPipelineCalls(mockFetch); + expect(calls).toHaveLength(1); + + const body = JSON.parse(calls[0][1].body); + expect(body.environment_key).toBe(environmentID); + expect(body.events).toHaveLength(2); + + const valueEvent = body.events[0]; + expect(valueEvent.event_id).toBe('font_size'); + expect(valueEvent.event_type).toBe('flag_evaluation'); + expect(valueEvent.value).toBe(16); + expect(valueEvent.enabled).toBe(true); + expect(valueEvent.identity_identifier).toBeNull(); + expect(valueEvent.evaluated_at).toBeDefined(); + expect(valueEvent.metadata).toEqual(expect.objectContaining({ id: 6149 })); + + const enabledEvent = body.events[1]; + expect(enabledEvent.event_id).toBe('hero'); + expect(enabledEvent.event_type).toBe('flag_evaluation'); + expect(enabledEvent.enabled).toBe(true); + expect(enabledEvent.value).toBe(flagsmith.getValue('hero')); + + const headers = calls[0][1].headers; + expect(headers['X-Environment-Key']).toBe(environmentID); + expect(headers['Content-Type']).toBe('application/json; charset=utf-8'); + expect(headers['Flagsmith-SDK-User-Agent']).toMatch(/^flagsmith-js-sdk\//); + }); + + test('should include identity and full traits when identified', async () => { + const { flagsmith, initConfig, mockFetch } = getFlagsmith({ + evaluationAnalyticsConfig: { + analyticsServerUrl: pipelineUrl, + flushInterval: 60000, + }, + identity: testIdentity, + }); + await flagsmith.init(initConfig); + + flagsmith.getValue('hero'); + + // @ts-ignore + await flagsmith.flushPipelineAnalytics(); + + const calls = getPipelineCalls(mockFetch); + const event = JSON.parse(calls[0][1].body).events[0]; + + expect(event.identity_identifier).toBe(testIdentity); + expect(event.traits).toEqual({ + number_trait: { value: 1 }, + string_trait: { value: 'Example' }, + }); + }); + + test('should cap buffer at maxBuffer and skip events when skipAnalytics is used', async () => { + const { flagsmith, initConfig } = getFlagsmith({ + evaluationAnalyticsConfig: { + analyticsServerUrl: pipelineUrl, + maxBuffer: 3, + flushInterval: 60000, + }, + }); + await flagsmith.init(initConfig); + + flagsmith.getValue('hero', { skipAnalytics: true }); + flagsmith.hasFeature('font_size', { skipAnalytics: true }); + // @ts-ignore + expect(flagsmith.pipelineEvents).toHaveLength(0); + + flagsmith.getValue('hero'); + flagsmith.getValue('font_size'); + flagsmith.getValue('json_value'); + flagsmith.getValue('number_value'); + flagsmith.getValue('off_value'); + + // @ts-ignore + expect(flagsmith.pipelineEvents).toHaveLength(3); + // @ts-ignore + expect(flagsmith.pipelineEvents[0].event_id).toBe('json_value'); + // @ts-ignore + expect(flagsmith.pipelineEvents[2].event_id).toBe('off_value'); + }); + + test('should deduplicate repeated evaluations with same result per flush window', async () => { + const { flagsmith, initConfig, mockFetch } = getFlagsmith({ + evaluationAnalyticsConfig: { + analyticsServerUrl: pipelineUrl, + flushInterval: 60000, + }, + identity: testIdentity, + }); + await flagsmith.init(initConfig); + + flagsmith.getValue('font_size'); + flagsmith.getValue('font_size'); + flagsmith.getValue('font_size'); + flagsmith.hasFeature('font_size'); + flagsmith.hasFeature('font_size'); + + // @ts-ignore + expect(flagsmith.pipelineEvents).toHaveLength(1); + // @ts-ignore + expect(flagsmith.pipelineEvents[0].event_id).toBe('font_size'); + + // @ts-ignore + await flagsmith.flushPipelineAnalytics(); + flagsmith.getValue('font_size'); + // @ts-ignore + expect(flagsmith.pipelineEvents).toHaveLength(1); + + flagsmith.getValue('hero'); + // @ts-ignore + expect(flagsmith.pipelineEvents).toHaveLength(2); + }); + + test('should record new event when evaluation result changes for same key', async () => { + const { flagsmith, initConfig } = getFlagsmith({ + evaluationAnalyticsConfig: { + analyticsServerUrl: pipelineUrl, + flushInterval: 60000, + }, + }); + await flagsmith.init(initConfig); + + flagsmith.getValue('font_size'); + // @ts-ignore + expect(flagsmith.pipelineEvents).toHaveLength(1); + + flagsmith.getValue('font_size'); + // @ts-ignore + expect(flagsmith.pipelineEvents).toHaveLength(1); + + await flagsmith.identify(testIdentity); + flagsmith.getValue('font_size'); + // @ts-ignore + expect(flagsmith.pipelineEvents).toHaveLength(2); + // @ts-ignore + expect(flagsmith.pipelineEvents[1].identity_identifier).toBe(testIdentity); + }); + + test('should re-queue on failure and coexist with standard analytics', async () => { + const { flagsmith, initConfig, mockFetch } = getFlagsmith({ + enableAnalytics: true, + evaluationAnalyticsConfig: { + analyticsServerUrl: pipelineUrl, + flushInterval: 60000, + }, + }); + + const original = mockFetch.getMockImplementation() as jest.Mock; + mockFetch.mockImplementation(async (url: string, options: any) => { + if (url.includes('v1/analytics/batch')) { + return { status: 500, text: () => Promise.resolve('Server Error') }; + } + return original(url, options); + }); + + await flagsmith.init(initConfig); + + flagsmith.getValue('hero'); + flagsmith.getValue('font_size'); + + // @ts-ignore + expect(flagsmith.evaluationEvent[environmentID]['hero']).toBe(1); + // @ts-ignore + expect(flagsmith.evaluationEvent[environmentID]['font_size']).toBe(1); + + // @ts-ignore + expect(flagsmith.pipelineEvents).toHaveLength(2); + + // @ts-ignore + await flagsmith.flushPipelineAnalytics(); + // @ts-ignore + expect(flagsmith.pipelineEvents).toHaveLength(2); + // @ts-ignore + expect(flagsmith.pipelineEvents[0].event_id).toBe('hero'); + }); +}); diff --git a/test/functions.test.ts b/test/functions.test.ts index e84e2ac..36dc5a3 100644 --- a/test/functions.test.ts +++ b/test/functions.test.ts @@ -7,7 +7,7 @@ describe('Flagsmith.functions', () => { }); test('should use a fallback when the feature is undefined', async () => { const onChange = jest.fn() - const {flagsmith,initConfig, AsyncStorage,mockFetch} = getFlagsmith({onChange}) + const { flagsmith,initConfig } = getFlagsmith({onChange}) await flagsmith.init(initConfig); expect(flagsmith.getValue("deleted_feature",{fallback:"foo"})).toBe("foo"); diff --git a/test/test-constants.ts b/test/test-constants.ts index 9cbf08d..c9bde1c 100644 --- a/test/test-constants.ts +++ b/test/test-constants.ts @@ -76,6 +76,12 @@ export function getFlagsmith(config: Partial = {}) { const flagsmith = createFlagsmithInstance(); const AsyncStorage = new MockAsyncStorage(); const mockFetch = jest.fn(async (url, options) => { + if (url.includes('v1/analytics/batch')) { + return {status: 202, text: () => Promise.resolve('')} + } + if (url.includes('analytics/flags')) { + return {status: 200, text: () => Promise.resolve('{}')} + } switch (url) { case 'https://edge.api.flagsmith.com/api/v1/flags/': return {status: 200, text: () => fs.readFile('./test/data/flags.json', 'utf8')} diff --git a/types.d.ts b/types.d.ts index f6c8f8d..de3c682 100644 --- a/types.d.ts +++ b/types.d.ts @@ -131,6 +131,34 @@ export interface IInitConfig = string, T * Customer application metadata */ applicationMetadata?: ApplicationMetadata; + /** + * @experimental Internal use only — API may change without notice. + * Configuration for the evaluation analytics pipeline. When provided, + * individual flag evaluation events are buffered and sent to the pipeline endpoint. + * @hidden + */ + /** @internal */ + evaluationAnalyticsConfig?: { + analyticsServerUrl: string; + maxBuffer?: number; + flushInterval?: number; + }; +} + +export interface IPipelineEvent { + event_id: string; // flag_name or event_name + event_type: 'flag_evaluation' | 'custom_event'; + evaluated_at: number; + identity_identifier: string | null; + enabled?: boolean | null; + value: IFlagsmithValue; + traits?: { [key: string]: null | TraitEvaluationContext } | null; + metadata?: Record | null; +} + +export interface IPipelineEventBatch { + events: IPipelineEvent[]; + environment_key: string; } export interface IFlagsmithResponse {