Skip to content

Commit 51b471c

Browse files
committed
feat(webapp): wire mollifier idempotency into trigger hot path (Phase B6b)
Three integration points that connect B6a's buffer-side primitives to the customer-facing flow per Q5: - IdempotencyKeyConcern.handleTriggerRequest falls through to buffer.lookupIdempotency after a PG miss. Buffered hits return isCached:true with a synthesised TaskRun via the existing findRunByIdWithMollifierFallback. Skipped when resumeParentOnCompletion is set: waitpoint blocking requires a PG row that doesn't exist yet; the follow-up accept SETNX still dedupes the trigger itself. Buffer outages fail open to "no cache hit" so the trigger hot path is never wedged by a transient Redis issue. - mollifyTrigger passes idempotencyKey + taskIdentifier through to buffer.accept. The SETNX race loser receives duplicate_idempotency with the winner's runId; the API response echoes it with isCached:true, matching PG-side cache-hit shape. - ResetIdempotencyKeyService calls buffer.resetIdempotency alongside the existing PG updateMany. 404 only fires when both stores report nothing bound. Buffer outage during reset is logged and treated as a miss; PG-side reset still works.
1 parent 0c7c07d commit 51b471c

5 files changed

Lines changed: 136 additions & 6 deletions

File tree

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Wire the mollifier buffer's idempotency surface into the trigger hot path per Q5. Three connected changes:
7+
8+
- `IdempotencyKeyConcern.handleTriggerRequest` now falls through to `buffer.lookupIdempotency` after a PG miss. A buffered cache hit synthesises a TaskRun via the existing `findRunByIdWithMollifierFallback` and returns `{ isCached: true, run }`. Skipped when `resumeParentOnCompletion` is set: blocking a parent on a buffered child via waitpoint requires a PG row that doesn't exist yet, and the follow-up accept's SETNX still catches the duplicate trigger itself. Buffer outages fail open to "no cache hit" so the trigger hot path can't be wedged by a transient Redis issue.
9+
10+
- `mollifyTrigger` passes `idempotencyKey` + `taskIdentifier` through to `MollifierBuffer.accept`. When the buffer's SETNX races with another concurrent buffered trigger using the same key, the race loser receives `{ kind: "duplicate_idempotency", existingRunId }` and the API response echoes the winner's runId with `isCached: true`, matching PG-side cache-hit shape.
11+
12+
- `ResetIdempotencyKeyService` calls `buffer.resetIdempotency` alongside the existing PG `updateMany`. The 404 only fires when both stores report nothing was bound. A buffer outage during reset is logged and treated as a miss — the PG side still works.

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import { logger } from "~/services/logger.server";
44
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
55
import type { RunEngine } from "~/v3/runEngine.server";
66
import { shouldIdempotencyKeyBeCleared } from "~/v3/taskStatus";
7+
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
8+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
79
import type { TraceEventConcern, TriggerTaskRequest } from "../types";
810

911
export type IdempotencyKeyConcernResult =
@@ -17,6 +19,47 @@ export class IdempotencyKeyConcern {
1719
private readonly traceEventConcern: TraceEventConcern
1820
) {}
1921

22+
// Q5 buffer-side dedup. Resolves an idempotency key against the
23+
// mollifier buffer when PG missed. Returns a SyntheticRun cast to
24+
// TaskRun so the route handler (which only reads run.id / run.friendlyId)
25+
// can echo the buffered run's friendlyId as a cached hit. Returns null
26+
// for any failure or miss — buffer outages must not 500 the trigger
27+
// hot path; we fail open to "no cache hit" and let the request through.
28+
private async findBufferedRunWithIdempotency(
29+
environmentId: string,
30+
organizationId: string,
31+
taskIdentifier: string,
32+
idempotencyKey: string,
33+
): Promise<TaskRun | null> {
34+
const buffer = getMollifierBuffer();
35+
if (!buffer) return null;
36+
37+
let bufferedRunId: string | null;
38+
try {
39+
bufferedRunId = await buffer.lookupIdempotency({
40+
envId: environmentId,
41+
taskIdentifier,
42+
idempotencyKey,
43+
});
44+
} catch (err) {
45+
logger.error("IdempotencyKeyConcern: buffer lookupIdempotency failed", {
46+
environmentId,
47+
taskIdentifier,
48+
err: err instanceof Error ? err.message : String(err),
49+
});
50+
return null;
51+
}
52+
if (!bufferedRunId) return null;
53+
54+
const synthetic = await findRunByIdWithMollifierFallback({
55+
runId: bufferedRunId,
56+
environmentId,
57+
organizationId,
58+
});
59+
if (!synthetic) return null;
60+
return synthetic as unknown as TaskRun;
61+
}
62+
2063
async handleTriggerRequest(
2164
request: TriggerTaskRequest,
2265
parentStore: string | undefined
@@ -44,6 +87,25 @@ export class IdempotencyKeyConcern {
4487
})
4588
: undefined;
4689

