diff --git a/build.gradle b/build.gradle index aa9e464f..e2f1f5f1 100644 --- a/build.gradle +++ b/build.gradle @@ -67,7 +67,7 @@ subprojects { // Freezing this to 1.X until https://github.com/spring-projects/spring-boot/issues/12649 is resolved slf4jVersion = "1.7.36" protobufVersion = "3.22.3" - kafkaVersion = "3.2.3" + kafkaVersion = "3.9.2" micrometerVersion = "1.12.4" micrometerTracingVersion = "1.2.4" lombokVersion = "1.18.30" diff --git a/testing/build.gradle b/testing/build.gradle index ccae003b..0597aabb 100644 --- a/testing/build.gradle +++ b/testing/build.gradle @@ -25,6 +25,7 @@ dependencies { implementation "org.apache.kafka:kafka-clients:$kafkaVersion" implementation "org.apache.kafka:kafka_2.12:$kafkaVersion" + implementation "org.apache.kafka:kafka-server:$kafkaVersion" implementation "org.apache.kafka:kafka-clients:$kafkaVersion:test" implementation "org.apache.kafka:kafka_2.12:$kafkaVersion:test" diff --git a/testing/src/main/java/com/linecorp/decaton/testing/EmbeddedKafkaCluster.java b/testing/src/main/java/com/linecorp/decaton/testing/EmbeddedKafkaCluster.java index 98acb156..1a63abfe 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/EmbeddedKafkaCluster.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/EmbeddedKafkaCluster.java @@ -89,7 +89,8 @@ private static Properties createBrokerConfig(int brokerId, String zkConnect) { 1, // logDir count false, // disable delegation token 1, // num partitions - (short) 1 // default replication factor + (short) 1, // default replication factor + false // disable fetch from follower ); } diff --git a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducerAdaptor.java b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducerAdaptor.java index 2298e09b..52e078c9 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducerAdaptor.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducerAdaptor.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ProducerFencedException; public abstract class ProducerAdaptor implements Producer { @@ -97,6 +98,11 @@ public List partitionsFor(String topic) { return delegate.metrics(); } + @Override + public Uuid clientInstanceId(Duration timeout) { + return delegate.clientInstanceId(timeout); + } + @Override public void close() { delegate.close();