Skip to content

Commit 14253dc

Browse files
d-csclaude
andcommitted
feat(webapp): mollifier stale-entry sweep + OTel signal
Without an external signal that the drainer is falling behind, a stuck or offline drainer drives the buffer toward the entry-hash TTL line and runs vanish silently — no PG row, no log, no dashboard indication. Add a periodic read-only sweep over the buffer's queue ZSETs that emits a `mollifier.stale_entries` OTel counter and a structured `mollifier.stale_entry` warning log for each entry whose dwell exceeds the configured threshold. Independent of the drainer (its own gate + `TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED`) so an entirely offline drainer is exactly when the sweep is most useful. Defaults: interval 5min, threshold half of `entryTtlSeconds`, hard cap of 1000 entries per env per pass. Sweep is strictly read-only — does not remove or salvage entries. The retention-policy question (drop the entry TTL entirely vs raise it vs pre-TTL salvage) is intentionally deferred to a separate change; this commit gets the signal in place first. Tested with a real `MollifierBuffer` (testcontainers): stale entries flagged, fresh entries left alone, multi-org scan walks every queue. Manually verified end-to-end: with a 10s interval + 2s threshold, each tick logs the buffered run with growing dwellMs as expected. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 69e8535 commit 14253dc

7 files changed

Lines changed: 407 additions & 0 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Periodic mollifier stale-entry sweep. Scans the buffer's queue ZSETs every `TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS` (default 5min); entries whose dwell exceeds `TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS` (default half of `entryTtlSeconds`) emit a `mollifier.stale_entries` OTel counter tick plus a structured `mollifier.stale_entry` warning log. Read-only — the sweep does not remove or salvage entries; that decision is deferred to a separate retention-policy change. Gives ops a paging signal when the drainer is offline or falling behind before TTL-induced silent loss kicks in.

