Skip to content

Commit 0183e43

Browse files
committed
feat(webapp): reschedule + replay APIs handle buffered runs (Phase C4 + C5)
Reschedule (C4): switches to mutateWithFallback. PG hits go through the existing RescheduleTaskRunService (which enforces status === "DELAYED"). Buffered hits land a set_delay patch on the snapshot; the drainer materialises the PG row with the new delayUntil. Synth- esised response returns { id, delayUntil }. Replay (C5): adds a read-fallback after the PG miss. The B4-extended SyntheticRun carries every field ReplayTaskRunService reads from a TaskRun, so the buffered case casts through and uses the existing service unchanged. Replay creates a fresh trigger that itself re-enters the mollifier gate — no special surge handling needed beyond what the gate already does. Also tightens the PG lookup to findFirst with runtimeEnvironmentId scoping (was findUnique on friendlyId only).
1 parent 3534f13 commit 0183e43

3 files changed

Lines changed: 96 additions & 45 deletions

File tree

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Reschedule and replay APIs now handle buffered runs.
7+
8+
`POST /api/v1/runs/{id}/reschedule` switches to `mutateWithFallback`. PG hits go through the existing `RescheduleTaskRunService` (which enforces `status === "DELAYED"`). Buffered-QUEUED hits land a `set_delay` patch on the snapshot; the drainer materialises the PG row with the new `delayUntil`. `busy` snapshots wait for drainer resolution then route through PG. Synthesised response returns `{ id, delayUntil }` for the SDK to confirm.
9+
10+
`POST /api/v1/runs/{id}/replay` adds a read-fallback after the PG miss: when the original run is still in the buffer, the synthesised TaskRun (extended in Phase B4 with all `ReplayTaskRunService`-relevant fields) is passed straight to the existing replay service. Replay creates a fresh trigger that itself re-enters the mollifier gate — no special surge handling needed. Also tightens the PG lookup to `findFirst` with `runtimeEnvironmentId` scoping; the prior `findUnique` left auth boundary checks to the upper layer.

apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
22
import { json } from "@remix-run/server-runtime";
3+
import type { TaskRun } from "@trigger.dev/database";
34
import { z } from "zod";
45
import { prisma } from "~/db.server";
56
import { authenticateApiRequest } from "~/services/apiAuth.server";
67
import { logger } from "~/services/logger.server";
78
import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server";
9+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
810
import { sanitizeTriggerSource } from "~/utils/triggerSource";
911

