Skip to content

Commit 7fce40e

Browse files
committed
fix(supervisor): wide-event review fixes + noisy-routes flag + socket lifecycle
1 parent fa331bb commit 7fce40e

7 files changed

Lines changed: 56 additions & 38 deletions

File tree

apps/supervisor/src/env.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,10 @@ const Env = z
261261
// line per natural unit of work (dequeue iteration, HTTP request, socket
262262
// lifecycle). High-QPS hotpath, so the kill switch must be honoured.
263263
TRIGGER_WIDE_EVENTS_ENABLED: BoolEnv.default(false),
264+
// When true, also emit wide events for high-frequency HTTP routes
265+
// (heartbeat, snapshots-since, logs/debug). Off in prod to keep event
266+
// volume manageable; on in test environments for full-fidelity debugging.
267+
TRIGGER_WIDE_EVENTS_NOISY_ROUTES: BoolEnv.default(false),
264268
})
265269
.superRefine((data, ctx) => {
266270
if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_METADATA_URL) {

apps/supervisor/src/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class ManagedSupervisor {
6363
env: { nodeId: env.TRIGGER_WORKER_INSTANCE_NAME },
6464
enabled: env.TRIGGER_WIDE_EVENTS_ENABLED,
6565
};
66+
private readonly wideEventsNoisyRoutes = env.TRIGGER_WIDE_EVENTS_NOISY_ROUTES;
6667

6768
constructor() {
6869
const {
@@ -260,7 +261,7 @@ class ManagedSupervisor {
260261
...this.wideEventOpts,
261262
traceparent,
262263
setup: (state) => {
263-
setMeta(state, "run_id", message.run.id);
264+
setMeta(state, "run_id", message.run.friendlyId);
264265
setMeta(state, "env_id", message.environment.id);
265266
setMeta(state, "org_id", message.organization.id);
266267
setMeta(state, "project_id", message.project.id);
@@ -472,6 +473,7 @@ class ManagedSupervisor {
472473
computeManager: this.computeManager,
473474
tracing: this.tracing,
474475
wideEventOpts: this.wideEventOpts,
476+
wideEventsNoisyRoutes: this.wideEventsNoisyRoutes,
475477
});
476478

477479
this.workloadServer.on("runConnected", this.onRunConnected.bind(this));

apps/supervisor/src/wideEvents/emit.test.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ describe("emit", () => {
4343
expect(out).not.toHaveProperty("trace_id");
4444
expect(out).not.toHaveProperty("version");
4545
expect(out).not.toHaveProperty("commit_sha");
46-
expect(out).not.toHaveProperty("instance_id");
4746
expect(out).not.toHaveProperty("error.code");
4847
});
4948

apps/supervisor/src/wideEvents/emit.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ export function emit(state: State): void {
2626
appendIfSet(out, "commit_sha", state.commitSha);
2727
appendIfSet(out, "region", state.region);
2828
appendIfSet(out, "node_id", state.nodeId);
29-
appendIfSet(out, "instance_id", state.instanceId);
3029

3130
out.ok = state.ok;
3231
if (state.statusCode !== 0) out.status = state.statusCode;

apps/supervisor/src/wideEvents/middleware.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ export async function runWideEvent<T>(
5757
});
5858
if (opts.route) state.extras.route = opts.route;
5959
if (opts.method) state.extras.method = opts.method;
60-
if (opts.setup) opts.setup(state);
6160

6261
const start = performance.now();
6362
try {
63+
if (opts.setup) opts.setup(state);
6464
const result = await wideEventStorage.run(state, () => Promise.resolve(fn()));
6565
state.durationMs = Math.round(performance.now() - start);
6666
if (finalize) finalize(state);

apps/supervisor/src/wideEvents/state.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ export type State = {
2121
commitSha?: string;
2222
region?: string;
2323
nodeId?: string;
24-
instanceId?: string;
2524

2625
// Caller-attached opaque metadata, flattened to `meta.<key>` on emit.
2726
meta: Record<string, string>;

apps/supervisor/src/workloadServer/index.ts

Lines changed: 48 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ type WorkloadServerOptions = {
8888
computeManager?: ComputeWorkloadManager;
8989
tracing?: OtlpTraceService;
9090
wideEventOpts: WideEventOptions;
91+
/** When true, high-frequency HTTP routes also emit wide events. */
92+
wideEventsNoisyRoutes: boolean;
9193
};
9294

9395
export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
@@ -96,6 +98,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
9698

9799
private readonly logger = new SimpleStructuredLogger("workload-server");
98100
private readonly wideEventOpts: WideEventOptions;
101+
private readonly wideEventsNoisyRoutes: boolean;
99102

100103
private readonly httpServer: HttpServer;
101104
private readonly websocketServer: Namespace<
@@ -126,6 +129,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
126129
this.workerClient = opts.workerClient;
127130
this.checkpointClient = opts.checkpointClient;
128131
this.wideEventOpts = opts.wideEventOpts;
132+
this.wideEventsNoisyRoutes = opts.wideEventsNoisyRoutes;
129133

130134
if (opts.computeManager?.snapshotsEnabled) {
131135
this.snapshotService = new ComputeSnapshotService({
@@ -183,16 +187,25 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
183187
* `traceparent` and `x-request-id` from `req.headers`, attaches `run_id` /
184188
* `snapshot_id` / `deployment_id` meta from `params` when present, and
185189
* captures the response status from `res.statusCode` after `fn` returns.
190+
*
191+
* Pass `highFrequency: true` for noisy routes (heartbeat, polling). Those
192+
* still go through the wrapper but only emit when
193+
* `TRIGGER_WIDE_EVENTS_NOISY_ROUTES` is on, so prod can keep them dark
194+
* while test envs capture full-fidelity traffic for debugging.
186195
*/
187196
private wideRoute<T>(
188197
ctx: { req: IncomingMessage; res: ServerResponse; params?: unknown },
189198
route: string,
190199
method: string,
191-
fn: () => Promise<T> | T
200+
fn: () => Promise<T> | T,
201+
routeOpts: { highFrequency?: boolean } = {}
192202
): Promise<T> {
203+
const enabled =
204+
this.wideEventOpts.enabled && (!routeOpts.highFrequency || this.wideEventsNoisyRoutes);
193205
return runWideEvent(
194206
{
195207
...this.wideEventOpts,
208+
enabled,
196209
route,
197210
method,
198211
traceparent: this.headerValueFromRequest(ctx.req, "traceparent"),
@@ -324,7 +337,8 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
324337
reply.json({
325338
ok: true,
326339
} satisfies WorkloadHeartbeatResponseBody);
327-
}
340+
},
341+
{ highFrequency: true }
328342
),
329343
}
330344
)
@@ -494,7 +508,8 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
494508
reply.json({
495509
snapshots: sinceSnapshotResponse.data.snapshots.map(legacifyCheckpointType),
496510
} satisfies WorkloadRunSnapshotsSinceResponseBody);
497-
}
511+
},
512+
{ highFrequency: true }
498513
),
499514
}
500515
)
@@ -550,7 +565,8 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
550565
body,
551566
this.runnerIdFromRequest(req)
552567
);
553-
}
568+
},
569+
{ highFrequency: true }
554570
),
555571
});
556572
} else {
@@ -563,7 +579,8 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
563579
"POST",
564580
async () => {
565581
ctx.reply.empty(204);
566-
}
582+
},
583+
{ highFrequency: true }
567584
),
568585
});
569586
}
@@ -654,6 +671,26 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
654671
};
655672
};
656673

