Skip to content

Commit d4f7342

Browse files
committed
feat(webapp,run-engine): cancel API supports buffered runs (Phase C1)
Per the Q4 mollifier-cancel design — first mutation endpoint. engine.createCancelledRun: new run-engine method that writes a CANCELED TaskRun row directly from a buffer snapshot. Skips queue insertion, waitpoint creation, and concurrency reservation (run never executes). Emits runCancelled so the existing handler writes the TaskEvent cancellation row. P2002 from double-pop is caught and returns the existing row without re-emitting. Drainer bifurcation: mollifierDrainerHandler routes to createCancelledRun when snapshot.cancelledAt is set. Cancel-wins- over-trigger — customer intent is terminal. Cancel route: wraps the call in mutateWithFallback. PG-row hits go through the existing CancelTaskRunService. Buffered-QUEUED hits land a mark_cancelled patch on the snapshot via mutateSnapshot. busy snapshots wait for drainer resolution then call the PG service against the resulting row. 404 / 503 surface for genuine missing or drainer-hung cases. Known follow-up: the Q3 wait-and-bounce for cancel-of-buffered-FAILED relies on the drainer eventually writing a SYSTEM_FAILURE PG row on terminal materialisation failure. That drainer-side write isn't implemented yet (the failed-drain path today only marks the buffer entry hash FAILED). Cancel-of-state-3 will currently 503 after 2s instead of returning the SYSTEM_FAILURE row. Acceptable rare-race behaviour; flagged for a follow-up alongside the drainer sweeper work.
1 parent d8a23aa commit d4f7342

5 files changed

Lines changed: 259 additions & 22 deletions

