From 8661efd41758f7735bb1f0fb5b25b582beccabbe Mon Sep 17 00:00:00 2001 From: sandeep-mst Date: Wed, 6 May 2026 20:13:08 +0530 Subject: [PATCH 1/2] offsets check is updated and test is added --- .../pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java | 2 +- .../apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java index 76ff8c0c0a..8567c50215 100644 --- a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java +++ b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java @@ -126,7 +126,7 @@ private Long currentOffset(TopicPartition topicPartition) { public Map currentOffsets() { Map snapshot = Maps.newHashMapWithExpectedSize(currentOffsets.size()); currentOffsets.forEach((topicPartition, offset) -> { - if (offset > 0) { + if (offset >= 0) { snapshot.put(topicPartition, new OffsetAndMetadata(offset, Optional.empty(), null)); } diff --git a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 74af038b31..bf06f31e23 100644 --- a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -1207,6 +1207,8 @@ public void offsetTest() throws Exception { // offset is 0 for the first written record assertEquals(sink.currentOffset(topicName, partition), 0); + // currentOffsets() should include the first record (offset 0) when processed by ackUntil + assertEquals(sink.taskContext.currentOffsets().size(), 1); entryId.set(1); sink.write(record); From 0f5d97b50634d36c3cf1e20614ac67f2a0e796cd Mon Sep 17 00:00:00 2001 From: sandeep-mst Date: Wed, 6 May 2026 20:45:05 +0530 Subject: [PATCH 2/2] updated the test for checking the offset value returned --- .../pulsar/io/kafka/connect/KafkaConnectSinkTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index bf06f31e23..a7e45de775 100644 --- a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -1207,8 +1207,10 @@ public void offsetTest() throws Exception { // offset is 0 for the first written record assertEquals(sink.currentOffset(topicName, partition), 0); - // currentOffsets() should include the first record (offset 0) when processed by ackUntil - assertEquals(sink.taskContext.currentOffsets().size(), 1); + // current offsets map returned by the PulsarKafkaSinkTaskContext should contain the record with offset 0 + assertEquals( + sink.taskContext.currentOffsets().get(new TopicPartition(topicName, partition)).offset(), 0 + ); entryId.set(1); sink.write(record);