@@ -13,6 +13,10 @@ import {
1313 setupTestScenario ,
1414 generateLargeOutput ,
1515} from "./helpers/snapshotTestHelpers.js" ;
16+ import {
17+ copySnapshotsToReplica ,
18+ createTestMetricsMeter ,
19+ } from "./helpers/replicaTestHelpers.js" ;
1620import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic" ;
1721
1822vi . setConfig ( { testTimeout : 120_000 } ) ;
@@ -684,6 +688,7 @@ describe("RunEngine getSnapshotsSince", () => {
684688 "falls back to the primary when the replica is missing the since snapshot" ,
685689 async ( { prisma, schemaOnlyPrisma, redisOptions } ) => {
686690 const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
691+ const { meter, getCounterValue } = createTestMetricsMeter ( ) ;
687692
688693 const engine = new RunEngine ( {
689694 prisma,
@@ -717,6 +722,7 @@ describe("RunEngine getSnapshotsSince", () => {
717722 baseCostInCents : 0.0001 ,
718723 } ,
719724 tracer : trace . getTracer ( "test" , "0.0.0" ) ,
725+ meter,
720726 } ) ;
721727
722728 try {
@@ -771,6 +777,9 @@ describe("RunEngine getSnapshotsSince", () => {
771777 expect ( expectedSnapshots . length ) . toBeGreaterThan ( 0 ) ;
772778 expect ( result ! . length ) . toBe ( expectedSnapshots . length ) ;
773779 expect ( result ! . map ( ( s ) => s . snapshot . id ) ) . toEqual ( expectedSnapshots . map ( ( s ) => s . id ) ) ;
780+
781+ // The fallback fired exactly once - this counter is the prod rollout signal.
782+ expect ( await getCounterValue ( "run_engine.snapshots_since.replica_miss" ) ) . toBe ( 1 ) ;
774783 } finally {
775784 await engine . quit ( ) ;
776785 }
@@ -781,6 +790,7 @@ describe("RunEngine getSnapshotsSince", () => {
781790 "returns null when the snapshot is missing on both replica and primary" ,
782791 async ( { prisma, schemaOnlyPrisma, redisOptions } ) => {
783792 const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
793+ const { meter, getCounterValue } = createTestMetricsMeter ( ) ;
784794
785795 const engine = new RunEngine ( {
786796 prisma,
@@ -811,6 +821,7 @@ describe("RunEngine getSnapshotsSince", () => {
811821 baseCostInCents : 0.0001 ,
812822 } ,
813823 tracer : trace . getTracer ( "test" , "0.0.0" ) ,
824+ meter,
814825 } ) ;
815826
816827 try {
@@ -845,6 +856,317 @@ describe("RunEngine getSnapshotsSince", () => {
845856 } ) ;
846857
847858 expect ( result ) . toBeNull ( ) ;
859+
860+ // Permanent misses are deliberately NOT counted - the counter only tracks
861+ // reads actually served by the primary fallback.
862+ expect ( await getCounterValue ( "run_engine.snapshots_since.replica_miss" ) ) . toBe ( 0 ) ;
863+ } finally {
864+ await engine . quit ( ) ;
865+ }
866+ }
867+ ) ;
868+
869+ containerTest (
870+ "serves the replica's view when the replica has the since snapshot but lags behind the primary" ,
871+ async ( { prisma, schemaOnlyPrisma, redisOptions } ) => {
872+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
873+ const { meter, getCounterValue } = createTestMetricsMeter ( ) ;
874+
875+ const engine = new RunEngine ( {
876+ prisma,
877+ // The schema-only database stands in for a replica that has the since-snapshot
878+ // but lags behind the primary by one snapshot (the newest one is excluded below).
879+ readOnlyPrisma : schemaOnlyPrisma ,
880+ readReplicaSnapshotsSinceEnabled : true ,
881+ worker : {
882+ redis : redisOptions ,
883+ workers : 1 ,
884+ tasksPerWorker : 10 ,
885+ pollIntervalMs : 100 ,
886+ } ,
887+ queue : {
888+ redis : redisOptions ,
889+ } ,
890+ runLock : {
891+ redis : redisOptions ,
892+ } ,
893+ machines : {
894+ defaultMachine : "small-1x" ,
895+ machines : {
896+ "small-1x" : {
897+ name : "small-1x" as const ,
898+ cpu : 0.5 ,
899+ memory : 0.5 ,
900+ centsPerMs : 0.0001 ,
901+ } ,
902+ } ,
903+ baseCostInCents : 0.0001 ,
904+ } ,
905+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
906+ meter,
907+ } ) ;
908+
909+ try {
910+ const taskIdentifier = "test-task" ;
911+ await setupBackgroundWorker ( engine , authenticatedEnvironment , taskIdentifier ) ;
912+
913+ const runFriendlyId = generateFriendlyId ( "run" ) ;
914+ const run = await engine . trigger (
915+ {
916+ number : 1 ,
917+ friendlyId : runFriendlyId ,
918+ environment : authenticatedEnvironment ,
919+ taskIdentifier,
920+ payload : "{}" ,
921+ payloadType : "application/json" ,
922+ context : { } ,
923+ traceContext : { } ,
924+ traceId : "t_replica_stale_tail" ,
925+ spanId : "s_replica_stale_tail" ,
926+ workerQueue : "main" ,
927+ queue : "task/test-task" ,
928+ isTest : false ,
929+ tags : [ ] ,
930+ } ,
931+ prisma
932+ ) ;
933+
934+ await setTimeout ( 500 ) ;
935+ await engine . dequeueFromWorkerQueue ( {
936+ consumerId : "test_replica_stale_tail" ,
937+ workerQueue : "main" ,
938+ } ) ;
939+
940+ const allSnapshots = await prisma . taskRunExecutionSnapshot . findMany ( {
941+ where : { runId : run . id , isValid : true } ,
942+ orderBy : { createdAt : "asc" } ,
943+ } ) ;
944+ expect ( allSnapshots . length ) . toBeGreaterThanOrEqual ( 3 ) ;
945+
946+ const since = allSnapshots [ 0 ] ;
947+ const tail = allSnapshots [ allSnapshots . length - 1 ] ;
948+
949+ // Replica has everything EXCEPT the newest snapshot - a lagging-but-usable replica.
950+ await copySnapshotsToReplica ( prisma , schemaOnlyPrisma , run . id , {
951+ excludeSnapshotIds : [ tail . id ] ,
952+ } ) ;
953+
954+ const result = await engine . getSnapshotsSince ( {
955+ runId : run . id ,
956+ snapshotId : since . id ,
957+ } ) ;
958+
959+ expect ( result ) . not . toBeNull ( ) ;
960+
961+ // The replica's view: everything after the since snapshot, minus the tail
962+ // it hasn't received yet. If reads were hitting the primary, the tail would
963+ // be present and these assertions would fail.
964+ const expectedSnapshots = allSnapshots . filter (
965+ ( s ) => s . createdAt . getTime ( ) > since . createdAt . getTime ( ) && s . id !== tail . id
966+ ) ;
967+ expect ( result ! . map ( ( s ) => s . snapshot . id ) ) . not . toContain ( tail . id ) ;
968+ expect ( result ! . map ( ( s ) => s . snapshot . id ) ) . toEqual ( expectedSnapshots . map ( ( s ) => s . id ) ) ;
969+
970+ // The since snapshot was on the replica - no fallback fired.
971+ expect ( await getCounterValue ( "run_engine.snapshots_since.replica_miss" ) ) . toBe ( 0 ) ;
972+ } finally {
973+ await engine . quit ( ) ;
974+ }
975+ }
976+ ) ;
977+
978+ containerTest (
979+ "reads from the primary when the flag is off even with a replica configured" ,
980+ async ( { prisma, schemaOnlyPrisma, redisOptions } ) => {
981+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
982+ const { meter, getCounterValue } = createTestMetricsMeter ( ) ;
983+
984+ const engine = new RunEngine ( {
985+ prisma,
986+ // Replica configured but EMPTY, flag off: a correct result can only come
987+ // from the primary.
988+ readOnlyPrisma : schemaOnlyPrisma ,
989+ readReplicaSnapshotsSinceEnabled : false ,
990+ worker : {
991+ redis : redisOptions ,
992+ workers : 1 ,
993+ tasksPerWorker : 10 ,
994+ pollIntervalMs : 100 ,
995+ } ,
996+ queue : {
997+ redis : redisOptions ,
998+ } ,
999+ runLock : {
1000+ redis : redisOptions ,
1001+ } ,
1002+ machines : {
1003+ defaultMachine : "small-1x" ,
1004+ machines : {
1005+ "small-1x" : {
1006+ name : "small-1x" as const ,
1007+ cpu : 0.5 ,
1008+ memory : 0.5 ,
1009+ centsPerMs : 0.0001 ,
1010+ } ,
1011+ } ,
1012+ baseCostInCents : 0.0001 ,
1013+ } ,
1014+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
1015+ meter,
1016+ } ) ;
1017+
1018+ try {
1019+ const taskIdentifier = "test-task" ;
1020+ await setupBackgroundWorker ( engine , authenticatedEnvironment , taskIdentifier ) ;
1021+
1022+ const runFriendlyId = generateFriendlyId ( "run" ) ;
1023+ const run = await engine . trigger (
1024+ {
1025+ number : 1 ,
1026+ friendlyId : runFriendlyId ,
1027+ environment : authenticatedEnvironment ,
1028+ taskIdentifier,
1029+ payload : "{}" ,
1030+ payloadType : "application/json" ,
1031+ context : { } ,
1032+ traceContext : { } ,
1033+ traceId : "t_replica_flag_off" ,
1034+ spanId : "s_replica_flag_off" ,
1035+ workerQueue : "main" ,
1036+ queue : "task/test-task" ,
1037+ isTest : false ,
1038+ tags : [ ] ,
1039+ } ,
1040+ prisma
1041+ ) ;
1042+
1043+ await setTimeout ( 500 ) ;
1044+ await engine . dequeueFromWorkerQueue ( {
1045+ consumerId : "test_replica_flag_off" ,
1046+ workerQueue : "main" ,
1047+ } ) ;
1048+
1049+ const allSnapshots = await prisma . taskRunExecutionSnapshot . findMany ( {
1050+ where : { runId : run . id , isValid : true } ,
1051+ orderBy : { createdAt : "asc" } ,
1052+ } ) ;
1053+ expect ( allSnapshots . length ) . toBeGreaterThan ( 1 ) ;
1054+
1055+ const since = allSnapshots [ 0 ] ;
1056+ const result = await engine . getSnapshotsSince ( {
1057+ runId : run . id ,
1058+ snapshotId : since . id ,
1059+ } ) ;
1060+
1061+ expect ( result ) . not . toBeNull ( ) ;
1062+ const expectedSnapshots = allSnapshots . filter (
1063+ ( s ) => s . createdAt . getTime ( ) > since . createdAt . getTime ( )
1064+ ) ;
1065+ expect ( expectedSnapshots . length ) . toBeGreaterThan ( 0 ) ;
1066+ expect ( result ! . map ( ( s ) => s . snapshot . id ) ) . toEqual ( expectedSnapshots . map ( ( s ) => s . id ) ) ;
1067+
1068+ // No replica read attempted, so no fallback could have fired.
1069+ expect ( await getCounterValue ( "run_engine.snapshots_since.replica_miss" ) ) . toBe ( 0 ) ;
1070+ } finally {
1071+ await engine . quit ( ) ;
1072+ }
1073+ }
1074+ ) ;
1075+
1076+ containerTest (
1077+ "uses the provided transaction client and never falls back" ,
1078+ async ( { prisma, schemaOnlyPrisma, redisOptions } ) => {
1079+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
1080+ const { meter, getCounterValue } = createTestMetricsMeter ( ) ;
1081+
1082+ const engine = new RunEngine ( {
1083+ prisma,
1084+ // Flag ON with an EMPTY replica: if the provided tx didn't bypass the replica,
1085+ // the read would miss and (at best) be served by the fallback, incrementing
1086+ // the counter.
1087+ readOnlyPrisma : schemaOnlyPrisma ,
1088+ readReplicaSnapshotsSinceEnabled : true ,
1089+ worker : {
1090+ redis : redisOptions ,
1091+ workers : 1 ,
1092+ tasksPerWorker : 10 ,
1093+ pollIntervalMs : 100 ,
1094+ } ,
1095+ queue : {
1096+ redis : redisOptions ,
1097+ } ,
1098+ runLock : {
1099+ redis : redisOptions ,
1100+ } ,
1101+ machines : {
1102+ defaultMachine : "small-1x" ,
1103+ machines : {
1104+ "small-1x" : {
1105+ name : "small-1x" as const ,
1106+ cpu : 0.5 ,
1107+ memory : 0.5 ,
1108+ centsPerMs : 0.0001 ,
1109+ } ,
1110+ } ,
1111+ baseCostInCents : 0.0001 ,
1112+ } ,
1113+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
1114+ meter,
1115+ } ) ;
1116+
1117+ try {
1118+ const taskIdentifier = "test-task" ;
1119+ await setupBackgroundWorker ( engine , authenticatedEnvironment , taskIdentifier ) ;
1120+
1121+ const runFriendlyId = generateFriendlyId ( "run" ) ;
1122+ const run = await engine . trigger (
1123+ {
1124+ number : 1 ,
1125+ friendlyId : runFriendlyId ,
1126+ environment : authenticatedEnvironment ,
1127+ taskIdentifier,
1128+ payload : "{}" ,
1129+ payloadType : "application/json" ,
1130+ context : { } ,
1131+ traceContext : { } ,
1132+ traceId : "t_replica_tx_bypass" ,
1133+ spanId : "s_replica_tx_bypass" ,
1134+ workerQueue : "main" ,
1135+ queue : "task/test-task" ,
1136+ isTest : false ,
1137+ tags : [ ] ,
1138+ } ,
1139+ prisma
1140+ ) ;
1141+
1142+ await setTimeout ( 500 ) ;
1143+ await engine . dequeueFromWorkerQueue ( {
1144+ consumerId : "test_replica_tx_bypass" ,
1145+ workerQueue : "main" ,
1146+ } ) ;
1147+
1148+ const allSnapshots = await prisma . taskRunExecutionSnapshot . findMany ( {
1149+ where : { runId : run . id , isValid : true } ,
1150+ orderBy : { createdAt : "asc" } ,
1151+ } ) ;
1152+ expect ( allSnapshots . length ) . toBeGreaterThan ( 1 ) ;
1153+
1154+ const since = allSnapshots [ 0 ] ;
1155+ const result = await engine . getSnapshotsSince ( {
1156+ runId : run . id ,
1157+ snapshotId : since . id ,
1158+ tx : prisma ,
1159+ } ) ;
1160+
1161+ expect ( result ) . not . toBeNull ( ) ;
1162+ const expectedSnapshots = allSnapshots . filter (
1163+ ( s ) => s . createdAt . getTime ( ) > since . createdAt . getTime ( )
1164+ ) ;
1165+ expect ( expectedSnapshots . length ) . toBeGreaterThan ( 0 ) ;
1166+ expect ( result ! . map ( ( s ) => s . snapshot . id ) ) . toEqual ( expectedSnapshots . map ( ( s ) => s . id ) ) ;
1167+
1168+ // The tx served the read directly - the fallback path never ran.
1169+ expect ( await getCounterValue ( "run_engine.snapshots_since.replica_miss" ) ) . toBe ( 0 ) ;
8481170 } finally {
8491171 await engine . quit ( ) ;
8501172 }
0 commit comments