From 1e0b1bb8ea5e094298662f81ac62cee81e9e665e Mon Sep 17 00:00:00 2001 From: Devendra Parhate Date: Thu, 8 May 2025 13:32:47 +0530 Subject: [PATCH 1/5] Implement error handling mechanism for data exception --- .../iceberg/connect/IcebergSinkConfig.java | 26 ++++++++++++++ .../iceberg/connect/data/ErrorTolerance.java | 34 +++++++++++++++++++ .../iceberg/connect/data/IcebergWriter.java | 34 ++++++++++++++----- 3 files changed, 85 insertions(+), 9 deletions(-) create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index a4e15932f1a3..b98628beeb5c 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.connect.data.ErrorTolerance; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -79,6 +80,8 @@ public class IcebergSinkConfig extends AbstractConfig { "iceberg.tables.schema-force-optional"; private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP = "iceberg.tables.schema-case-insensitive"; + private static final String ERROR_TOLERANCE = "errors.tolerance"; + private static final String ERROR_LOG_INCLUDE_MESSAGES = "errors.log.include.messages"; private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic"; private static final String CONTROL_GROUP_ID_PREFIX_PROP = "iceberg.control.group-id-prefix"; private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms"; @@ -94,6 +97,9 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String NAME_PROP = "name"; private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers"; + private static final String DEFAULT_ERROR_TOLERANCE = ErrorTolerance.NONE.toString(); + private static final String DEFAULT_ERROR_LOG_INCLUDE_MESSAGES = "false"; + private static final String DEFAULT_CATALOG_NAME = "iceberg"; private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg"; public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-"; @@ -177,6 +183,18 @@ private static ConfigDef newConfigDef() { DEFAULT_CATALOG_NAME, Importance.MEDIUM, "Iceberg catalog name"); + configDef.define( + ERROR_TOLERANCE, + ConfigDef.Type.STRING, + DEFAULT_ERROR_TOLERANCE, + Importance.MEDIUM, + "Behavior for tolerating errors during connector operation. 'none' is the default value and signals that any error will result in an immediate connector task failure; 'all' changes the behavior to skip over problematic records."); + configDef.define( + ERROR_LOG_INCLUDE_MESSAGES, + ConfigDef.Type.BOOLEAN, + DEFAULT_ERROR_LOG_INCLUDE_MESSAGES, + Importance.MEDIUM, + "If true, write each error and the details of the failed operation and problematic record to the Connect application log. This is 'false' by default, so that only errors that are not tolerated are reported."); configDef.define( CONTROL_TOPIC_PROP, ConfigDef.Type.STRING, @@ -434,6 +452,14 @@ public JsonConverter jsonConverter() { return jsonConverter; } + public String errorTolerance() { + return getString(ERROR_TOLERANCE); + } + + public boolean errorLogIncludeMessages() { + return getBoolean(ERROR_LOG_INCLUDE_MESSAGES); + } + @VisibleForTesting static boolean checkClassName(String className) { return (className.matches(".*\\.ConnectDistributed.*") diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java new file mode 100644 index 000000000000..cc4cc2599479 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/ErrorTolerance.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.Locale; + +public enum ErrorTolerance { + + /** Tolerate no errors. */ + NONE, + + /** Tolerate all errors. */ + ALL; + + public String value() { + return name().toLowerCase(Locale.ROOT); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java index b5be5b3a0047..32ff698e7b76 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java @@ -32,8 +32,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class IcebergWriter implements RecordWriter { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergWriter.class); + private final Table table; private final String tableName; private final IcebergSinkConfig config; @@ -56,7 +61,7 @@ private void initNewWriter() { } @Override - public void write(SinkRecord record) { + public void write(SinkRecord record) throws DataException { try { // ignore tombstones... if (record.value() != null) { @@ -64,14 +69,25 @@ public void write(SinkRecord record) { writer.write(row); } } catch (Exception e) { - throw new DataException( - String.format( - Locale.ROOT, - "An error occurred converting record, topic: %s, partition, %d, offset: %d", - record.topic(), - record.kafkaPartition(), - record.kafkaOffset()), - e); + String recordData = ""; + if (this.config.errorLogIncludeMessages()) { + recordData = String.format(", record: %s", record.value().toString()); + } + DataException ex = + new DataException( + String.format( + Locale.ROOT, + "topic: %s, partition, %d, offset: %d %s", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + recordData), + e); + if (this.config.errorTolerance().equalsIgnoreCase(ErrorTolerance.ALL.toString())) { + LOG.error("An error occurred converting record...", ex); + } else { + throw ex; + } } } From 4055bccd50c4a95f22e389c60023a1d4256c0a2c Mon Sep 17 00:00:00 2001 From: Devendra Parhate Date: Wed, 17 Dec 2025 12:47:22 +0530 Subject: [PATCH 2/5] Update DLQ implementation to use errant reporter --- .../connect/channel/CommitterImpl.java | 1 + .../iceberg/connect/data/IcebergWriter.java | 48 +++++++++---------- .../iceberg/connect/data/SinkWriter.java | 30 +++++++++++- 3 files changed, 52 insertions(+), 27 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java index 853a049f6bec..cc44d8170251 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java @@ -166,6 +166,7 @@ private void startWorker() { if (null == this.worker) { LOG.info("Starting commit worker"); SinkWriter sinkWriter = new SinkWriter(catalog, config); + sinkWriter.setReporter(context.errantRecordReporter()); worker = new Worker(config, clientFactory, sinkWriter, context); worker.start(); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java index 32ff698e7b76..a0a7d60b366b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java @@ -32,13 +32,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; class IcebergWriter implements RecordWriter { - - private static final Logger LOG = LoggerFactory.getLogger(IcebergWriter.class); - private final Table table; private final String tableName; private final IcebergSinkConfig config; @@ -61,32 +56,33 @@ private void initNewWriter() { } @Override - public void write(SinkRecord record) throws DataException { + public void write(SinkRecord record) { + Record row = null; try { // ignore tombstones... if (record.value() != null) { - Record row = convertToRow(record); - writer.write(row); + row = convertToRow(record); } } catch (Exception e) { - String recordData = ""; - if (this.config.errorLogIncludeMessages()) { - recordData = String.format(", record: %s", record.value().toString()); - } - DataException ex = - new DataException( - String.format( - Locale.ROOT, - "topic: %s, partition, %d, offset: %d %s", - record.topic(), - record.kafkaPartition(), - record.kafkaOffset(), - recordData), - e); - if (this.config.errorTolerance().equalsIgnoreCase(ErrorTolerance.ALL.toString())) { - LOG.error("An error occurred converting record...", ex); - } else { - throw ex; + String recordData = + this.config.errorLogIncludeMessages() + ? String.format(", record: %s", record.value().toString()) + : ""; + throw new DataException( + String.format( + Locale.ROOT, + "An error occurred converting record, topic: %s, partition, %d, offset: %d%s", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + recordData), + e); + } + if (row != null) { + try { + writer.write(row); + } catch (IOException e) { + throw new UncheckedIOException(e); } } } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java index f81155e13777..6fe0fcfd6558 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -32,13 +32,21 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SinkWriter { + + private static final Logger LOG = LoggerFactory.getLogger(SinkWriter.class); + private final IcebergSinkConfig config; private final IcebergWriterFactory writerFactory; private final Map writers; private final Map sourceOffsets; + private ErrantRecordReporter reporter; public SinkWriter(Catalog catalog, IcebergSinkConfig config) { this.config = config; @@ -47,6 +55,10 @@ public SinkWriter(Catalog catalog, IcebergSinkConfig config) { this.sourceOffsets = Maps.newHashMap(); } + public void setReporter(ErrantRecordReporter reporter) { + this.reporter = reporter; + } + public void close() { writers.values().forEach(RecordWriter::close); } @@ -65,7 +77,23 @@ public SinkWriterResult completeWrite() { } public void save(Collection sinkRecords) { - sinkRecords.forEach(this::save); + for (SinkRecord record : sinkRecords) { + try { + this.save(record); + } catch (DataException ex) { + if (this.reporter != null) { + this.reporter.report(record, ex); + } + if (this.config.errorTolerance().equalsIgnoreCase(ErrorTolerance.ALL.toString())) { + LOG.error( + "Data exception encountered while saving record but tolerated due to error tolerance settings. " + + "To change this behavior, set 'errors.tolerance' to 'none':", + ex); + } else { + throw ex; + } + } + } } private void save(SinkRecord record) { From a994331a6e2d19cd1a1e741e761c4a1b111c412f Mon Sep 17 00:00:00 2001 From: Devendra Parhate Date: Wed, 17 Dec 2025 13:45:00 +0530 Subject: [PATCH 3/5] Add tests --- .../org/apache/iceberg/io/BaseTaskWriter.java | 21 +++-- .../iceberg/connect/data/SinkWriterTest.java | 82 ++++++++++++++++++- 2 files changed, 95 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index 0834c7156a9c..25047d1a45cd 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -44,8 +44,12 @@ import org.apache.iceberg.util.StructProjection; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class BaseTaskWriter implements TaskWriter { + private static final Logger LOG = LoggerFactory.getLogger(BaseTaskWriter.class); + private final List completedDataFiles = Lists.newArrayList(); private final List completedDeleteFiles = Lists.newArrayList(); private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); @@ -345,12 +349,17 @@ private void closeCurrent() throws IOException { currentWriter.close(); if (currentRows == 0L) { - try { - io.deleteFile(currentFile.encryptingOutputFile()); - } catch (UncheckedIOException e) { - // the file may not have been created, and it isn't worth failing the job to clean up, - // skip deleting - } + // the file may not have been created or cannot be deleted, and it isn't worth failing + // the job to clean up, skip deleting + Tasks.foreach(currentFile.encryptingOutputFile()) + .suppressFailureWhenFinished() + .onFailure( + (file, exc) -> + LOG.warn( + "Failed to delete the uncommitted empty file during writer clean up: {}", + file, + exc)) + .run(io::deleteFile); } else { complete(currentWriter); } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java index 4a17b926fc56..161d1b121a22 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; @@ -44,10 +45,13 @@ import org.apache.iceberg.types.Types; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; public class SinkWriterTest { @@ -168,7 +172,12 @@ public void testDynamicNoRoute() { } private List sinkWriterTest( - Map value, IcebergSinkConfig config) { + Map value, IcebergSinkConfig config) { + return sinkWriterTest(value, config, null); + } + + private List sinkWriterTest( + Map value, IcebergSinkConfig config, ErrantRecordReporter reporter) { IcebergWriterResult writeResult = new IcebergWriterResult( TableIdentifier.parse(TABLE_NAME), @@ -182,7 +191,7 @@ private List sinkWriterTest( when(writerFactory.createWriter(any(), any(), anyBoolean())).thenReturn(writer); SinkWriter sinkWriter = new SinkWriter(catalog, config); - + sinkWriter.setReporter(reporter); // save a record Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); SinkRecord rec = @@ -207,4 +216,73 @@ private List sinkWriterTest( return result.writerResults(); } + + @Test + public void testErrorToleranceAll() { + + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.ALL.toString()); + when(config.errorLogIncludeMessages()).thenReturn(true); + + + Map value = ImmutableMap.of("id", 1); + List writerResults = sinkWriterTest(value, config); + Map badValue = ImmutableMap.of("id", "abc"); + List writerResults1 = sinkWriterTest(badValue, config); + assertThat(writerResults1.size()).isEqualTo(1); + } + + @Test + public void testErrorToleranceNone() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.NONE.toString()); + + Map badValue = ImmutableMap.of("id", "abc"); + assertThatThrownBy(() -> sinkWriterTest(badValue, config)) + .isInstanceOf(DataException.class) + .hasMessage("An error occurred converting record, topic: topic, partition, 1, offset: 100"); + } + + @Test + public void testErrorToleranceNoneErrorLogIncludeMessages() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.NONE.toString()); + when(config.errorLogIncludeMessages()).thenReturn(true); + + Map badValue = ImmutableMap.of("id", "abc"); + assertThatThrownBy(() -> sinkWriterTest(badValue, config)) + .isInstanceOf(DataException.class) + .hasStackTraceContaining( + "Caused by: java.lang.NumberFormatException: For input string: \"abc\"\n") + .hasMessage( + "An error occurred converting record, topic: topic, partition, 1, offset: 100, record: {id=abc}"); + } + + @Test + public void testErrantRecordReporter() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tables()).thenReturn(ImmutableList.of(TABLE_IDENTIFIER.toString())); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.errorTolerance()).thenReturn(ErrorTolerance.ALL.toString()); + + ErrantRecordReporter reporter = mock(ErrantRecordReporter.class); + when(reporter.report(any(), any())) + .then( + invocation -> { + return null; + }); + + Map badValue = ImmutableMap.of("id", "abc"); + List writerResults1 = sinkWriterTest(badValue, config, reporter); + assertThat(writerResults1.size()).isEqualTo(1); + + // Verify report function was called once + Mockito.verify(reporter, Mockito.times(1)).report(any(), any()); + } } From 30795833c2579c2b8c414d167200f2087202d79a Mon Sep 17 00:00:00 2001 From: Devendra Parhate Date: Wed, 17 Dec 2025 13:52:52 +0530 Subject: [PATCH 4/5] Add tests --- core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java | 1 - .../java/org/apache/iceberg/connect/data/SinkWriterTest.java | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index 25047d1a45cd..0edf3662a18a 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -20,7 +20,6 @@ import java.io.Closeable; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.List; import java.util.Map; import org.apache.iceberg.DataFile; diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java index 161d1b121a22..05f040a1385a 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java @@ -172,7 +172,7 @@ public void testDynamicNoRoute() { } private List sinkWriterTest( - Map value, IcebergSinkConfig config) { + Map value, IcebergSinkConfig config) { return sinkWriterTest(value, config, null); } @@ -226,7 +226,6 @@ public void testErrorToleranceAll() { when(config.errorTolerance()).thenReturn(ErrorTolerance.ALL.toString()); when(config.errorLogIncludeMessages()).thenReturn(true); - Map value = ImmutableMap.of("id", 1); List writerResults = sinkWriterTest(value, config); Map badValue = ImmutableMap.of("id", "abc"); From 3c761466338dc94fbef6c6c097137a443a19cfc0 Mon Sep 17 00:00:00 2001 From: Devendra Parhate Date: Wed, 17 Dec 2025 14:31:02 +0530 Subject: [PATCH 5/5] Add tests --- .../java/org/apache/iceberg/connect/data/SinkWriterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java index 05f040a1385a..f7a3a955a1c4 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/SinkWriterTest.java @@ -20,7 +20,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock;