@@ -367,136 +367,6 @@ export class RunEngineTriggerTaskService {
367367 taskKind : taskKind ?? "STANDARD" ,
368368 } ;
369369
370- // Short-circuit before the gate when mollifier is globally off (the
371- // default for every deployment that hasn't opted in). Avoids the
372- // GateInputs allocation, the deps spread inside `evaluateGate`, and
373- // the `mollifier.decisions{outcome=pass_through}` OTel increment on
374- // every trigger — `triggerTask` is the highest-throughput code path
375- // in the system. The check goes through a DI'd predicate so unit
376- // tests that inject a custom `evaluateGate` can also override the
377- // gate-on check (the default reads `env.TRIGGER_MOLLIFIER_ENABLED`,
378- // which is "0" in CI where no .env file is present).
379- const mollifierOutcome : GateOutcome | null = this . isMollifierGloballyEnabled ( )
380- ? await this . evaluateGate ( {
381- envId : environment . id ,
382- orgId : environment . organizationId ,
383- taskId,
384- orgFeatureFlags :
385- ( environment . organization . featureFlags as Record < string , unknown > | null ) ?? null ,
386- options : {
387- debounce : body . options ?. debounce ,
388- oneTimeUseToken : options . oneTimeUseToken ,
389- parentTaskRunId : body . options ?. parentRunId ,
390- resumeParentOnCompletion : body . options ?. resumeParentOnCompletion ,
391- } ,
392- } )
393- : null ;
394-
395- // Phase 2: real divert path. When the gate says mollify, write the
396- // engine.trigger input snapshot into the Redis buffer and return a
397- // synthesised TriggerTaskServiceResult. The customer never waits on
398- // Postgres; the drainer materialises the run later by replaying
399- // engine.trigger against the snapshot. Skip traceRun entirely — the
400- // run span is created by the drainer when it eventually runs.
401- if ( mollifierOutcome ?. action === "mollify" ) {
402- const mollifierBuffer = this . getMollifierBuffer ( ) ;
403- if ( mollifierBuffer && ! body . options ?. debounce ) {
404- const synthetic = await startSpan (
405- this . tracer ,
406- "mollifier.queued" ,
407- async ( mollifierSpan ) => {
408- mollifierSpan . setAttribute ( "mollifier.reason" , mollifierOutcome . decision . reason ) ;
409- mollifierSpan . setAttribute ( "mollifier.count" , mollifierOutcome . decision . count ) ;
410- mollifierSpan . setAttribute (
411- "mollifier.threshold" ,
412- mollifierOutcome . decision . threshold
413- ) ;
414- mollifierSpan . setAttribute ( "runId" , runFriendlyId ) ;
415- mollifierSpan . setAttribute ( "taskRunId" , runFriendlyId ) ;
416-
417- const payloadPacket = await this . payloadProcessor . process ( triggerRequest ) ;
418- const taskEventStore = parentRun ?. taskEventStore ?? "taskEvent" ;
419- // Seed the W3C `traceparent` from the queued span so downstream
420- // `recordRunDebugLog` calls (engine QUEUED/EXECUTING/FINISHED,
421- // run:notify, etc.) emit TaskEvent rows that join the run's trace.
422- // Pass-through gets this for free via `traceEventConcern.traceRun`
423- // populating `event.traceContext`; the mollifier path skips that
424- // wrapper so we have to build the same shape ourselves.
425- const traceContext = this . #propagateExternalTraceContext(
426- {
427- traceparent : serializeTraceparent (
428- mollifierSpan . spanContext ( ) . traceId ,
429- mollifierSpan . spanContext ( ) . spanId
430- ) ,
431- } ,
432- parentRun ?. traceContext ,
433- undefined
434- ) ;
435-
436- const engineTriggerInput = this . #buildEngineTriggerInput( {
437- runFriendlyId,
438- environment,
439- idempotencyKey,
440- idempotencyKeyExpiresAt,
441- body,
442- options,
443- queueName,
444- lockedQueueId,
445- workerQueue,
446- enableFastPath,
447- lockedToBackgroundWorker : lockedToBackgroundWorker ?? undefined ,
448- delayUntil,
449- ttl,
450- metadataPacket,
451- tags,
452- depth,
453- parentRun : parentRun ?? undefined ,
454- annotations,
455- planType,
456- taskId,
457- payloadPacket,
458- traceContext,
459- traceId : mollifierSpan . spanContext ( ) . traceId ,
460- spanId : mollifierSpan . spanContext ( ) . spanId ,
461- parentSpanId : undefined ,
462- taskEventStore,
463- } ) ;
464-
465- const result = await mollifyTrigger ( {
466- runFriendlyId,
467- environmentId : environment . id ,
468- organizationId : environment . organizationId ,
469- engineTriggerInput,
470- decision : mollifierOutcome . decision ,
471- buffer : mollifierBuffer ,
472- // Idempotency-key triple wires the buffer's SETNX into
473- // the trigger-time dedup symmetric with PG (Q5).
474- idempotencyKey,
475- taskIdentifier : taskId ,
476- } ) ;
477-
478- logger . info ( "mollifier.buffered" , {
479- runId : runFriendlyId ,
480- envId : environment . id ,
481- orgId : environment . organizationId ,
482- taskId,
483- reason : mollifierOutcome . decision . reason ,
484- } ) ;
485-
486- return result ;
487- }
488- ) ;
489- // Synthetic result is structurally narrower than the full TaskRun;
490- // the route handler only reads `result.run.friendlyId`.
491- return synthetic as unknown as TriggerTaskServiceResult ;
492- }
493- if ( ! mollifierBuffer ) {
494- logger . warn (
495- "mollifier gate said mollify but buffer is null — falling through to pass-through"
496- ) ;
497- }
498- }
499-
500370 try {
501371 return await this . traceEventConcern . traceRun (
502372 triggerRequest ,
@@ -507,6 +377,126 @@ export class RunEngineTriggerTaskService {
507377 event . setAttribute ( "runId" , runFriendlyId ) ;
508378 span . setAttribute ( "runId" , runFriendlyId ) ;
509379
380+ // Short-circuit when mollifier is globally off (the default
381+ // for every deployment that hasn't opted in). Avoids the
382+ // GateInputs allocation, the deps spread inside `evaluateGate`,
383+ // and the `mollifier.decisions{outcome=pass_through}` OTel
384+ // increment on every trigger — `triggerTask` is the
385+ // highest-throughput code path in the system. The check goes
386+ // through a DI'd predicate so unit tests that inject a custom
387+ // `evaluateGate` can also override the gate-on check (the
388+ // default reads `env.TRIGGER_MOLLIFIER_ENABLED`, which is "0"
389+ // in CI where no .env file is present).
390+ const mollifierOutcome : GateOutcome | null = this . isMollifierGloballyEnabled ( )
391+ ? await this . evaluateGate ( {
392+ envId : environment . id ,
393+ orgId : environment . organizationId ,
394+ taskId,
395+ orgFeatureFlags :
396+ ( environment . organization . featureFlags as Record < string , unknown > | null ) ??
397+ null ,
398+ options : {
399+ debounce : body . options ?. debounce ,
400+ oneTimeUseToken : options . oneTimeUseToken ,
401+ parentTaskRunId : body . options ?. parentRunId ,
402+ resumeParentOnCompletion : body . options ?. resumeParentOnCompletion ,
403+ } ,
404+ } )
405+ : null ;
406+
407+ // When the gate says mollify, write the engine.trigger input
408+ // snapshot into the Redis buffer and return a synthesised
409+ // TriggerTaskServiceResult. The customer never waits on
410+ // Postgres; the drainer materialises the run later by replaying
411+ // engine.trigger against the snapshot. The run span has already
412+ // been opened by traceRun above (PARTIAL event in ClickHouse),
413+ // so its traceId/spanId live in the snapshot and the drainer's
414+ // `mollifier.drained` span parents on the same trace — buffered
415+ // runs become visible in the dashboard's trace view immediately,
416+ // not only after the drainer fires.
417+ if ( mollifierOutcome ?. action === "mollify" ) {
418+ const mollifierBuffer = this . getMollifierBuffer ( ) ;
419+ if ( mollifierBuffer && ! body . options ?. debounce ) {
420+ event . setAttribute ( "mollifier.reason" , mollifierOutcome . decision . reason ) ;
421+ event . setAttribute ( "mollifier.count" , String ( mollifierOutcome . decision . count ) ) ;
422+ event . setAttribute (
423+ "mollifier.threshold" ,
424+ String ( mollifierOutcome . decision . threshold )
425+ ) ;
426+ event . setAttribute ( "taskRunId" , runFriendlyId ) ;
427+
428+ const payloadPacket = await this . payloadProcessor . process ( triggerRequest ) ;
429+
430+ const engineTriggerInput = this . #buildEngineTriggerInput( {
431+ runFriendlyId,
432+ environment,
433+ idempotencyKey,
434+ idempotencyKeyExpiresAt,
435+ body,
436+ options,
437+ queueName,
438+ lockedQueueId,
439+ workerQueue,
440+ enableFastPath,
441+ lockedToBackgroundWorker : lockedToBackgroundWorker ?? undefined ,
442+ delayUntil,
443+ ttl,
444+ metadataPacket,
445+ tags,
446+ depth,
447+ parentRun : parentRun ?? undefined ,
448+ annotations,
449+ planType,
450+ taskId,
451+ payloadPacket,
452+ traceContext : this . #propagateExternalTraceContext(
453+ event . traceContext ,
454+ parentRun ?. traceContext ,
455+ event . traceparent ?. spanId
456+ ) ,
457+ traceId : event . traceId ,
458+ spanId : event . spanId ,
459+ parentSpanId :
460+ options . parentAsLinkType === "replay"
461+ ? undefined
462+ : event . traceparent ?. spanId ,
463+ taskEventStore : store ,
464+ } ) ;
465+
466+ const result = await mollifyTrigger ( {
467+ runFriendlyId,
468+ environmentId : environment . id ,
469+ organizationId : environment . organizationId ,
470+ engineTriggerInput,
471+ decision : mollifierOutcome . decision ,
472+ buffer : mollifierBuffer ,
473+ // Idempotency-key triple wires the buffer's SETNX into
474+ // the trigger-time dedup symmetric with PG (Q5).
475+ idempotencyKey,
476+ taskIdentifier : taskId ,
477+ } ) ;
478+
479+ logger . info ( "mollifier.buffered" , {
480+ runId : runFriendlyId ,
481+ envId : environment . id ,
482+ orgId : environment . organizationId ,
483+ taskId,
484+ reason : mollifierOutcome . decision . reason ,
485+ } ) ;
486+
487+ // Synthetic result is structurally narrower than the full
488+ // TaskRun; the route handler only reads
489+ // `result.run.friendlyId`. traceRun flushes the PARTIAL
490+ // run-span event to ClickHouse on callback return.
491+ return result as unknown as TriggerTaskServiceResult ;
492+ }
493+ if ( ! mollifierBuffer ) {
494+ logger . warn (
495+ "mollifier gate said mollify but buffer is null — falling through to pass-through"
496+ ) ;
497+ }
498+ }
499+
510500 const payloadPacket = await this . payloadProcessor . process ( triggerRequest ) ;
511501
512502 const baseEngineInput = this . #buildEngineTriggerInput( {
0 commit comments