Skip to content

Commit 69e8535

Browse files
d-csclaude
andcommitted
test(webapp): pin realtime buffered-resource resolution
The previous commit's regression coverage was thin: only the log-dedup gate was unit-tested. The load-bearing logic — synthesise a resource when PG misses but the buffer has the run, with an `id` matching what the drainer will eventually write — had no regression test, so a future change that removed the buffered fallback would put the silent- hang back into prod without anything failing in CI. Extract the resource-resolution rules from the route's findResource into `resolveRealtimeRunResource`, a pure function. Cover the branching with unit tests (PG hit, PG hit during drain race, PG miss + buffer hit, missing taskIdentifier default, both miss) and pin the full chain with a container-backed test that uses a real MollifierBuffer + the real readFallback helper and asserts the synthesised `id` matches `RunId.fromFriendlyId(friendlyId)`. That identity is what Electric's `WHERE id='<id>'` clause depends on when the drainer eventually INSERTs the row. 12 tests total across the three Phase-5.2 suites; one empirical probe run after the refactor confirmed end-to-end behaviour unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 50106dc commit 69e8535

4 files changed

Lines changed: 308 additions & 22 deletions

File tree

apps/webapp/app/routes/realtime.v1.runs.$runId.ts

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
isInitialBufferedSubscriptionRequest,
1313
recordRealtimeBufferedSubscription,
1414
} from "~/v3/mollifier/mollifierTelemetry.server";
15+
import { resolveRealtimeRunResource } from "~/v3/mollifier/realtimeRunResource.server";
1516

1617
const ParamsSchema = z.object({
1718
runId: z.string(),
@@ -36,7 +37,6 @@ export const loader = createLoaderApiRoute(
3637
},
3738
},
3839
});
39-
if (pgRun) return pgRun;
4040

