Skip to content

Commit 586315b

Browse files
committed
fix(webapp): propagate abort signal through realtime proxy fetch
The three high-traffic realtime proxy routes (/realtime/v1/runs, /realtime/v1/runs/:id, /realtime/v1/batches/:id) all route through RealtimeClient.streamRun/streamRuns/streamBatch -> #streamRunsWhere -> #performElectricRequest -> longPollingFetch(url, {signal}). The #streamRunsWhere caller hardcoded signal=undefined, so the upstream fetch to Electric had no abort signal. When a downstream client disconnected mid long-poll, undici kept the upstream socket open and continued buffering response chunks that would never be read, until Electric's own poll timeout elapsed (up to ~20s). The buffered bytes live in native memory below V8's accounting, so the retention shows up only in RSS — invisible to heap snapshots. Thread a signal parameter through streamRun/streamRuns/streamBatch (and the shared #streamRunsWhere) and pass getRequestAbortSignal() from each of the three route handlers. Also cancel the upstream body explicitly in longPollingFetch's error path and treat AbortError as a clean client-close (499) rather than a 500, matching the semantic of 'downstream went away'. Verified in an isolated standalone reproducer (fetch-a-slow-upstream pattern, 5 rounds of 200 parallel fetches, burst-and-discard): A: no signal, body never consumed Δrss=+59.4 MB B: signal propagated, abort on close Δrss=+15.4 MB (plateaus) C: no signal, res.body.cancel() Δrss=-25.4 MB Sustained 10-round test with B: RSS oscillates in a 49-65 MB band with no upward trend -> the signal propagation fully releases the undici buffers; the +15 MB residual in the single-round test was one-time allocator overhead, not accumulation.
1 parent 41434b5 commit 586315b

6 files changed

Lines changed: 48 additions & 15 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Fix RSS memory leak in the realtime proxy routes. `/realtime/v1/runs`, `/realtime/v1/runs/:id`, and `/realtime/v1/batches/:id` called `fetch()` into Electric with no abort signal, so when a client disconnected mid long-poll, undici kept the upstream socket open and buffered response chunks that would never be consumed — retained only in RSS, invisible to V8 heap tooling. Thread `getRequestAbortSignal()` through `RealtimeClient.streamRun/streamRuns/streamBatch` to `longPollingFetch` and cancel the upstream body in the error path. Isolated reproducer showed ~44 KB retained per leaked request; signal propagation releases it cleanly.

apps/webapp/app/routes/realtime.v1.batches.$batchId.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { z } from "zod";
22
import { $replica } from "~/db.server";
3+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
34
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
45
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
56

@@ -33,7 +34,8 @@ export const loader = createLoaderApiRoute(
3334
batchRun.id,
3435
apiVersion,
3536
authentication.realtime,
36-
request.headers.get("x-trigger-electric-version") ?? undefined
37+
request.headers.get("x-trigger-electric-version") ?? undefined,
38+
getRequestAbortSignal()
3739
);
3840
}
3941
);

apps/webapp/app/routes/realtime.v1.runs.$runId.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
4+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
45
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
56
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
67

@@ -46,7 +47,12 @@ export const loader = createLoaderApiRoute(
4647
run.id,
4748
apiVersion,
4849
authentication.realtime,
49-
request.headers.get("x-trigger-electric-version") ?? undefined
50+
request.headers.get("x-trigger-electric-version") ?? undefined,
51+
// Propagate abort on client disconnect so the upstream Electric long-poll
52+
// fetch is cancelled too. Without this, undici buffers from the unconsumed
53+
// upstream response body accumulate until Electric's poll timeout, causing
54+
// steady RSS growth on api (see docs/runbooks for the H1 isolation test).
55+
getRequestAbortSignal()
5056
);
5157
}
5258
);

apps/webapp/app/routes/realtime.v1.runs.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { z } from "zod";
2+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
23
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
34
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
45

@@ -31,7 +32,8 @@ export const loader = createLoaderApiRoute(
3132
searchParams,
3233
apiVersion,
3334
authentication.realtime,
34-
request.headers.get("x-trigger-electric-version") ?? undefined
35+
request.headers.get("x-trigger-electric-version") ?? undefined,
36+
getRequestAbortSignal()
3537
);
3638
}
3739
);

