Skip to content

Commit af85cda

Browse files
d-csclaude
andcommitted
feat(webapp): alertable gauge for mollifier stale-entry signal
The mollifier.stale_entries counter from the previous commit reflects sweep-tick events, not stable state. A single stuck entry observed across N ticks contributes N events, so a rate() query is proportional to (stuck-entry-count × scan-frequency), not "how many entries are stale right now". Useful for historical views but the wrong shape for ops alerts. Add a companion observable gauge `mollifier.stale_entries.current` with `{envId}` attribute. The sweep emits a per-env snapshot on each pass (including zero counts for envs whose stale entries cleared), and an OTel batch-observable callback exposes the latest snapshot to the metric exporter on every scrape. Recommended alert: mollifier_stale_entries_current{envId=...} > 0 for 5m The snapshot replaces (not merges) so an env that paged on a previous sweep clears when the drainer catches up, instead of staying latched at the last stale count. Test seam captures the snapshot to verify per-env counts and the clear-on-drain behaviour. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 14253dc commit af85cda

3 files changed

Lines changed: 111 additions & 7 deletions

File tree

apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import type { MollifierBuffer } from "@trigger.dev/redis-worker";
22
import { logger as defaultLogger } from "~/services/logger.server";
33
import { getMollifierBuffer } from "./mollifierBuffer.server";
4-
import { recordStaleEntry as defaultRecordStaleEntry } from "./mollifierTelemetry.server";
4+
import {
5+
recordStaleEntry as defaultRecordStaleEntry,
6+
reportStaleEntrySnapshot as defaultReportStaleEntrySnapshot,
7+
} from "./mollifierTelemetry.server";
58

69
// One pass of the sweep scans every env's queue ZSET. The per-env page
710
// is bounded so a single pathological env can't make the sweep run
@@ -20,6 +23,7 @@ export type StaleSweepConfig = {
2023
export type StaleSweepDeps = {
2124
getBuffer?: () => MollifierBuffer | null;
2225
recordStaleEntry?: (envId: string) => void;
26+
reportStaleEntrySnapshot?: (snapshot: Map<string, number>) => void;
2327
logger?: { warn: (message: string, fields: Record<string, unknown>) => void };
2428
now?: () => number;
2529
};
@@ -43,24 +47,36 @@ export async function runStaleSweepOnce(
4347
): Promise<StaleSweepResult> {
4448
const getBuffer = deps.getBuffer ?? getMollifierBuffer;
4549
const recordStale = deps.recordStaleEntry ?? defaultRecordStaleEntry;
50+
const reportSnapshot =
51+
deps.reportStaleEntrySnapshot ?? defaultReportStaleEntrySnapshot;
4652
const log = deps.logger ?? defaultLogger;
4753
const now = (deps.now ?? Date.now)();
4854
const maxEntries = config.maxEntriesPerEnv ?? DEFAULT_MAX_ENTRIES_PER_ENV;
4955

5056
const buffer = getBuffer();
5157
if (!buffer) {
58+
// Replace any previous snapshot with empty so a previously-paging
59+
// env doesn't stay latched if mollifier is turned off mid-flight.
60+
reportSnapshot(new Map());
5261
return { orgsScanned: 0, envsScanned: 0, entriesScanned: 0, staleCount: 0 };
5362
}
5463

5564
const orgs = await buffer.listOrgs();
5665
let envsScanned = 0;
5766
let entriesScanned = 0;
5867
let staleCount = 0;
68+
// Tracks the stale count per env this pass. Includes zero counts for
69+
// envs that have entries but none stale — that's what lets the gauge
70+
// drop back to 0 when the drainer catches up. Envs absent from this
71+
// map are also absent from the new snapshot, clearing any latched
72+
// alerts on envs that have fully drained.
73+
const perEnvStale = new Map<string, number>();
5974

6075
for (const orgId of orgs) {
6176
const envs = await buffer.listEnvsForOrg(orgId);
6277
for (const envId of envs) {
6378
envsScanned += 1;
79+
let envStale = 0;
6480
const entries = await buffer.listEntriesForEnv(envId, maxEntries);
6581
for (const entry of entries) {
6682
entriesScanned += 1;
@@ -74,12 +90,16 @@ export async function runStaleSweepOnce(
7490
dwellMs,
7591
staleThresholdMs: config.staleThresholdMs,
7692
});
77-
staleCount += 1;
93+
envStale += 1;
7894
}
7995
}
96+
perEnvStale.set(envId, envStale);
97+
staleCount += envStale;
8098
}
8199
}
82100