90+
// Buffer fallback per Q5 mollifier-idempotency design. PG missed —
91+
// the same key may belong to a buffered run that hasn't materialised
92+
// yet. Skipped when `resumeParentOnCompletion` is set: blocking a
93+
// parent on a buffered child via waitpoint requires a PG row that
94+
// doesn't exist yet. The follow-up accept's SETNX in mollifyTrigger
95+
// still dedupes the trigger itself; the waitpoint just doesn't fire
96+
// for this rare race window.
97+
if (!existingRun && idempotencyKey && !request.body.options?.resumeParentOnCompletion) {
98+
const buffered = await this.findBufferedRunWithIdempotency(
99+
request.environment.id,
100+
request.environment.organizationId,
101+
request.taskId,
102+
idempotencyKey,
103+
);
104+
if (buffered) {
105+
return { isCached: true, run: buffered };
106+
}
107+
}
108+
47109
if (existingRun) {
48110
// The idempotency key has expired
49111
if (existingRun.idempotencyKeyExpiresAt && existingRun.idempotencyKeyExpiresAt < new Date()) {

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,10 @@ export class RunEngineTriggerTaskService {
445445
engineTriggerInput,
446446
decision: mollifierOutcome.decision,
447447
buffer: mollifierBuffer,
448+
// Idempotency-key triple wires the buffer's SETNX into
449+
// the trigger-time dedup symmetric with PG (Q5).
450+
idempotencyKey,
451+
taskIdentifier: taskId,
448452
});
449453

450454
logger.info("mollifier.buffered", {

apps/webapp/app/v3/mollifier/mollifierMollify.server.ts

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ export type MollifyNotice = {
1111
export type MollifySyntheticResult = {
1212
run: { friendlyId: string };
1313
error: undefined;
14-
isCached: false;
15-
notice: MollifyNotice;
14+
// The race-loser path (Q5): if accept's SETNX hit an existing
15+
// buffered run with the same (env, task, idempotencyKey), the
16+
// response echoes the winner's runId with isCached=true. The
17+
// mollifier-queued notice is only attached for the happy accept.
18+
isCached: boolean;
19+
notice?: MollifyNotice;
1620
};
1721

1822
const NOTICE: MollifyNotice = {
@@ -29,14 +33,35 @@ export async function mollifyTrigger(args: {
2933
engineTriggerInput: MollifierSnapshot;
3034
decision: Extract<TripDecision, { divert: true }>;
3135
buffer: MollifierBuffer;
36+
// Optional idempotency context. When both are passed, accept SETNXes
37+
// the lookup so the buffered window participates in trigger-time
38+
// dedup symmetrically with PG (Q5).
39+
idempotencyKey?: string;
40+
taskIdentifier?: string;
3241
}): Promise<MollifySyntheticResult> {
33-
await args.buffer.accept({
42+
const result = await args.buffer.accept({
3443
runId: args.runFriendlyId,
3544
envId: args.environmentId,
3645
orgId: args.organizationId,
3746
payload: serialiseMollifierSnapshot(args.engineTriggerInput),
47+
idempotencyKey: args.idempotencyKey,
48+
taskIdentifier: args.taskIdentifier,
3849
});
3950

51+
if (result.kind === "duplicate_idempotency") {
52+
// Race loser. Echo the winner's runId so the SDK's response shape
53+
// matches PG-side idempotency cache hits.
54+
return {
55+
run: { friendlyId: result.existingRunId },
56+
error: undefined,
57+
isCached: true,
58+
};
59+
}
60+
61+
// Both "accepted" and "duplicate_run_id" produce the same customer-
62+
// visible response: a buffered-trigger acknowledgement. The duplicate
63+
// runId case is unreachable in practice (runIds are server-generated
64+
// and unique) but is silently idempotent at the buffer layer either way.
4065
return {
4166
run: { friendlyId: args.runFriendlyId },
4267
error: undefined,

apps/webapp/app/v3/services/resetIdempotencyKey.server.ts

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
22
import { BaseService, ServiceValidationError } from "./baseService.server";
33
import { logger } from "~/services/logger.server";
4+
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
45

56
export class ResetIdempotencyKeyService extends BaseService {
67
public async call(
78
idempotencyKey: string,
89
taskIdentifier: string,
910
authenticatedEnv: AuthenticatedEnvironment
1011
): Promise<{ id: string }> {
11-
const { count } = await this._prisma.taskRun.updateMany({
12+
const { count: pgCount } = await this._prisma.taskRun.updateMany({
1213
where: {
1314
idempotencyKey,
1415
taskIdentifier,
@@ -20,15 +21,41 @@ export class ResetIdempotencyKeyService extends BaseService {
2021
},
2122
});
2223

23-
if (count === 0) {
24+
// Buffer-side reset (Q5): the key may belong to a buffered run that
25+
// hasn't materialised yet. The PG updateMany above can't see it.
26+
// resetIdempotency clears both the snapshot fields and the Redis
27+
// lookup atomically. Returns null when nothing was bound there.
28+
const buffer = getMollifierBuffer();
29+
const bufferResult = buffer
30+
? await buffer
31+
.resetIdempotency({
32+
envId: authenticatedEnv.id,
33+
taskIdentifier,
34+
idempotencyKey,
35+
})
36+
.catch((err) => {
37+
// Buffer outage shouldn't 500 the reset endpoint if PG
38+
// already cleared something. Log and treat as a miss.
39+
logger.error("ResetIdempotencyKeyService: buffer reset failed", {
40+
idempotencyKey,
41+
taskIdentifier,
42+
err: err instanceof Error ? err.message : String(err),
43+
});
44+
return { clearedRunId: null };
45+
})
46+
: { clearedRunId: null };
47+
48+
const totalCount = pgCount + (bufferResult.clearedRunId ? 1 : 0);
49+
50+
if (totalCount === 0) {
2451
throw new ServiceValidationError(
2552
`No runs found with idempotency key: ${idempotencyKey} and task: ${taskIdentifier}`,
2653
404
2754
);
2855
}
2956

3057
logger.info(
31-
`Reset idempotency key: ${idempotencyKey} for task: ${taskIdentifier} in env: ${authenticatedEnv.id}, affected ${count} run(s)`
58+
`Reset idempotency key: ${idempotencyKey} for task: ${taskIdentifier} in env: ${authenticatedEnv.id}, affected ${totalCount} run(s) (pg=${pgCount}, buffered=${bufferResult.clearedRunId ? 1 : 0})`
3259
);
3360

3461
return { id: idempotencyKey };

0 commit comments

Comments
 (0)