File tree

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Cancel API (`POST /api/v2/runs/{id}/cancel`) now works on buffered runs. Per the Q4 mollifier-cancel design:
7+
8+
- `engine.createCancelledRun` (new method in `@internal/run-engine`): writes a `CANCELED` TaskRun row directly from a buffer snapshot, bypassing the trigger/queue pipeline. Skips run-queue insertion (no execution needed), waitpoint creation (single-`triggerAndWait` can't enter the buffer), and concurrency reservation. Emits `runCancelled` so the existing handler writes the TaskEvent cancellation row. Idempotent: P2002 unique-constraint violations from double-pop after a drainer requeue return the existing row without re-emitting.
9+
10+
- Drainer bifurcation (`mollifierDrainerHandler.server.ts`): when the snapshot carries `cancelledAt`, route to `createCancelledRun` instead of `engine.trigger`. Cancel-wins-over-trigger ordering — customer intent is terminal.
11+
12+
- Cancel route (`api.v2.runs.$runParam.cancel.ts`): wraps the call in `mutateWithFallback`. PG-row hits go through the existing `CancelTaskRunService`. Buffered-run hits land a `mark_cancelled` patch on the snapshot via `mutateSnapshot`. `busy` snapshots wait for drainer resolution then call the PG service against the resulting row. Genuine 404s and timeouts surface as 404/503 respectively.

apps/webapp/app/routes/api.v2.runs.$runParam.cancel.ts

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
3-
import { $replica } from "~/db.server";
43
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
4+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
55
import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server";
6+
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";
67

78
const ParamsSchema = z.object({
89
runParam: z.string(),
@@ -17,29 +18,48 @@ const { action } = createActionApiRoute(
1718
action: "write",
1819
resource: (params) => ({ type: "runs", id: params.runParam }),
1920
},
20-
findResource: async (params, auth) => {
21-
return $replica.taskRun.findFirst({
22-
where: {
23-
friendlyId: params.runParam,
24-
runtimeEnvironmentId: auth.environment.id,
25-
},
26-
});
27-
},
21+
// PG-side authorisation is performed inside mutateWithFallback. Routing
22+
// the resource through findResource (which would require a PG-or-buffer
23+
// resolved discriminated union here) would duplicate the resolution
24+
// mutateWithFallback already does, so we pass `null` to signal "open"
25+
// and let the helper do the lookup atomically with the mutation.
26+
findResource: async () => null,
2827
},
29-
async ({ resource }) => {
30-
if (!resource) {
31-
return json({ error: "Run not found" }, { status: 404 });
32-
}
28+
async ({ params, authentication }) => {
29+
const runId = params.runParam;
30+
const env = authentication.environment;
31+
const cancelledAt = new Date();
32+
const cancelReason = "Canceled by user";
3333

34-
const service = new CancelTaskRunService();
34+
const outcome = await mutateWithFallback({
35+
runId,
36+
environmentId: env.id,
37+
organizationId: env.organizationId,
38+
bufferPatch: {
39+
type: "mark_cancelled",
40+
cancelledAt: cancelledAt.toISOString(),
41+
cancelReason,
42+
},
43+
pgMutation: async (taskRun) => {
44+
const service = new CancelTaskRunService();
45+
try {
46+
await service.call(taskRun);
47+
} catch {
48+
return json({ error: "Internal Server Error" }, { status: 500 });
49+
}
50+
return json({ id: taskRun.friendlyId }, { status: 200 });
51+
},
52+
synthesisedResponse: () => json({ id: runId }, { status: 200 }),
53+
abortSignal: getRequestAbortSignal(),
54+
});
3555

36-
try {
37-
await service.call(resource);
38-
} catch (error) {
39-
return json({ error: "Internal Server Error" }, { status: 500 });
56+
if (outcome.kind === "not_found") {
57+
return json({ error: "Run not found" }, { status: 404 });
4058
}
41-
42-
return json({ id: resource.friendlyId }, { status: 200 });
59+
if (outcome.kind === "timed_out") {
60+
return json({ error: "Run materialisation timed out" }, { status: 503 });
61+
}
62+
return outcome.response;
4363
}
4464
);
4565

apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,39 @@ export function createDrainerHandler(deps: {
4545
})
4646
: context.active();
4747

48+
// Cancel-wins-over-trigger (Q4 bifurcation). If a cancel API call
49+
// landed on this entry while it was QUEUED, the snapshot carries
50+
// `cancelledAt` + `cancelReason`. Skip the normal materialise path
51+
// and write a CANCELED PG row directly. The existing runCancelled
52+
// handler writes the TaskEvent.
53+
const cancelledAtStr =
54+
typeof input.payload.cancelledAt === "string" ? input.payload.cancelledAt : undefined;
55+
if (cancelledAtStr) {
56+
const cancelReason =
57+
typeof input.payload.cancelReason === "string"
58+
? input.payload.cancelReason
59+
: "Canceled by user";
60+
await context.with(parentContext, async () => {
61+
await startSpan(tracer, "mollifier.drained.cancelled", async (span) => {
62+
span.setAttribute("mollifier.drained", true);
63+
span.setAttribute("mollifier.dwell_ms", dwellMs);
64+
span.setAttribute("mollifier.attempts", input.attempts);
65+
span.setAttribute("mollifier.run_friendly_id", input.runId);
66+
span.setAttribute("mollifier.cancel_bifurcation", true);
67+
span.setAttribute("taskRunId", input.runId);
68+
await deps.engine.createCancelledRun(
69+
{
70+
snapshot: input.payload as any,
71+
cancelledAt: new Date(cancelledAtStr),
72+
cancelReason,
73+
},
74+
deps.prisma,
75+
);
76+
});
77+
});
78+
return;
79+
}
80+
4881
await context.with(parentContext, async () => {
4982
await startSpan(tracer, "mollifier.drained", async (span) => {
5083
span.setAttribute("mollifier.drained", true);

apps/webapp/test/mollifierMollify.test.ts

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ vi.mock("~/db.server", () => ({
88
import { mollifyTrigger } from "~/v3/mollifier/mollifierMollify.server";
99
import type { MollifierBuffer } from "@trigger.dev/redis-worker";
1010

11-
function fakeBuffer(): { buffer: MollifierBuffer; accept: ReturnType<typeof vi.fn> } {
12-
const accept = vi.fn(async () => undefined);
11+
function fakeBuffer(
12+
acceptResult: Awaited<ReturnType<MollifierBuffer["accept"]>> = { kind: "accepted" },
13+
): { buffer: MollifierBuffer; accept: ReturnType<typeof vi.fn> } {
14+
const accept = vi.fn(async () => acceptResult);
1315
return {
1416
buffer: { accept } as unknown as MollifierBuffer,
1517
accept,
@@ -39,6 +41,8 @@ describe("mollifyTrigger", () => {
3941
envId: "env_a",
4042
orgId: "org_1",
4143
payload: expect.any(String),
44+
idempotencyKey: undefined,
45+
taskIdentifier: undefined,
4246
});
4347
expect(result.run.friendlyId).toBe("run_friendly_1");
4448
expect(result.error).toBeUndefined();
@@ -50,6 +54,26 @@ describe("mollifyTrigger", () => {
5054
});
5155
});
5256

57+
it("echoes the winner's runId with isCached=true on duplicate_idempotency", async () => {
58+
const { buffer } = fakeBuffer({
59+
kind: "duplicate_idempotency",
60+
existingRunId: "run_winner",
61+
});
62+
const result = await mollifyTrigger({
63+
runFriendlyId: "run_loser",
64+
environmentId: "env_a",
65+
organizationId: "org_1",
66+
engineTriggerInput: { taskIdentifier: "t", payload: "{}" },
67+
decision: { divert: true, reason: "per_env_rate", count: 1, threshold: 1 },
68+
buffer,
69+
idempotencyKey: "key",
70+
taskIdentifier: "t",
71+
});
72+
expect(result.run.friendlyId).toBe("run_winner");
73+
expect(result.isCached).toBe(true);
74+
expect(result.notice).toBeUndefined();
75+
});
76+
5377
it("snapshot is round-trippable: payload field is parseable JSON of engineTriggerInput", async () => {
5478
const { buffer, accept } = fakeBuffer();
5579
const engineInput = { taskIdentifier: "t", payload: "{}", tags: ["a", "b"] };

internal-packages/run-engine/src/engine/index.ts

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,154 @@ export class RunEngine {
443443

444444
//MARK: - Run functions
445445

446+
/**
447+
* Writes a TaskRun row in CANCELED state directly, bypassing the trigger
448+
* pipeline. Used by the mollifier drainer when a cancel API call lands on
449+
* a buffered run before it materialises (Q4 mollifier-cancel design).
450+
*
451+
* Skips: queue insertion (no execution), waitpoint creation (single-
452+
* triggerAndWait can't enter the buffer; F4 bypass), concurrency
453+
* reservation. Emits `runCancelled` so the existing TaskEvent handler
454+
* writes the cancellation event row — the only side effect PG-side cancel
455+
* has today per audit.
456+
*
457+
* Idempotent: if a row with the same friendlyId already exists (double
458+
* drainer pop after requeue), Prisma's P2002 unique-constraint violation
459+
* is caught and the existing row is returned. The duplicate runCancelled
460+
* emission is skipped — the original drain's emit already wrote the
461+
* TaskEvent.
462+
*/
463+
async createCancelledRun(
464+
{
465+
snapshot,
466+
cancelledAt,
467+
cancelReason,
468+
}: {
469+
snapshot: TriggerParams;
470+
cancelledAt: Date;
471+
cancelReason: string;
472+
},
473+
tx?: PrismaClientOrTransaction,
474+
): Promise<TaskRun> {
475+
const prisma = tx ?? this.prisma;
476+
return startSpan(this.tracer, "createCancelledRun", async (span) => {
477+
span.setAttribute("friendlyId", snapshot.friendlyId);
478+
span.setAttribute("taskIdentifier", snapshot.taskIdentifier);
479+
const id = RunId.fromFriendlyId(snapshot.friendlyId);
480+
const error: TaskRunError = { type: "STRING_ERROR", raw: cancelReason };
481+
482+
try {
483+
const taskRun = await prisma.taskRun.create({
484+
data: {
485+
id,
486+
engine: "V2",
487+
status: "CANCELED",
488+
friendlyId: snapshot.friendlyId,
489+
runtimeEnvironmentId: snapshot.environment.id,
490+
environmentType: snapshot.environment.type,
491+
organizationId: snapshot.environment.organization.id,
492+
projectId: snapshot.environment.project.id,
493+
idempotencyKey: snapshot.idempotencyKey,
494+
idempotencyKeyExpiresAt: snapshot.idempotencyKeyExpiresAt,
495+
idempotencyKeyOptions: snapshot.idempotencyKeyOptions,
496+
taskIdentifier: snapshot.taskIdentifier,
497+
payload: snapshot.payload,
498+
payloadType: snapshot.payloadType,
499+
context: snapshot.context,
500+
traceContext: snapshot.traceContext,
501+
traceId: snapshot.traceId,
502+
spanId: snapshot.spanId,
503+
parentSpanId: snapshot.parentSpanId,
504+
lockedToVersionId: snapshot.lockedToVersionId,
505+
taskVersion: snapshot.taskVersion,
506+
sdkVersion: snapshot.sdkVersion,
507+
cliVersion: snapshot.cliVersion,
508+
concurrencyKey: snapshot.concurrencyKey,
509+
queue: snapshot.queue,
510+
lockedQueueId: snapshot.lockedQueueId,
511+
workerQueue: snapshot.workerQueue,
512+
isTest: snapshot.isTest,
513+
taskEventStore: snapshot.taskEventStore,
514+
runTags: snapshot.tags.length === 0 ? undefined : snapshot.tags,
515+
oneTimeUseToken: snapshot.oneTimeUseToken,
516+
parentTaskRunId: snapshot.parentTaskRunId,
517+
rootTaskRunId: snapshot.rootTaskRunId,
518+
replayedFromTaskRunFriendlyId: snapshot.replayedFromTaskRunFriendlyId,
519+
batchId: snapshot.batch?.id,
520+
resumeParentOnCompletion: snapshot.resumeParentOnCompletion,
521+
depth: snapshot.depth,
522+
seedMetadata: snapshot.seedMetadata,
523+
seedMetadataType: snapshot.seedMetadataType,
524+
metadata: snapshot.metadata,
525+
metadataType: snapshot.metadataType,
526+
machinePreset: snapshot.machine,
527+
scheduleId: snapshot.scheduleId,
528+
scheduleInstanceId: snapshot.scheduleInstanceId,
529+
createdAt: snapshot.createdAt,
530+
bulkActionGroupIds: snapshot.bulkActionId ? [snapshot.bulkActionId] : undefined,
531+
planType: snapshot.planType,
532+
realtimeStreamsVersion: snapshot.realtimeStreamsVersion,
533+
streamBasinName: snapshot.streamBasinName,
534+
annotations: snapshot.annotations,
535+
completedAt: cancelledAt,
536+
updatedAt: cancelledAt,
537+
error: error as unknown as Prisma.InputJsonValue,
538+
attemptNumber: 0,
539+
executionSnapshots: {
540+
create: {
541+
engine: "V2",
542+
executionStatus: "FINISHED",
543+
description: "Run cancelled before materialisation",
544+
runStatus: "CANCELED",
545+
environmentId: snapshot.environment.id,
546+
environmentType: snapshot.environment.type,
547+
projectId: snapshot.environment.project.id,
548+
organizationId: snapshot.environment.organization.id,
549+
},
550+
},
551+
},
552+
});
553+
554+
this.eventBus.emit("runCancelled", {
555+
time: cancelledAt,
556+
run: {
557+
id: taskRun.id,
558+
status: taskRun.status,
559+
friendlyId: taskRun.friendlyId,
560+
spanId: taskRun.spanId,
561+
taskEventStore: taskRun.taskEventStore,
562+
createdAt: taskRun.createdAt,
563+
completedAt: taskRun.completedAt,
564+
error,
565+
updatedAt: taskRun.updatedAt,
566+
attemptNumber: taskRun.attemptNumber ?? 0,
567+
},
568+
organization: { id: snapshot.environment.organization.id },
569+
project: { id: snapshot.environment.project.id },
570+
environment: { id: snapshot.environment.id },
571+
});
572+
573+
return taskRun;
574+
} catch (err) {
575+
// P2002 = unique constraint violation. Double-pop after a drainer
576+
// requeue can reach this. Idempotent: return the existing row
577+
// without re-emitting.
578+
if (
579+
err instanceof Prisma.PrismaClientKnownRequestError &&
580+
err.code === "P2002"
581+
) {
582+
this.logger.info(
583+
"createCancelledRun: row already exists, returning existing (idempotent)",
584+
{ friendlyId: snapshot.friendlyId },
585+
);
586+
const existing = await prisma.taskRun.findFirst({ where: { id } });
587+
if (existing) return existing;
588+
}
589+
throw err;
590+
}
591+
});
592+
}
593+
446594
/** "Triggers" one run. */
447595
async trigger(
448596
{

0 commit comments

Comments
 (0)