Skip to content

Commit 97018b1

Browse files
d-csclaude
andcommitted
fix(run-engine): emit runFailed from createFailedTaskRun
The mollifier drainer's terminal-failure path (Phase 4G) and the batch-trigger's "queue size limit exceeded" path both call createFailedTaskRun to write a SYSTEM_FAILURE PG row for runs that never actually executed. Neither path emitted runFailed afterwards, so the runEngineHandlers' `runFailed` listener never fired — which means PerformTaskRunAlertsService never enqueued an alert delivery job, and customers' configured TASK_RUN alert channels missed the failure entirely. The row was visible in the dashboard list but silent for alerting purposes. Emit runFailed from createFailedTaskRun with `attemptNumber: 0` as the marker that the run never executed (distinguishes synthesised terminal failures from runs that exhausted their retries). PerformTaskRunAlertsService doesn't filter on attemptNumber or status, so the existing pipeline picks the event up without further changes. DeliverAlertService dispatches via the channel type (email/webhook/etc) the same way it does for any other terminal failure. Test: a containerTest subscribes to runFailed before calling createFailedTaskRun, asserts exactly one event fires with the expected payload shape. The existing batchTrigger tests still pass (they didn't assert the negative). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 8dc878e commit 97018b1

3 files changed

Lines changed: 155 additions & 0 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
`engine.createFailedTaskRun` now emits the `runFailed` event so the alert pipeline picks up the SYSTEM_FAILURE row and the event-store handler writes the completion event into the trace. Affects the mollifier drainer's terminal-failure path (introduced in Phase 4G) and the batch-trigger's "queue size limit exceeded" path. Previously these terminal failures landed in PG silently — visible in the dashboard list but never reaching customers' configured TASK_RUN alert channels. The event payload carries `attemptNumber: 0` as the marker that the run never executed (synthesised terminal failure, not exhausted retries).

internal-packages/run-engine/src/engine/index.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,6 +1132,44 @@ export class RunEngine {
11321132
});
11331133
}
11341134

