Skip to content

Commit e5ffd81

Browse files
committed
fix(node-sdk): include userId when syncing company context
1 parent 6e9b203 commit e5ffd81

2 files changed

Lines changed: 211 additions & 14 deletions

File tree

packages/node-sdk/src/client.ts

Lines changed: 111 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,17 @@ import {
1212
API_TIMEOUT_MS,
1313
FLAG_EVENT_RATE_LIMITER_WINDOW_SIZE_MS,
1414
FLAGS_REFETCH_MS,
15+
PUBSUB_SSE_URL,
1516
loadConfig,
1617
REFLAG_LOG_PREFIX,
1718
SDK_VERSION,
1819
SDK_VERSION_HEADER_NAME,
1920
} from "./config";
2021
import fetchClient, { withRetry } from "./fetch-http-client";
22+
import {
23+
FlagUpdatesSSESubscription,
24+
openFlagUpdatesSSE,
25+
} from "./flag-updates-sse";
2126
import { isFlagsFallbackSnapshot } from "./flagsFallbackProvider";
2227
import { subscribe as triggerOnExit } from "./flusher";
2328
import inRequestCache from "./inRequestCache";
@@ -26,7 +31,7 @@ import { newRateLimiter } from "./rate-limiter";
2631
import type {
2732
BootstrappedFlags,
2833
CachedFlagDefinition,
29-
CacheStrategy,
34+
FlagsSyncMode,
3035
EvaluatedFlagsAPIResponse,
3136
FlagAPIResponse,
3237
FlagDefinition,
@@ -239,7 +244,8 @@ export class ReflagClient {
239244
configFile?: string;
240245
flagsFetchRetries: number;
241246
fetchTimeoutMs: number;
242-
cacheStrategy: CacheStrategy;
247+
flagsSyncMode: FlagsSyncMode;
248+
flagsPushUrl: string;
243249
};
244250
httpClient: HttpClient;
245251

@@ -257,10 +263,14 @@ export class ReflagClient {
257263

258264
private initializationFinished = false;
259265
private canLoadFlagsFallbackProvider = true;
266+
private pendingWaitForVersion: number | undefined;
267+
private realtimeRefreshPromise: Promise<void> | undefined;
268+
private flagsUpdatesSSESubscription: FlagUpdatesSSESubscription | undefined;
260269
private _initialize = once(async () => {
261270
const start = Date.now();
262271
if (!this._config.offline) {
263272
await this.flagsCache.refresh();
273+
this.startFlagsUpdatesSSE();
264274
}
265275
this.logger.info(
266276
"Reflag initialized in " +
@@ -288,7 +298,7 @@ export class ReflagClient {
288298
* @param options.configFile - The path to the config file (optional).
289299
* @param options.flagsFetchRetries - Number of retries for fetching flags (optional, defaults to 3).
290300
* @param options.fetchTimeoutMs - Timeout for fetching flags (optional, defaults to 10000ms).
291-
* @param options.cacheStrategy - The cache strategy to use for the client (optional, defaults to "periodically-update").
301+
* @param options.flagsSyncMode - How flag definitions are synchronized (optional, defaults to "polling").
292302
*
293303
* @throws An error if the options are invalid.
294304
**/
@@ -351,6 +361,21 @@ export class ReflagClient {
351361
"fetchTimeoutMs must be a non-negative integer",
352362
);
353363

364+
ok(
365+
options.flagsSyncMode === undefined ||
366+
options.flagsSyncMode === "polling" ||
367+
options.flagsSyncMode === "in-request" ||
368+
options.flagsSyncMode === "push",
369+
'flagsSyncMode must be one of "polling", "in-request", or "push"',
370+
);
371+
372+
ok(
373+
options.flagsPushUrl === undefined ||
374+
(typeof options.flagsPushUrl === "string" &&
375+
options.flagsPushUrl.length > 0),
376+
"flagsPushUrl must be a non-empty string",
377+
);
378+
354379
if (!options.configFile) {
355380
options.configFile =
356381
(process.env.REFLAG_CONFIG_FILE ??
@@ -424,6 +449,10 @@ export class ReflagClient {
424449
logger: this.logger,
425450
});
426451

452+
const flagsSyncMode: FlagsSyncMode =
453+
options.flagsSyncMode ??
454+
(options.cacheStrategy === "in-request" ? "in-request" : "polling");
455+
427456
this._config = {
428457
offline,
429458
apiBaseUrl: (config.apiBaseUrl ?? config.host) || API_BASE_URL,
@@ -441,7 +470,8 @@ export class ReflagClient {
441470
flagOverrides: baseFlagOverrides,
442471
flagsFetchRetries: options.flagsFetchRetries ?? 3,
443472
fetchTimeoutMs: options.fetchTimeoutMs ?? API_TIMEOUT_MS,
444-
cacheStrategy: options.cacheStrategy ?? "periodically-update",
473+
flagsSyncMode,
474+
flagsPushUrl: options.flagsPushUrl ?? PUBSUB_SSE_URL,
445475
};
446476
this.baseFlagOverrides = baseFlagOverrides;
447477

@@ -454,8 +484,16 @@ export class ReflagClient {
454484
}
455485

456486
const fetchFlags = async () => {
487+
const waitForVersion = this.pendingWaitForVersion;
488+
this.pendingWaitForVersion = undefined;
489+
490+
const path =
491+
waitForVersion === undefined
492+
? "features"
493+
: `features?waitForVersion=${encodeURIComponent(String(waitForVersion))}`;
494+
457495
const res = await this.get<FlagsAPIResponse>(
458-
"features",
496+
path,
459497
this._config.flagsFetchRetries,
460498
);
461499
if (!isObject(res) || !Array.isArray(res?.features)) {
@@ -467,21 +505,81 @@ export class ReflagClient {
467505
return compileFlagDefinitions(res.features);
468506
};
469507

470-
if (this._config.cacheStrategy === "periodically-update") {
471-
this.flagsCache = periodicallyUpdatingCache<CachedFlagDefinition[]>(
508+
if (this._config.flagsSyncMode === "push") {
509+
this.flagsCache = inRequestCache<CachedFlagDefinition[]>(
510+
Number.MAX_SAFE_INTEGER,
511+
this.logger,
512+
fetchFlags,
513+
);
514+
} else if (this._config.flagsSyncMode === "in-request") {
515+
this.flagsCache = inRequestCache<CachedFlagDefinition[]>(
472516
this._config.refetchInterval,
473517
this.logger,
474518
fetchFlags,
475519
);
476520
} else {
477-
this.flagsCache = inRequestCache<CachedFlagDefinition[]>(
521+
this.flagsCache = periodicallyUpdatingCache<CachedFlagDefinition[]>(
478522
this._config.refetchInterval,
479523
this.logger,
480524
fetchFlags,
481525
);
482526
}
483527
}
484528

529+
private startFlagsUpdatesSSE() {
530+
if (
531+
this._config.offline ||
532+
this._config.flagsSyncMode !== "push" ||
533+
this.flagsUpdatesSSESubscription
534+
) {
535+
return;
536+
}
537+
538+
this.flagsUpdatesSSESubscription = openFlagUpdatesSSE({
539+
url: this._config.flagsPushUrl,
540+
headers: this._config.headers,
541+
logger: this.logger,
542+
onFlagStateVersion: (version) => {
543+
this.pendingWaitForVersion =
544+
this.pendingWaitForVersion === undefined
545+
? version
546+
: Math.max(this.pendingWaitForVersion, version);
547+
this.refreshFlagsToPendingVersion();
548+
},
549+
});
550+
}
551+
552+
private refreshFlagsToPendingVersion() {
553+
if (this.realtimeRefreshPromise) {
554+
return;
555+
}
556+
557+
this.realtimeRefreshPromise = (async () => {
558+
while (true) {
559+
const pendingAtStart = this.pendingWaitForVersion;
560+
if (pendingAtStart === undefined) {
561+
break;
562+
}
563+
564+
await this.flagsCache.refresh();
565+
566+
const pendingNow = this.pendingWaitForVersion;
567+
if (pendingNow === undefined || pendingNow <= pendingAtStart) {
568+
break;
569+
}
570+
}
571+
})()
572+
.catch((error) => {
573+
this.logger.warn("failed to refresh flags from SSE update", error);
574+
})
575+
.finally(() => {
576+
this.realtimeRefreshPromise = undefined;
577+
if (this.pendingWaitForVersion !== undefined) {
578+
this.refreshFlagsToPendingVersion();
579+
}
580+
});
581+
}
582+
485583
private async loadFlagsFallbackDefinitions() {
486584
if (!this.canLoadFlagsFallbackProvider) {
487585
return undefined;
@@ -831,6 +929,7 @@ export class ReflagClient {
831929

832930
await this.batchBuffer.flush();
833931
await this.flagsCache.waitRefresh();
932+
await this.realtimeRefreshPromise;
834933
}
835934

836935
/**
@@ -857,6 +956,9 @@ export class ReflagClient {
857956
* multiple background processes from running simultaneously.
858957
*/
859958
public destroy() {
959+
this.flagsUpdatesSSESubscription?.close();
960+
this.flagsUpdatesSSESubscription = undefined;
961+
860962
this.flagsCache.destroy();
861963
this.batchBuffer.destroy();
862964
}
@@ -1228,6 +1330,7 @@ export class ReflagClient {
12281330
const { id: _, ...attributes } = options.company;
12291331
promises.push(
12301332
this.updateCompany(options.company.id, {
1333+
userId: options.user?.id,
12311334
attributes,
12321335
meta: options.meta,
12331336
}),

packages/node-sdk/test/client.test.ts

Lines changed: 100 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,17 @@ const expectedHeaders = {
100100
Authorization: `Bearer ${validOptions.secretKey}`,
101101
};
102102

103+
function createSSEMessageStream(data: object) {
104+
const encoder = new TextEncoder();
105+
return new ReadableStream<Uint8Array>({
106+
start(controller) {
107+
controller.enqueue(
108+
encoder.encode(`event: message\ndata: ${JSON.stringify(data)}\n\n`),
109+
);
110+
},
111+
});
112+
}
113+
103114
const flagDefinitions: FlagsAPIResponse = {
104115
features: [
105116
{
@@ -989,6 +1000,75 @@ describe("ReflagClient", () => {
9891000
);
9901001
});
9911002

1003+
it("should refresh features with waitForVersion when receiving SSE updates", async () => {
1004+
const fetchMock = vi.spyOn(global, "fetch").mockResolvedValue(
1005+
new Response(
1006+
createSSEMessageStream({
1007+
id: "flag-state:test:0",
1008+
timestamp: 1774605839478,
1009+
name: "flag_state_updated",
1010+
clientId: "reflag-backend",
1011+
channel: "flag-state:test",
1012+
action: 0,
1013+
encoding: "json",
1014+
data: JSON.stringify({
1015+
envId: "endbMeqfmcgLyZ",
1016+
flagStateVersion: 22,
1017+
}),
1018+
}),
1019+
{
1020+
status: 200,
1021+
headers: {
1022+
"Content-Type": "text/event-stream",
1023+
},
1024+
},
1025+
),
1026+
);
1027+
1028+
try {
1029+
httpClient.get.mockResolvedValue({
1030+
ok: true,
1031+
status: 200,
1032+
body: {
1033+
success: true,
1034+
...flagDefinitions,
1035+
},
1036+
});
1037+
1038+
const client = new ReflagClient({
1039+
...validOptions,
1040+
flagsSyncMode: "push",
1041+
});
1042+
1043+
await client.initialize();
1044+
1045+
await vi.waitFor(() => {
1046+
expect(httpClient.get).toHaveBeenCalledWith(
1047+
"https://api.example.com/features?waitForVersion=22",
1048+
expectedHeaders,
1049+
API_TIMEOUT_MS,
1050+
);
1051+
});
1052+
1053+
expect(fetchMock).toHaveBeenCalledWith(
1054+
"https://pubsub.reflag.com/sse",
1055+
{
1056+
method: "GET",
1057+
headers: {
1058+
...expectedHeaders,
1059+
Accept: "text/event-stream",
1060+
"Cache-Control": "no-cache",
1061+
},
1062+
signal: expect.any(AbortSignal),
1063+
},
1064+
);
1065+
1066+
client.destroy();
1067+
} finally {
1068+
fetchMock.mockRestore();
1069+
}
1070+
});
1071+
9921072
it("should load flag definitions from flagsFallbackProvider when live fetch fails", async () => {
9931073
const savedAt = "2026-03-09T00:00:00.000Z";
9941074
const flagsFallbackProvider: FlagsFallbackProvider = {
@@ -1083,7 +1163,7 @@ describe("ReflagClient", () => {
10831163

10841164
const client = new ReflagClient({
10851165
...validOptions,
1086-
cacheStrategy: "in-request",
1166+
flagsSyncMode: "in-request",
10871167
flagsFetchRetries: 1,
10881168
});
10891169

@@ -1348,7 +1428,7 @@ describe("ReflagClient", () => {
13481428
active: true,
13491429
},
13501430
type: "company",
1351-
userId: undefined,
1431+
userId: user.id,
13521432
},
13531433
{
13541434
attributes: {
@@ -1546,7 +1626,7 @@ describe("ReflagClient", () => {
15461626
employees: 100,
15471627
name: "Acme Inc.",
15481628
},
1549-
userId: undefined, // this is a bug, will fix in separate PR
1629+
userId: user.id,
15501630
context: undefined,
15511631
},
15521632
{
@@ -1635,7 +1715,14 @@ describe("ReflagClient", () => {
16351715
await client.initialize();
16361716
client.getFlags({ user, company, other: otherContext });
16371717

1638-
expect(isAllowedSpy).toHaveBeenCalledWith("1GHpP+QfYperQ0AtD8bWPiRE4H0=");
1718+
expect(isAllowedSpy).toHaveBeenNthCalledWith(
1719+
1,
1720+
"5Zt35h50IPRNU8yXAj/YbPME/qE=",
1721+
);
1722+
expect(isAllowedSpy).toHaveBeenNthCalledWith(
1723+
2,
1724+
"J3G4oF56f2t+T6xYzES6inc78+c=",
1725+
);
16391726
});
16401727

16411728
it("should return evaluated flags when only user is defined", async () => {
@@ -1901,7 +1988,7 @@ describe("ReflagClient", () => {
19011988
employees: 100,
19021989
name: "Acme Inc.",
19031990
},
1904-
userId: undefined, // this is a bug, will fix in separate PR
1991+
userId: user.id,
19051992
context: undefined,
19061993
},
19071994
{
@@ -2559,7 +2646,14 @@ describe("ReflagClient", () => {
25592646
await client.initialize();
25602647
client.getFlagsForBootstrap({ user, company, other: otherContext });
25612648

2562-
expect(isAllowedSpy).toHaveBeenCalledWith("1GHpP+QfYperQ0AtD8bWPiRE4H0=");
2649+
expect(isAllowedSpy).toHaveBeenNthCalledWith(
2650+
1,
2651+
"5Zt35h50IPRNU8yXAj/YbPME/qE=",
2652+
);
2653+
expect(isAllowedSpy).toHaveBeenNthCalledWith(
2654+
2,
2655+
"J3G4oF56f2t+T6xYzES6inc78+c=",
2656+
);
25632657
});
25642658

25652659
it("should work in offline mode", async () => {

0 commit comments

Comments
 (0)