1+ package com .helltractor .demo .container ;
2+
3+ import lombok .SneakyThrows ;
4+ import org .awaitility .Awaitility ;
5+ import org .testcontainers .containers .Container ;
6+ import org .testcontainers .containers .GenericContainer ;
7+ import org .testcontainers .containers .KafkaContainer ;
8+ import org .testcontainers .containers .Network ;
9+ import org .testcontainers .lifecycle .Startable ;
10+ import org .testcontainers .utility .DockerImageName ;
11+
12+ import java .time .Duration ;
13+ import java .util .Collection ;
14+ import java .util .stream .Collectors ;
15+ import java .util .stream .IntStream ;
16+ import java .util .stream .Stream ;
17+
18+ import static org .assertj .core .api .Assertions .assertThat ;
19+
20+ /**
21+ * Provides an easy way to launch a Kafka cluster with multiple brokers.
22+ */
23+ public class KafkaContainerCluster implements Startable {
24+
25+ private final int brokersNum ;
26+
27+ private final Network network ;
28+
29+ private final GenericContainer <?> zookeeper ;
30+
31+ private final Collection <KafkaContainer > brokers ;
32+
33+ public KafkaContainerCluster (String confluentPlatformVersion , int brokersNum , int internalTopicsRf ) {
34+ if (brokersNum < 0 ) {
35+ throw new IllegalArgumentException ("brokersNum '" + brokersNum + "' must be greater than 0" );
36+ }
37+ if (internalTopicsRf < 0 || internalTopicsRf > brokersNum ) {
38+ throw new IllegalArgumentException (
39+ "internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0"
40+ );
41+ }
42+
43+ this .brokersNum = brokersNum ;
44+ this .network = Network .newNetwork ();
45+
46+ this .zookeeper =
47+ new GenericContainer <>(DockerImageName .parse ("confluentinc/cp-zookeeper" ).withTag (confluentPlatformVersion ))
48+ .withNetwork (network )
49+ .withNetworkAliases ("zookeeper" )
50+ .withEnv ("ZOOKEEPER_CLIENT_PORT" , String .valueOf (KafkaContainer .ZOOKEEPER_PORT ));
51+
52+ this .brokers =
53+ IntStream
54+ .range (0 , this .brokersNum )
55+ .mapToObj (brokerNum -> {
56+ return new KafkaContainer (
57+ DockerImageName .parse ("confluentinc/cp-kafka" ).withTag (confluentPlatformVersion )
58+ )
59+ .withNetwork (this .network )
60+ .withNetworkAliases ("broker-" + brokerNum )
61+ .dependsOn (this .zookeeper )
62+ .withExternalZookeeper ("zookeeper:" + KafkaContainer .ZOOKEEPER_PORT )
63+ .withEnv ("KAFKA_BROKER_ID" , brokerNum + "" )
64+ .withEnv ("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR" , internalTopicsRf + "" )
65+ .withEnv ("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS" , internalTopicsRf + "" )
66+ .withEnv ("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR" , internalTopicsRf + "" )
67+ .withEnv ("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR" , internalTopicsRf + "" )
68+ .withStartupTimeout (Duration .ofMinutes (1 ));
69+ })
70+ .collect (Collectors .toList ());
71+ }
72+
73+ public Collection <KafkaContainer > getBrokers () {
74+ return this .brokers ;
75+ }
76+
77+ public String getBootstrapServers () {
78+ return brokers .stream ().map (KafkaContainer ::getBootstrapServers ).collect (Collectors .joining ("," ));
79+ }
80+
81+ private Stream <GenericContainer <?>> allContainers () {
82+ return Stream .concat (this .brokers .stream (), Stream .of (this .zookeeper ));
83+ }
84+
85+ @ Override
86+ @ SneakyThrows
87+ public void start () {
88+ // sequential start to avoid resource contention on CI systems with weaker hardware
89+ brokers .forEach (GenericContainer ::start );
90+
91+ Awaitility
92+ .await ()
93+ .atMost (Duration .ofSeconds (30 ))
94+ .untilAsserted (() -> {
95+ Container .ExecResult result =
96+ this .zookeeper .execInContainer (
97+ "sh" ,
98+ "-c" ,
99+ "zookeeper-shell zookeeper:" +
100+ KafkaContainer .ZOOKEEPER_PORT +
101+ " ls /brokers/ids | tail -n 1"
102+ );
103+ String brokers = result .getStdout ();
104+
105+ assertThat (brokers .split ("," )).hasSize (this .brokersNum );
106+ });
107+ }
108+
109+ @ Override
110+ public void stop () {
111+ allContainers ().parallel ().forEach (GenericContainer ::stop );
112+ }
113+ }
0 commit comments