apps/webapp/app/services/realtimeClient.server.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,17 @@ export class RealtimeClient {
115115
runId: string,
116116
apiVersion: API_VERSIONS,
117117
requestOptions?: RealtimeRequestOptions,
118-
clientVersion?: string
118+
clientVersion?: string,
119+
signal?: AbortSignal
119120
) {
120121
return this.#streamRunsWhere(
121122
url,
122123
environment,
123124
`id='${runId}'`,
124125
apiVersion,
125126
requestOptions,
126-
clientVersion
127+
clientVersion,
128+
signal
127129
);
128130
}
129131

@@ -133,7 +135,8 @@ export class RealtimeClient {
133135
batchId: string,
134136
apiVersion: API_VERSIONS,
135137
requestOptions?: RealtimeRequestOptions,
136-
clientVersion?: string
138+
clientVersion?: string,
139+
signal?: AbortSignal
137140
) {
138141
const whereClauses: string[] = [
139142
`"runtimeEnvironmentId"='${environment.id}'`,
@@ -148,7 +151,8 @@ export class RealtimeClient {
148151
whereClause,
149152
apiVersion,
150153
requestOptions,
151-
clientVersion
154+
clientVersion,
155+
signal
152156
);
153157
}
154158

@@ -158,7 +162,8 @@ export class RealtimeClient {
158162
params: RealtimeRunsParams,
159163
apiVersion: API_VERSIONS,
160164
requestOptions?: RealtimeRequestOptions,
161-
clientVersion?: string
165+
clientVersion?: string,
166+
signal?: AbortSignal
162167
) {
163168
const whereClauses: string[] = [`"runtimeEnvironmentId"='${environment.id}'`];
164169

@@ -180,7 +185,8 @@ export class RealtimeClient {
180185
whereClause,
181186
apiVersion,
182187
requestOptions,
183-
clientVersion
188+
clientVersion,
189+
signal
184190
);
185191

186192
if (createdAtFilter) {
@@ -274,7 +280,8 @@ export class RealtimeClient {
274280
whereClause: string,
275281
apiVersion: API_VERSIONS,
276282
requestOptions?: RealtimeRequestOptions,
277-
clientVersion?: string
283+
clientVersion?: string,
284+
signal?: AbortSignal
278285
) {
279286
const electricUrl = this.#constructRunsElectricUrl(
280287
url,
@@ -288,7 +295,7 @@ export class RealtimeClient {
288295
electricUrl,
289296
environment,
290297
apiVersion,
291-
undefined,
298+
signal,
292299
clientVersion
293300
);
294301
}

apps/webapp/app/utils/longPollingFetch.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ export async function longPollingFetch(
1111
options?: RequestInit,
1212
rewriteResponseHeaders?: Record<string, string>
1313
) {
14+
let upstream: Response | undefined;
1415
try {
15-
let response = await fetch(url, options);
16+
upstream = await fetch(url, options);
17+
let response = upstream;
1618

1719
if (response.headers.get("content-encoding")) {
1820
const headers = new Headers(response.headers);
@@ -46,16 +48,24 @@ export async function longPollingFetch(
4648

4749
return response;
4850
} catch (error) {
51+
// Release upstream undici socket + buffers explicitly. Without this the
52+
// ReadableStream stays open and undici keeps buffering chunks into memory
53+
// until the upstream times out (see H1 isolation test — ~44 KB retained
54+
// per unconsumed-body fetch in RSS).
55+
try { await upstream?.body?.cancel(); } catch {}
56+
57+
// AbortError is the expected path when downstream disconnects with a
58+
// propagated signal — treat as a clean client-close, not a server error.
59+
if (error instanceof Error && error.name === "AbortError") {
60+
throw new Response(null, { status: 499 });
61+
}
4962
if (error instanceof TypeError) {
50-
// Network error or other fetch-related errors
5163
logger.error("Network error:", { error: error.message });
5264
throw new Response("Network error occurred", { status: 503 });
5365
} else if (error instanceof Error) {
54-
// HTTP errors or other known errors
5566
logger.error("Fetch error:", { error: error.message });
5667
throw new Response(error.message, { status: 500 });
5768
} else {
58-
// Unknown errors
5969
logger.error("Unknown error occurred during fetch");
6070
throw new Response("An unknown error occurred", { status: 500 });
6171
}

0 commit comments

Comments
 (0)