Skip to content

Commit 50106dc

Browse files
d-csclaude
andcommitted
fix(webapp): keep useRealtimeRun stream open across the buffered window
Customers subscribing to a freshly-triggered run via useRealtimeRun silently hung when the gate diverted the run into the mollifier buffer. The route's findResource looked up the PG TaskRun by friendlyId, found nothing, and returned 404. Electric SQL's ShapeStream treats the initial 404 as terminal — no retry, no error surfaced to the hook, and crucially no recovery after the drainer eventually INSERTed the PG row. The customer's component shows the empty state indefinitely even though the run is alive and progressing. When the PG lookup misses but the buffer has the run, return a synthetic resource whose `id` is derived from the friendlyId — the same value engine.trigger will write when the drainer materialises this run. The route then opens the Electric subscription against `WHERE id='<id>'`, Electric streams an empty initial snapshot, and the SDK long-polls until the drainer's INSERT propagates through. Empirically validated end-to-end: trigger a buffered run, open the subscription, simulate the drainer's PG INSERT + UPDATE, and the SDK iterator yields the QUEUED and EXECUTING events in real time. Adds a `mollifier.realtime_subscriptions.buffered` counter and a structured log line. The observability gate fires once per cold subscription (Electric's `handle` query param is the dedup signal), not on every ~20s long-poll reconnect; that gate is unit-tested. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 1ece983 commit 50106dc

5 files changed

Lines changed: 140 additions & 2 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
`useRealtimeRun` / `subscribeToRun` previously hung silently when the run was still in the mollifier buffer: the realtime route returned 404, Electric's `ShapeStream` stopped on the first response, and the hook never recovered even after the drainer materialised the run. Open the Electric shape stream against a synthetic resource derived from the buffer entry instead — the stream returns an empty initial snapshot and streams the `INSERT` to the client when the drainer creates the PG row. Adds a `mollifier.realtime_subscriptions.buffered` counter and a structured log line on the initial connect for visibility into how often customers subscribe inside the buffered window.

apps/webapp/app/routes/realtime.v1.runs.$runId.ts

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { json } from "@remix-run/server-runtime";
21
import { z } from "zod";
32
import { $replica } from "~/db.server";
43
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
@@ -7,6 +6,12 @@ import {
76
anyResource,
87
createLoaderApiRoute,
98
} from "~/services/routeBuilders/apiBuilder.server";
9+
import { logger } from "~/services/logger.server";
10+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
11+
import {
12+
isInitialBufferedSubscriptionRequest,
13+
recordRealtimeBufferedSubscription,
14+
} from "~/v3/mollifier/mollifierTelemetry.server";
1015

