Skip to content

Commit 0c7c07d

Browse files
committed
feat(redis-worker): mollifier buffer idempotency-key dedup (Phase B6a)
Buffer-side counterpart to PG's idempotency-key uniqueness, per the Q5 mollifier-idempotency design. Three changes to the buffer's atomic Lua surface plus two new high-level methods. acceptMollifierEntry: when the caller passes idempotencyKey + task- Identifier, SETNX a `mollifier:idempotency:{env}:{task}:{key}` lookup pointing at the runId. Second accepts for the same tuple return the existing winner's runId so the loser's response can echo it as a cached hit. accept's return shape changes from boolean to a discrim- inated AcceptResult (accepted / duplicate_run_id / duplicate_idemp- otency). Existing four callers that ignored the boolean continue to work; one assertion was updated for the new shape in tests. ackMollifierEntry: DELs the idempotency lookup atomically with marking the entry materialised. PG becomes canonical post-mater- ialisation; the lookup TTL is the safety net if the DEL is missed. New lookupIdempotency: resolves a buffered run by (env, task, key) tuple. Used by IdempotencyKeyConcern in B6b. Self-heals stale lookups that point at expired entries. New resetIdempotency: atomic Lua that nulls idempotencyKey + idempotencyKeyExpiresAt on the snapshot payload, clears the denormalised hash pointer, and DELs the lookup. Used by ResetIdempotencyKeyService in B6b alongside the PG-side updateMany. BufferEntrySchema gains an optional idempotencyLookupKey string field (empty when no idempotency key was bound) so the ack Lua can DEL the lookup without reading the payload JSON. 8 new tests cover: lookup write+TTL, duplicate_idempotency return, lookupIdempotency hit/miss/self-heal, ack-DELs-lookup, reset clears both stores, reset null when nothing bound.
1 parent 9c08f2f commit 0c7c07d

4 files changed

Lines changed: 465 additions & 15 deletions

File tree

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Add buffer-side idempotency-key dedup to `MollifierBuffer` per the Q5 mollifier-idempotency design. The `acceptMollifierEntry` Lua now SETNX-writes a `mollifier:idempotency:{envId}:{taskIdentifier}:{idempotencyKey}` lookup when the caller passes both an `idempotencyKey` and a `taskIdentifier`. Second accepts for the same tuple return `{ kind: "duplicate_idempotency", existingRunId }` so the loser can echo the winner's runId as a cached hit. `accept`'s return shape changes from `boolean` to a discriminated `AcceptResult` (`accepted` / `duplicate_run_id` / `duplicate_idempotency`).
6+
7+
New methods: `lookupIdempotency` (with stale-lookup self-heal) and `resetIdempotency` (atomic Lua that nulls `idempotencyKey` + `idempotencyKeyExpiresAt` on the snapshot payload, clears the denormalised hash pointer, and DELs the lookup). The drainer ack Lua now DELs the lookup atomically with marking the entry materialised — PG is canonical for the key post-materialisation.
8+
9+
`BufferEntrySchema` gains an optional `idempotencyLookupKey` field (the denormalised Redis lookup key string stored on the entry hash so the ack Lua can DEL it without reading the payload JSON).

packages/redis-worker/src/mollifier/buffer.test.ts

Lines changed: 302 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -857,8 +857,8 @@ describe("MollifierBuffer.accept idempotency", () => {
857857
payload: serialiseSnapshot({ first: false }),
858858
});
859859

860-
expect(first).toBe(true);
861-
expect(second).toBe(false);
860+
expect(first).toEqual({ kind: "accepted" });
861+
expect(second).toEqual({ kind: "duplicate_run_id" });
862862