101+
reportSnapshot(perEnvStale);
102+
83103
return { orgsScanned: orgs.length, envsScanned, entriesScanned, staleCount };
84104
}
85105

apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ export function recordRealtimeBufferedSubscription(envId: string): void {
3535

3636
// Counts buffer entries that have been waiting in the queue ZSET longer
3737
// than the configured stale threshold (typically half of entryTtlSeconds).
38-
// Climbing in lockstep with the queue depth means the drainer is offline
39-
// or falling behind — alerting hooks into this counter give ops a paging
40-
// signal before TTL-induced silent loss kicks in.
38+
// Useful for historical "stale events over time" views, but not directly
39+
// alertable on its own — a single stuck entry observed by N sweep ticks
40+
// adds N to the counter, so `rate()` over an alerting window reflects
41+
// (entries × ticks), not "entries that are stale right now".
4142
export const staleEntriesCounter = meter.createCounter(
4243
"mollifier.stale_entries",
4344
{
@@ -50,6 +51,41 @@ export function recordStaleEntry(envId: string): void {
5051
staleEntriesCounter.add(1, { envId });
5152
}
5253

54+
// Alertable signal: the count of stale entries observed by the latest
55+
// sweep, per env. The sweep snapshots the full per-env picture on each
56+
// pass (including zeros for envs that no longer have any stale entries)
57+
// so an env that was paging can clear when the drainer catches up
58+
// instead of staying latched. Recommended alert:
59+
// mollifier_stale_entries_current{envId=...} > 0 for 5m
60+
export const staleEntriesGauge = meter.createObservableGauge(
61+
"mollifier.stale_entries.current",
62+
{
63+
description:
64+
"Buffer entries whose dwell exceeds the stale threshold, as observed by the latest sweep pass",
65+
},
66+
);
67+
68+
const latestStaleSnapshot = new Map<string, number>();
69+
70+
export function reportStaleEntrySnapshot(snapshot: Map<string, number>): void {
71+
// Replace, don't merge — envs absent from the new snapshot have either
72+
// drained or no longer exist; leaving their last value cached would
73+
// keep alerts latched forever.
74+
latestStaleSnapshot.clear();
75+
for (const [envId, count] of snapshot) {
76+
latestStaleSnapshot.set(envId, count);
77+
}
78+
}
79+
80+
meter.addBatchObservableCallback(
81+
(result) => {
82+
for (const [envId, count] of latestStaleSnapshot) {
83+
result.observe(staleEntriesGauge, count, { envId });
84+
}
85+
},
86+
[staleEntriesGauge],
87+
);
88+
5389
// Electric SQL's shape-stream protocol adds a `handle=` query param on
5490
// every reconnect after the initial GET. Gating the realtime-buffered
5591
// log/counter on its absence keeps the signal at one tick per

apps/webapp/test/mollifierStaleSweep.test.ts

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,22 @@ const SNAPSHOT = {
1515

1616
function spyDeps() {
1717
const recordedStaleEnvIds: string[] = [];
18+
const snapshots: Array<Map<string, number>> = [];
1819
const warnings: Array<{ message: string; fields: Record<string, unknown> }> = [];
1920
return {
2021
recordedStaleEnvIds,
22+
snapshots,
2123
warnings,
2224
deps: {
2325
recordStaleEntry: (envId: string) => {
2426
recordedStaleEnvIds.push(envId);
2527
},
28+
reportStaleEntrySnapshot: (snapshot: Map<string, number>) => {
29+
// Clone so post-sweep assertions see what was reported *at that
30+
// call site*, not whatever subsequent passes mutate the source
31+
// map into.
32+
snapshots.push(new Map(snapshot));
33+
},
2634
logger: {
2735
warn: (message: string, fields: Record<string, unknown>) => {
2836
warnings.push({ message, fields });
@@ -37,7 +45,7 @@ describe("runStaleSweepOnce — unit", () => {
3745
// Mirrors the prod gate: if TRIGGER_MOLLIFIER_ENABLED=0 the buffer
3846
// singleton is null and the sweep is a no-op. We don't want it to
3947
// emit a metric (or throw) just because mollifier is disabled.
40-
const { deps, recordedStaleEnvIds, warnings } = spyDeps();
48+
const { deps, recordedStaleEnvIds, warnings, snapshots } = spyDeps();
4149
const result = await runStaleSweepOnce(
4250
{ staleThresholdMs: 1000 },
4351
{ ...deps, getBuffer: () => null },
@@ -50,6 +58,10 @@ describe("runStaleSweepOnce — unit", () => {
5058
});
5159
expect(recordedStaleEnvIds).toEqual([]);
5260
expect(warnings).toEqual([]);
61+
// An empty snapshot is still reported so any previously-paging env
62+
// (from a prior sweep before mollifier was disabled) clears.
63+
expect(snapshots).toHaveLength(1);
64+
expect(snapshots[0].size).toBe(0);
5365
});
5466
});
5567

@@ -86,7 +98,7 @@ describe("runStaleSweepOnce — testcontainers", () => {
8698
// the threshold without actually waiting in real time.
8799
const futureNow = Date.now() + 5 * 60 * 1000;
88100

89-
const { deps, recordedStaleEnvIds, warnings } = spyDeps();
101+
const { deps, recordedStaleEnvIds, warnings, snapshots } = spyDeps();
90102
const result = await runStaleSweepOnce(
91103
{ staleThresholdMs: 60 * 1000 },
92104
{
@@ -110,6 +122,42 @@ describe("runStaleSweepOnce — testcontainers", () => {
110122
expect(w.fields.staleThresholdMs).toBe(60 * 1000);
111123
expect(w.fields.dwellMs).toBeGreaterThan(60 * 1000);
112124
}
125+
// Snapshot drives the alertable gauge — env_a has 2 stale
126+
// entries, env_b has 1. Both must appear so a future alert can
127+
// identify which env is paging.
128+
expect(snapshots).toHaveLength(1);
129+
expect(Object.fromEntries(snapshots[0])).toEqual({
130+
env_a: 2,
131+
env_b: 1,
132+
});
133+
} finally {
134+
await buffer.close();
135+
}
136+
},
137+
);
138+
139+
redisTest(
140+
"snapshot reports zero for envs that have entries but none stale (clears latched alerts)",
141+
async ({ redisOptions }) => {
142+
// Critical for alert behaviour: a previous sweep reported env_a
143+
// stale, alert fired, drainer caught up. The next sweep must
144+
// report `env_a -> 0` so the gauge drops below the alert
145+
// threshold instead of staying latched at the last stale value.
146+
const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 60 });
147+
try {
148+
await buffer.accept({
149+
runId: "run_just_arrived",
150+
envId: "env_a",
151+
orgId: "org_1",
152+
payload: JSON.stringify(SNAPSHOT),
153+
});
154+
const { deps, snapshots } = spyDeps();
155+
await runStaleSweepOnce(
156+
{ staleThresholdMs: 60 * 1000 },
157+
{ ...deps, getBuffer: () => buffer },
158+
);
159+
expect(snapshots).toHaveLength(1);
160+
expect(Object.fromEntries(snapshots[0])).toEqual({ env_a: 0 });
113161
} finally {
114162
await buffer.close();
115163
}

0 commit comments

Comments
 (0)