1116
const ParamsSchema = z.object({
1217
runId: z.string(),
@@ -18,7 +23,7 @@ export const loader = createLoaderApiRoute(
1823
allowJWT: true,
1924
corsStrategy: "all",
2025
findResource: async (params, authentication) => {
21-
return $replica.taskRun.findFirst({
26+
const pgRun = await $replica.taskRun.findFirst({
2227
where: {
2328
friendlyId: params.runId,
2429
runtimeEnvironmentId: authentication.environment.id,
@@ -31,6 +36,37 @@ export const loader = createLoaderApiRoute(
3136
},
3237
},
3338
});
39+
if (pgRun) return pgRun;
40+
41+
// Buffered fallback. If the run is sitting in the mollifier buffer
42+
// (no PG row yet), open the Electric subscription anyway: the
43+
// shape stream returns an empty initial snapshot, and when the
44+
// drainer INSERTs the PG row Electric streams it to the client.
45+
// Without this branch the route 404s, ShapeStream stops on the
46+
// first response, and the hook silently hangs even after the run
47+
// materialises (no auto-recovery).
48+
const synthetic = await findRunByIdWithMollifierFallback({
49+
runId: params.runId,
50+
environmentId: authentication.environment.id,
51+
organizationId: authentication.environment.organizationId,
52+
});
53+
if (!synthetic) return null;
54+
55+
// Shape findResource expects: friendlyId, taskIdentifier, runTags,
56+
// batch (for authorization), and id (for streamRun's WHERE clause).
57+
// The synthetic.id is derived from friendlyId via RunId — the same
58+
// value engine.trigger will write when the drainer materialises
59+
// this run, so the Electric subscription matches on INSERT.
60+
// `__bufferedDwellMs` flags this resource as buffer-sourced for
61+
// the loader body's observability hook below.
62+
return {
63+
id: synthetic.id,
64+
friendlyId: synthetic.friendlyId,
65+
taskIdentifier: synthetic.taskIdentifier ?? "",
66+
runTags: synthetic.runTags,
67+
batch: null as { friendlyId: string } | null,
68+
__bufferedDwellMs: Date.now() - synthetic.createdAt.getTime(),
69+
};
3470
},
3571
authorization: {
3672
action: "read",
@@ -48,6 +84,22 @@ export const loader = createLoaderApiRoute(
4884
},
4985
},
5086
async ({ authentication, request, resource: run, apiVersion }) => {
87+
// Observability for buffered-window subscriptions. The gate keeps
88+
// the counter at one tick per subscription instead of one tick per
89+
// ~20s live-poll iteration (see `isInitialBufferedSubscriptionRequest`).
90+
const bufferedDwellMs = (run as { __bufferedDwellMs?: number }).__bufferedDwellMs;
91+
if (
92+
typeof bufferedDwellMs === "number" &&
93+
isInitialBufferedSubscriptionRequest(request.url)
94+
) {
95+
recordRealtimeBufferedSubscription(authentication.environment.id);
96+
logger.info("mollifier.realtime.buffered_subscription", {
97+
runId: run.friendlyId,
98+
envId: authentication.environment.id,
99+
bufferDwellMs: bufferedDwellMs,
100+
});
101+
}
102+
51103
return realtimeClient.streamRun(
52104
request.url,
53105
authentication.environment,

apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,30 @@ export function recordDecision(outcome: DecisionOutcome, reason?: DecisionReason
1515
...(reason ? { reason } : {}),
1616
});
1717
}
18+
19+
// Counts subscriptions hitting `/realtime/v1/runs/<id>` for a run that
20+
// lives only in the mollifier buffer (no PG row yet). The route opens
21+
// the Electric stream anyway so the eventual drainer-INSERT propagates
22+
// to the client; this counter is the signal of how often customers
23+
// subscribe inside the buffered window.
24+
export const realtimeBufferedSubscriptionsCounter = meter.createCounter(
25+
"mollifier.realtime_subscriptions.buffered",
26+
{
27+
description:
28+
"Realtime subscriptions opened against a runId that exists only in the mollifier buffer",
29+
},
30+
);
31+
32+
export function recordRealtimeBufferedSubscription(envId: string): void {
33+
realtimeBufferedSubscriptionsCounter.add(1, { envId });
34+
}
35+
36+
// Electric SQL's shape-stream protocol adds a `handle=` query param on
37+
// every reconnect after the initial GET. Gating the realtime-buffered
38+
// log/counter on its absence keeps the signal at one tick per
39+
// subscription instead of one tick per ~20s live-poll iteration —
40+
// without it the counter would over-count by the long-poll factor.
41+
export function isInitialBufferedSubscriptionRequest(url: string | URL): boolean {
42+
const u = typeof url === "string" ? new URL(url) : url;
43+
return !u.searchParams.has("handle");
44+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
3+
vi.mock("~/db.server", () => ({
4+
prisma: {},
5+
$replica: {},
6+
}));
7+
8+
import { isInitialBufferedSubscriptionRequest } from "~/v3/mollifier/mollifierTelemetry.server";
9+
10+
describe("isInitialBufferedSubscriptionRequest", () => {
11+
// Electric's shape-stream protocol returns a `handle=<shape-id>` in
12+
// the first response. The SDK echoes that handle on every reconnect /
13+
// live-poll iteration thereafter. The realtime route logs +
14+
// increments the mollifier.realtime_subscriptions.buffered counter
15+
// only on the initial connect (handle absent) so each subscription
16+
// produces a single observability event instead of one per
17+
// long-poll round-trip (~20s).
18+
it("returns true for the SDK's initial GET (no handle param)", () => {
19+
expect(
20+
isInitialBufferedSubscriptionRequest(
21+
"http://localhost:3030/realtime/v1/runs/run_x?log=full&offset=-1",
22+
),
23+
).toBe(true);
24+
});
25+
26+
it("returns false for Electric's reconnects (handle present)", () => {
27+
expect(
28+
isInitialBufferedSubscriptionRequest(
29+
"http://localhost:3030/realtime/v1/runs/run_x?handle=100344308-1779&log=full&offset=0_0",
30+
),
31+
).toBe(false);
32+
});
33+
34+
it("returns false for Electric live-poll reconnects (handle + cursor)", () => {
35+
expect(
36+
isInitialBufferedSubscriptionRequest(
37+
"http://localhost:3030/realtime/v1/runs/run_x?cursor=51020980&handle=100344308&live=true&log=full&offset=0_inf",
38+
),
39+
).toBe(false);
40+
});
41+
42+
it("accepts a URL instance as well as a string", () => {
43+
const url = new URL("http://localhost:3030/realtime/v1/runs/run_x?log=full");
44+
expect(isInitialBufferedSubscriptionRequest(url)).toBe(true);
45+
});
46+
});

docs/realtime/react-hooks/subscribe.mdx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ Trigger a task and immediately subscribe to its run. Details in the [triggering]
2121

2222
The `useRealtimeRun` hook allows you to subscribe to a run by its ID.
2323

24+
<Note>
25+
During sustained traffic bursts the platform may briefly buffer new triggers before
26+
materialising them. `useRealtimeRun` keeps the subscription open across this window and
27+
begins streaming as soon as the run is materialised — typically sub-second.
28+
</Note>
29+
30+
2431
```tsx
2532
"use client"; // This is needed for Next.js App Router or other RSC frameworks
2633

0 commit comments

Comments
 (0)