Skip to content

Commit 8dc878e

Browse files
d-csclaude
andcommitted
feat(redis-worker,webapp): drop mollifier entry TTL — drainer is the recovery mechanism
Buffer entries used to EXPIRE after entryTtlSeconds (600s dev / 1h prod). Once that window elapsed without the drainer ack'ing, the entry just vanished — no PG row, no log, no customer signal. The stale-entry sweep was added in the previous commit so ops gets paged on dwell-too-long; with that signal in place, the TTL itself is now the cause of the failure mode it was meant to mitigate. Remove it. Buffer entries persist until the drainer ACKs (with the existing 30s post-materialise grace TTL) or FAILs them. Idempotency lookup keys also lose their TTL — keeping them paired to the entry hash prevents the dedup-drift bug where a TTL'd lookup would let the same idempotency key spawn a second buffered run while the first still existed. `failMollifierEntry` now DELs the entry hash + lookup because the SYSTEM_FAILURE PG row written by the drainer is the canonical record; the buffer entry is no longer load-bearing. Knock-on changes: - `MollifierBufferOptions`: `entryTtlSeconds` removed (no consumers outside this repo). - `TRIGGER_MOLLIFIER_ENTRY_TTL_S`: removed from env.server.ts and the example .env. The stale-sweep threshold now has its own explicit default (5min) instead of "half of TTL". - `MollifierBuffer.getEntryTtlSeconds`: retained — it returns the Redis-side TTL, which is now -1 in steady state and ~30s after ack. Used by the ack-grace-TTL test. - Existing tests updated: TTL-related cases inverted to assert no TTL; FAILED-state cases inverted to assert teardown; runId-reuse-after- fail now succeeds (slot is reclaimable). Operational alert: Redis memory pressure if the drainer is offline. That's the same failure mode as Redis OOM in any other context, with existing infra-level alerts. The mollifier.stale_entries.current gauge fires first; ops should be on it long before memory becomes a problem. See _ops/mollifier-ops.md. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 449ded0 commit 8dc878e

13 files changed

Lines changed: 174 additions & 165 deletions
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": minor
3+
---
4+
5+
`MollifierBuffer`: remove the `entryTtlSeconds` constructor option and stop applying any TTL to buffer entry hashes or idempotency-lookup keys. Buffer entries now persist until the drainer ACKs (with a 30s post-materialise grace TTL) or FAILs them. The previous design auto-evicted entries after the TTL, which silently lost runs when the drainer was offline or falling behind — no PG row, no log, no customer signal. With the TTL gone, the drainer is the only mechanism that removes entries; operators alert on Redis memory pressure (separate, existing concern) and on the `mollifier.stale_entries.current` gauge (5min default threshold) instead. `fail` now also DELs the entry hash plus its idempotency lookup, because the SYSTEM_FAILURE PG row written by the drainer is the canonical record of the failure and the buffer entry is no longer load-bearing.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Drop `TRIGGER_MOLLIFIER_ENTRY_TTL_S` and the `entryTtlSeconds` option on `MollifierBuffer`. Buffer entries no longer auto-expire — the drainer is the only mechanism that removes them, which prevents silent run loss when the drainer is offline or falling behind. Default for `TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS` is now an explicit 5 minutes (used to be half of the old entry TTL); set it directly if you want a different alerting horizon. See `_ops/mollifier-ops.md` for the new recovery flow.

_ops/mollifier-ops.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ Defaults are tuned for production; tune below for incident response.
9898
| `TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS` | `3` | Retries before terminal failure → `SYSTEM_FAILURE` PG row |
9999
| `TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED` | inherits | Run the alerting sweep |
100100
| `TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS` | `300_000` | Sweep cadence |
101-
| `TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS` | (unset) | Dwell threshold. Defaults to half of `entryTtlSeconds` when unset |
101+
| `TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS` | `300_000` | Dwell threshold above which an entry is flagged stale (matches the sweep interval — "anything still here when we check") |
102102

103103
## Failure modes & recovery
104104

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,7 +1093,6 @@ const EnvironmentSchema = z
10931093
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().nonnegative().default(100),
10941094
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
10951095
TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
1096-
TRIGGER_MOLLIFIER_ENTRY_TTL_S: z.coerce.number().int().positive().default(600),
10971096
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
10981097
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
10991098
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),
@@ -1102,7 +1101,9 @@ const EnvironmentSchema = z
11021101
// dwell exceeds the stale threshold. Independent of the drainer —
11031102
// its job is exactly to make a stuck/offline drainer visible to
11041103
// ops. Defaults: enabled when the mollifier is enabled, run every
1105-
// 5 minutes, flag entries with dwell > half of entryTtlSeconds.
1104+
// 5 minutes, alert on anything that's been dwelling for 5+ minutes
1105+
// (matches the sweep interval — "anything still here when we
1106+
// check" is the simplest threshold that converges).
11061107
TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED: z
11071108
.string()
11081109
.default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
@@ -1115,7 +1116,7 @@ const EnvironmentSchema = z
11151116
.number()
11161117
.int()
11171118
.positive()
1118-
.optional(),
1119+
.default(5 * 60_000),
11191120

