Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.connect.data.ErrorTolerance;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
Expand Down Expand Up @@ -79,6 +80,10 @@ public class IcebergSinkConfig extends AbstractConfig {
"iceberg.tables.schema-force-optional";
private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP =
"iceberg.tables.schema-case-insensitive";
private static final String ERROR_TOLERANCE = "errors.tolerance";
private static final String ERROR_LOG_INCLUDE_MESSAGES = "errors.log.include.messages";
private static final String ERROR_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG =
"errors.deadletterqueue.topic.name";
private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
private static final String CONTROL_GROUP_ID_PREFIX_PROP = "iceberg.control.group-id-prefix";
private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms";
Expand All @@ -95,6 +100,10 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String TASK_ID = "task.id";
private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers";

private static final String DEFAULT_ERROR_TOLERANCE = ErrorTolerance.NONE.toString();
private static final String DEFAULT_ERROR_LOG_INCLUDE_MESSAGES = "false";
private static final String DEFAULT_ERROR_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = "";

private static final String DEFAULT_CATALOG_NAME = "iceberg";
private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg";
public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-";
Expand Down Expand Up @@ -181,6 +190,24 @@ private static ConfigDef newConfigDef() {
DEFAULT_CATALOG_NAME,
Importance.MEDIUM,
"Iceberg catalog name");
configDef.define(
ERROR_TOLERANCE,
ConfigDef.Type.STRING,
DEFAULT_ERROR_TOLERANCE,
Importance.MEDIUM,
"Behavior for tolerating errors during connector operation. 'none' is the default value and signals that any error will result in an immediate connector task failure; 'all' changes the behavior to skip over problematic records.");
configDef.define(
ERROR_LOG_INCLUDE_MESSAGES,
ConfigDef.Type.BOOLEAN,
DEFAULT_ERROR_LOG_INCLUDE_MESSAGES,
Importance.MEDIUM,
"If true, write each error and the details of the failed operation and problematic record to the Connect application log. This is 'false' by default, so that only errors that are not tolerated are reported.");
configDef.define(
ERROR_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
ConfigDef.Type.STRING,
DEFAULT_ERROR_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
Importance.MEDIUM,
"The name of the topic to be used as the dead letter queue (DLQ) for messages that result in an error when processed by this sink connector, or its transformations or converters. The topic name is blank by default, which means that no messages are to be recorded in the DLQ");
configDef.define(
CONTROL_TOPIC_PROP,
ConfigDef.Type.STRING,
Expand Down Expand Up @@ -452,6 +479,18 @@ public JsonConverter jsonConverter() {
return jsonConverter;
}

public String errorTolerance() {
return getString(ERROR_TOLERANCE);
}

public boolean errorLogIncludeMessages() {
return getBoolean(ERROR_LOG_INCLUDE_MESSAGES);
}

public String errorDeadLetterQueueTopicNameConfig() {
return getString(ERROR_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG);
}

@VisibleForTesting
static boolean checkClassName(String className) {
return (className.matches(".*\\.ConnectDistributed.*")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private void processControlEvents() {
private void startWorker() {
if (null == this.worker) {
LOG.info("Starting commit worker {}-{}", config.connectorName(), config.taskId());
SinkWriter sinkWriter = new SinkWriter(catalog, config);
SinkWriter sinkWriter = new SinkWriter(catalog, config, context.errantRecordReporter());
worker = new Worker(config, clientFactory, sinkWriter, context);
worker.start();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
*
*/
package org.apache.iceberg.connect.data;

import java.util.Locale;

public enum ErrorTolerance {

/** Tolerate no errors. */
NONE,

/** Tolerate all errors. */
ALL;

public String value() {
return name().toLowerCase(Locale.ROOT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,34 @@ private void initNewWriter() {

@Override
public void write(SinkRecord record) {
Record row = null;
try {
// ignore tombstones...
if (record.value() != null) {
Record row = convertToRow(record);
writer.write(row);
row = convertToRow(record);
}
} catch (Exception e) {
String recordData = "";
if (this.config.errorLogIncludeMessages()) {
recordData = String.format(", record: %s", record.value().toString());
}
throw new DataException(
String.format(
Locale.ROOT,
"An error occurred converting record, topic: %s, partition, %d, offset: %d",
"An error occurred converting record, topic: %s, partition, %d, offset: %d%s",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset()),
record.kafkaOffset(),
recordData),
e);
}
if (row != null) {
try {
writer.write(row);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

private Record convertToRow(SinkRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,28 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SinkWriter {

private static final Logger LOG = LoggerFactory.getLogger(SinkWriter.class);

private final IcebergSinkConfig config;
private final IcebergWriterFactory writerFactory;
private final Map<String, RecordWriter> writers;
private final Map<TopicPartition, Offset> sourceOffsets;
private final ErrantRecordReporter reporter;

public SinkWriter(Catalog catalog, IcebergSinkConfig config) {
public SinkWriter(Catalog catalog, IcebergSinkConfig config, ErrantRecordReporter reporter) {
this.config = config;
this.writerFactory = new IcebergWriterFactory(catalog, config);
this.writers = Maps.newHashMap();
this.sourceOffsets = Maps.newHashMap();
this.reporter = reporter;
}

public void close() {
Expand All @@ -65,7 +74,20 @@ public SinkWriterResult completeWrite() {
}

public void save(Collection<SinkRecord> sinkRecords) {
sinkRecords.forEach(this::save);
for (SinkRecord record : sinkRecords) {
try {
this.save(record);
} catch (DataException ex) {
if (this.reporter != null) {
this.reporter.report(record, ex);
}
if (this.config.errorTolerance().equalsIgnoreCase(ErrorTolerance.ALL.toString())) {
LOG.error("An error occurred converting record...", ex);
} else {
throw ex;
}
}
}
}

private void save(SinkRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock;
Expand All @@ -44,6 +45,7 @@
import org.apache.iceberg.types.Types;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -181,7 +183,7 @@ private List<IcebergWriterResult> sinkWriterTest(
IcebergWriterFactory writerFactory = mock(IcebergWriterFactory.class);
when(writerFactory.createWriter(any(), any(), anyBoolean())).thenReturn(writer);

SinkWriter sinkWriter = new SinkWriter(catalog, config);
SinkWriter sinkWriter = new SinkWriter(catalog, config, null);

// save a record
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Expand All @@ -207,4 +209,46 @@ private List<IcebergWriterResult> sinkWriterTest(

return result.writerResults();
}


@Test
public void testErrorToleranceAll() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
when(config.errorTolerance()).thenReturn(ErrorTolerance.ALL.toString());
when(config.errorLogIncludeMessages()).thenReturn(true);

Map<String, Object> badValue = ImmutableMap.of("id", "abc");
List<IcebergWriterResult> writerResults1 = sinkWriterTest(badValue, config);
assertThat(writerResults1.size()).isEqualTo(1);
}

@Test
public void testErrorToleranceNone() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
when(config.errorTolerance()).thenReturn(ErrorTolerance.NONE.toString());

Map<String, Object> badValue = ImmutableMap.of("id", "abc");
assertThatThrownBy(() -> sinkWriterTest(badValue, config))
.isInstanceOf(DataException.class)
.hasMessage("An error occurred converting record, topic: topic, partition, 1, offset: 100");
}

@Test
public void testErrorToleranceNoneErrorLogIncludeMessages() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString()));
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
when(config.errorTolerance()).thenReturn(ErrorTolerance.NONE.toString());
when(config.errorLogIncludeMessages()).thenReturn(true);

Map<String, Object> badValue = ImmutableMap.of("id", "abc");
assertThatThrownBy(() -> sinkWriterTest(badValue, config))
.isInstanceOf(DataException.class)
.hasMessage("An error occurred converting record, topic: topic, partition, 1, offset: 100, record: {id=abc}");
}

}
Loading