Skip to content

Commit d4b55c1

Browse files
d-csclaude
andcommitted
feat(webapp): write SYSTEM_FAILURE PG row when drainer hits a non-retryable error
Previously, a non-retryable engine.trigger failure during drain left the buffer entry as `status: "FAILED"` in Redis with no PG row. The customer saw the run in their SDK / dashboard listing for ~10 min (buffer TTL) then it vanished entirely — no audit trail of the failure. Billing was unaffected (no attempts ever ran) but observability was zero. Reuse the engine's existing `createFailedTaskRun` helper (the same one batch-trigger calls when an item fails to start) — writes a terminal SYSTEM_FAILURE TaskRun row with the engine.trigger error stored on `error`, no attempts, P2002-idempotent on the unique constraint. Drainer handler classifies the failure: - Retryable PG error → rethrow so MollifierDrainer.drainOne requeues - Non-retryable → createFailedTaskRun, swallow original error so the buffer entry is ack'd (PG now has the audit row) - createFailedTaskRun also fails (PG truly unreachable) → rethrow original so drainer falls through to its existing buffer.fail terminal-marker path - Snapshot too malformed to construct the environment block → rethrow (defensive — drainer falls through to buffer.fail) Tests cover each path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 3453618 commit d4b55c1

2 files changed

Lines changed: 168 additions & 5 deletions

File tree

apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,77 @@ export function createDrainerHandler(deps: {
8686
span.setAttribute("mollifier.run_friendly_id", input.runId);
8787
span.setAttribute("taskRunId", input.runId);
8888

89-
await deps.engine.trigger(input.payload as any, deps.prisma);
89+
try {
90+
await deps.engine.trigger(input.payload as any, deps.prisma);
91+
} catch (err) {
92+
// The retryable-PG class re-throws so the drainer's outer
93+
// worker loop can `buffer.requeue` (handled in
94+
// `MollifierDrainer.drainOne`). For non-retryable failures we
95+
// write a terminal SYSTEM_FAILURE row to PG via the engine's
96+
// existing `createFailedTaskRun` (used by batch-trigger for
97+
// the same purpose) so the customer sees the run in their
98+
// dashboard / SDK instead of silently losing it when the
99+
// buffer entry TTLs out. If THAT insert also fails (PG truly
100+
// unreachable), rethrow so the drainer's outer catch falls
101+
// through to its existing `buffer.fail` terminal-marker path.
102+
if (isRetryablePgError(err)) {
103+
throw err;
104+
}
105+
const reason = err instanceof Error ? err.message : String(err);
106+
span.setAttribute("mollifier.terminal_failure_reason", reason);
107+
const snapshot = input.payload as Record<string, unknown>;
108+
const env = snapshot.environment as
109+
| {
110+
id: string;
111+
type: any;
112+
project: { id: string };
113+
organization: { id: string };
114+
}
115+
| undefined;
116+
if (!env) {
117+
// Snapshot too malformed to even construct a TaskRun row.
118+
// Drainer's outer catch will buffer.fail this entry.
119+
throw err;
120+
}
121+
try {
122+
await deps.engine.createFailedTaskRun({
123+
friendlyId: input.runId,
124+
environment: env,
125+
taskIdentifier: String(snapshot.taskIdentifier ?? ""),
126+
payload: typeof snapshot.payload === "string" ? snapshot.payload : undefined,
127+
payloadType:
128+
typeof snapshot.payloadType === "string" ? snapshot.payloadType : undefined,
129+
error: {
130+
type: "STRING_ERROR",
131+
raw: `Mollifier drainer terminal failure: ${reason}`,
132+
},
133+
parentTaskRunId:
134+
typeof snapshot.parentTaskRunId === "string"
135+
? snapshot.parentTaskRunId
136+
: undefined,
137+
rootTaskRunId:
138+
typeof snapshot.rootTaskRunId === "string"
139+
? snapshot.rootTaskRunId
140+
: undefined,
141+
depth: typeof snapshot.depth === "number" ? snapshot.depth : 0,
142+
resumeParentOnCompletion: snapshot.resumeParentOnCompletion === true,
143+
traceId: typeof snapshot.traceId === "string" ? snapshot.traceId : undefined,
144+
spanId: typeof snapshot.spanId === "string" ? snapshot.spanId : undefined,
145+
taskEventStore:
146+
typeof snapshot.taskEventStore === "string"
147+
? snapshot.taskEventStore
148+
: undefined,
149+
queue: typeof snapshot.queue === "string" ? snapshot.queue : undefined,
150+
lockedQueueId:
151+
typeof snapshot.lockedQueueId === "string" ? snapshot.lockedQueueId : undefined,
152+
});
153+
} catch (writeErr) {
154+
// Class A — PG itself is failing. Rethrow the original
155+
// error so the drainer falls back to buffer.fail. Include
156+
// the write error in the log line at the drainer layer.
157+
throw err;
158+
}
159+
}
90160
});
91161
});
92162
};