863863
// First payload preserved; second was a no-op.
864864
const stored = await buffer.getEntry("run_dup");
@@ -899,7 +899,7 @@ describe("MollifierBuffer.accept idempotency", () => {
899899
expect(stored!.status).toBe("DRAINING");
900900

901901
const dup = await buffer.accept({ runId: "run_dr", envId: "env_a", orgId: "org_1", payload: "{}" });
902-
expect(dup).toBe(false);
902+
expect(dup).toEqual({ kind: "duplicate_run_id" });
903903

904904
const afterDup = await buffer.getEntry("run_dr");
905905
expect(afterDup!.status).toBe("DRAINING"); // unchanged
@@ -931,7 +931,7 @@ describe("MollifierBuffer.accept idempotency", () => {
931931
expect(stored!.status).toBe("FAILED");
932932

933933
const dup = await buffer.accept({ runId: "run_fl", envId: "env_a", orgId: "org_1", payload: "{}" });
934-
expect(dup).toBe(false);
934+
expect(dup).toEqual({ kind: "duplicate_run_id" });
935935

936936
const afterDup = await buffer.getEntry("run_fl");
937937
expect(afterDup!.status).toBe("FAILED"); // unchanged
@@ -979,8 +979,8 @@ describe("MollifierBuffer.accept idempotency", () => {
979979
payload: "{}",
980980
});
981981

982-
expect(first).toBe(true);
983-
expect(reAccept).toBe(false);
982+
expect(first).toEqual({ kind: "accepted" });
983+
expect(reAccept).toEqual({ kind: "duplicate_run_id" });
984984

985985
const stored = await buffer.getEntry("run_x");
986986
expect(stored!.materialised).toBe(true);
@@ -1078,6 +1078,302 @@ describe("MollifierBuffer envs set lifecycle", () => {
10781078
);
10791079
});
10801080

1081+
describe("MollifierBuffer idempotency lookup", () => {
1082+
redisTest(
1083+
"accept with idempotencyKey + taskIdentifier writes the lookup with matching TTL",
1084+
{ timeout: 20_000 },
1085+
async ({ redisContainer }) => {
1086+
const buffer = new MollifierBuffer({
1087+
redisOptions: {
1088+
host: redisContainer.getHost(),
1089+
port: redisContainer.getPort(),
1090+
password: redisContainer.getPassword(),
1091+
},
1092+
entryTtlSeconds: 600,
1093+
logger: new Logger("test", "log"),
1094+
});
1095+
try {
1096+
const result = await buffer.accept({
1097+
runId: "ri1",
1098+
envId: "env_i",
1099+
orgId: "org_1",
1100+
payload: "{}",
1101+
idempotencyKey: "ikey-1",
1102+
taskIdentifier: "my-task",
1103+
});
1104+
expect(result).toEqual({ kind: "accepted" });
1105+
1106+
const lookupKey = "mollifier:idempotency:env_i:my-task:ikey-1";
1107+
const stored = await buffer["redis"].get(lookupKey);
1108+
expect(stored).toBe("ri1");
1109+
const ttl = await buffer["redis"].ttl(lookupKey);
1110+
expect(ttl).toBeGreaterThan(0);
1111+
expect(ttl).toBeLessThanOrEqual(600);
1112+
1113+
const entry = await buffer.getEntry("ri1");
1114+
expect(entry!.idempotencyLookupKey).toBe(lookupKey);
1115+
} finally {
1116+
await buffer.close();
1117+
}
1118+
},
1119+
);
1120+
1121+
redisTest(
1122+
"second accept with same (env, task, idempotencyKey) returns duplicate_idempotency with the winner's runId",
1123+
{ timeout: 20_000 },
1124+
async ({ redisContainer }) => {
1125+
const buffer = new MollifierBuffer({
1126+
redisOptions: {
1127+
host: redisContainer.getHost(),
1128+
port: redisContainer.getPort(),
1129+
password: redisContainer.getPassword(),
1130+
},
1131+
entryTtlSeconds: 600,
1132+
logger: new Logger("test", "log"),
1133+
});
1134+
try {
1135+
const first = await buffer.accept({
1136+
runId: "ri-a",
1137+
envId: "env_i",
1138+
orgId: "org_1",
1139+
payload: "{}",
1140+
idempotencyKey: "ikey-2",
1141+
taskIdentifier: "my-task",
1142+
});
1143+
const second = await buffer.accept({
1144+
runId: "ri-b",
1145+
envId: "env_i",
1146+
orgId: "org_1",
1147+
payload: "{}",
1148+
idempotencyKey: "ikey-2",
1149+
taskIdentifier: "my-task",
1150+
});
1151+
1152+
expect(first).toEqual({ kind: "accepted" });
1153+
expect(second).toEqual({
1154+
kind: "duplicate_idempotency",
1155+
existingRunId: "ri-a",
1156+
});
1157+
1158+
// The loser's runId entry was never created.
1159+
const loserEntry = await buffer.getEntry("ri-b");
1160+
expect(loserEntry).toBeNull();
1161+
} finally {
1162+
await buffer.close();
1163+
}
1164+
},
1165+
);
1166+
1167+
redisTest(
1168+
"lookupIdempotency hits when the run is buffered",
1169+
{ timeout: 20_000 },
1170+
async ({ redisContainer }) => {
1171+
const buffer = new MollifierBuffer({
1172+
redisOptions: {
1173+
host: redisContainer.getHost(),
1174+
port: redisContainer.getPort(),
1175+
password: redisContainer.getPassword(),
1176+
},
1177+
entryTtlSeconds: 600,
1178+
logger: new Logger("test", "log"),
1179+
});
1180+
try {
1181+
await buffer.accept({
1182+
runId: "rl1",
1183+
envId: "env_i",
1184+
orgId: "org_1",
1185+
payload: "{}",
1186+
idempotencyKey: "k1",
1187+
taskIdentifier: "t",
1188+
});
1189+
const found = await buffer.lookupIdempotency({
1190+
envId: "env_i",
1191+
taskIdentifier: "t",
1192+
idempotencyKey: "k1",
1193+
});
1194+
expect(found).toBe("rl1");
1195+
} finally {
1196+
await buffer.close();
1197+
}
1198+
},
1199+
);
1200+
1201+
redisTest(
1202+
"lookupIdempotency returns null when no lookup is bound",
1203+
{ timeout: 20_000 },
1204+
async ({ redisContainer }) => {
1205+
const buffer = new MollifierBuffer({
1206+
redisOptions: {
1207+
host: redisContainer.getHost(),
1208+
port: redisContainer.getPort(),
1209+
password: redisContainer.getPassword(),
1210+
},
1211+
entryTtlSeconds: 600,
1212+
logger: new Logger("test", "log"),
1213+
});
1214+
try {
1215+
const found = await buffer.lookupIdempotency({
1216+
envId: "env_i",
1217+
taskIdentifier: "t",
1218+
idempotencyKey: "absent",
1219+
});
1220+
expect(found).toBeNull();
1221+
} finally {
1222+
await buffer.close();
1223+
}
1224+
},
1225+
);
1226+
1227+
redisTest(
1228+
"lookupIdempotency self-heals when the lookup points at an expired entry",
1229+
{ timeout: 20_000 },
1230+
async ({ redisContainer }) => {
1231+
const buffer = new MollifierBuffer({
1232+
redisOptions: {
1233+
host: redisContainer.getHost(),
1234+
port: redisContainer.getPort(),
1235+
password: redisContainer.getPassword(),
1236+
},
1237+
entryTtlSeconds: 600,
1238+
logger: new Logger("test", "log"),
1239+
});
1240+
try {
1241+
// Plant a stale lookup pointing at a non-existent entry.
1242+
const lookupKey = "mollifier:idempotency:env_i:t:stale";
1243+
await buffer["redis"].set(lookupKey, "rl-stale", "EX", 600);
1244+
expect(await buffer["redis"].get(lookupKey)).toBe("rl-stale");
1245+
1246+
const found = await buffer.lookupIdempotency({
1247+
envId: "env_i",
1248+
taskIdentifier: "t",
1249+
idempotencyKey: "stale",
1250+
});
1251+
expect(found).toBeNull();
1252+
// Self-healed.
1253+
expect(await buffer["redis"].get(lookupKey)).toBeNull();
1254+
} finally {
1255+
await buffer.close();
1256+
}
1257+
},
1258+
);
1259+
1260+
redisTest(
1261+
"ack DELs the idempotency lookup along with marking materialised",
1262+
{ timeout: 20_000 },
1263+
async ({ redisContainer }) => {
1264+
const buffer = new MollifierBuffer({
1265+
redisOptions: {
1266+
host: redisContainer.getHost(),
1267+
port: redisContainer.getPort(),
1268+
password: redisContainer.getPassword(),
1269+
},
1270+
entryTtlSeconds: 600,
1271+
logger: new Logger("test", "log"),
1272+
});
1273+
try {
1274+
await buffer.accept({
1275+
runId: "ra1",
1276+
envId: "env_i",
1277+
orgId: "org_1",
1278+
payload: "{}",
1279+
idempotencyKey: "ka",
1280+
taskIdentifier: "t",
1281+
});
1282+
await buffer.pop("env_i");
1283+
await buffer.ack("ra1");
1284+
1285+
const lookupKey = "mollifier:idempotency:env_i:t:ka";
1286+
expect(await buffer["redis"].get(lookupKey)).toBeNull();
1287+
const entry = await buffer.getEntry("ra1");
1288+
expect(entry!.materialised).toBe(true);
1289+
} finally {
1290+
await buffer.close();
1291+
}
1292+
},
1293+
);
1294+
1295+
redisTest(
1296+
"resetIdempotency clears snapshot fields + lookup; returns the runId",
1297+
{ timeout: 20_000 },
1298+
async ({ redisContainer }) => {
1299+
const buffer = new MollifierBuffer({
1300+
redisOptions: {
1301+
host: redisContainer.getHost(),
1302+
port: redisContainer.getPort(),
1303+
password: redisContainer.getPassword(),
1304+
},
1305+
entryTtlSeconds: 600,
1306+
logger: new Logger("test", "log"),
1307+
});
1308+
try {
1309+
await buffer.accept({
1310+
runId: "rr1",
1311+
envId: "env_i",
1312+
orgId: "org_1",
1313+
payload: serialiseSnapshot({
1314+
idempotencyKey: "kr",
1315+
idempotencyKeyExpiresAt: "2026-12-01T00:00:00Z",
1316+
other: "field",
1317+
}),
1318+
idempotencyKey: "kr",
1319+
taskIdentifier: "t",
1320+
});
1321+
1322+
const result = await buffer.resetIdempotency({
1323+
envId: "env_i",
1324+
taskIdentifier: "t",
1325+
idempotencyKey: "kr",
1326+
});
1327+
expect(result.clearedRunId).toBe("rr1");
1328+
1329+
// Lookup is gone.
1330+
const lookupKey = "mollifier:idempotency:env_i:t:kr";
1331+
expect(await buffer["redis"].get(lookupKey)).toBeNull();
1332+
1333+
// Snapshot's idempotency fields are nulled, other fields kept.
1334+
const entry = await buffer.getEntry("rr1");
1335+
const payload = JSON.parse(entry!.payload) as {
1336+
idempotencyKey: unknown;
1337+
idempotencyKeyExpiresAt: unknown;
1338+
other: string;
1339+
};
1340+
expect(payload.idempotencyKey).toBeNull();
1341+
expect(payload.idempotencyKeyExpiresAt).toBeNull();
1342+
expect(payload.other).toBe("field");
1343+
expect(entry!.idempotencyLookupKey).toBe("");
1344+
} finally {
1345+
await buffer.close();
1346+
}
1347+
},
1348+
);
1349+
1350+
redisTest(
1351+
"resetIdempotency returns null when nothing is bound",
1352+
{ timeout: 20_000 },
1353+
async ({ redisContainer }) => {
1354+
const buffer = new MollifierBuffer({
1355+
redisOptions: {
1356+
host: redisContainer.getHost(),
1357+
port: redisContainer.getPort(),
1358+
password: redisContainer.getPassword(),
1359+
},
1360+
entryTtlSeconds: 600,
1361+
logger: new Logger("test", "log"),
1362+
});
1363+
try {
1364+
const result = await buffer.resetIdempotency({
1365+
envId: "env_i",
1366+
taskIdentifier: "t",
1367+
idempotencyKey: "absent",
1368+
});
1369+
expect(result.clearedRunId).toBeNull();
1370+
} finally {
1371+
await buffer.close();
1372+
}
1373+
},
1374+
);
1375+
});
1376+
10811377
describe("MollifierBuffer.mutateSnapshot", () => {
10821378
redisTest(
10831379
"returns not_found when no entry exists for the runId",

0 commit comments

Comments
 (0)