@@ -14,6 +14,7 @@ import {
1414 generateLargeOutput ,
1515} from "./helpers/snapshotTestHelpers.js" ;
1616import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic" ;
17+ import type { PrismaReplicaClient } from "@trigger.dev/database" ;
1718
1819vi . setConfig ( { testTimeout : 120_000 } ) ;
1920
@@ -679,4 +680,171 @@ describe("RunEngine getSnapshotsSince", () => {
679680 }
680681 }
681682 ) ;
683+
684+ containerTest (
685+ "falls back to the primary when the replica is missing the since snapshot" ,
686+ async ( { prisma, schemaOnlyPrisma, redisOptions } ) => {
687+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
688+
689+ const engine = new RunEngine ( {
690+ prisma,
691+ // An empty (schema-only) database stands in for a read replica that has not
692+ // caught up: every lookup on it misses, so the engine must fall back to the
693+ // primary instead of failing the poll.
694+ readOnlyPrisma : schemaOnlyPrisma as PrismaReplicaClient ,
695+ readReplicaSnapshotsSinceEnabled : true ,
696+ worker : {
697+ redis : redisOptions ,
698+ workers : 1 ,
699+ tasksPerWorker : 10 ,
700+ pollIntervalMs : 100 ,
701+ } ,
702+ queue : {
703+ redis : redisOptions ,
704+ } ,
705+ runLock : {
706+ redis : redisOptions ,
707+ } ,
708+ machines : {
709+ defaultMachine : "small-1x" ,
710+ machines : {
711+ "small-1x" : {
712+ name : "small-1x" as const ,
713+ cpu : 0.5 ,
714+ memory : 0.5 ,
715+ centsPerMs : 0.0001 ,
716+ } ,
717+ } ,
718+ baseCostInCents : 0.0001 ,
719+ } ,
720+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
721+ } ) ;
722+
723+ try {
724+ const taskIdentifier = "test-task" ;
725+ await setupBackgroundWorker ( engine , authenticatedEnvironment , taskIdentifier ) ;
726+
727+ const runFriendlyId = generateFriendlyId ( "run" ) ;
728+ const run = await engine . trigger (
729+ {
730+ number : 1 ,
731+ friendlyId : runFriendlyId ,
732+ environment : authenticatedEnvironment ,
733+ taskIdentifier,
734+ payload : "{}" ,
735+ payloadType : "application/json" ,
736+ context : { } ,
737+ traceContext : { } ,
738+ traceId : "t_replica_fallback" ,
739+ spanId : "s_replica_fallback" ,
740+ workerQueue : "main" ,
741+ queue : "task/test-task" ,
742+ isTest : false ,
743+ tags : [ ] ,
744+ } ,
745+ prisma
746+ ) ;
747+
748+ await setTimeout ( 500 ) ;
749+ await engine . dequeueFromWorkerQueue ( {
750+ consumerId : "test_replica_fallback" ,
751+ workerQueue : "main" ,
752+ } ) ;
753+
754+ const allSnapshots = await prisma . taskRunExecutionSnapshot . findMany ( {
755+ where : { runId : run . id , isValid : true } ,
756+ orderBy : { createdAt : "asc" } ,
757+ } ) ;
758+ expect ( allSnapshots . length ) . toBeGreaterThan ( 1 ) ;
759+
760+ // The since-snapshot exists only on the primary - the replica misses it.
761+ const firstSnapshot = allSnapshots [ 0 ] ;
762+ const result = await engine . getSnapshotsSince ( {
763+ runId : run . id ,
764+ snapshotId : firstSnapshot . id ,
765+ } ) ;
766+
767+ // Served by the primary fallback, not a failed poll.
768+ expect ( result ) . not . toBeNull ( ) ;
769+ expect ( result ! . length ) . toBe ( allSnapshots . length - 1 ) ;
770+ expect ( result ! . map ( ( s ) => s . snapshot . id ) ) . toEqual ( allSnapshots . slice ( 1 ) . map ( ( s ) => s . id ) ) ;
771+ } finally {
772+ await engine . quit ( ) ;
773+ }
774+ }
775+ ) ;
776+
777+ containerTest (
778+ "returns null when the snapshot is missing on both replica and primary" ,
779+ async ( { prisma, schemaOnlyPrisma, redisOptions } ) => {
780+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
781+
782+ const engine = new RunEngine ( {
783+ prisma,
784+ readOnlyPrisma : schemaOnlyPrisma as PrismaReplicaClient ,
785+ readReplicaSnapshotsSinceEnabled : true ,
786+ worker : {
787+ redis : redisOptions ,
788+ workers : 1 ,
789+ tasksPerWorker : 10 ,
790+ pollIntervalMs : 100 ,
791+ } ,
792+ queue : {
793+ redis : redisOptions ,
794+ } ,
795+ runLock : {
796+ redis : redisOptions ,
797+ } ,
798+ machines : {
799+ defaultMachine : "small-1x" ,
800+ machines : {
801+ "small-1x" : {
802+ name : "small-1x" as const ,
803+ cpu : 0.5 ,
804+ memory : 0.5 ,
805+ centsPerMs : 0.0001 ,
806+ } ,
807+ } ,
808+ baseCostInCents : 0.0001 ,
809+ } ,
810+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
811+ } ) ;
812+
813+ try {
814+ const taskIdentifier = "test-task" ;
815+ await setupBackgroundWorker ( engine , authenticatedEnvironment , taskIdentifier ) ;
816+
817+ const runFriendlyId = generateFriendlyId ( "run" ) ;
818+ const run = await engine . trigger (
819+ {
820+ number : 1 ,
821+ friendlyId : runFriendlyId ,
822+ environment : authenticatedEnvironment ,
823+ taskIdentifier,
824+ payload : "{}" ,
825+ payloadType : "application/json" ,
826+ context : { } ,
827+ traceContext : { } ,
828+ traceId : "t_replica_both_miss" ,
829+ spanId : "s_replica_both_miss" ,
830+ workerQueue : "main" ,
831+ queue : "task/test-task" ,
832+ isTest : false ,
833+ tags : [ ] ,
834+ } ,
835+ prisma
836+ ) ;
837+
838+ // A snapshot id that exists nowhere - a genuine miss stays an error (null).
839+ const result = await engine . getSnapshotsSince ( {
840+ runId : run . id ,
841+ snapshotId : "snapshot_does_not_exist" ,
842+ } ) ;
843+
844+ expect ( result ) . toBeNull ( ) ;
845+ } finally {
846+ await engine . quit ( ) ;
847+ }
848+ }
849+ ) ;
682850} ) ;
0 commit comments