1012
const ParamsSchema = z.object({
@@ -32,12 +34,34 @@ export async function action({ request, params }: ActionFunctionArgs) {
3234
const { runParam } = parsed.data;
3335

3436
try {
35-
const taskRun = await prisma.taskRun.findUnique({
37+
const env = authenticationResult.environment;
38+
// PG-first. Replay works on any status per audit (Q2 design) — no
39+
// filter beyond friendlyId is the existing semantic; findFirst with
40+
// env scoping tightens it minimally without changing behaviour for
41+
// a correctly-authed caller.
42+
let taskRun: TaskRun | null = await prisma.taskRun.findFirst({
3643
where: {
3744
friendlyId: runParam,
45+
runtimeEnvironmentId: env.id,
3846
},
3947
});
4048

49+
if (!taskRun) {
50+
// Buffered fallback (Q2). The SyntheticRun shape was extended in
51+
// Phase B4 to carry every field ReplayTaskRunService reads from a
52+
// TaskRun. Cast through unknown — the synthesised object has the
53+
// same field surface as a real PG row from the service's
54+
// perspective.
55+
const buffered = await findRunByIdWithMollifierFallback({
56+
runId: runParam,
57+
environmentId: env.id,
58+
organizationId: env.organizationId,
59+
});
60+
if (buffered) {
61+
taskRun = buffered as unknown as TaskRun;
62+
}
63+
}
64+
4165
if (!taskRun) {
4266
return json({ error: "Run not found" }, { status: 404 });
4367
}

apps/webapp/app/routes/api.v1.runs.$runParam.reschedule.ts

Lines changed: 61 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,90 +3,107 @@ import { json } from "@remix-run/server-runtime";
33
import { RescheduleRunRequestBody } from "@trigger.dev/core/v3/schemas";
44
import { z } from "zod";
55
import { getApiVersion } from "~/api/versions";
6-
import { prisma } from "~/db.server";
76
import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server";
87
import { authenticateApiRequest } from "~/services/apiAuth.server";
8+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
99
import { logger } from "~/services/logger.server";
1010
import { ServiceValidationError } from "~/v3/services/baseService.server";
1111
import { RescheduleTaskRunService } from "~/v3/services/rescheduleTaskRun.server";
12+
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";
13+
import { parseDelay } from "~/utils/delays";
1214

1315
const ParamsSchema = z.object({
1416
runParam: z.string(),
1517
});
1618

1719
export async function action({ request, params }: ActionFunctionArgs) {
18-
// Ensure this is a POST request
1920
if (request.method.toUpperCase() !== "POST") {
2021
return { status: 405, body: "Method Not Allowed" };
2122
}
2223

23-
// Authenticate the request
2424
const authenticationResult = await authenticateApiRequest(request);
25-
2625
if (!authenticationResult) {
2726
return json({ error: "Invalid or missing API Key" }, { status: 401 });
2827
}
2928

3029
const parsed = ParamsSchema.safeParse(params);
31-
3230
if (!parsed.success) {
3331
return json({ error: "Invalid or missing run ID" }, { status: 400 });
3432
}
3533

36-
const { runParam } = parsed.data;
37-
38-
const taskRun = await prisma.taskRun.findUnique({
39-
where: {
40-
friendlyId: runParam,
41-
runtimeEnvironmentId: authenticationResult.environment.id,
42-
},
43-
});
44-
45-
if (!taskRun) {
46-
return json({ error: "Run not found" }, { status: 404 });
47-
}
48-
4934
const anyBody = await request.json();
50-
5135
const body = RescheduleRunRequestBody.safeParse(anyBody);
52-
5336
if (!body.success) {
5437
return json({ error: "Invalid request body" }, { status: 400 });
5538
}
5639

57-
const service = new RescheduleTaskRunService();
40+
const env = authenticationResult.environment;
41+
// Pre-resolve the absolute Date the buffer snapshot should encode.
42+
// RescheduleTaskRunService expects this to be present on the body for
43+
// its PG-side flow; for the buffer-side patch we encode the same
44+
// wall-clock value so the drainer's engine.trigger sees the intended
45+
// delayUntil after materialisation.
46+
const delayUntil = await parseDelay(body.data.delay);
47+
if (!delayUntil) {
48+
return json({ error: "Invalid delay value" }, { status: 400 });
49+
}
5850

5951
try {
60-
const updatedRun = await service.call(taskRun, body.data);
61-
62-
if (!updatedRun) {
63-
return json({ error: "An unknown error occurred" }, { status: 500 });
64-
}
65-
66-
const run = await ApiRetrieveRunPresenter.findRun(
67-
updatedRun.friendlyId,
68-
authenticationResult.environment
69-
);
70-
71-
if (!run) {
52+
const outcome = await mutateWithFallback<Response>({
53+
runId: parsed.data.runParam,
54+
environmentId: env.id,
55+
organizationId: env.organizationId,
56+
bufferPatch: {
57+
type: "set_delay",
58+
delayUntil: delayUntil.toISOString(),
59+
},
60+
pgMutation: async (taskRun) => {
61+
const service = new RescheduleTaskRunService();
62+
const updatedRun = await service.call(taskRun, body.data);
63+
if (!updatedRun) {
64+
return json({ error: "An unknown error occurred" }, { status: 500 });
65+
}
66+
67+
const run = await ApiRetrieveRunPresenter.findRun(updatedRun.friendlyId, env);
68+
if (!run) {
69+
return json({ error: "Run not found" }, { status: 404 });
70+
}
71+
const apiVersion = getApiVersion(request);
72+
const presenter = new ApiRetrieveRunPresenter(apiVersion);
73+
const result = await presenter.call(run, env);
74+
if (!result) {
75+
return json({ error: "Run not found" }, { status: 404 });
76+
}
77+
return json(result);
78+
},
79+
// Buffered snapshot has been patched. Synthesise a minimal
80+
// retrieve-shape response — the run hasn't materialised yet, so
81+
// the presenter's full pass would synthesise mostly defaults
82+
// anyway. Returning the friendlyId + the new delay is sufficient
83+
// for SDK confirmation; subsequent retrieve calls go through the
84+
// existing presenter with read-fallback (Phase A).
85+
synthesisedResponse: () =>
86+
json(
87+
{
88+
id: parsed.data.runParam,
89+
delayUntil: delayUntil.toISOString(),
90+
},
91+
{ status: 200 }
92+
),
93+
abortSignal: getRequestAbortSignal(),
94+
});
95+
96+
if (outcome.kind === "not_found") {
7297
return json({ error: "Run not found" }, { status: 404 });
7398
}
74-
75-
const apiVersion = getApiVersion(request);
76-
77-
const presenter = new ApiRetrieveRunPresenter(apiVersion);
78-
const result = await presenter.call(run, authenticationResult.environment);
79-
80-
if (!result) {
81-
return json({ error: "Run not found" }, { status: 404 });
99+
if (outcome.kind === "timed_out") {
100+
return json({ error: "Run materialisation timed out" }, { status: 503 });
82101
}
83-
84-
return json(result);
102+
return outcome.response;
85103
} catch (error) {
86104
if (error instanceof ServiceValidationError) {
87105
return json({ error: error.message }, { status: 400 });
88106
}
89-
90107
logger.error("Failed to reschedule run", { error });
91108
return json({ error: "Something went wrong, please try again." }, { status: 500 });
92109
}

0 commit comments

Comments
 (0)