apps/webapp/app/entry.server.tsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { renderToPipeableStream } from "react-dom/server";
77
import { PassThrough } from "stream";
88
import * as Worker from "~/services/worker.server";
99
import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server";
10+
import { initMollifierStaleSweepWorker } from "~/v3/mollifierStaleSweepWorker.server";
1011
import { bootstrap } from "./bootstrap";
1112
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
1213
import {
@@ -249,6 +250,7 @@ Worker.init().catch((error) => {
249250
});
250251

251252
initMollifierDrainerWorker();
253+
initMollifierStaleSweepWorker();
252254

253255
bootstrap().catch((error) => {
254256
logError(error);

apps/webapp/app/env.server.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,6 +1098,25 @@ const EnvironmentSchema = z
10981098
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
10991099
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),
11001100

1101+
// Periodic sweep that scans buffer queue ZSETs for entries whose
1102+
// dwell exceeds the stale threshold. Independent of the drainer —
1103+
// its job is exactly to make a stuck/offline drainer visible to
1104+
// ops. Defaults: enabled when the mollifier is enabled, run every
1105+
// 5 minutes, flag entries with dwell > half of entryTtlSeconds.
1106+
TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED: z
1107+
.string()
1108+
.default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
1109+
TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS: z.coerce
1110+
.number()
1111+
.int()
1112+
.positive()
1113+
.default(5 * 60_000),
1114+
TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS: z.coerce
1115+
.number()
1116+
.int()
1117+
.positive()
1118+
.optional(),
1119+
11011120
BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
11021121
.number()
11031122
.int()
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import type { MollifierBuffer } from "@trigger.dev/redis-worker";
2+
import { logger as defaultLogger } from "~/services/logger.server";
3+
import { getMollifierBuffer } from "./mollifierBuffer.server";
4+
import { recordStaleEntry as defaultRecordStaleEntry } from "./mollifierTelemetry.server";
5+
6+
// One pass of the sweep scans every env's queue ZSET. The per-env page
7+
// is bounded so a single pathological env can't make the sweep run
8+
// unboundedly long.
9+
const DEFAULT_MAX_ENTRIES_PER_ENV = 1000;
10+
11+
export type StaleSweepConfig = {
12+
// Entries whose dwell exceeds this threshold are flagged stale. Set
13+
// it well below `entryTtlSeconds * 1000` so ops have lead time before
14+
// TTL-induced silent loss; the default (half of entryTtlSeconds)
15+
// matches the cadence in the plan doc.
16+
staleThresholdMs: number;
17+
maxEntriesPerEnv?: number;
18+
};
19+
20+
export type StaleSweepDeps = {
21+
getBuffer?: () => MollifierBuffer | null;
22+
recordStaleEntry?: (envId: string) => void;
23+
logger?: { warn: (message: string, fields: Record<string, unknown>) => void };
24+
now?: () => number;
25+
};
26+
27+
export type StaleSweepResult = {
28+
orgsScanned: number;
29+
envsScanned: number;
30+
entriesScanned: number;
31+
staleCount: number;
32+
};
33+
34+
// Walks orgs → envs → entries, emitting an OTel counter tick and a
35+
// structured warning log for each buffer entry whose dwell exceeds the
36+
// stale threshold. Read-only: the sweep does NOT remove or salvage
37+
// entries; that decision is deferred to a separate retention-policy
38+
// change. The signal here exists so ops sees the drainer falling
39+
// behind well before TTL-induced loss kicks in.
40+
export async function runStaleSweepOnce(
41+
config: StaleSweepConfig,
42+
deps: StaleSweepDeps = {},
43+
): Promise<StaleSweepResult> {
44+
const getBuffer = deps.getBuffer ?? getMollifierBuffer;
45+
const recordStale = deps.recordStaleEntry ?? defaultRecordStaleEntry;
46+
const log = deps.logger ?? defaultLogger;
47+
const now = (deps.now ?? Date.now)();
48+
const maxEntries = config.maxEntriesPerEnv ?? DEFAULT_MAX_ENTRIES_PER_ENV;
49+
50+
const buffer = getBuffer();
51+
if (!buffer) {
52+
return { orgsScanned: 0, envsScanned: 0, entriesScanned: 0, staleCount: 0 };
53+
}
54+
55+
const orgs = await buffer.listOrgs();
56+
let envsScanned = 0;
57+
let entriesScanned = 0;
58+
let staleCount = 0;
59+
60+
for (const orgId of orgs) {
61+
const envs = await buffer.listEnvsForOrg(orgId);
62+
for (const envId of envs) {
63+
envsScanned += 1;
64+
const entries = await buffer.listEntriesForEnv(envId, maxEntries);
65+
for (const entry of entries) {
66+
entriesScanned += 1;
67+
const dwellMs = now - entry.createdAt.getTime();
68+
if (dwellMs > config.staleThresholdMs) {
69+
recordStale(envId);
70+
log.warn("mollifier.stale_entry", {
71+
runId: entry.runId,
72+
envId,
73+
orgId,
74+
dwellMs,
75+
staleThresholdMs: config.staleThresholdMs,
76+
});
77+
staleCount += 1;
78+
}
79+
}
80+
}
81+
}
82+
83+
return { orgsScanned: orgs.length, envsScanned, entriesScanned, staleCount };
84+
}
85+
86+
export type StaleSweepIntervalHandle = {
87+
stop: () => void;
88+
};
89+
90+
// Production wrapper: schedule `runStaleSweepOnce` on a fixed interval.
91+
// One pass at a time — if a sweep is still running when the timer fires
92+
// the next tick is skipped (a backed-up Redis would otherwise queue
93+
// overlapping sweeps that all log the same stale entries).
94+
export function startStaleSweepInterval(
95+
config: StaleSweepConfig & { intervalMs: number },
96+
deps: StaleSweepDeps = {},
97+
): StaleSweepIntervalHandle {
98+
let stopped = false;
99+
let inFlight = false;
100+
101+
const tick = async () => {
102+
if (stopped || inFlight) return;
103+
inFlight = true;
104+
try {
105+
await runStaleSweepOnce(config, deps);
106+
} catch (err) {
107+
const log = deps.logger ?? defaultLogger;
108+
log.warn("mollifier.stale_sweep.failed", {
109+
err: err instanceof Error ? err.message : String(err),
110+
});
111+
} finally {
112+
inFlight = false;
113+
}
114+
};
115+
116+
const timer = setInterval(() => {
117+
void tick();
118+
}, config.intervalMs);
119+
120+
return {
121+
stop: () => {
122+
stopped = true;
123+
clearInterval(timer);
124+
},
125+
};
126+
}

apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,23 @@ export function recordRealtimeBufferedSubscription(envId: string): void {
3333
realtimeBufferedSubscriptionsCounter.add(1, { envId });
3434
}
3535

36+
// Counts buffer entries that have been waiting in the queue ZSET longer
37+
// than the configured stale threshold (typically half of entryTtlSeconds).
38+
// Climbing in lockstep with the queue depth means the drainer is offline
39+
// or falling behind — alerting hooks into this counter give ops a paging
40+
// signal before TTL-induced silent loss kicks in.
41+
export const staleEntriesCounter = meter.createCounter(
42+
"mollifier.stale_entries",
43+
{
44+
description:
45+
"Mollifier buffer entries whose dwell exceeds the stale threshold (per sweep pass)",
46+
},
47+
);
48+
49+
export function recordStaleEntry(envId: string): void {
50+
staleEntriesCounter.add(1, { envId });
51+
}
52+
3653
// Electric SQL's shape-stream protocol adds a `handle=` query param on
3754
// every reconnect after the initial GET. Gating the realtime-buffered
3855
// log/counter on its absence keeps the signal at one tick per
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { env } from "~/env.server";
2+
import { logger } from "~/services/logger.server";
3+
import { signalsEmitter } from "~/services/signals.server";
4+
import {
5+
startStaleSweepInterval,
6+
type StaleSweepIntervalHandle,
7+
} from "./mollifier/mollifierStaleSweep.server";
8+
9+
declare global {
10+
// eslint-disable-next-line no-var
11+
var __mollifierStaleSweepRegistered__: boolean | undefined;
12+
// eslint-disable-next-line no-var
13+
var __mollifierStaleSweepHandle__: StaleSweepIntervalHandle | undefined;
14+
}
15+
16+
/**
17+
* Bootstraps the mollifier stale-entry sweep.
18+
*
19+
* Independent of the drainer — its purpose is to alert when entries are
20+
* piling up despite the drainer being supposedly healthy, so it runs
21+
* any time the mollifier itself is enabled (gated separately from
22+
* `TRIGGER_MOLLIFIER_DRAINER_ENABLED`). The sweep is read-only: it
23+
* counts and logs stale entries but does not remove or salvage them.
24+
*
25+
* The Remix dev server re-evaluates `entry.server.tsx` on every change,
26+
* so the registration guard + handle cache make the bootstrap
27+
* idempotent across hot reloads.
28+
*/
29+
export function initMollifierStaleSweepWorker(): void {
30+
if (env.TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED !== "1") return;
31+
if (global.__mollifierStaleSweepRegistered__) return;
32+
33+
// Default the threshold to half of `entryTtlSeconds`, mirroring the
34+
// plan doc's cadence. Operators wanting an earlier or later signal
35+
// can set it explicitly.
36+
const staleThresholdMs =
37+
env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS ??
38+
Math.floor(env.TRIGGER_MOLLIFIER_ENTRY_TTL_S * 1000 * 0.5);
39+
40+
logger.debug("Initializing mollifier stale-entry sweep", {
41+
intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS,
42+
staleThresholdMs,
43+
});
44+
45+
const handle = startStaleSweepInterval({
46+
intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS,
47+
staleThresholdMs,
48+
});
49+
50+
signalsEmitter.on("SIGTERM", handle.stop);
51+
signalsEmitter.on("SIGINT", handle.stop);
52+
global.__mollifierStaleSweepRegistered__ = true;
53+
global.__mollifierStaleSweepHandle__ = handle;
54+
}

0 commit comments

Comments
 (0)