674+
const emitSocketLifecycle = (
675+
event: "run_connected" | "run_disconnected",
676+
friendlyId: string,
677+
disconnectReason?: string
678+
) => {
679+
emitOneShot({
680+
...this.wideEventOpts,
681+
populate: (state) => {
682+
state.extras.event = event;
683+
setMeta(state, "run_id", friendlyId);
684+
if (socket.data.deploymentId) {
685+
setMeta(state, "deployment_id", socket.data.deploymentId);
686+
}
687+
if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId);
688+
state.extras.socket_id = socket.id;
689+
if (disconnectReason) state.extras.disconnect_reason = disconnectReason;
690+
},
691+
});
692+
};
693+
657694
const runConnected = (friendlyId: string) => {
658695
socketLogger.debug("runConnected", { ...getSocketMetadata() });
659696

@@ -664,20 +701,22 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
664701
newRunId: friendlyId,
665702
oldRunId: socket.data.runFriendlyId,
666703
});
667-
runDisconnected(socket.data.runFriendlyId);
704+
runDisconnected(socket.data.runFriendlyId, "socket_run_replaced");
668705
}
669706

670707
this.runSockets.set(friendlyId, socket);
671708
this.emit("runConnected", { run: { friendlyId } });
672709
socket.data.runFriendlyId = friendlyId;
710+
emitSocketLifecycle("run_connected", friendlyId);
673711
};
674712

675-
const runDisconnected = (friendlyId: string) => {
713+
const runDisconnected = (friendlyId: string, reason: string) => {
676714
socketLogger.debug("runDisconnected", { ...getSocketMetadata() });
677715

678716
this.runSockets.delete(friendlyId);
679717
this.emit("runDisconnected", { run: { friendlyId } });
680718
socket.data.runFriendlyId = undefined;
719+
emitSocketLifecycle("run_disconnected", friendlyId, reason);
681720
};
682721

683722
socketLogger.debug("wsServer socket connected", { ...getSocketMetadata() });
@@ -695,7 +734,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
695734
});
696735

697736
if (socket.data.runFriendlyId) {
698-
runDisconnected(socket.data.runFriendlyId);
737+
runDisconnected(socket.data.runFriendlyId, `socket_disconnecting:${reason}`);
699738
}
700739
});
701740

@@ -725,18 +764,6 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
725764

726765
try {
727766
runConnected(message.run.friendlyId);
728-
emitOneShot({
729-
...this.wideEventOpts,
730-
populate: (state) => {
731-
state.extras.event = "run:start";
732-
setMeta(state, "run_id", message.run.friendlyId);
733-
if (socket.data.deploymentId) {
734-
setMeta(state, "deployment_id", socket.data.deploymentId);
735-
}
736-
if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId);
737-
state.extras.socket_id = socket.id;
738-
},
739-
});
740767
} catch (error) {
741768
log.error("run:start error", { error });
742769
}
@@ -752,22 +779,10 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
752779
log.debug("Handling run:stop");
753780

754781
try {
755-
runDisconnected(message.run.friendlyId);
782+
runDisconnected(message.run.friendlyId, "run_stop_message");
756783
// Don't delete trace context here - run:stop fires after each snapshot/shutdown
757784
// but the run may be restored on a new VM and snapshot again. Trace context is
758785
// re-populated on dequeue, and entries are small (4 strings per run).
759-
emitOneShot({
760-
...this.wideEventOpts,
761-
populate: (state) => {
762-
state.extras.event = "run:stop";
763-
setMeta(state, "run_id", message.run.friendlyId);
764-
if (socket.data.deploymentId) {
765-
setMeta(state, "deployment_id", socket.data.deploymentId);
766-
}
767-
if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId);
768-
state.extras.socket_id = socket.id;
769-
},
770-
});
771786
} catch (error) {
772787
log.error("run:stop error", { error });
773788
}

0 commit comments

Comments
 (0)