From 1400dc0f123c6e43e70759073f39002dc89d4801 Mon Sep 17 00:00:00 2001 From: Jaroslav Kaspar Date: Wed, 27 May 2026 13:17:58 +0200 Subject: [PATCH] [fix][io] Create new JSON converters for offset storage in Kafka Connect Adaptor Pulsar Kafka Connect adaptor previously used the same converters for both data and offset storage. This coudl cause various issues. For example when data were using AvroConverter, offsets were serialized using MockSchemaRegistryClient (in memory only). After a connector restart, the fresh MockSchemaRegistryClient had no schema records, causing deserialization to fail with "Subject Not Found; error code: 40401" and the connector losing its offset position. Kafka Connect do not reuse the data converters for offset, but creates new JSON converters configured with `schema.enable` set to `false`. Thus the adaptor was changed to have the same behavior. This is a breaking change for connectors that previously stored offsets in a non-JSON format, those offsets do not have to be readable after upgrade. The offsets probably weren't readable even before this fix (as we can see for the Avro converter). But since we cannot besure what converters users used and how they behaved, this change should be probably included in a major release. Note: The tests for adaptor currently do not compile. We are waiting for matching pulsar artifact to be publihsed. The fix was verified E2E by manual testing. Fixes #30 --- .../connect/AbstractKafkaConnectSource.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index 47f691af12..77f513daf4 100644 --- a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -43,6 +43,8 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.OffsetStorageReader; @@ -114,6 +116,15 @@ public void open(Map config, SourceContext sourceContext) throws keyConverter.configure(config, true); valueConverter.configure(config, false); + // initialize offset converters + Converter offsetKeyConverter = new JsonConverter(); + Converter offsetValueConverter = new JsonConverter(); + + Map offsetConverterConfig = new HashMap<>(); + offsetConverterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false); + offsetKeyConverter.configure(offsetConverterConfig, true); + offsetValueConverter.configure(offsetConverterConfig, false); + offsetStore = new PulsarOffsetBackingStore(sourceContext.getPulsarClient()); PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(stringConfig); offsetStore.configure(pulsarKafkaWorkerConfig); @@ -122,14 +133,14 @@ public void open(Map config, SourceContext sourceContext) throws offsetReader = new OffsetStorageReaderImpl( offsetStore, "pulsar-kafka-connect-adaptor", - keyConverter, - valueConverter + offsetKeyConverter, + offsetValueConverter ); offsetWriter = new OffsetStorageWriter( offsetStore, "pulsar-kafka-connect-adaptor", - keyConverter, - valueConverter + offsetKeyConverter, + offsetValueConverter ); sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader, pulsarKafkaWorkerConfig);