From 0ecc271cc50e4625912985c9285ae28692380159 Mon Sep 17 00:00:00 2001 From: Devendra Parhate Date: Mon, 8 Dec 2025 15:32:15 +0530 Subject: [PATCH 1/4] Add errorLogIncludeMessages test --- .../iceberg/connect/IcebergSinkConfig.java | 39 +++++++++++++++++++ .../connect/channel/CommitterImpl.java | 2 +- .../iceberg/connect/data/SinkWriter.java | 26 ++++++++++++- 3 files changed, 64 insertions(+), 3 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 9650ce16270c..43dccd96779f 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.connect.data.ErrorTolerance; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -79,6 +80,10 @@ public class IcebergSinkConfig extends AbstractConfig { "iceberg.tables.schema-force-optional"; private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP = "iceberg.tables.schema-case-insensitive"; + private static final String ERROR_TOLERANCE = "errors.tolerance"; + private static final String ERROR_LOG_INCLUDE_MESSAGES = "errors.log.include.messages"; + private static final String ERROR_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = + "errors.deadletterqueue.topic.name"; private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic"; private static final String CONTROL_GROUP_ID_PREFIX_PROP = "iceberg.control.group-id-prefix"; private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms"; @@ -95,6 +100,10 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String TASK_ID = "task.id"; private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers"; + private static final String DEFAULT_ERROR_TOLERANCE = ErrorTolerance.NONE.toString(); + private static final String DEFAULT_ERROR_LOG_INCLUDE_MESSAGES = "false"; + private static final String DEFAULT_ERROR_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = ""; + private static final String DEFAULT_CATALOG_NAME = "iceberg"; private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg"; public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-"; @@ -181,6 +190,24 @@ private static ConfigDef newConfigDef() { DEFAULT_CATALOG_NAME, Importance.MEDIUM, "Iceberg catalog name"); + configDef.define( + ERROR_TOLERANCE, + ConfigDef.Type.STRING, + DEFAULT_ERROR_TOLERANCE, + Importance.MEDIUM, + "Behavior for tolerating errors during connector operation. 'none' is the default value and signals that any error will result in an immediate connector task failure; 'all' changes the behavior to skip over problematic records."); + configDef.define( + ERROR_LOG_INCLUDE_MESSAGES, + ConfigDef.Type.BOOLEAN, + DEFAULT_ERROR_LOG_INCLUDE_MESSAGES, + Importance.MEDIUM, + "If true, write each error and the details of the failed operation and problematic record to the Connect application log. This is 'false' by default, so that only errors that are not tolerated are reported."); + configDef.define( + ERROR_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, + ConfigDef.Type.STRING, + DEFAULT_ERROR_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, + Importance.MEDIUM, + "The name of the topic to be used as the dead letter queue (DLQ) for messages that result in an error when processed by this sink connector, or its transformations or converters. The topic name is blank by default, which means that no messages are to be recorded in the DLQ"); configDef.define( CONTROL_TOPIC_PROP, ConfigDef.Type.STRING, @@ -452,6 +479,18 @@ public JsonConverter jsonConverter() { return jsonConverter; } + public String errorTolerance() { + return getString(ERROR_TOLERANCE); + } + + public boolean errorLogIncludeMessages() { + return getBoolean(ERROR_LOG_INCLUDE_MESSAGES); + } + + public String errorDeadLetterQueueTopicNameConfig() { + return getString(ERROR_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG); + } + @VisibleForTesting static boolean checkClassName(String className) { return (className.matches(".*\\.ConnectDistributed.*") diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java index 04602a66a5e1..b283e67c1379 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java @@ -193,7 +193,7 @@ private void processControlEvents() { private void startWorker() { if (null == this.worker) { LOG.info("Starting commit worker {}-{}", config.connectorName(), config.taskId()); - SinkWriter sinkWriter = new SinkWriter(catalog, config); + SinkWriter sinkWriter = new SinkWriter(catalog, config, context.errantRecordReporter()); worker = new Worker(config, clientFactory, sinkWriter, context); worker.start(); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java index f81155e13777..71d8af518b95 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -32,19 +32,28 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SinkWriter { + + private static final Logger LOG = LoggerFactory.getLogger(SinkWriter.class); + private final IcebergSinkConfig config; private final IcebergWriterFactory writerFactory; private final Map writers; private final Map sourceOffsets; + private final ErrantRecordReporter reporter; - public SinkWriter(Catalog catalog, IcebergSinkConfig config) { + public SinkWriter(Catalog catalog, IcebergSinkConfig config, ErrantRecordReporter reporter) { this.config = config; this.writerFactory = new IcebergWriterFactory(catalog, config); this.writers = Maps.newHashMap(); this.sourceOffsets = Maps.newHashMap(); + this.reporter = reporter; } public void close() { @@ -65,7 +74,20 @@ public SinkWriterResult completeWrite() { } public void save(Collection sinkRecords) { - sinkRecords.forEach(this::save); + for (SinkRecord record : sinkRecords) { + try { + this.save(record); + } catch (DataException ex) { + if (this.reporter != null) { + this.reporter.report(record, ex); + } + if (this.config.errorTolerance().equalsIgnoreCase(ErrorTolerance.ALL.toString())) { + LOG.error("An error occurred converting record...", ex); + } else { + throw ex; + } + } + } } private void save(SinkRecord record) { From c95e3789057942228520dc910c2869ce559e9f35 Mon Sep 17 00:00:00 2001 From: Devendra Parhate Date: Mon, 8 Dec 2025 15:32:24 +0530 Subject: [PATCH 2/4] Implement error handling mechanism for data exception --- .../iceberg/connect/data/ErrorTolerance.java | 36 +++++++++++++++++++ .../iceberg/connect/data/IcebergWriter.java | 20 ++++++++--- 2 files changed, 52 insertions(+), 4 deletions(-) create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java new file mode 100644 index 000000000000..1f24cab1b73d --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java @@ -0,0 +1,36 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ +package org.apache.iceberg.connect.data; + +import java.util.Locale; + +public enum ErrorTolerance { + + /** Tolerate no errors. */ + NONE, + + /** Tolerate all errors. */ + ALL; + + public String value() { + return name().toLowerCase(Locale.ROOT); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java index b5be5b3a0047..9f540fe4f2f1 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java @@ -57,22 +57,34 @@ private void initNewWriter() { @Override public void write(SinkRecord record) { + Record row = null; try { // ignore tombstones... if (record.value() != null) { - Record row = convertToRow(record); - writer.write(row); + row = convertToRow(record); } } catch (Exception e) { + String recordData = ""; + if (this.config.errorLogIncludeMessages()) { + recordData = String.format(", record: %s", record.value().toString()); + } throw new DataException( String.format( Locale.ROOT, - "An error occurred converting record, topic: %s, partition, %d, offset: %d", + "An error occurred converting record, topic: %s, partition, %d, offset: %d%s", record.topic(), record.kafkaPartition(), - record.kafkaOffset()), + record.kafkaOffset(), + recordData), e); } + if (row != null) { + try { + writer.write(row); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } private Record convertToRow(SinkRecord record) { From 2384f6a86007259a15e87adbaab572f9d9f6b3f7 Mon Sep 17 00:00:00 2001 From: Devendra Parhate Date: Mon, 8 Dec 2025 15:34:33 +0530 Subject: [PATCH 3/4] Add error tolerance test --- .../iceberg/connect/data/TestSinkWriter.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java index a14ebcab7336..9772fd352dbe 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java @@ -207,4 +207,46 @@ private List sinkWriterTest( return result.writerResults(); } + + + @Test + public void testErrorToleranceAll() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.ALL.toString()); + when(config.errorLogIncludeMessages()).thenReturn(true); + + Map badValue = ImmutableMap.of("id", "abc"); + List writerResults1 = sinkWriterTest(badValue, config); + assertThat(writerResults1.size()).isEqualTo(1); + } + + @Test + public void testErrorToleranceNone() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.NONE.toString()); + + Map badValue = ImmutableMap.of("id", "abc"); + assertThatThrownBy(() -> sinkWriterTest(badValue, config)) + .isInstanceOf(DataException.class) + .hasMessage("An error occurred converting record, topic: topic, partition, 1, offset: 100"); + } + + @Test + public void testErrorToleranceNoneErrorLogIncludeMessages() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.NONE.toString()); + when(config.errorLogIncludeMessages()).thenReturn(true); + + Map badValue = ImmutableMap.of("id", "abc"); + assertThatThrownBy(() -> sinkWriterTest(badValue, config)) + .isInstanceOf(DataException.class) + .hasMessage("An error occurred converting record, topic: topic, partition, 1, offset: 100, record: {id=abc}"); + } + } From 90695de2bf60ad4f426b0a12de45525e8a1b6fa7 Mon Sep 17 00:00:00 2001 From: Devendra Parhate Date: Mon, 8 Dec 2025 15:37:12 +0530 Subject: [PATCH 4/4] Add error tolerance test --- .../java/org/apache/iceberg/connect/data/TestSinkWriter.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java index 9772fd352dbe..da7ba9906277 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSinkWriter.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; @@ -44,6 +45,7 @@ import org.apache.iceberg.types.Types; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -181,7 +183,7 @@ private List sinkWriterTest( IcebergWriterFactory writerFactory = mock(IcebergWriterFactory.class); when(writerFactory.createWriter(any(), any(), anyBoolean())).thenReturn(writer); - SinkWriter sinkWriter = new SinkWriter(catalog, config); + SinkWriter sinkWriter = new SinkWriter(catalog, config, null); // save a record Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);