Skip to content

Commit 39e3bab

Browse files
committed
feat(webapp): dashboard cancel/replay/idempotencyKey-reset handle buffered runs (Phase D)
Parallels Phase C's API-side work for the three dashboard mutation routes. D1 cancel — PG miss → buffer.mutateSnapshot('mark_cancelled'). Org- membership verified against the buffered run's orgId (dashboard URL doesn't carry an envId so the API-side env-scoped auth doesn't apply). busy returns a "retry in a moment" message. D2 replay — PG miss → findRunByIdWithMollifierFallback; B4-extended SyntheticRun cast to TaskRun and fed to ReplayTaskRunService. Project/env slugs for the redirect path looked up from the entry's envId. D3 idempotencyKey reset — PG miss → buffer.getEntry + readFallback to read snapshot's idempotencyKey + taskIdentifier; org-membership verified against entry orgId; existing ResetIdempotencyKeyService (extended in B6b to clear both stores) handles the actual reset.
1 parent 63b0a35 commit 39e3bab

4 files changed

Lines changed: 145 additions & 13 deletions
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Dashboard mutation routes handle buffered runs (Phase D — parallels Phase C's API-side work).
7+
8+
- `POST /resources/taskruns/{runParam}/cancel`: PG miss falls through to `buffer.mutateSnapshot('mark_cancelled')`. Org-membership is verified against the buffered run's `orgId` (the dashboard URL doesn't carry an envId so the API-side env-scoped auth doesn't apply). `busy` returns a "retry in a moment" message.
9+
- `POST /resources/taskruns/{runParam}/replay`: PG miss falls through to `findRunByIdWithMollifierFallback`; the B4-extended `SyntheticRun` is cast to `TaskRun` and fed to `ReplayTaskRunService`. Project/env slugs needed for the success-redirect are looked up from the entry's `envId`.
10+
- `POST /resources/orgs/.../runs/{runParam}/idempotencyKey/reset`: PG miss falls through to buffer; reads `idempotencyKey` + `taskIdentifier` from the snapshot; org-membership verified against the entry's `orgId`. The existing `ResetIdempotencyKeyService` (extended in B6b to clear both stores) handles the actual reset.

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import { logger } from "~/services/logger.server";
55
import { requireUserId } from "~/services/session.server";
66
import { ResetIdempotencyKeyService } from "~/v3/services/resetIdempotencyKey.server";
77
import { v3RunParamsSchema } from "~/utils/pathBuilder";
8+
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
9+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
810

911
export const action: ActionFunction = async ({ request, params }) => {
1012
const userId = await requireUserId(request);
@@ -37,17 +39,53 @@ export const action: ActionFunction = async ({ request, params }) => {
3739
},
3840
});
3941

40-
if (!taskRun) {
41-
return jsonWithErrorMessage({}, request, "Run not found");
42-
}
43-
44-
if (!taskRun.idempotencyKey) {
45-
return jsonWithErrorMessage({}, request, "This run does not have an idempotency key");
42+
// Resolve run from PG or the mollifier buffer (Q5). For a buffered
43+
// run the snapshot carries the idempotencyKey + taskIdentifier; we
44+
// also need the runtimeEnvironmentId to feed ResetIdempotencyKeyService
45+
// (which clears both PG and the buffer lookup — B6b).
46+
let resolved:
47+
| { idempotencyKey: string; taskIdentifier: string; runtimeEnvironmentId: string }
48+
| null = null;
49+
if (taskRun) {
50+
if (!taskRun.idempotencyKey) {
51+
return jsonWithErrorMessage({}, request, "This run does not have an idempotency key");
52+
}
53+
resolved = {
54+
idempotencyKey: taskRun.idempotencyKey,
55+
taskIdentifier: taskRun.taskIdentifier,
56+
runtimeEnvironmentId: taskRun.runtimeEnvironmentId,
57+
};
58+
} else {
59+
const buffer = getMollifierBuffer();
60+
const entry = buffer ? await buffer.getEntry(runParam) : null;
61+
if (!entry) {
62+
return jsonWithErrorMessage({}, request, "Run not found");
63+
}
64+
const member = await prisma.orgMember.findFirst({
65+
where: { userId, organizationId: entry.orgId },
66+
select: { id: true },
67+
});
68+
if (!member) {
69+
return jsonWithErrorMessage({}, request, "Run not found");
70+
}
71+
const synthetic = await findRunByIdWithMollifierFallback({
72+
runId: runParam,
73+
environmentId: entry.envId,
74+
organizationId: entry.orgId,
75+
});
76+
if (!synthetic?.idempotencyKey || !synthetic.taskIdentifier) {
77+
return jsonWithErrorMessage({}, request, "This run does not have an idempotency key");
78+
}
79+
resolved = {
80+
idempotencyKey: synthetic.idempotencyKey,
81+
taskIdentifier: synthetic.taskIdentifier,
82+
runtimeEnvironmentId: entry.envId,
83+
};
4684
}
4785

4886
const environment = await prisma.runtimeEnvironment.findUnique({
4987
where: {
50-
id: taskRun.runtimeEnvironmentId,
88+
id: resolved.runtimeEnvironmentId,
5189
},
5290
include: {
5391
project: {
@@ -64,7 +102,7 @@ export const action: ActionFunction = async ({ request, params }) => {
64102

65103
const service = new ResetIdempotencyKeyService();
66104

67-
await service.call(taskRun.idempotencyKey, taskRun.taskIdentifier, {
105+
await service.call(resolved.idempotencyKey, resolved.taskIdentifier, {
68106
...environment,
69107
organizationId: environment.project.organizationId,
70108
organization: environment.project.organization,

apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { redirectWithErrorMessage, redirectWithSuccessMessage } from "~/models/m
66
import { logger } from "~/services/logger.server";
77
import { requireUserId } from "~/services/session.server";
88
import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server";
9+
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
910

1011
export const cancelSchema = z.object({
1112
redirectUrl: z.string(),
@@ -42,15 +43,56 @@ export const action: ActionFunction = async ({ request, params }) => {
4243
},
4344
});
4445

45-
if (!taskRun) {
46+
if (taskRun) {
47+
const cancelRunService = new CancelTaskRunService();
48+
await cancelRunService.call(taskRun);
49+
return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`);
50+
}
51+
52+
// PG miss — try the mollifier buffer. The customer can hit cancel
53+
// on a buffered run from the dashboard during the burst window.
54+
// Q4 design: snapshot a `mark_cancelled` patch; the drainer's
55+
// bifurcation routes the run to `engine.createCancelledRun` on
56+
// next pop.
57+
const buffer = getMollifierBuffer();
58+
const entry = buffer ? await buffer.getEntry(runParam) : null;
59+
if (!entry) {
4660
submission.error = { runParam: ["Run not found"] };
4761
return json(submission);
4862
}
4963

50-
const cancelRunService = new CancelTaskRunService();
51-
await cancelRunService.call(taskRun);
64+
// Dashboard auth: verify the requesting user is a member of the
65+
// buffered run's org. The API path scopes by env id from the
66+
// authenticated request; the dashboard route uses org-membership
67+
// because the URL doesn't carry an envId.
68+
const member = await prisma.orgMember.findFirst({
69+
where: { userId, organizationId: entry.orgId },
70+
select: { id: true },
71+
});
72+
if (!member) {
73+
submission.error = { runParam: ["Run not found"] };
74+
return json(submission);
75+
}
5276

53-
return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`);
77+
const result = await buffer!.mutateSnapshot(runParam, {
78+
type: "mark_cancelled",
79+
cancelledAt: new Date().toISOString(),
80+
cancelReason: "Canceled by user",
81+
});
82+
if (result === "applied_to_snapshot") {
83+
return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`);
84+
}
85+
if (result === "not_found") {
86+
submission.error = { runParam: ["Run not found"] };
87+
return json(submission);
88+
}
89+
// "busy" — drainer is materialising. Customer can retry; by then the
90+
// PG row exists and the regular cancel path takes over.
91+
return redirectWithErrorMessage(
92+
submission.value.redirectUrl,
93+
request,
94+
"Run is materialising — retry in a moment"
95+
);
5496
} catch (error) {
5597
if (error instanceof Error) {
5698
logger.error("Failed to cancel run", {

apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ import { requireUser } from "~/services/session.server";
1111
import { sortEnvironments } from "~/utils/environmentSort";
1212
import { v3RunSpanPath } from "~/utils/pathBuilder";
1313
import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server";
14+
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
15+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
16+
import type { TaskRun } from "@trigger.dev/database";
1417
import parseDuration from "parse-duration";
1518
import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server";
1619
import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server";
@@ -174,7 +177,7 @@ export const action: ActionFunction = async ({ request, params }) => {
174177
}
175178

176179
try {
177-
const taskRun = await prisma.taskRun.findFirst({
180+
const pgRun = await prisma.taskRun.findFirst({
178181
where: {
179182
friendlyId: runParam,
180183
},
@@ -192,6 +195,45 @@ export const action: ActionFunction = async ({ request, params }) => {
192195
},
193196
});
194197

198+
// Mollifier read-fallback (Q2): if the original isn't in PG yet,
199+
// synthesise a TaskRun from the buffered snapshot. The B4-extended
200+
// SyntheticRun carries every field ReplayTaskRunService reads. We
201+
// also need projectSlug + orgSlug + envSlug for the redirect path,
202+
// so look those up via the snapshot's runtimeEnvironmentId.
203+
let taskRun:
204+
| (TaskRun & {
205+
project: { slug: string; organization: { slug: string } };
206+
runtimeEnvironment: { slug: string };
207+
})
208+
| null = pgRun ?? null;
209+
if (!taskRun) {
210+
const buffer = getMollifierBuffer();
211+
const entry = buffer ? await buffer.getEntry(runParam) : null;
212+
if (entry) {
213+
const synthetic = await findRunByIdWithMollifierFallback({
214+
runId: runParam,
215+
environmentId: entry.envId,
216+
organizationId: entry.orgId,
217+
});
218+
if (synthetic) {
219+
const envRow = await prisma.runtimeEnvironment.findFirst({
220+
where: { id: entry.envId },
221+
select: {
222+
slug: true,
223+
project: { select: { slug: true, organization: { select: { slug: true } } } },
224+
},
225+
});
226+
if (envRow) {
227+
taskRun = {
228+
...(synthetic as unknown as TaskRun),
229+
project: { slug: envRow.project.slug, organization: { slug: envRow.project.organization.slug } },
230+
runtimeEnvironment: { slug: envRow.slug },
231+
};
232+
}
233+
}
234+
}
235+
}
236+
195237
if (!taskRun) {
196238
return redirectWithErrorMessage(submission.value.failedRedirect, request, "Run not found");
197239
}

0 commit comments

Comments
 (0)