1135+
// Emit `runFailed` so the alert pipeline picks up the
1136+
// SYSTEM_FAILURE row and the event-store handler writes the
1137+
// completion event into the trace. Without this the mollifier
1138+
// drainer's terminal failures (and batch-trigger's
1139+
// exceed-limit failures) land in PG silently — visible in the
1140+
// dashboard list but never reaching customers' configured
1141+
// ERROR alert channels.
1142+
this.eventBus.emit("runFailed", {
1143+
time: taskRun.completedAt ?? new Date(),
1144+
run: {
1145+
id: taskRun.id,
1146+
status: taskRun.status,
1147+
spanId: taskRun.spanId,
1148+
error,
1149+
taskEventStore: taskRun.taskEventStore,
1150+
createdAt: taskRun.createdAt,
1151+
completedAt: taskRun.completedAt,
1152+
updatedAt: taskRun.updatedAt,
1153+
// This row never attempted execution — it's a synthesised
1154+
// terminal failure. The alert payload's `attemptNumber=0`
1155+
// is the signal downstream consumers can use to
1156+
// distinguish a never-ran failure from a run that
1157+
// exhausted its retries.
1158+
attemptNumber: 0,
1159+
usageDurationMs: 0,
1160+
costInCents: 0,
1161+
},
1162+
organization: {
1163+
id: environment.organization.id,
1164+
},
1165+
project: {
1166+
id: environment.project.id,
1167+
},
1168+
environment: {
1169+
id: environment.id,
1170+
},
1171+
});
1172+
11351173
return taskRun;
11361174
},
11371175
{
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import { containerTest } from "@internal/testcontainers";
2+
import { trace } from "@internal/tracing";
3+
import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic";
4+
import { expect } from "vitest";
5+
import { RunEngine } from "../index.js";
6+
import { EventBusEventArgs } from "../eventBus.js";
7+
import { setupAuthenticatedEnvironment } from "./setup.js";
8+
9+
vi.setConfig({ testTimeout: 60_000 });
10+
11+
describe("RunEngine.createFailedTaskRun", () => {
12+
containerTest("emits runFailed so the alert pipeline wakes up", async ({ prisma, redisOptions }) => {
13+
// The mollifier drainer (and batch-trigger over-limit path) call
14+
// createFailedTaskRun to write a terminal SYSTEM_FAILURE PG row
15+
// for runs that never actually executed. Without an explicit
16+
// runFailed emit, the row lands silently — the
17+
// runEngineHandlers' `runFailed` listener (which enqueues
18+
// PerformTaskRunAlertsService) never fires, so customers'
19+
// configured TASK_RUN alert channels miss the failure entirely.
20+
//
21+
// Regression intent: if the emit is removed or moved out of
22+
// createFailedTaskRun's success path, this test fails. The
23+
// shape assertions pin the fields the alert delivery service
24+
// reads from the event payload (run.id, run.status, error,
25+
// attemptNumber=0 as the never-ran-marker).
26+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
27+
28+
const engine = new RunEngine({
29+
prisma,
30+
worker: {
31+
redis: redisOptions,
32+
workers: 1,
33+
tasksPerWorker: 10,
34+
pollIntervalMs: 100,
35+
},
36+
queue: {
37+
redis: redisOptions,
38+
masterQueueConsumersDisabled: true,
39+
processWorkerQueueDebounceMs: 50,
40+
},
41+
runLock: {
42+
redis: redisOptions,
43+
},
44+
machines: {
45+
defaultMachine: "small-1x",
46+
machines: {
47+
"small-1x": {
48+
name: "small-1x" as const,
49+
cpu: 0.5,
50+
memory: 0.5,
51+
centsPerMs: 0.0001,
52+
},
53+
},
54+
baseCostInCents: 0.0005,
55+
},
56+
tracer: trace.getTracer("test", "0.0.0"),
57+
});
58+
59+
try {
60+
const failedEvents: EventBusEventArgs<"runFailed">[0][] = [];
61+
engine.eventBus.on("runFailed", (event) => {
62+
failedEvents.push(event);
63+
});
64+
65+
const friendlyId = generateFriendlyId("run");
66+
const taskIdentifier = "drainer-terminal-test";
67+
68+
const failed = await engine.createFailedTaskRun({
69+
friendlyId,
70+
environment: {
71+
id: authenticatedEnvironment.id,
72+
type: authenticatedEnvironment.type,
73+
project: { id: authenticatedEnvironment.project.id },
74+
organization: { id: authenticatedEnvironment.organization.id },
75+
},
76+
taskIdentifier,
77+
payload: "{}",
78+
payloadType: "application/json",
79+
error: {
80+
type: "STRING_ERROR",
81+
raw: "Mollifier drainer terminal failure: synthetic engine.trigger panic",
82+
},
83+
traceId: "0123456789abcdef0123456789abcdef",
84+
spanId: "fedcba9876543210",
85+
});
86+
87+
expect(failed.status).toBe("SYSTEM_FAILURE");
88+
89+
expect(failedEvents).toHaveLength(1);
90+
const event = failedEvents[0];
91+
expect(event.run.id).toBe(failed.id);
92+
expect(event.run.status).toBe("SYSTEM_FAILURE");
93+
expect(event.run.spanId).toBe("fedcba9876543210");
94+
// attemptNumber=0 is the marker that the run never executed —
95+
// it's a synthesised terminal failure, not an exhausted-retries
96+
// failure. Downstream consumers can use this to distinguish.
97+
expect(event.run.attemptNumber).toBe(0);
98+
expect(event.run.usageDurationMs).toBe(0);
99+
expect(event.run.costInCents).toBe(0);
100+
expect(event.run.error).toEqual({
101+
type: "STRING_ERROR",
102+
raw: "Mollifier drainer terminal failure: synthetic engine.trigger panic",
103+
});
104+
expect(event.organization.id).toBe(authenticatedEnvironment.organization.id);
105+
expect(event.project.id).toBe(authenticatedEnvironment.project.id);
106+
expect(event.environment.id).toBe(authenticatedEnvironment.id);
107+
} finally {
108+
await engine.quit();
109+
}
110+
});
111+
});

0 commit comments

Comments
 (0)