Skip to content

Commit d5c1e22

Browse files
committed
feat(webapp,redis-worker): metadata PUT handles buffered runs (Phase C3)
Closes the last API-parity gap in the master plan. redis-worker side: - New casSetMetadata Lua command with optimistic lock on a metadataVersion entry-hash field. Returns applied / version_conflict / not_found / busy. Mirrors the PG-side UpdateMetadataService's CAS loop so concurrent metadata.increment / metadata.set / metadata.append calls against a buffered run never lose deltas. - accept Lua initialises metadataVersion=0; BufferEntrySchema gains the field. webapp side: - applyMetadataMutationToBufferedRun helper does the read-apply-CAS- retry loop in JS, reusing the existing @trigger.dev/core applyMetadataOperations function (no Lua re-implementation of the 6 operation types). - metadata PUT route does PG-first via the existing service (which owns the full request shape: parent/root ops, batching, validation), then falls through to the buffer helper on PG miss. busy and version_exhausted return 503 with retry hint; not_found returns 404. - Parent/root operations on a buffered target are fanned out to the snapshot's parentTaskRunId via the existing service. If the parent is also buffered the helper recurses. Best-effort — parent/root ingestion failures do not surface to the caller. Tests: 3 new redis-worker tests covering CAS apply / version conflict / not_found-busy paths. All 71 redis-worker mollifier + 68 webapp mollifier tests green.
1 parent 6d04414 commit d5c1e22

8 files changed

Lines changed: 423 additions & 14 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Add `MollifierBuffer.casSetMetadata` — optimistic-lock metadata write for buffered runs. Adds a `metadataVersion` field to the entry hash; the Lua refuses the write if the expected version has moved, returning `{ kind: "version_conflict", currentVersion }` so the caller can retry. Mirrors the PG-side `UpdateMetadataService` retry-on-conflict pattern, so concurrent `metadata.increment` / `metadata.append` / `metadata.set` calls against a buffered run never lose deltas.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
`PUT /api/v1/runs/{id}/metadata` now handles buffered runs (Phase C3). Closes the last endpoint in the mollifier API-parity master plan.
7+
8+
PG remains canonical when the row exists — `UpdateMetadataService.call` owns the full request shape including parent/root operations, the metadataVersion CAS loop, batching, and validation. The route falls through to the buffer only when the existing service returns `undefined` (no PG row).
9+
10+
Buffer path uses a new `applyMetadataMutationToBufferedRun` helper that mirrors the PG service's optimistic-lock pattern: read the snapshot, apply the body's `metadata` replace + `operations` deltas in JS via the existing `applyMetadataOperations` from `@trigger.dev/core`, CAS-write back via `buffer.casSetMetadata`, retry on `version_conflict` up to 3 times. Concurrent `metadata.increment` / `metadata.set` / `metadata.append` calls against the same buffered run never lose deltas.
11+
12+
`busy` (entry is DRAINING or already materialised) and `version_exhausted` (pathological contention) return 503 with a retry hint. `not_found` returns 404.
13+
14+
`parentOperations` and `rootOperations` on a buffered target run are fanned out to the snapshot's `parentTaskRunId` via the existing service (parent is typically PG-materialised by the time the child enters the buffer). If the parent is also buffered, the helper recurses through the same CAS path. Best-effort — parent/root ingestion failures do not surface to the caller.

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 97 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
import type { LoaderFunctionArgs } from "@remix-run/server-runtime";
22
import { json } from "@remix-run/server-runtime";
33
import { tryCatch } from "@trigger.dev/core/utils";
4+
import type { RunMetadataChangeOperation } from "@trigger.dev/core/v3/schemas";
45
import { UpdateMetadataRequestBody } from "@trigger.dev/core/v3";
56
import { z } from "zod";
67
import { $replica } from "~/db.server";
78
import { authenticateApiRequest } from "~/services/apiAuth.server";
89
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
910
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
1011
import { ServiceValidationError } from "~/v3/services/common.server";
12+
import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server";
1113
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
1214