apps/webapp/test/mollifierDrainerHandler.test.ts

Lines changed: 97 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,14 @@ describe("createDrainerHandler", () => {
9090
expect(observedTraceId).toBe(snapshotTraceId);
9191
});
9292

93-
it("propagates engine.trigger errors so MollifierDrainer can classify them", async () => {
93+
it("rethrows retryable PG errors so MollifierDrainer requeues the entry", async () => {
94+
const err = new Error("Can't reach database server");
9495
const trigger = vi.fn(async () => {
95-
throw new Error("boom");
96+
throw err;
9697
});
98+
const createFailedTaskRun = vi.fn();
9799
const handler = createDrainerHandler({
98-
engine: { trigger } as any,
100+
engine: { trigger, createFailedTaskRun } as any,
99101
prisma: {} as any,
100102
});
101103

@@ -108,6 +110,97 @@ describe("createDrainerHandler", () => {
108110
attempts: 0,
109111
createdAt: new Date(),
110112
} as any),
111-
).rejects.toThrow("boom");
113+
).rejects.toThrow("Can't reach database server");
114+
// Retryable: we do NOT write a SYSTEM_FAILURE row, the entry should
115+
// be requeued for another shot.
116+
expect(createFailedTaskRun).not.toHaveBeenCalled();
117+
});
118+
119+
const envFixture = {
120+
id: "env_a",
121+
type: "DEVELOPMENT",
122+
project: { id: "proj_1" },
123+
organization: { id: "org_1" },
124+
};
125+
126+
it("writes a SYSTEM_FAILURE PG row when engine.trigger fails non-retryably", async () => {
127+
const trigger = vi.fn(async () => {
128+
throw new Error("validation failed: payload too large");
129+
});
130+
const createFailedTaskRun = vi.fn(async () => ({
131+
id: "internal",
132+
friendlyId: "run_x",
133+
}));
134+
const handler = createDrainerHandler({
135+
engine: { trigger, createFailedTaskRun } as any,
136+
prisma: {} as any,
137+
});
138+
139+
await expect(
140+
handler({
141+
runId: "run_x",
142+
envId: "env_a",
143+
orgId: "org_1",
144+
payload: { taskIdentifier: "t", environment: envFixture },
145+
attempts: 0,
146+
createdAt: new Date(),
147+
} as any),
148+
).resolves.toBeUndefined();
149+
150+
expect(trigger).toHaveBeenCalledOnce();
151+
expect(createFailedTaskRun).toHaveBeenCalledOnce();
152+
const arg = createFailedTaskRun.mock.calls[0][0] as { error: { raw: string } };
153+
expect(arg.error.raw).toContain("validation failed");
154+
});
155+
156+
it("rethrows the original error when createFailedTaskRun also fails (PG genuinely unreachable)", async () => {
157+
const triggerErr = new Error("engine rejected the snapshot");
158+
const trigger = vi.fn(async () => {
159+
throw triggerErr;
160+
});
161+
const createFailedTaskRun = vi.fn(async () => {
162+
throw new Error("connection refused");
163+
});
164+
const handler = createDrainerHandler({
165+
engine: { trigger, createFailedTaskRun } as any,
166+
prisma: {} as any,
167+
});
168+
169+
await expect(
170+
handler({
171+
runId: "run_x",
172+
envId: "env_a",
173+
orgId: "org_1",
174+
payload: { taskIdentifier: "t", environment: envFixture },
175+
attempts: 0,
176+
createdAt: new Date(),
177+
} as any),
178+
).rejects.toThrow("engine rejected the snapshot");
179+
// Drainer's outer drainOne loop now decides retry vs buffer.fail.
180+
expect(createFailedTaskRun).toHaveBeenCalledOnce();
181+
});
182+
183+
it("rethrows the original error when the snapshot lacks an environment block", async () => {
184+
const triggerErr = new Error("engine rejected the snapshot");
185+
const trigger = vi.fn(async () => {
186+
throw triggerErr;
187+
});
188+
const createFailedTaskRun = vi.fn();
189+
const handler = createDrainerHandler({
190+
engine: { trigger, createFailedTaskRun } as any,
191+
prisma: {} as any,
192+
});
193+
194+
await expect(
195+
handler({
196+
runId: "run_x",
197+
envId: "env_a",
198+
orgId: "org_1",
199+
payload: { taskIdentifier: "t" /* no environment */ },
200+
attempts: 0,
201+
createdAt: new Date(),
202+
} as any),
203+
).rejects.toThrow("engine rejected the snapshot");
204+
expect(createFailedTaskRun).not.toHaveBeenCalled();
112205
});
113206
});

0 commit comments

Comments
 (0)