diff --git a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index 47f691af12..77f513daf4 100644 --- a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -43,6 +43,8 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.OffsetStorageReader; @@ -114,6 +116,15 @@ public void open(Map config, SourceContext sourceContext) throws keyConverter.configure(config, true); valueConverter.configure(config, false); + // initialize offset converters + Converter offsetKeyConverter = new JsonConverter(); + Converter offsetValueConverter = new JsonConverter(); + + Map offsetConverterConfig = new HashMap<>(); + offsetConverterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false); + offsetKeyConverter.configure(offsetConverterConfig, true); + offsetValueConverter.configure(offsetConverterConfig, false); + offsetStore = new PulsarOffsetBackingStore(sourceContext.getPulsarClient()); PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(stringConfig); offsetStore.configure(pulsarKafkaWorkerConfig); @@ -122,14 +133,14 @@ public void open(Map config, SourceContext sourceContext) throws offsetReader = new OffsetStorageReaderImpl( offsetStore, "pulsar-kafka-connect-adaptor", - keyConverter, - valueConverter + offsetKeyConverter, + offsetValueConverter ); offsetWriter = new OffsetStorageWriter( offsetStore, "pulsar-kafka-connect-adaptor", - keyConverter, - valueConverter + offsetKeyConverter, + offsetValueConverter ); sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader, pulsarKafkaWorkerConfig);