Skip to content

Commit 03bc55c

Browse files
committed
fix(webapp): make native realtime change publishing fail-safe
1 parent 911a1cf commit 03bc55c

4 files changed

Lines changed: 44 additions & 3 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Harden the native realtime backend's run-change publishing so a publish can never throw into a run lifecycle operation and never buffers commands in memory during a pub/sub Redis outage.

apps/webapp/app/redis.server.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ export type RedisWithClusterOptions = {
1111
clusterMode?: boolean;
1212
clusterOptions?: Omit<ClusterOptions, "redisOptions">;
1313
keyPrefix?: string;
14+
/** Reject commands immediately when the connection isn't ready instead of buffering them in memory (default: ioredis's `true`). */
15+
enableOfflineQueue?: boolean;
16+
/** Cap retries for an in-flight command before it rejects; `null` means unlimited (default: ioredis's default of 20). */
17+
maxRetriesPerRequest?: number | null;
1418
};
1519

1620
export type RedisClient = Redis | Cluster;
@@ -37,13 +41,19 @@ export function createRedisClient(
3741

3842
redis = new Redis.Cluster(nodes, {
3943
...options.clusterOptions,
44+
...(options.enableOfflineQueue !== undefined
45+
? { enableOfflineQueue: options.enableOfflineQueue }
46+
: {}),
4047
redisOptions: {
4148
connectionName,
4249
keyPrefix: options.keyPrefix,
4350
username: options.username,
4451
password: options.password,
4552
enableAutoPipelining: true,
4653
reconnectOnError: defaultReconnectOnError,
54+
...(options.maxRetriesPerRequest !== undefined
55+
? { maxRetriesPerRequest: options.maxRetriesPerRequest }
56+
: {}),
4757
...(options.tlsDisabled
4858
? {
4959
checkServerIdentity: () => {
@@ -72,6 +82,12 @@ export function createRedisClient(
7282
enableAutoPipelining: true,
7383
keyPrefix: options.keyPrefix,
7484
reconnectOnError: defaultReconnectOnError,
85+
...(options.enableOfflineQueue !== undefined
86+
? { enableOfflineQueue: options.enableOfflineQueue }
87+
: {}),
88+
...(options.maxRetriesPerRequest !== undefined
89+
? { maxRetriesPerRequest: options.maxRetriesPerRequest }
90+
: {}),
7591
...(options.tlsDisabled ? {} : { tls: {} }),
7692
});
7793
}

apps/webapp/app/services/realtime/runChangeNotifier.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,13 @@ export class RunChangeNotifier {
225225

226226
#ensurePublisher(): RedisClient {
227227
if (!this.#publisher) {
228-
this.#publisher = createRedisClient(`${this.#connectionName}:pub`, this.options.redis);
228+
// Publishes are fire-and-forget with a consumer-side backstop, so a dropped publish is
229+
// latency-only. Fail fast rather than buffer commands in memory during a pub/sub outage.
230+
this.#publisher = createRedisClient(`${this.#connectionName}:pub`, {
231+
...this.options.redis,
232+
enableOfflineQueue: false,
233+
maxRetriesPerRequest: 1,
234+
});
229235
}
230236
return this.#publisher;
231237
}

apps/webapp/app/services/realtime/runChangeNotifierInstance.server.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { getMeter } from "@internal/tracing";
22
import { env } from "~/env.server";
33
import { singleton } from "~/utils/singleton";
4+
import { logger } from "../logger.server";
45
import { RunChangeNotifier, type ChangeRecordInput } from "./runChangeNotifier.server";
56

67
/**
@@ -74,12 +75,24 @@ export function publishChangeRecord(input: ChangeRecordInput): void {
7475
if (!nativeBackendEnabled) {
7576
return;
7677
}
77-
getRunChangeNotifier().publish(input);
78+
// Publish runs on the run-engine event bus / metadata flush loop; lazy init + encoding happen
79+
// before the notifier's own try/catch, so guard the whole call — it must never throw at its caller.
80+
try {
81+
getRunChangeNotifier().publish(input);
82+
} catch (error) {
83+
logger.error("[runChangeNotifier] publishChangeRecord threw; dropping notification", { error });
84+
}
7885
}
7986

8087
export function publishManyChangeRecords(inputs: ChangeRecordInput[]): void {
8188
if (!nativeBackendEnabled) {
8289
return;
8390
}
84-
getRunChangeNotifier().publishMany(inputs);
91+
try {
92+
getRunChangeNotifier().publishMany(inputs);
93+
} catch (error) {
94+
logger.error("[runChangeNotifier] publishManyChangeRecords threw; dropping notifications", {
95+
error,
96+
});
97+
}
8598
}

0 commit comments

Comments
 (0)