Skip to content

Commit dea1c7c

Browse files
committed
feat(webapp,redis-worker): mutateWithFallback helper (Phase B5)
Composes PG-first (replica) lookup, MollifierBuffer.mutateSnapshot, and writer-side spin-wait into the Q3 wait-and-bounce flow. Returns a discriminated outcome rather than throwing Response, so the helper stays route-agnostic and unit-testable. Phase C mutation endpoints (tags, metadata-put, reschedule, cancel) consume this in upcoming commits. Wait knobs default to safetyNetMs=2000, pollStepMs=20, pgTimeoutMs=50 per Q3. Each PG poll is bounded by pgTimeoutMs via Promise.race so a slow query can't burn the whole safety-net budget. Abort signal is respected between polls (callers should pass getRequestAbortSignal() when running in a request handler). Also exports SnapshotPatch and MutateSnapshotResult from @trigger.dev/redis-worker so webapp consumers can type-check their callers of mutateSnapshot.
1 parent 3650812 commit dea1c7c

5 files changed

Lines changed: 384 additions & 1 deletion

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Export `SnapshotPatch` and `MutateSnapshotResult` types from `@trigger.dev/redis-worker` so webapp consumers can type-check their callers of `MollifierBuffer.mutateSnapshot`.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Add `mutateWithFallback` helper in `app/v3/mollifier/mutateWithFallback.server.ts`. Composes PG-first (replica) lookup, `MollifierBuffer.mutateSnapshot`, and writer-side spin-wait into the Q3 wait-and-bounce flow. Returns a discriminated outcome (`pg` / `snapshot` / `not_found` / `timed_out`) without throwing Response objects, keeping the helper route-agnostic and unit-testable. Wait knobs (`safetyNetMs=2000`, `pollStepMs=20`, `pgTimeoutMs=50`) are overridable for tests. Each PG poll is bounded by `pgTimeoutMs` via `Promise.race` so a slow query can't burn the safety net. Phase C mutation endpoints (tags, metadata-put, reschedule, cancel) will consume this helper.
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import type {
2+
MollifierBuffer,
3+
MutateSnapshotResult,
4+
SnapshotPatch,
5+
} from "@trigger.dev/redis-worker";
6+
import type { TaskRun } from "@trigger.dev/database";
7+
import { prisma, $replica } from "~/db.server";
8+
import { logger } from "~/services/logger.server";
9+
import { getMollifierBuffer } from "./mollifierBuffer.server";
10+
11+
// Wait/retry knobs per Q3 design. Exported for tests.
12+
export const DEFAULT_SAFETY_NET_MS = 2_000;
13+
export const DEFAULT_POLL_STEP_MS = 20;
14+
export const DEFAULT_PG_TIMEOUT_MS = 50;
15+
16+
export type MutateWithFallbackInput<TResponse> = {
17+
runId: string;
18+
environmentId: string;
19+
organizationId: string;
20+
bufferPatch: SnapshotPatch;
21+
// Called when a PG row exists (either replica-hit or post-wait writer-hit).
22+
// Receives the full TaskRun shape and returns the customer-visible body.
23+
pgMutation: (pgRow: TaskRun) => Promise<TResponse>;
24+
// Called when the patch landed cleanly on the buffer snapshot. The
25+
// drainer will see the patched payload on its next pop.
26+
synthesisedResponse: () => TResponse;
27+
abortSignal?: AbortSignal;
28+
// Override defaults for tests.
29+
safetyNetMs?: number;
30+
pollStepMs?: number;
31+
pgTimeoutMs?: number;
32+
// Test injection.
33+
getBuffer?: () => MollifierBuffer | null;
34+
prismaWriter?: TaskRunReader;
35+
prismaReplica?: TaskRunReader;
36+
sleep?: (ms: number) => Promise<void>;
37+
now?: () => number;
38+
};
39+
40+
export type MutateWithFallbackOutcome<TResponse> =
41+
| { kind: "pg"; response: TResponse }
42+
| { kind: "snapshot"; response: TResponse }
43+
| { kind: "not_found" }
44+
| { kind: "timed_out" };
45+
46+
// PG-first → buffer mutateSnapshot → wait-and-bounce. Implements the Q3
47+
// design (`_plans/2026-05-19-mollifier-mutation-race-design.md`). The
48+
// caller decides how to translate the outcome into an HTTP response —
49+
// this helper never throws Response objects so it remains route-agnostic
50+
// and unit-testable in isolation.
51+
export async function mutateWithFallback<TResponse>(
52+
input: MutateWithFallbackInput<TResponse>,
53+
): Promise<MutateWithFallbackOutcome<TResponse>> {
54+
const replica = input.prismaReplica ?? $replica;
55+
const writer = input.prismaWriter ?? prisma;
56+
const buffer = (input.getBuffer ?? getMollifierBuffer)();
57+
const sleep = input.sleep ?? defaultSleep;
58+
const now = input.now ?? Date.now;
59+
60+
// Path 1 — PG is already canonical.
61+
const replicaRow = await findRunInPg(replica, input.runId, input.environmentId);
62+
if (replicaRow) {
63+
const response = await input.pgMutation(replicaRow);
64+
return { kind: "pg", response };
65+
}
66+
67+
if (!buffer) {
68+
// No buffer configured (mollifier disabled or boot-time error). PG
69+
// missed; nothing else to consult.
70+
return { kind: "not_found" };
71+
}
72+
73+
// Path 2 — buffer snapshot mutation.
74+
const result: MutateSnapshotResult = await buffer.mutateSnapshot(
75+
input.runId,
76+
input.bufferPatch,
77+
);
78+
79+
if (result === "applied_to_snapshot") {
80+
return { kind: "snapshot", response: input.synthesisedResponse() };
81+
}
82+
83+
if (result === "not_found") {
84+
// Disambiguate a genuine 404 from a replica-lag miss: ask the writer
85+
// directly. If the row just appeared post-drain we route through the
86+
// PG mutation path.
87+
const writerRow = await findRunInPg(writer, input.runId, input.environmentId);
88+
if (writerRow) {
89+
const response = await input.pgMutation(writerRow);
90+
return { kind: "pg", response };
91+
}
92+
return { kind: "not_found" };
93+
}
94+
95+
// result === "busy" — entry is DRAINING / FAILED / materialised. Wait
96+
// for the drainer to terminate the entry into PG (success or
97+
// SYSTEM_FAILURE) and route through pgMutation.
98+
const safetyNetMs = input.safetyNetMs ?? DEFAULT_SAFETY_NET_MS;
99+
const pollStepMs = input.pollStepMs ?? DEFAULT_POLL_STEP_MS;
100+
const pgTimeoutMs = input.pgTimeoutMs ?? DEFAULT_PG_TIMEOUT_MS;
101+
const deadline = now() + safetyNetMs;
102+
103+
while (now() < deadline) {
104+
if (input.abortSignal?.aborted) {
105+
return { kind: "timed_out" };
106+
}
107+
108+
const row = await findRunInPgWithTimeout(
109+
writer,
110+
input.runId,
111+
input.environmentId,
112+
pgTimeoutMs,
113+
);
114+
if (row) {
115+
const response = await input.pgMutation(row);
116+
return { kind: "pg", response };
117+
}
118+
119+
if (now() >= deadline) break;
120+
await sleep(pollStepMs);
121+
}
122+
123+
logger.warn("mollifier mutate-with-fallback: drainer resolution timed out", {
124+
runId: input.runId,
125+
safetyNetMs,
126+
});
127+
return { kind: "timed_out" };
128+
}
129+
130+
// Structural reader interface — accepts both the writer (`prisma`) and the
131+
// replica (`$replica`), which differ slightly in their generated Prisma
132+
// types but share the findFirst surface used here.
133+
type TaskRunReader = {
134+
taskRun: {
135+
findFirst(args: {
136+
where: { friendlyId: string; runtimeEnvironmentId: string };
137+
}): Promise<TaskRun | null>;
138+
};
139+
};
140+
141+
async function findRunInPg(
142+
client: TaskRunReader,
143+
friendlyId: string,
144+
environmentId: string,
145+
): Promise<TaskRun | null> {
146+
return client.taskRun.findFirst({
147+
where: { friendlyId, runtimeEnvironmentId: environmentId },
148+
});
149+
}
150+
151+
async function findRunInPgWithTimeout(
152+
client: TaskRunReader,
153+
friendlyId: string,
154+
environmentId: string,
155+
timeoutMs: number,
156+
): Promise<TaskRun | null> {
157+
// One slow PG query shouldn't burn the whole safety-net budget.
158+
// Promise.race against a timer; on timeout we treat the poll as a miss
159+
// and the outer loop tries again on the next tick.
160+
const timeoutToken = Symbol("pg-timeout");
161+
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
162+
const timeoutPromise = new Promise<typeof timeoutToken>((resolve) => {
163+
timeoutHandle = setTimeout(() => resolve(timeoutToken), timeoutMs);
164+
});
165+
try {
166+
const winner = await Promise.race([
167+
findRunInPg(client, friendlyId, environmentId),
168+
timeoutPromise,
169+
]);
170+
if (winner === timeoutToken) return null;
171+
return winner;
172+
} finally {
173+
if (timeoutHandle) clearTimeout(timeoutHandle);
174+
}
175+
}
176+
177+
function defaultSleep(ms: number): Promise<void> {
178+
return new Promise((resolve) => setTimeout(resolve, ms));
179+
}
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
3+
vi.mock("~/db.server", () => ({
4+
prisma: { taskRun: { findFirst: vi.fn(async () => null) } },
5+
$replica: { taskRun: { findFirst: vi.fn(async () => null) } },
6+
}));
7+
8+
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";
9+
import type { MollifierBuffer, MutateSnapshotResult } from "@trigger.dev/redis-worker";
10+
import type { TaskRun } from "@trigger.dev/database";
11+
12+
type FindFirst = ReturnType<typeof vi.fn>;
13+
type PrismaStub = { taskRun: { findFirst: FindFirst } };
14+
15+
function fakePrisma(rows: Array<TaskRun | null>): PrismaStub {
16+
const fn = vi.fn();
17+
for (const r of rows) fn.mockResolvedValueOnce(r);
18+
fn.mockResolvedValue(null);
19+
return { taskRun: { findFirst: fn } };
20+
}
21+
22+
function bufferReturning(result: MutateSnapshotResult): MollifierBuffer {
23+
return {
24+
mutateSnapshot: vi.fn(async () => result),
25+
} as unknown as MollifierBuffer;
26+
}
27+
28+
const fakeRun = (overrides: Partial<TaskRun> = {}): TaskRun =>
29+
({
30+
id: "pg_id",
31+
friendlyId: "run_1",
32+
runtimeEnvironmentId: "env_a",
33+
...overrides,
34+
}) as TaskRun;
35+
36+
const baseInput = {
37+
runId: "run_1",
38+
environmentId: "env_a",
39+
organizationId: "org_1",
40+
bufferPatch: { type: "append_tags" as const, tags: ["x"] },
41+
};
42+
43+
describe("mutateWithFallback", () => {
44+
it("hits replica → calls pgMutation, returns pg outcome", async () => {
45+
const row = fakeRun();
46+
const pgMutation = vi.fn(async () => "pg-response");
47+
const synthesisedResponse = vi.fn(() => "snapshot-response");
48+
49+
const result = await mutateWithFallback({
50+
...baseInput,
51+
pgMutation,
52+
synthesisedResponse,
53+
prismaReplica: fakePrisma([row]) as unknown as typeof import("~/db.server").$replica,
54+
prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma,
55+
getBuffer: () => bufferReturning("applied_to_snapshot"),
56+
});
57+
58+
expect(result).toEqual({ kind: "pg", response: "pg-response" });
59+
expect(pgMutation).toHaveBeenCalledWith(row);
60+
expect(synthesisedResponse).not.toHaveBeenCalled();
61+
});
62+
63+
it("replica miss + buffer applied_to_snapshot → synthesisedResponse", async () => {
64+
const pgMutation = vi.fn(async () => "pg");
65+
const result = await mutateWithFallback({
66+
...baseInput,
67+
pgMutation,
68+
synthesisedResponse: () => "snap",
69+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
70+
prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma,
71+
getBuffer: () => bufferReturning("applied_to_snapshot"),
72+
});
73+
expect(result).toEqual({ kind: "snapshot", response: "snap" });
74+
expect(pgMutation).not.toHaveBeenCalled();
75+
});
76+
77+
it("replica miss + buffer not_found + writer miss → not_found", async () => {
78+
const result = await mutateWithFallback({
79+
...baseInput,
80+
pgMutation: async () => "pg",
81+
synthesisedResponse: () => "snap",
82+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
83+
prismaWriter: fakePrisma([null]) as unknown as typeof import("~/db.server").prisma,
84+
getBuffer: () => bufferReturning("not_found"),
85+
});
86+
expect(result).toEqual({ kind: "not_found" });
87+
});
88+
89+
it("replica miss + buffer not_found + writer hit → pgMutation (replica-lag recovery)", async () => {
90+
const row = fakeRun({ friendlyId: "run_1" });
91+
const pgMutation = vi.fn(async () => "pg-recovered");
92+
const result = await mutateWithFallback({
93+
...baseInput,
94+
pgMutation,
95+
synthesisedResponse: () => "snap",
96+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
97+
prismaWriter: fakePrisma([row]) as unknown as typeof import("~/db.server").prisma,
98+
getBuffer: () => bufferReturning("not_found"),
99+
});
100+
expect(result).toEqual({ kind: "pg", response: "pg-recovered" });
101+
expect(pgMutation).toHaveBeenCalledWith(row);
102+
});
103+
104+
it("replica miss + buffer busy + writer resolves mid-wait → pgMutation", async () => {
105+
const row = fakeRun();
106+
const pgMutation = vi.fn(async () => "pg-after-wait");
107+
// Replica misses; writer misses twice, then hits.
108+
const writer = fakePrisma([null, null, row]);
109+
let nowValue = 0;
110+
const result = await mutateWithFallback({
111+
...baseInput,
112+
pgMutation,
113+
synthesisedResponse: () => "snap",
114+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
115+
prismaWriter: writer as unknown as typeof import("~/db.server").prisma,
116+
getBuffer: () => bufferReturning("busy"),
117+
sleep: async () => {
118+
nowValue += 20;
119+
},
120+
now: () => nowValue,
121+
safetyNetMs: 2000,
122+
pollStepMs: 20,
123+
pgTimeoutMs: 50,
124+
});
125+
expect(result).toEqual({ kind: "pg", response: "pg-after-wait" });
126+
expect(pgMutation).toHaveBeenCalledWith(row);
127+
// Writer should have been polled 3 times before the hit.
128+
expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(3);
129+
});
130+
131+
it("replica miss + buffer busy + drainer never resolves → timed_out", async () => {
132+
let nowValue = 0;
133+
const result = await mutateWithFallback({
134+
...baseInput,
135+
pgMutation: async () => "pg",
136+
synthesisedResponse: () => "snap",
137+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
138+
prismaWriter: fakePrisma([null, null, null, null, null]) as unknown as typeof import("~/db.server").prisma,
139+
getBuffer: () => bufferReturning("busy"),
140+
sleep: async () => {
141+
nowValue += 20;
142+
},
143+
now: () => nowValue,
144+
safetyNetMs: 60,
145+
pollStepMs: 20,
146+
pgTimeoutMs: 5,
147+
});
148+
expect(result).toEqual({ kind: "timed_out" });
149+
});
150+
151+
it("abort signal during wait → timed_out without further polls", async () => {
152+
const writer = fakePrisma([null, null, null]);
153+
const controller = new AbortController();
154+
let nowValue = 0;
155+
const result = await mutateWithFallback({
156+
...baseInput,
157+
pgMutation: async () => "pg",
158+
synthesisedResponse: () => "snap",
159+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
160+
prismaWriter: writer as unknown as typeof import("~/db.server").prisma,
161+
getBuffer: () => bufferReturning("busy"),
162+
sleep: async () => {
163+
nowValue += 20;
164+
controller.abort();
165+
},
166+
now: () => nowValue,
167+
safetyNetMs: 2000,
168+
pollStepMs: 20,
169+
pgTimeoutMs: 5,
170+
abortSignal: controller.signal,
171+
});
172+
expect(result).toEqual({ kind: "timed_out" });
173+
// One poll happened before the sleep+abort.
174+
expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(1);
175+
});
176+
177+
it("buffer is null (mollifier disabled) → not_found after replica miss", async () => {
178+
const result = await mutateWithFallback({
179+
...baseInput,
180+
pgMutation: async () => "pg",
181+
synthesisedResponse: () => "snap",
182+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
183+
prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma,
184+
getBuffer: () => null,
185+
});
186+
expect(result).toEqual({ kind: "not_found" });
187+
});
188+
});

packages/redis-worker/src/mollifier/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
export { MollifierBuffer, type MollifierBufferOptions } from "./buffer.js";
1+
export {
2+
MollifierBuffer,
3+
type MollifierBufferOptions,
4+
type SnapshotPatch,
5+
type MutateSnapshotResult,
6+
} from "./buffer.js";
27
export {
38
MollifierDrainer,
49
type MollifierDrainerOptions,

0 commit comments

Comments
 (0)