4141
// Buffered fallback. If the run is sitting in the mollifier buffer
4242
// (no PG row yet), open the Electric subscription anyway: the
@@ -45,28 +45,15 @@ export const loader = createLoaderApiRoute(
4545
// Without this branch the route 404s, ShapeStream stops on the
4646
// first response, and the hook silently hangs even after the run
4747
// materialises (no auto-recovery).
48-
const synthetic = await findRunByIdWithMollifierFallback({
49-
runId: params.runId,
50-
environmentId: authentication.environment.id,
51-
organizationId: authentication.environment.organizationId,
52-
});
53-
if (!synthetic) return null;
48+
const bufferedSynthetic = pgRun
49+
? null
50+
: await findRunByIdWithMollifierFallback({
51+
runId: params.runId,
52+
environmentId: authentication.environment.id,
53+
organizationId: authentication.environment.organizationId,
54+
});
5455

55-
// Shape findResource expects: friendlyId, taskIdentifier, runTags,
56-
// batch (for authorization), and id (for streamRun's WHERE clause).
57-
// The synthetic.id is derived from friendlyId via RunId — the same
58-
// value engine.trigger will write when the drainer materialises
59-
// this run, so the Electric subscription matches on INSERT.
60-
// `__bufferedDwellMs` flags this resource as buffer-sourced for
61-
// the loader body's observability hook below.
62-
return {
63-
id: synthetic.id,
64-
friendlyId: synthetic.friendlyId,
65-
taskIdentifier: synthetic.taskIdentifier ?? "",
66-
runTags: synthetic.runTags,
67-
batch: null as { friendlyId: string } | null,
68-
__bufferedDwellMs: Date.now() - synthetic.createdAt.getTime(),
69-
};
56+
return resolveRealtimeRunResource({ pgRun, bufferedSynthetic });
7057
},
7158
authorization: {
7259
action: "read",
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import type { SyntheticRun } from "./readFallback.server";
2+
3+
// Shape `realtime.v1.runs.$runId.ts`'s findResource hands to the route's
4+
// authorization callback + loader body. The PG-resident case is the
5+
// canonical shape (a TaskRun row with the batch join); the buffered
6+
// case below mirrors it from the synthetic run.
7+
export type RealtimeRunResource = {
8+
id: string;
9+
friendlyId: string;
10+
taskIdentifier: string;
11+
runTags: string[];
12+
batch: { friendlyId: string } | null;
13+
// Present only when this resource was resolved from the mollifier
14+
// buffer (no PG row yet). Stamped at resolve time so the loader body
15+
// can emit observability for buffered-window subscriptions. The flag
16+
// doubles as the discriminant — PG-sourced resources never carry it.
17+
__bufferedDwellMs?: number;
18+
};
19+
20+
export type RealtimeRunResourcePgRun = {
21+
id: string;
22+
friendlyId: string;
23+
taskIdentifier: string;
24+
runTags: string[];
25+
batch: { friendlyId: string } | null;
26+
};
27+
28+
// Given the results of the PG and buffer lookups, produce the resource
29+
// shape the realtime route returns from findResource. PG-first: if the
30+
// run is PG-resident, return it unchanged (the buffered fallback only
31+
// fires when no PG row exists yet). When only the buffer has the run,
32+
// synthesise a matching shape whose `id` is the deterministic value
33+
// engine.trigger will write when the drainer materialises this run —
34+
// this is what lets the Electric subscription's `WHERE id=<id>` match
35+
// the eventual INSERT.
36+
export function resolveRealtimeRunResource(input: {
37+
pgRun: RealtimeRunResourcePgRun | null;
38+
bufferedSynthetic: Pick<
39+
SyntheticRun,
40+
"id" | "friendlyId" | "taskIdentifier" | "runTags" | "createdAt"
41+
> | null;
42+
now?: () => number;
43+
}): RealtimeRunResource | null {
44+
if (input.pgRun) return input.pgRun;
45+
if (input.bufferedSynthetic) {
46+
const now = (input.now ?? Date.now)();
47+
return {
48+
id: input.bufferedSynthetic.id,
49+
friendlyId: input.bufferedSynthetic.friendlyId,
50+
taskIdentifier: input.bufferedSynthetic.taskIdentifier ?? "",
51+
runTags: input.bufferedSynthetic.runTags,
52+
batch: null,
53+
__bufferedDwellMs: now - input.bufferedSynthetic.createdAt.getTime(),
54+
};
55+
}
56+
return null;
57+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
3+
vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} }));
4+
5+
import { resolveRealtimeRunResource } from "~/v3/mollifier/realtimeRunResource.server";
6+
7+
const pgRun = {
8+
id: "pg_internal_id",
9+
friendlyId: "run_pg_friendly",
10+
taskIdentifier: "hello-world",
11+
runTags: ["a", "b"],
12+
batch: { friendlyId: "batch_1" },
13+
};
14+
15+
const bufferedSynthetic = {
16+
id: "buffered_id",
17+
friendlyId: "run_buffered_id",
18+
taskIdentifier: "hello-world",
19+
runTags: ["c"],
20+
// Six seconds ago against the fixed `now` below.
21+
createdAt: new Date("2026-05-22T12:00:00.000Z"),
22+
};
23+
24+
const fixedNow = () => new Date("2026-05-22T12:00:06.000Z").getTime();
25+
26+
describe("resolveRealtimeRunResource", () => {
27+
it("returns the PG run unchanged when one exists", () => {
28+
// PG wins even if the buffer also has the entry — the drainer may
29+
// be racing the route call and the PG row is the canonical source.
30+
expect(
31+
resolveRealtimeRunResource({ pgRun, bufferedSynthetic: null }),
32+
).toEqual(pgRun);
33+
expect(
34+
resolveRealtimeRunResource({ pgRun, bufferedSynthetic }),
35+
).toEqual(pgRun);
36+
});
37+
38+
it("never stamps __bufferedDwellMs on a PG-sourced resource", () => {
39+
// The loader body uses __bufferedDwellMs as a discriminant for
40+
// emitting buffered-subscription observability. A PG-resident run
41+
// must never carry it or every PG subscription would over-count.
42+
const result = resolveRealtimeRunResource({ pgRun, bufferedSynthetic });
43+
expect(result).not.toHaveProperty("__bufferedDwellMs");
44+
});
45+
46+
it("synthesises a resource from the buffered entry when PG misses", () => {
47+
// Load-bearing assertion: `id` must equal `bufferedSynthetic.id`.
48+
// The realtime route hands this `id` to streamRun, which builds
49+
// Electric's `WHERE id='<id>'` clause. When the drainer materialises
50+
// the run, engine.trigger writes the row with that same id (derived
51+
// deterministically from friendlyId), and Electric streams the
52+
// INSERT to the client. If the synthesised `id` ever drifts from
53+
// what the drainer writes, the customer subscribes to a shape that
54+
// never matches and the hook silently hangs even after materialise.
55+
const result = resolveRealtimeRunResource({
56+
pgRun: null,
57+
bufferedSynthetic,
58+
now: fixedNow,
59+
});
60+
expect(result).toEqual({
61+
id: "buffered_id",
62+
friendlyId: "run_buffered_id",
63+
taskIdentifier: "hello-world",
64+
runTags: ["c"],
65+
batch: null,
66+
__bufferedDwellMs: 6000,
67+
});
68+
});
69+
70+
it("defaults a missing taskIdentifier to empty string", () => {
71+
const result = resolveRealtimeRunResource({
72+
pgRun: null,
73+
bufferedSynthetic: { ...bufferedSynthetic, taskIdentifier: undefined },
74+
now: fixedNow,
75+
});
76+
expect(result?.taskIdentifier).toBe("");
77+
});
78+
79+
it("returns null when neither PG nor buffer have the run", () => {
80+
// This is the genuine not-found case — typo'd runId, deleted run,
81+
// etc. The api-builder maps null to 404. Critically, the buffered-
82+
// fallback must NOT promote a missing run to a synthetic resource —
83+
// that would cause Electric to open a shape for a runId that may
84+
// never exist, which is also a silent-hang situation but for a
85+
// different reason.
86+
expect(
87+
resolveRealtimeRunResource({ pgRun: null, bufferedSynthetic: null }),
88+
).toBeNull();
89+
});
90+
});
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import { describe, expect, vi } from "vitest";
2+
import { redisTest } from "@internal/testcontainers";
3+
import { MollifierBuffer } from "@trigger.dev/redis-worker";
4+
import { RunId } from "@trigger.dev/core/v3/isomorphic";
5+
6+
vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} }));
7+
8+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
9+
import { resolveRealtimeRunResource } from "~/v3/mollifier/realtimeRunResource.server";
10+
11+
const SNAPSHOT_BASE = {
12+
friendlyId: "run_phase52e2e",
13+
taskIdentifier: "hello-world",
14+
payload: '{"x":1}',
15+
payloadType: "application/json",
16+
traceContext: { traceparent: "00-0123456789abcdef0123456789abcdef-fedcba9876543210-01" },
17+
traceId: "0123456789abcdef0123456789abcdef",
18+
spanId: "fedcba9876543210",
19+
queue: "task/hello-world",
20+
tags: ["realtime-e2e"],
21+
depth: 0,
22+
isTest: false,
23+
taskEventStore: "taskEvent",
24+
};
25+
26+
// End-to-end: a real MollifierBuffer has an entry, the real
27+
// readFallback helper deserialises it, and the resolveRealtimeRunResource
28+
// helper produces the resource shape the realtime route returns from
29+
// findResource. Regression intent: if any link in the chain breaks —
30+
// buffer interface rename, snapshot field rename, id-derivation drift,
31+
// synthetic-shape change — this test fails. The route file itself is
32+
// then a thin glue layer over tested pieces.
33+
describe("realtime buffered-subscription resource resolution (testcontainers)", () => {
34+
redisTest(
35+
"synthesises a resource whose `id` matches RunId.fromFriendlyId",
36+
async ({ redisOptions }) => {
37+
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
38+
try {
39+
await buffer.accept({
40+
runId: SNAPSHOT_BASE.friendlyId,
41+
envId: "env_a",
42+
orgId: "org_1",
43+
payload: JSON.stringify(SNAPSHOT_BASE),
44+
});
45+
46+
const bufferedSynthetic = await findRunByIdWithMollifierFallback(
47+
{
48+
runId: SNAPSHOT_BASE.friendlyId,
49+
environmentId: "env_a",
50+
organizationId: "org_1",
51+
},
52+
{ getBuffer: () => buffer },
53+
);
54+
expect(bufferedSynthetic).not.toBeNull();
55+
56+
const resource = resolveRealtimeRunResource({
57+
pgRun: null,
58+
bufferedSynthetic,
59+
});
60+
61+
// The load-bearing contract: the resolved `id` MUST equal what
62+
// engine.trigger will write to PG.TaskRun.id when the drainer
63+
// materialises this run. Electric's `WHERE id='<id>'` clause
64+
// depends on this match — drift means a silent-hang regression.
65+
expect(resource?.id).toBe(RunId.fromFriendlyId(SNAPSHOT_BASE.friendlyId));
66+
expect(resource?.friendlyId).toBe(SNAPSHOT_BASE.friendlyId);
67+
expect(resource?.taskIdentifier).toBe("hello-world");
68+
expect(resource?.runTags).toEqual(["realtime-e2e"]);
69+
expect(resource?.batch).toBeNull();
70+
expect(resource?.__bufferedDwellMs).toBeTypeOf("number");
71+
expect(resource?.__bufferedDwellMs).toBeGreaterThanOrEqual(0);
72+
} finally {
73+
await buffer.close();
74+
}
75+
},
76+
);
77+
78+
redisTest(
79+
"returns null when neither PG nor the buffer have the entry",
80+
async ({ redisOptions }) => {
81+
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
82+
try {
83+
const bufferedSynthetic = await findRunByIdWithMollifierFallback(
84+
{
85+
runId: "run_does_not_exist",
86+
environmentId: "env_a",
87+
organizationId: "org_1",
88+
},
89+
{ getBuffer: () => buffer },
90+
);
91+
expect(bufferedSynthetic).toBeNull();
92+
93+
const resource = resolveRealtimeRunResource({
94+
pgRun: null,
95+
bufferedSynthetic,
96+
});
97+
// The api builder relies on this null to emit a real 404 for
98+
// genuinely missing runs. If we ever promote unknown runIds to
99+
// synthetic resources here, the route opens an Electric shape
100+
// for a run that may never exist — a different silent-hang
101+
// failure mode for typos, deleted runs, etc.
102+
expect(resource).toBeNull();
103+
} finally {
104+
await buffer.close();
105+
}
106+
},
107+
);
108+
109+
redisTest(
110+
"does not fall back to buffer when PG has the row",
111+
async ({ redisOptions }) => {
112+
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
113+
try {
114+
await buffer.accept({
115+
runId: SNAPSHOT_BASE.friendlyId,
116+
envId: "env_a",
117+
orgId: "org_1",
118+
payload: JSON.stringify(SNAPSHOT_BASE),
119+
});
120+
121+
// Simulate the drainer having materialised the run: PG has the
122+
// canonical row, the buffer still has its entry (would be
123+
// ack'd & removed in real ops). The resolver must return the
124+
// PG row and NOT carry the __bufferedDwellMs flag — otherwise
125+
// the loader body would emit a buffered-subscription log for a
126+
// run that's actually PG-resident, over-counting the signal.
127+
const pgRun = {
128+
id: RunId.fromFriendlyId(SNAPSHOT_BASE.friendlyId),
129+
friendlyId: SNAPSHOT_BASE.friendlyId,
130+
taskIdentifier: "hello-world",
131+
runTags: ["realtime-e2e"],
132+
batch: null,
133+
};
134+
135+
const bufferedSynthetic = await findRunByIdWithMollifierFallback(
136+
{
137+
runId: SNAPSHOT_BASE.friendlyId,
138+
environmentId: "env_a",
139+
organizationId: "org_1",
140+
},
141+
{ getBuffer: () => buffer },
142+
);
143+
144+
const resource = resolveRealtimeRunResource({ pgRun, bufferedSynthetic });
145+
expect(resource).toEqual(pgRun);
146+
expect(resource).not.toHaveProperty("__bufferedDwellMs");
147+
} finally {
148+
await buffer.close();
149+
}
150+
},
151+
);
152+
});

0 commit comments

Comments
 (0)