99 createElectricContainer ,
1010 createPostgresContainer ,
1111 createRedisContainer ,
12- createMinIOContainer ,
1312 postgresUriWithDatabase ,
1413 pushDatabaseSchema ,
1514 useContainer ,
@@ -23,7 +22,7 @@ import {
2322 runClickhouseMigrations ,
2423 truncateClickhouseTables ,
2524} from "./clickhouse" ;
26- import { StartedMinIOContainer , type MinIOConnectionConfig } from "./minio" ;
25+ import { MinIOContainer , StartedMinIOContainer , type MinIOConnectionConfig } from "./minio" ;
2726import { ClickHouseClient , createClient } from "@clickhouse/client" ;
2827
2928export { assertNonNullable , createPostgresContainer } from "./utils" ;
@@ -141,26 +140,32 @@ type PostgresTestContext = {
141140// clone, redis = FLUSHALL, clickhouse = TRUNCATE) instead of re-booting. Reset fixtures are `auto`
142141// so they run for every test even if it doesn't destructure them.
143142
144- // Boot postgres once + push the schema into a dedicated template db that nothing else connects to
145- // (so CREATE DATABASE ... TEMPLATE never trips on an active session).
146- const bootWorkerPostgres = async ( { } , use : Use < StartedPostgreSqlContainer > ) => {
147- const container = await new PostgreSqlContainer ( "docker.io/postgres:14" )
148- . withCommand ( [ "-c" , "listen_addresses=*" , "-c" , "wal_level=logical" ] )
149- . start ( ) ;
150- await pushDatabaseSchema ( postgresUriWithDatabase ( container . getConnectionUri ( ) , POSTGRES_TEMPLATE_DB ) ) ;
151- try {
152- await use ( container ) ;
153- } finally {
154- await container . stop ( { timeout : 0 } ) ;
143+ // Boot postgres ONCE per worker (module singleton, reaped by Ryuk on worker exit) and push the
144+ // schema into a dedicated template db that nothing else connects to (so CREATE DATABASE ... TEMPLATE
145+ // never trips on an active session).
146+ let workerPostgresContainer : Promise < StartedPostgreSqlContainer > | undefined ;
147+ const getWorkerPostgresContainer = ( ) => {
148+ if ( ! workerPostgresContainer ) {
149+ workerPostgresContainer = ( async ( ) => {
150+ const container = await new PostgreSqlContainer ( "docker.io/postgres:14" )
151+ . withCommand ( [ "-c" , "listen_addresses=*" , "-c" , "wal_level=logical" ] )
152+ . start ( ) ;
153+ await pushDatabaseSchema (
154+ postgresUriWithDatabase ( container . getConnectionUri ( ) , POSTGRES_TEMPLATE_DB )
155+ ) ;
156+ return container ;
157+ } ) ( ) ;
155158 }
159+ return workerPostgresContainer ;
156160} ;
157161
158- // Per test: clone a fresh database from the template (fast filesystem copy) - isolated AND parallel-ready.
159- const templateClonePrisma = async (
160- { postgresContainer } : { postgresContainer : StartedPostgreSqlContainer } ,
161- use : Use < PrismaClient >
162- ) => {
163- const baseUri = postgresContainer . getConnectionUri ( ) ;
162+ // Per test: clone a fresh database from the template (fast filesystem copy), then hand back a view
163+ // of the shared container whose connection points at the clone. This keeps prisma AND any code that
164+ // reads postgresContainer.getConnectionUri()/getDatabase() (e.g. logical replication) on the SAME
165+ // isolated database - and it's parallel-ready (each test owns its db).
166+ const clonedPostgresContainer = async ( { } , use : Use < StartedPostgreSqlContainer > ) => {
167+ const container = await getWorkerPostgresContainer ( ) ;
168+ const baseUri = container . getConnectionUri ( ) ;
164169 const cloneDb = `test_${ pgCloneCounter ++ } ` ;
165170
166171 const admin = new PrismaClient ( {
@@ -169,8 +174,25 @@ const templateClonePrisma = async (
169174 await admin . $executeRawUnsafe ( `CREATE DATABASE "${ cloneDb } " TEMPLATE "${ POSTGRES_TEMPLATE_DB } "` ) ;
170175 await admin . $disconnect ( ) ;
171176
177+ const cloneUri = postgresUriWithDatabase ( baseUri , cloneDb ) ;
178+ const view = new Proxy ( container , {
179+ get ( target , prop , receiver ) {
180+ if ( prop === "getConnectionUri" ) return ( ) => cloneUri ;
181+ if ( prop === "getDatabase" ) return ( ) => cloneDb ;
182+ const value = Reflect . get ( target , prop , receiver ) ;
183+ return typeof value === "function" ? value . bind ( target ) : value ;
184+ } ,
185+ } ) ;
186+
187+ await use ( view ) ;
188+ } ;
189+
190+ const prismaFromContainer = async (
191+ { postgresContainer } : { postgresContainer : StartedPostgreSqlContainer } ,
192+ use : Use < PrismaClient >
193+ ) => {
172194 const prisma = new PrismaClient ( {
173- datasources : { db : { url : postgresUriWithDatabase ( baseUri , cloneDb ) } } ,
195+ datasources : { db : { url : postgresContainer . getConnectionUri ( ) } } ,
174196 } ) ;
175197 try {
176198 await use ( prisma ) ;
@@ -180,8 +202,8 @@ const templateClonePrisma = async (
180202} ;
181203
182204export const postgresTest = test . extend < PostgresTestContext > ( {
183- postgresContainer : [ bootWorkerPostgres , { scope : "worker" } ] ,
184- prisma : templateClonePrisma ,
205+ postgresContainer : clonedPostgresContainer ,
206+ prisma : prismaFromContainer ,
185207} ) ;
186208
187209export const redisContainer = async (
@@ -377,6 +399,9 @@ export const clickhouseTest = test.extend<ClickhouseTestContext>({
377399 clickhouseClient : scopedClickhouseClient ,
378400} ) ;
379401
402+ // NOTE: per-test containers (not worker-scoped) - the replication package does logical replication
403+ // (slots/publications/REPLICA IDENTITY), which doesn't play nicely with a shared container +
404+ // template-clone. A dedicated container per test is the correct, isolated choice here.
380405export const postgresAndRedisTest = test . extend < PostgresAndRedisContext > ( {
381406 network,
382407 postgresContainer,
@@ -400,8 +425,8 @@ type ContainerTestContext = {
400425// per test (postgres template-clone, redis FLUSHALL, clickhouse TRUNCATE) - no per-test boots, no
401426// shared docker network needed.
402427export const containerTest = test . extend < ContainerTestContext > ( {
403- postgresContainer : [ bootWorkerPostgres , { scope : "worker" } ] ,
404- prisma : templateClonePrisma ,
428+ postgresContainer : clonedPostgresContainer ,
429+ prisma : prismaFromContainer ,
405430 redisContainer : [ bootWorkerRedis , { scope : "worker" } ] ,
406431 resetRedis : [ flushRedis , { auto : true } ] ,
407432 redisOptions,
@@ -410,6 +435,20 @@ export const containerTest = test.extend<ContainerTestContext>({
410435 clickhouseClient : scopedClickhouseClient ,
411436} ) ;
412437
438+ // For tests that exercise the Postgres -> ClickHouse logical-replication pipeline (WAL slots,
439+ // publications, REPLICA IDENTITY). These need a dedicated Postgres per test - the worker-scoped +
440+ // template-clone model used by containerTest doesn't carry logical replication across cloned dbs.
441+ // Everything is per-test here (fully isolated, same as the pre-scoping containerTest).
442+ export const replicationContainerTest = test . extend < ContainerContext > ( {
443+ network,
444+ postgresContainer,
445+ prisma,
446+ redisContainer,
447+ redisOptions,
448+ clickhouseContainer,
449+ clickhouseClient,
450+ } ) ;
451+
413452export const containerWithElectricTest = test . extend < ContainerWithElectricContext > ( {
414453 network,
415454 postgresContainer,
@@ -428,17 +467,22 @@ export const containerWithElectricAndRedisTest = test.extend<ContainerWithElectr
428467 clickhouseClient,
429468} ) ;
430469
431- const minioContainer = async (
432- { network , task } : { network : StartedNetwork } & TestContext ,
433- use : Use < StartedMinIOContainer >
434- ) => {
435- const { container , metadata } = await withContainerSetup ( {
436- name : "minioContainer" ,
437- task ,
438- setup : createMinIOContainer ( network ) ,
439- } ) ;
470+ // Boot minio once per worker; reset the bucket per test (auto fixture).
471+ const bootWorkerMinio = async ( { } , use : Use < StartedMinIOContainer > ) => {
472+ const container = await new MinIOContainer ( ) . start ( ) ;
473+ try {
474+ await use ( container ) ;
475+ } finally {
476+ await container . stop ( { timeout : 0 } ) ;
477+ }
478+ } ;
440479
441- await useContainer ( "minioContainer" , { container, task, use : ( ) => use ( container ) } ) ;
480+ const minioReset = async (
481+ { minioContainer } : { minioContainer : StartedMinIOContainer } ,
482+ use : Use < void >
483+ ) => {
484+ await minioContainer . resetBucket ( ) ;
485+ await use ( ) ;
442486} ;
443487
444488const minioConfig = async (
@@ -448,18 +492,30 @@ const minioConfig = async (
448492 await use ( minioContainer . getConnectionConfig ( ) ) ;
449493} ;
450494
451- export const minioTest = test . extend < MinIOContext > ( {
452- network,
453- minioContainer,
495+ type MinioTestContext = {
496+ minioContainer : StartedMinIOContainer ;
497+ resetMinio : void ;
498+ minioConfig : MinIOConnectionConfig ;
499+ } ;
500+
501+ export const minioTest = test . extend < MinioTestContext > ( {
502+ minioContainer : [ bootWorkerMinio , { scope : "worker" } ] ,
503+ resetMinio : [ minioReset , { auto : true } ] ,
454504 minioConfig,
455505} ) ;
456506
457- type PostgresAndMinIOContext = NetworkContext & PostgresContext & MinIOContext ;
507+ type PostgresAndMinioTestContext = {
508+ postgresContainer : StartedPostgreSqlContainer ;
509+ prisma : PrismaClient ;
510+ minioContainer : StartedMinIOContainer ;
511+ resetMinio : void ;
512+ minioConfig : MinIOConnectionConfig ;
513+ } ;
458514
459- export const postgresAndMinioTest = test . extend < PostgresAndMinIOContext > ( {
460- network ,
461- postgresContainer ,
462- prisma ,
463- minioContainer ,
515+ export const postgresAndMinioTest = test . extend < PostgresAndMinioTestContext > ( {
516+ postgresContainer : clonedPostgresContainer ,
517+ prisma : prismaFromContainer ,
518+ minioContainer : [ bootWorkerMinio , { scope : "worker" } ] ,
519+ resetMinio : [ minioReset , { auto : true } ] ,
464520 minioConfig,
465521} ) ;
0 commit comments