Skip to content

Commit 3534f13

Browse files
committed
fix(webapp): tags route handles buffered runs (Phase C2)
Closes the live 500 the parity script flagged. The previous route did prisma.taskRun.update after a findFirst that could miss; on buffered runs (no PG row yet) the update raised RecordNotFound and surfaced as a 500. Switches to mutateWithFallback. PG hits go through the existing select-dedupe-validate-update flow with MAX_TAGS_PER_RUN enforcement. Buffered-QUEUED hits apply append_tags via Lua (atomic dedup against existing snapshot tags). busy snapshots wait for drainer resolution then update PG. 404 / 503 surface for missing / hung cases. The MAX_TAGS_PER_RUN cap is skipped on the buffered side — the drainer's engine.trigger doesn't enforce it either, matching the pre-buffer trigger path. Pushing the cap into the snapshot-mutate Lua is a possible follow-up.
1 parent d4f7342 commit 3534f13

2 files changed

Lines changed: 61 additions & 43 deletions

File tree

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
`POST /api/v1/runs/{id}/tags` now handles buffered runs. Previously the route did `prisma.taskRun.update` after a `findFirst` that could miss; on buffered runs (no PG row yet) the update raised `RecordNotFound` and the route leaked as a 500 — the live drift the parity script flagged.
7+
8+
Switches the route to `mutateWithFallback` per the Q3 design. PG hits go through the existing select-dedupe-update flow with `MAX_TAGS_PER_RUN` enforcement. Buffered-QUEUED hits apply the `append_tags` patch on the snapshot (Lua-atomic dedup against existing tags). `busy` snapshots wait for drainer resolution then update PG normally. Genuine 404 / 503 surface as 404 / 503.
9+
10+
The `MAX_TAGS_PER_RUN` enforcement is skipped on the buffered side — the drainer's `engine.trigger` doesn't enforce it either, so behaviour matches the pre-buffer trigger path. Pushing the cap into the snapshot-mutate Lua is a possible follow-up.

apps/webapp/app/routes/api.v1.runs.$runId.tags.ts

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@ import { z } from "zod";
44
import { prisma } from "~/db.server";
55
import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
66
import { authenticateApiRequest } from "~/services/apiAuth.server";
7+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
78
import { logger } from "~/services/logger.server";
9+
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";
810

911
const ParamsSchema = z.object({
1012
runId: z.string(),
1113
});
1214

1315
export async function action({ request, params }: ActionFunctionArgs) {
14-
// Ensure this is a POST request
1516
if (request.method.toUpperCase() !== "POST") {
1617
return { status: 405, body: "Method Not Allowed" };
1718
}
1819

19-
// Authenticate the request
2020
const authenticationResult = await authenticateApiRequest(request);
2121
if (!authenticationResult) {
2222
return json({ error: "Invalid or Missing API Key" }, { status: 401 });
@@ -32,59 +32,67 @@ export async function action({ request, params }: ActionFunctionArgs) {
3232

3333
try {
3434
const anyBody = await request.json();
35-
3635
const body = AddTagsRequestBody.safeParse(anyBody);
3736
if (!body.success) {
3837
return json({ error: "Invalid request body", issues: body.error.issues }, { status: 400 });
3938
}
40-
41-
const run = await prisma.taskRun.findFirst({
42-
where: {
43-
friendlyId: parsedParams.data.runId,
44-
runtimeEnvironmentId: authenticationResult.environment.id,
45-
},
46-
select: {
47-
runTags: true,
48-
},
49-
});
50-
51-
const existingTags = run?.runTags ?? [];
52-
53-
//remove duplicate tags from the new tags
5439
const bodyTags = typeof body.data.tags === "string" ? [body.data.tags] : body.data.tags;
55-
const newTags = bodyTags.filter((tag) => {
56-
if (tag.trim().length === 0) return false;
57-
return !existingTags.includes(tag);
58-
});
59-
60-
if (existingTags.length + newTags.length > MAX_TAGS_PER_RUN) {
61-
return json(
62-
{
63-
error: `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${
64-
existingTags.length + newTags.length
65-
}. These tags have not been set: ${newTags.map((t) => `'${t}'`).join(", ")}.`,
66-
},
67-
{ status: 422 }
68-
);
69-
}
40+
const nonEmptyTags = bodyTags.filter((t) => t.trim().length > 0);
7041

71-
if (newTags.length === 0) {
42+
if (nonEmptyTags.length === 0) {
7243
return json({ message: "No new tags to add" }, { status: 200 });
7344
}
7445

75-
await prisma.taskRun.update({
76-
where: {
77-
friendlyId: parsedParams.data.runId,
78-
runtimeEnvironmentId: authenticationResult.environment.id,
79-
},
80-
data: {
81-
runTags: {
82-
push: newTags,
83-
},
46+
const env = authenticationResult.environment;
47+
const outcome = await mutateWithFallback({
48+
runId: parsedParams.data.runId,
49+
environmentId: env.id,
50+
organizationId: env.organizationId,
51+
bufferPatch: { type: "append_tags", tags: nonEmptyTags },
52+
pgMutation: async (taskRun) => {
53+
const existing = taskRun.runTags ?? [];
54+
const newTags = nonEmptyTags.filter((t) => !existing.includes(t));
55+
56+
if (existing.length + newTags.length > MAX_TAGS_PER_RUN) {
57+
return json(
58+
{
59+
error: `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${
60+
existing.length + newTags.length
61+
}. These tags have not been set: ${newTags.map((t) => `'${t}'`).join(", ")}.`,
62+
},
63+
{ status: 422 }
64+
);
65+
}
66+
if (newTags.length === 0) {
67+
return json({ message: "No new tags to add" }, { status: 200 });
68+
}
69+
await prisma.taskRun.update({
70+
where: {
71+
id: taskRun.id,
72+
runtimeEnvironmentId: env.id,
73+
},
74+
data: { runTags: { push: newTags } },
75+
});
76+
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
8477
},
78+
// Buffer-applied patch path. The mutateSnapshot Lua deduplicates
79+
// against existing snapshot tags atomically. MAX_TAGS_PER_RUN
80+
// enforcement is skipped on the buffered side — the drainer's
81+
// engine.trigger writes the PG row without enforcement either,
82+
// matching today's pre-buffer trigger semantics. A future
83+
// refinement could push the limit check into the Lua.
84+
synthesisedResponse: () =>
85+
json({ message: `Successfully set ${nonEmptyTags.length} new tags.` }, { status: 200 }),
86+
abortSignal: getRequestAbortSignal(),
8587
});
8688

87-
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
89+
if (outcome.kind === "not_found") {
90+
return json({ error: "Run not found" }, { status: 404 });
91+
}
92+
if (outcome.kind === "timed_out") {
93+
return json({ error: "Run materialisation timed out" }, { status: 503 });
94+
}
95+
return outcome.response;
8896
} catch (error) {
8997
logger.error("Failed to add run tags", { error });
9098
return json({ error: "Something went wrong, please try again." }, { status: 500 });

0 commit comments

Comments
 (0)