11201121
BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
11211122
.number()

apps/webapp/app/v3/mollifier/mollifierBuffer.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ function initializeMollifierBuffer(): MollifierBuffer {
2222
enableAutoPipelining: true,
2323
...(env.TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
2424
},
25-
entryTtlSeconds: env.TRIGGER_MOLLIFIER_ENTRY_TTL_S,
2625
});
2726
}
2827

apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,14 @@ export function initMollifierStaleSweepWorker(): void {
3030
if (env.TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED !== "1") return;
3131
if (global.__mollifierStaleSweepRegistered__) return;
3232

33-
// Default the threshold to half of `entryTtlSeconds`, mirroring the
34-
// plan doc's cadence. Operators wanting an earlier or later signal
35-
// can set it explicitly.
36-
const staleThresholdMs =
37-
env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS ??
38-
Math.floor(env.TRIGGER_MOLLIFIER_ENTRY_TTL_S * 1000 * 0.5);
39-
4033
logger.debug("Initializing mollifier stale-entry sweep", {
4134
intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS,
42-
staleThresholdMs,
35+
staleThresholdMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS,
4336
});
4437

4538
const handle = startStaleSweepInterval({
4639
intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS,
47-
staleThresholdMs,
40+
staleThresholdMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS,
4841
});
4942

5043
signalsEmitter.on("SIGTERM", handle.stop);

apps/webapp/test/mollifierRealtimeRunResourceBuffer.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ describe("realtime buffered-subscription resource resolution (testcontainers)",
3434
redisTest(
3535
"synthesises a resource whose `id` matches RunId.fromFriendlyId",
3636
async ({ redisOptions }) => {
37-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
37+
const buffer = new MollifierBuffer({ redisOptions });
3838
try {
3939
await buffer.accept({
4040
runId: SNAPSHOT_BASE.friendlyId,
@@ -78,7 +78,7 @@ describe("realtime buffered-subscription resource resolution (testcontainers)",
7878
redisTest(
7979
"returns null when neither PG nor the buffer have the entry",
8080
async ({ redisOptions }) => {
81-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
81+
const buffer = new MollifierBuffer({ redisOptions });
8282
try {
8383
const bufferedSynthetic = await findRunByIdWithMollifierFallback(
8484
{
@@ -109,7 +109,7 @@ describe("realtime buffered-subscription resource resolution (testcontainers)",
109109
redisTest(
110110
"does not fall back to buffer when PG has the row",
111111
async ({ redisOptions }) => {
112-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
112+
const buffer = new MollifierBuffer({ redisOptions });
113113
try {
114114
await buffer.accept({
115115
runId: SNAPSHOT_BASE.friendlyId,

apps/webapp/test/mollifierStaleSweep.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ describe("runStaleSweepOnce — testcontainers", () => {
6969
redisTest(
7070
"flags entries whose dwell exceeds the stale threshold and skips fresh ones",
7171
async ({ redisOptions }) => {
72-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
72+
const buffer = new MollifierBuffer({ redisOptions });
7373
try {
7474
// Two stale entries (one in each env) + one fresh entry. Sweep
7575
// should flag the two stale, leave the fresh one alone, record
@@ -143,7 +143,7 @@ describe("runStaleSweepOnce — testcontainers", () => {
143143
// stale, alert fired, drainer caught up. The next sweep must
144144
// report `env_a -> 0` so the gauge drops below the alert
145145
// threshold instead of staying latched at the last stale value.
146-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
146+
const buffer = new MollifierBuffer({ redisOptions });
147147
try {
148148
await buffer.accept({
149149
runId: "run_just_arrived",
@@ -171,7 +171,7 @@ describe("runStaleSweepOnce — testcontainers", () => {
171171
// `dwellMs > threshold` to `dwellMs >= threshold` would flag every
172172
// entry the first time the sweep runs after a perfectly synchronised
173173
// accept call — the dashboard would page on every burst.
174-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
174+
const buffer = new MollifierBuffer({ redisOptions });
175175
try {
176176
await buffer.accept({
177177
runId: "run_fresh_only",
@@ -200,7 +200,7 @@ describe("runStaleSweepOnce — testcontainers", () => {
200200
// must walk every org/env, not just the first one it finds. If a
201201
// future refactor collapsed listOrgs/listEnvsForOrg into a single
202202
// env-flat list this test catches a regression there.
203-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
203+
const buffer = new MollifierBuffer({ redisOptions });
204204
try {
205205
await buffer.accept({
206206
runId: "run_x",

apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ function fakePrisma(member: { id: string } | null) {
2323

2424
describe("findBufferedRunRedirectInfo (testcontainers)", () => {
2525
redisTest("returns slugs + spanId for a real buffer entry when user is a member", async ({ redisOptions }) => {
26-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
26+
const buffer = new MollifierBuffer({ redisOptions });
2727
try {
2828
await buffer.accept({
2929
runId: "run_real_1",
@@ -47,7 +47,7 @@ describe("findBufferedRunRedirectInfo (testcontainers)", () => {
4747
});
4848

4949
redisTest("returns null when no buffer entry exists for the runId", async ({ redisOptions }) => {
50-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
50+
const buffer = new MollifierBuffer({ redisOptions });
5151
try {
5252
const info = await findBufferedRunRedirectInfo(
5353
{ runFriendlyId: "run_missing", userId: "user_1" },
@@ -60,7 +60,7 @@ describe("findBufferedRunRedirectInfo (testcontainers)", () => {
6060
});
6161

6262
redisTest("returns null when the user is not an org member (default check enforced)", async ({ redisOptions }) => {
63-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
63+
const buffer = new MollifierBuffer({ redisOptions });
6464
try {
6565
await buffer.accept({
6666
runId: "run_real_2",
@@ -79,7 +79,7 @@ describe("findBufferedRunRedirectInfo (testcontainers)", () => {
7979
});
8080

8181
redisTest("skips the org-membership check when skipOrgMembershipCheck is set (admin path)", async ({ redisOptions }) => {
82-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
82+
const buffer = new MollifierBuffer({ redisOptions });
8383
try {
8484
await buffer.accept({
8585
runId: "run_real_3",
@@ -103,7 +103,7 @@ describe("findBufferedRunRedirectInfo (testcontainers)", () => {
103103
});
104104

105105
redisTest("returns null when snapshot is malformed JSON", async ({ redisOptions }) => {
106-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
106+
const buffer = new MollifierBuffer({ redisOptions });
107107
try {
108108
await buffer.accept({
109109
runId: "run_real_4",
@@ -122,7 +122,7 @@ describe("findBufferedRunRedirectInfo (testcontainers)", () => {
122122
});
123123

124124
redisTest("returns null when snapshot lacks org/project slugs", async ({ redisOptions }) => {
125-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
125+
const buffer = new MollifierBuffer({ redisOptions });
126126
try {
127127
await buffer.accept({
128128
runId: "run_real_5",
@@ -141,7 +141,7 @@ describe("findBufferedRunRedirectInfo (testcontainers)", () => {
141141
});
142142

143143
redisTest("returns info with undefined spanId when snapshot has no spanId", async ({ redisOptions }) => {
144-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
144+
const buffer = new MollifierBuffer({ redisOptions });
145145
try {
146146
await buffer.accept({
147147
runId: "run_real_6",

apps/webapp/test/mollifierTripEvaluator.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ describe("createRealTripEvaluator", () => {
1414
redisTest(
1515
"returns divert=false when the sliding window stays under threshold",
1616
async ({ redisOptions }) => {
17-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 600 });
17+
const buffer = new MollifierBuffer({ redisOptions });
1818
try {
1919
const evaluator = createRealTripEvaluator({
2020
getBuffer: () => buffer,
@@ -32,7 +32,7 @@ describe("createRealTripEvaluator", () => {
3232
redisTest(
3333
"returns divert=true with reason per_env_rate once the window trips",
3434
async ({ redisOptions }) => {
35-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 600 });
35+
const buffer = new MollifierBuffer({ redisOptions });
3636
try {
3737
// threshold=2 → the 3rd call within windowMs is the first that trips.
3838
const options = { windowMs: 5000, threshold: 2, holdMs: 5000 } as const;
@@ -73,7 +73,7 @@ describe("createRealTripEvaluator", () => {
7373
redisTest(
7474
"returns divert=false when buffer throws (fail-open)",
7575
async ({ redisOptions }) => {
76-
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 600 });
76+
const buffer = new MollifierBuffer({ redisOptions });
7777
// Closing the client up front means evaluateTrip will throw on the first
7878
// Redis command — a real failure mode, not a stub.
7979
await buffer.close();

0 commit comments

Comments
 (0)