1315
const ParamsSchema = z.object({
@@ -45,10 +47,6 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
4547
organizationId: env.organizationId,
4648
});
4749
if (buffered) {
48-
// Buffered snapshot stores metadata as the original packet shape
49-
// (could be a string for application/json payloads). Pass through
50-
// without re-encoding — the consumer expects the same shape PG would
51-
// return.
5250
return json(
5351
{
5452
metadata: buffered.metadata ?? null,
@@ -61,6 +59,43 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
6159
return json({ error: "Run not found" }, { status: 404 });
6260
}
6361

62+
// Route parent/root operations to the existing PG service by directly
63+
// invoking it against the parent/root runId. The service ingests via
64+
// its batching worker, which targets PG by id. If the parent/root is
65+
// itself buffered we recurse through our buffered-mutation helper.
66+
// `_ingestion_only` flag: a synthetic body that has the operations
67+
// promoted to top-level `operations` so the service applies them to
68+
// `targetRunId` directly.
69+
async function routeOperationsToRun(
70+
targetRunId: string | undefined,
71+
operations: RunMetadataChangeOperation[] | undefined,
72+
env: { id: string; organizationId: string }
73+
): Promise<void> {
74+
if (!targetRunId || !operations || operations.length === 0) return;
75+
76+
// Try PG first via the existing service (this is how parent/root
77+
// operations have always landed; preserve that).
78+
const [error] = await tryCatch(
79+
updateMetadataService.call(
80+
targetRunId,
81+
{ operations },
82+
{ id: env.id, organizationId: env.organizationId } as unknown as Parameters<
83+
typeof updateMetadataService.call
84+
>[2]
85+
)
86+
);
87+
if (!error) return;
88+
89+
// PG service threw — could be "Cannot update metadata for a completed
90+
// run" or similar. If the target is buffered, route operations to its
91+
// snapshot too. Best-effort; do not surface this failure to the
92+
// caller — the parent/root ops are auxiliary.
93+
await applyMetadataMutationToBufferedRun({
94+
runId: targetRunId,
95+
body: { operations },
96+
});
97+
}
98+
6499
const { action } = createActionApiRoute(
65100
{
66101
params: ParamsSchema,
@@ -69,23 +104,72 @@ const { action } = createActionApiRoute(
69104
method: "PUT",
70105
},
71106
async ({ authentication, body, params }) => {
72-
const [error, result] = await tryCatch(
73-
updateMetadataService.call(params.runId, body, authentication.environment)
74-
);
107+
const env = authentication.environment;
108+
const runId = params.runId;
75109

76-
if (error) {
77-
if (error instanceof ServiceValidationError) {
78-
return json({ error: error.message }, { status: error.status ?? 422 });
110+
// PG-canonical path. If the run is in PG, the existing service
111+
// owns the full request shape including parent/root operations,
112+
// metadataVersion CAS, batching, validation — none of which the
113+
// buffer side needs to reimplement.
114+
const [pgError, pgResult] = await tryCatch(
115+
updateMetadataService.call(runId, body, env)
116+
);
117+
if (pgError) {
118+
if (pgError instanceof ServiceValidationError) {
119+
return json({ error: pgError.message }, { status: pgError.status ?? 422 });
79120
}
80-
81121
return json({ error: "Internal Server Error" }, { status: 500 });
82122
}
123+
if (pgResult) {
124+
return json(pgResult, { status: 200 });
125+
}
83126

84-
if (!result) {
127+
// PG miss. Target run is either buffered or genuinely absent.
128+
const bufferOutcome = await applyMetadataMutationToBufferedRun({
129+
runId,
130+
body: { metadata: body.metadata, operations: body.operations },
131+
});
132+
133+
if (bufferOutcome.kind === "not_found") {
85134
return json({ error: "Task Run not found" }, { status: 404 });
86135
}
136+
if (bufferOutcome.kind === "busy") {
137+
// Entry is materialising. Best path is to retry the PG call —
138+
// the row may be visible now. We don't waste a roundtrip in
139+
// the happy path, but a 503 here would be customer-visible
140+
// breakage for legitimately-burst workloads. Hand back 503 with
141+
// a retry hint; SDK retry policy converges.
142+
return json({ error: "Run materialising, retry shortly" }, { status: 503 });
143+
}
144+
if (bufferOutcome.kind === "version_exhausted") {
145+
// Pathological contention — many concurrent metadata writers on
146+
// the same buffered runId. Surface as 503 rather than silently
147+
// dropping the request.
148+
return json({ error: "Metadata write contention; retry shortly" }, { status: 503 });
149+
}
150+
151+
// Buffered metadata mutation succeeded. Fan parent/root operations
152+
// out to their respective runs (parent/root are typically PG-
153+
// materialised by the time the child is buffered, so the existing
154+
// service handles them; if they're also buffered, the helper
155+
// recurses through the buffered mutation path).
156+
const bufferedEntry = await findRunByIdWithMollifierFallback({
157+
runId,
158+
environmentId: env.id,
159+
organizationId: env.organizationId,
160+
});
161+
if (bufferedEntry) {
162+
await Promise.all([
163+
routeOperationsToRun(bufferedEntry.parentTaskRunId, body.parentOperations, env),
164+
// The snapshot doesn't carry rootTaskRunId; fall back to parent
165+
// as a rough proxy (matches the existing service's nil-coalesce
166+
// behaviour where rootTaskRun defaults to the parent). Phase D
167+
// / future work could thread rootTaskRunId through the snapshot.
168+
routeOperationsToRun(bufferedEntry.parentTaskRunId, body.rootOperations, env),
169+
]);
170+
}
87171

88-
return json(result, { status: 200 });
172+
return json({ metadata: bufferOutcome.newMetadata }, { status: 200 });
89173
}
90174
);
91175

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import { applyMetadataOperations } from "@trigger.dev/core/v3";
2+
import type { FlushedRunMetadata } from "@trigger.dev/core/v3/schemas";
3+
import type { MollifierBuffer } from "@trigger.dev/redis-worker";
4+
import { logger } from "~/services/logger.server";
5+
import { getMollifierBuffer } from "./mollifierBuffer.server";
6+
7+
export type ApplyMetadataMutationOutcome =
8+
| { kind: "applied"; newMetadata: Record<string, unknown> }
9+
| { kind: "not_found" }
10+
| { kind: "busy" }
11+
| { kind: "version_exhausted" };
12+
13+
// Apply a metadata PUT (body.metadata replace AND/OR body.operations
14+
// deltas) to a buffered run's snapshot. Mirrors the PG-side
15+
// `UpdateMetadataService.#updateRunMetadataWithOperations` retry loop:
16+
// read snapshot → apply operations in JS → CAS-write back with the
17+
// observed `metadataVersion`. Retries on conflict; bounded by
18+
// `maxRetries`. The Lua CAS is the atomicity primitive — concurrent
19+
// callers never lose an increment / append / set.
20+
export async function applyMetadataMutationToBufferedRun(input: {
21+
runId: string;
22+
body: Pick<FlushedRunMetadata, "metadata" | "operations">;
23+
buffer?: MollifierBuffer | null;
24+
maxRetries?: number;
25+
}): Promise<ApplyMetadataMutationOutcome> {
26+
const buffer = input.buffer ?? getMollifierBuffer();
27+
if (!buffer) return { kind: "not_found" };
28+
29+
const maxRetries = input.maxRetries ?? 3;
30+
for (let attempt = 0; attempt <= maxRetries; attempt++) {
31+
const entry = await buffer.getEntry(input.runId);
32+
if (!entry) return { kind: "not_found" };
33+
if (entry.status !== "QUEUED" || entry.materialised) {
34+
return { kind: "busy" };
35+
}
36+
37+
const snapshot = JSON.parse(entry.payload) as Record<string, unknown>;
38+
const currentMetadataType =
39+
typeof snapshot.metadataType === "string" ? snapshot.metadataType : "application/json";
40+
41+
// Starting point: either the body's replace metadata, or whatever's
42+
// already on the snapshot. PG-side service uses the same precedence
43+
// (replace overrides existing, operations apply on top).
44+
let metadataObject: Record<string, unknown>;
45+
if (input.body.metadata !== undefined) {
46+
metadataObject = input.body.metadata as Record<string, unknown>;
47+
} else if (typeof snapshot.metadata === "string") {
48+
try {
49+
metadataObject = JSON.parse(snapshot.metadata) as Record<string, unknown>;
50+
} catch {
51+
metadataObject = {};
52+
}
53+
} else {
54+
metadataObject = {};
55+
}
56+
57+
if (input.body.operations?.length) {
58+
const result = applyMetadataOperations(metadataObject, input.body.operations);
59+
metadataObject = result.newMetadata;
60+
}
61+
62+
const newMetadataStr = JSON.stringify(metadataObject);
63+
const cas = await buffer.casSetMetadata({
64+
runId: input.runId,
65+
expectedVersion: entry.metadataVersion,
66+
newMetadata: newMetadataStr,
67+
newMetadataType: currentMetadataType,
68+
});
69+
70+
if (cas.kind === "applied") {
71+
return { kind: "applied", newMetadata: metadataObject };
72+
}
73+
if (cas.kind === "not_found") return { kind: "not_found" };
74+
if (cas.kind === "busy") return { kind: "busy" };
75+
// version_conflict — another caller wrote between our read + CAS.
76+
// Loop to re-read and retry.
77+
logger.debug("applyMetadataMutationToBufferedRun: version_conflict, retrying", {
78+
runId: input.runId,
79+
attempt,
80+
observedVersion: entry.metadataVersion,
81+
currentVersion: cas.currentVersion,
82+
});
83+
}
84+
85+
logger.warn("applyMetadataMutationToBufferedRun: retries exhausted", {
86+
runId: input.runId,
87+
maxRetries,
88+
});
89+
return { kind: "version_exhausted" };
90+
}

packages/redis-worker/src/mollifier/buffer.test.ts

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,6 +1374,129 @@ describe("MollifierBuffer idempotency lookup", () => {
13741374
);
13751375
});
13761376

1377+
describe("MollifierBuffer.casSetMetadata", () => {
1378+
redisTest(
1379+
"applies when expectedVersion matches; increments version; updates payload",
1380+
{ timeout: 20_000 },
1381+
async ({ redisContainer }) => {
1382+
const buffer = new MollifierBuffer({
1383+
redisOptions: {
1384+
host: redisContainer.getHost(),
1385+
port: redisContainer.getPort(),
1386+
password: redisContainer.getPassword(),
1387+
},
1388+
entryTtlSeconds: 600,
1389+
logger: new Logger("test", "log"),
1390+
});
1391+
try {
1392+
await buffer.accept({
1393+
runId: "cas1",
1394+
envId: "env_c",
1395+
orgId: "org_1",
1396+
payload: serialiseSnapshot({ metadata: '{"v":1}', metadataType: "application/json" }),
1397+
});
1398+
const result = await buffer.casSetMetadata({
1399+
runId: "cas1",
1400+
expectedVersion: 0,
1401+
newMetadata: '{"v":2}',
1402+
newMetadataType: "application/json",
1403+
});
1404+
expect(result).toEqual({ kind: "applied", newVersion: 1 });
1405+
1406+
const entry = await buffer.getEntry("cas1");
1407+
expect(entry!.metadataVersion).toBe(1);
1408+
const payload = JSON.parse(entry!.payload) as { metadata: string };
1409+
expect(payload.metadata).toBe('{"v":2}');
1410+
} finally {
1411+
await buffer.close();
1412+
}
1413+
},
1414+
);
1415+
1416+
redisTest(
1417+
"returns version_conflict when expectedVersion is stale",
1418+
{ timeout: 20_000 },
1419+
async ({ redisContainer }) => {
1420+
const buffer = new MollifierBuffer({
1421+
redisOptions: {
1422+
host: redisContainer.getHost(),
1423+
port: redisContainer.getPort(),
1424+
password: redisContainer.getPassword(),
1425+
},
1426+
entryTtlSeconds: 600,
1427+
logger: new Logger("test", "log"),
1428+
});
1429+
try {
1430+
await buffer.accept({
1431+
runId: "cas2",
1432+
envId: "env_c",
1433+
orgId: "org_1",
1434+
payload: serialiseSnapshot({}),
1435+
});
1436+
await buffer.casSetMetadata({
1437+
runId: "cas2",
1438+
expectedVersion: 0,
1439+
newMetadata: '{"a":1}',
1440+
newMetadataType: "application/json",
1441+
});
1442+
1443+
// Second write with stale expectedVersion = 0 must conflict.
1444+
const result = await buffer.casSetMetadata({
1445+
runId: "cas2",
1446+
expectedVersion: 0,
1447+
newMetadata: '{"a":2}',
1448+
newMetadataType: "application/json",
1449+
});
1450+
expect(result).toEqual({ kind: "version_conflict", currentVersion: 1 });
1451+
} finally {
1452+
await buffer.close();
1453+
}
1454+
},
1455+
);
1456+
1457+
redisTest(
1458+
"returns not_found / busy on missing or terminal entries",
1459+
{ timeout: 20_000 },
1460+
async ({ redisContainer }) => {
1461+
const buffer = new MollifierBuffer({
1462+
redisOptions: {
1463+
host: redisContainer.getHost(),
1464+
port: redisContainer.getPort(),
1465+
password: redisContainer.getPassword(),
1466+
},
1467+
entryTtlSeconds: 600,
1468+
logger: new Logger("test", "log"),
1469+
});
1470+
try {
1471+
const nf = await buffer.casSetMetadata({
1472+
runId: "absent",
1473+
expectedVersion: 0,
1474+
newMetadata: "{}",
1475+
newMetadataType: "application/json",
1476+
});
1477+
expect(nf).toEqual({ kind: "not_found" });
1478+
1479+
await buffer.accept({
1480+
runId: "cas3",
1481+
envId: "env_c",
1482+
orgId: "org_1",
1483+
payload: serialiseSnapshot({}),
1484+
});
1485+
await buffer.pop("env_c");
1486+
const busy = await buffer.casSetMetadata({
1487+
runId: "cas3",
1488+
expectedVersion: 0,
1489+
newMetadata: "{}",
1490+
newMetadataType: "application/json",
1491+
});
1492+
expect(busy).toEqual({ kind: "busy" });
1493+
} finally {
1494+
await buffer.close();
1495+
}
1496+
},
1497+
);
1498+
});
1499+
13771500
describe("MollifierBuffer.mutateSnapshot", () => {
13781501
redisTest(
13791502
"returns not_found when no entry exists for the runId",

0 commit comments

Comments
 (0)