Skip to content

Commit f2ff1a9

Browse files
committed
test(run-engine): integration tests for engine.createCancelledRun (Phase F3)
Three containerTest cases covering the novel C1 piece — the rest of the Phase C work has unit-test coverage already. 1. Writes CANCELED PG row with snapshot fields, completedAt set to cancelledAt, error.raw set to cancelReason, runTags / taskIdentifier / payload preserved. 2. Emits runCancelled with full payload (id, friendlyId, status, error, organization / project / environment ids). 3. Idempotent on double-pop: second call after the first returns the existing row id (P2002 caught) and does not re-emit the event. Real PG + Redis testcontainers. ~15s total.
1 parent a871022 commit f2ff1a9

1 file changed

Lines changed: 186 additions & 0 deletions

File tree

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
import { containerTest } from "@internal/testcontainers";
2+
import { trace } from "@internal/tracing";
3+
import { RunId } from "@trigger.dev/core/v3/isomorphic";
4+
5+
function freshRunId() {
6+
return RunId.generate().friendlyId;
7+
}
8+
import { expect } from "vitest";
9+
import { RunEngine } from "../index.js";
10+
import type { EventBusEventArgs } from "../eventBus.js";
11+
import { setupAuthenticatedEnvironment } from "./setup.js";
12+
13+
vi.setConfig({ testTimeout: 60_000 });
14+
15+
function baseEngineOptions(redisOptions: Parameters<typeof RunEngine>[0]["queue"]["redis"]) {
16+
return {
17+
worker: {
18+
redis: redisOptions,
19+
workers: 1,
20+
tasksPerWorker: 10,
21+
pollIntervalMs: 100,
22+
},
23+
queue: {
24+
redis: redisOptions,
25+
masterQueueConsumersDisabled: true,
26+
processWorkerQueueDebounceMs: 50,
27+
},
28+
runLock: {
29+
redis: redisOptions,
30+
},
31+
machines: {
32+
defaultMachine: "small-1x" as const,
33+
machines: {
34+
"small-1x": {
35+
name: "small-1x" as const,
36+
cpu: 0.5,
37+
memory: 0.5,
38+
centsPerMs: 0.0001,
39+
},
40+
},
41+
baseCostInCents: 0.0001,
42+
},
43+
tracer: trace.getTracer("test", "0.0.0"),
44+
};
45+
}
46+
47+
// Phase C1 / Q4 design — engine.createCancelledRun writes a CANCELED
48+
// TaskRun row directly from a buffer snapshot. Verifies the bypass-
49+
// queue / bypass-waitpoint / emit-runCancelled contract.
50+
describe("RunEngine.createCancelledRun", () => {
51+
containerTest(
52+
"writes CANCELED PG row with snapshot fields, completedAt, error",
53+
async ({ prisma, redisOptions }) => {
54+
const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
55+
const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) });
56+
try {
57+
const friendlyId = freshRunId();
58+
const cancelledAt = new Date("2026-05-20T12:00:00.000Z");
59+
const cancelReason = "Canceled by user";
60+
61+
const result = await engine.createCancelledRun({
62+
snapshot: {
63+
friendlyId,
64+
environment: env,
65+
taskIdentifier: "test-task",
66+
payload: '{"hello":"world"}',
67+
payloadType: "application/json",
68+
context: {},
69+
traceContext: {},
70+
traceId: "0000000000000000aaaa000000000000",
71+
spanId: "bbbb000000000000",
72+
queue: "task/test-task",
73+
isTest: false,
74+
tags: ["test-tag"],
75+
},
76+
cancelledAt,
77+
cancelReason,
78+
});
79+
80+
expect(result.status).toBe("CANCELED");
81+
expect(result.friendlyId).toBe(friendlyId);
82+
expect(result.id).toBe(RunId.fromFriendlyId(friendlyId));
83+
expect(result.completedAt?.toISOString()).toBe(cancelledAt.toISOString());
84+
expect(result.taskIdentifier).toBe("test-task");
85+
expect(result.runTags).toEqual(["test-tag"]);
86+
expect(result.payload).toBe('{"hello":"world"}');
87+
const err = result.error as { type?: string; raw?: string };
88+
expect(err.type).toBe("STRING_ERROR");
89+
expect(err.raw).toBe(cancelReason);
90+
91+
// Verify the PG row is canonical (findFirst returns the row).
92+
const stored = await prisma.taskRun.findFirst({
93+
where: { friendlyId },
94+
});
95+
expect(stored).not.toBeNull();
96+
expect(stored!.status).toBe("CANCELED");
97+
} finally {
98+
await engine.quit();
99+
}
100+
},
101+
);
102+
103+
containerTest(
104+
"emits runCancelled with correct payload",
105+
async ({ prisma, redisOptions }) => {
106+
const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
107+
const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) });
108+
const captured: EventBusEventArgs<"runCancelled">[0][] = [];
109+
engine.eventBus.on("runCancelled", (event) => {
110+
captured.push(event);
111+
});
112+
113+
try {
114+
const cancelledAt = new Date();
115+
const cancelReason = "Test cancel";
116+
const friendlyId = freshRunId();
117+
await engine.createCancelledRun({
118+
snapshot: {
119+
friendlyId,
120+
environment: env,
121+
taskIdentifier: "test-task",
122+
payload: "{}",
123+
payloadType: "application/json",
124+
context: {},
125+
traceContext: {},
126+
traceId: "0000000000000000cccc000000000000",
127+
spanId: "dddd000000000000",
128+
queue: "task/test-task",
129+
isTest: false,
130+
tags: [],
131+
},
132+
cancelledAt,
133+
cancelReason,
134+
});
135+
136+
expect(captured).toHaveLength(1);
137+
expect(captured[0]!.run.status).toBe("CANCELED");
138+
expect(captured[0]!.run.friendlyId).toBe(friendlyId);
139+
expect(captured[0]!.run.error).toEqual({ type: "STRING_ERROR", raw: cancelReason });
140+
expect(captured[0]!.organization.id).toBe(env.organization.id);
141+
} finally {
142+
await engine.quit();
143+
}
144+
},
145+
);
146+
147+
containerTest(
148+
"idempotent on double-pop: second call returns existing row without re-emitting",
149+
async ({ prisma, redisOptions }) => {
150+
const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
151+
const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) });
152+
const captured: EventBusEventArgs<"runCancelled">[0][] = [];
153+
engine.eventBus.on("runCancelled", (event) => {
154+
captured.push(event);
155+
});
156+
157+
try {
158+
const snapshot = {
159+
friendlyId: freshRunId(),
160+
environment: env,
161+
taskIdentifier: "test-task",
162+
payload: "{}",
163+
payloadType: "application/json",
164+
context: {},
165+
traceContext: {},
166+
traceId: "0000000000000000eeee000000000000",
167+
spanId: "ffff000000000000",
168+
queue: "task/test-task",
169+
isTest: false,
170+
tags: [],
171+
};
172+
const cancelledAt = new Date();
173+
const cancelReason = "Test idempotent";
174+
175+
const first = await engine.createCancelledRun({ snapshot, cancelledAt, cancelReason });
176+
const second = await engine.createCancelledRun({ snapshot, cancelledAt, cancelReason });
177+
178+
expect(second.id).toBe(first.id);
179+
// Only the first call's emit fired; the P2002 path skips re-emission.
180+
expect(captured).toHaveLength(1);
181+
} finally {
182+
await engine.quit();
183+
}
184+
},
185+
);
186+
});

0 commit comments

Comments
 (0)