diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index 471dc3e56035..c4c625a8e0c1 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -20,7 +20,6 @@ import java.io.Closeable; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.List; import java.util.Map; import org.apache.iceberg.DataFile; @@ -43,8 +42,12 @@ import org.apache.iceberg.util.StructProjection; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class BaseTaskWriter implements TaskWriter { + private static final Logger LOG = LoggerFactory.getLogger(BaseTaskWriter.class); + private final List completedDataFiles = Lists.newArrayList(); private final List completedDeleteFiles = Lists.newArrayList(); private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); @@ -344,12 +347,17 @@ private void closeCurrent() throws IOException { currentWriter.close(); if (currentRows == 0L) { - try { - io.deleteFile(currentFile.encryptingOutputFile()); - } catch (UncheckedIOException e) { - // the file may not have been created, and it isn't worth failing the job to clean up, - // skip deleting - } + // the file may not have been created or cannot be deleted, and it isn't worth failing + // the job to clean up, skip deleting + Tasks.foreach(currentFile.encryptingOutputFile()) + .suppressFailureWhenFinished() + .onFailure( + (file, exc) -> + LOG.warn( + "Failed to delete the uncommitted empty file during writer clean up: {}", + file, + exc)) + .run(io::deleteFile); } else { complete(currentWriter); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index bf5b59a0f025..ae26beec69d7 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.connect.data.ErrorTolerance; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -79,6 +80,8 @@ public class IcebergSinkConfig extends AbstractConfig { "iceberg.tables.schema-force-optional"; private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP = "iceberg.tables.schema-case-insensitive"; + private static final String ERROR_TOLERANCE = "errors.tolerance"; + private static final String ERROR_LOG_INCLUDE_MESSAGES = "errors.log.include.messages"; private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic"; private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms"; private static final int COMMIT_INTERVAL_MS_DEFAULT = 300_000; @@ -91,6 +94,9 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String NAME_PROP = "name"; private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers"; + private static final String DEFAULT_ERROR_TOLERANCE = ErrorTolerance.NONE.toString(); + private static final String DEFAULT_ERROR_LOG_INCLUDE_MESSAGES = "false"; + private static final String DEFAULT_CATALOG_NAME = "iceberg"; private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg"; public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-"; @@ -174,6 +180,18 @@ private static ConfigDef newConfigDef() { DEFAULT_CATALOG_NAME, Importance.MEDIUM, "Iceberg catalog name"); + configDef.define( + ERROR_TOLERANCE, + ConfigDef.Type.STRING, + DEFAULT_ERROR_TOLERANCE, + Importance.MEDIUM, + "Behavior for tolerating errors during connector operation. 'none' is the default value and signals that any error will result in an immediate connector task failure; 'all' changes the behavior to skip over problematic records."); + configDef.define( + ERROR_LOG_INCLUDE_MESSAGES, + ConfigDef.Type.BOOLEAN, + DEFAULT_ERROR_LOG_INCLUDE_MESSAGES, + Importance.MEDIUM, + "If true, write each error and the details of the failed operation and problematic record to the Connect application log. This is 'false' by default, so that only errors that are not tolerated are reported."); configDef.define( CONTROL_TOPIC_PROP, ConfigDef.Type.STRING, @@ -406,6 +424,14 @@ public JsonConverter jsonConverter() { return jsonConverter; } + public String errorTolerance() { + return getString(ERROR_TOLERANCE); + } + + public boolean errorLogIncludeMessages() { + return getBoolean(ERROR_LOG_INCLUDE_MESSAGES); + } + @VisibleForTesting static boolean checkClassName(String className) { return (className.matches(".*\\.ConnectDistributed.*") diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java index 53b7b76e8ea0..dab2fc0312bf 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java @@ -78,6 +78,7 @@ public void start(Catalog catalog, IcebergSinkConfig config, SinkTaskContext con LOG.info("Starting commit worker"); SinkWriter sinkWriter = new SinkWriter(catalog, config); + sinkWriter.setReporter(context.errantRecordReporter()); worker = new Worker(config, clientFactory, sinkWriter, context); worker.start(); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java new file mode 100644 index 000000000000..cc4cc2599479 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.Locale; + +public enum ErrorTolerance { + + /** Tolerate no errors. */ + NONE, + + /** Tolerate all errors. */ + ALL; + + public String value() { + return name().toLowerCase(Locale.ROOT); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java index b5be5b3a0047..a0a7d60b366b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java @@ -57,22 +57,34 @@ private void initNewWriter() { @Override public void write(SinkRecord record) { + Record row = null; try { // ignore tombstones... if (record.value() != null) { - Record row = convertToRow(record); - writer.write(row); + row = convertToRow(record); } } catch (Exception e) { + String recordData = + this.config.errorLogIncludeMessages() + ? String.format(", record: %s", record.value().toString()) + : ""; throw new DataException( String.format( Locale.ROOT, - "An error occurred converting record, topic: %s, partition, %d, offset: %d", + "An error occurred converting record, topic: %s, partition, %d, offset: %d%s", record.topic(), record.kafkaPartition(), - record.kafkaOffset()), + record.kafkaOffset(), + recordData), e); } + if (row != null) { + try { + writer.write(row); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } private Record convertToRow(SinkRecord record) { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java index f81155e13777..6fe0fcfd6558 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -32,13 +32,21 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SinkWriter { + + private static final Logger LOG = LoggerFactory.getLogger(SinkWriter.class); + private final IcebergSinkConfig config; private final IcebergWriterFactory writerFactory; private final Map writers; private final Map sourceOffsets; + private ErrantRecordReporter reporter; public SinkWriter(Catalog catalog, IcebergSinkConfig config) { this.config = config; @@ -47,6 +55,10 @@ public SinkWriter(Catalog catalog, IcebergSinkConfig config) { this.sourceOffsets = Maps.newHashMap(); } + public void setReporter(ErrantRecordReporter reporter) { + this.reporter = reporter; + } + public void close() { writers.values().forEach(RecordWriter::close); } @@ -65,7 +77,23 @@ public SinkWriterResult completeWrite() { } public void save(Collection sinkRecords) { - sinkRecords.forEach(this::save); + for (SinkRecord record : sinkRecords) { + try { + this.save(record); + } catch (DataException ex) { + if (this.reporter != null) { + this.reporter.report(record, ex); + } + if (this.config.errorTolerance().equalsIgnoreCase(ErrorTolerance.ALL.toString())) { + LOG.error( + "Data exception encountered while saving record but tolerated due to error tolerance settings. " + + "To change this behavior, set 'errors.tolerance' to 'none':", + ex); + } else { + throw ex; + } + } + } } private void save(SinkRecord record) { diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java index 4a17b926fc56..cc6fa7810e1a 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; @@ -44,10 +45,13 @@ import org.apache.iceberg.types.Types; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; public class SinkWriterTest { @@ -169,6 +173,11 @@ public void testDynamicNoRoute() { private List sinkWriterTest( Map value, IcebergSinkConfig config) { + return sinkWriterTest(value, config, null); + } + + private List sinkWriterTest( + Map value, IcebergSinkConfig config, ErrantRecordReporter reporter) { IcebergWriterResult writeResult = new IcebergWriterResult( TableIdentifier.parse(TABLE_NAME), @@ -182,7 +191,7 @@ private List sinkWriterTest( when(writerFactory.createWriter(any(), any(), anyBoolean())).thenReturn(writer); SinkWriter sinkWriter = new SinkWriter(catalog, config); - + sinkWriter.setReporter(reporter); // save a record Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); SinkRecord rec = @@ -207,4 +216,75 @@ private List sinkWriterTest( return result.writerResults(); } + + @Test + public void testErrorToleranceAll() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.ALL.toString()); + when(config.errorLogIncludeMessages()).thenReturn(true); + + ErrantRecordReporter reporter = mock(ErrantRecordReporter.class); + when(reporter.report(any(), any())) + .then( + invocation -> { + return null; + }); + Map badValue = ImmutableMap.of("id", "abc"); + List writerResults1 = sinkWriterTest(badValue, config, reporter); + assertThat(writerResults1.size()).isEqualTo(1); + } + + @Test + public void testErrorToleranceNone() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.NONE.toString()); + + Map badValue = ImmutableMap.of("id", "abc"); + assertThatThrownBy(() -> sinkWriterTest(badValue, config)) + .isInstanceOf(DataException.class) + .hasMessage("An error occurred converting record, topic: topic, partition, 1, offset: 100"); + } + + @Test + public void testErrorToleranceNoneErrorLogIncludeMessages() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.NONE.toString()); + when(config.errorLogIncludeMessages()).thenReturn(true); + + Map badValue = ImmutableMap.of("id", "abc"); + assertThatThrownBy(() -> sinkWriterTest(badValue, config)) + .isInstanceOf(DataException.class) + .hasStackTraceContaining( + "Caused by: java.lang.NumberFormatException: For input string: \"abc\"\n") + .hasMessage( + "An error occurred converting record, topic: topic, partition, 1, offset: 100, record: {id=abc}"); + } + + @Test + public void testErrantRecordReporter() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.ALL.toString()); + + ErrantRecordReporter reporter = mock(ErrantRecordReporter.class); + when(reporter.report(any(), any())) + .then( + invocation -> { + return null; + }); + + Map badValue = ImmutableMap.of("id", "abc"); + List writerResults1 = sinkWriterTest(badValue, config, reporter); + assertThat(writerResults1.size()).isEqualTo(1); + + // Verify report function was called once + Mockito.verify(reporter, Mockito.times(1)).report(any(), any()); + } }