freqDistribution = new HashMap<>();
+ for (int freq : idCounts.values()) {
+ freqDistribution.merge(freq, 1, Integer::sum);
+ }
+
+ System.out.println("\n=== 重复 ID 频次分布 ===");
+ freqDistribution.entrySet().stream()
+ .sorted(Map.Entry.comparingByKey())
+ .forEach(e -> System.out.printf(
+ "出现 %d 次的 ID 有 %d 个%n", e.getKey(), e.getValue()));
+
+ // 计算实际重复率:基于唯一 ID 数 vs 总记录数
+ int uniqueIds = idCounts.size();
+ int totalRecords = idCounts.values().stream().mapToInt(Integer::intValue).sum();
+ int duplicateRecords = totalRecords - uniqueIds;
+ double actualDuplicateRate = (double) duplicateRecords / totalRecords * 100;
+
+ System.out.printf("%n实际重复率分析:%n");
+ System.out.printf(" 总记录数: %,d%n", totalRecords);
+ System.out.printf(" 唯一 ID 数: %,d%n", uniqueIds);
+ System.out.printf(" 重复记录数: %,d%n", duplicateRecords);
+ System.out.printf(" 实际重复率: %.2f%%%n", actualDuplicateRate);
+
+ // 70% 重复率下,实际重复率应在 60%-80% 范围
+ assertTrue(actualDuplicateRate >= 60 && actualDuplicateRate <= 80,
+ String.format("实际重复率 %.2f%% 应在 60%%-80%% 范围内(配置 70%%)", actualDuplicateRate));
+ }
+ }
+}
diff --git a/connectors/connector-perf-test/src/test/java/io/tapdata/connector/paimon/perf/S3FileScanDebugTest.java b/connectors/connector-perf-test/src/test/java/io/tapdata/connector/paimon/perf/S3FileScanDebugTest.java
new file mode 100644
index 00000000..a5c2efa0
--- /dev/null
+++ b/connectors/connector-perf-test/src/test/java/io/tapdata/connector/paimon/perf/S3FileScanDebugTest.java
@@ -0,0 +1,176 @@
+package io.tapdata.connector.paimon.perf;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+
+/**
+ * S3 文件扫描调试测试(使用 AWS S3 SDK)
+ */
+public class S3FileScanDebugTest {
+
+ // S3 配置(根据你的实际情况修改)
+ private static final String S3_ENDPOINT = "http://192.168.1.184:9081";
+ private static final String S3_ACCESS_KEY = "admin";
+ private static final String S3_SECRET_KEY = "admin123";
+ private static final String S3_REGION = "us-east-1";
+ private static final String S3_BUCKET = "luke";
+ private static final String S3_WAREHOUSE = "warehouse-paimon-perf";
+
+ public static void main(String[] args) {
+ System.out.println("=== S3 文件扫描调试测试(AWS S3 SDK) ===\n");
+
+ try {
+ test1_S3Connection();
+ test2_ListBucket();
+ test3_ScanWarehouse();
+
+ System.out.println("\n=== ✅ 所有测试通过 ===");
+ } catch (Exception e) {
+ System.err.println("\n❌ 测试失败: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 测试 1: S3 连接验证
+ */
+ private static void test1_S3Connection() {
+ System.out.println("[测试 1] 验证 S3 连接...");
+ System.out.println(" Endpoint: " + S3_ENDPOINT);
+ System.out.println(" Bucket: " + S3_BUCKET);
+ System.out.println(" Region: " + S3_REGION);
+
+ AmazonS3 s3Client = createS3Client();
+
+ try {
+ // 验证 bucket 是否存在
+ boolean exists = s3Client.doesBucketExistV2(S3_BUCKET);
+ System.out.println(" Bucket 存在: " + exists);
+
+ if (exists) {
+ System.out.println(" ✅ S3 连接成功");
+ } else {
+ System.out.println(" ⚠️ Bucket 不存在,请检查配置");
+ }
+ } catch (Exception e) {
+ System.out.println(" ❌ 连接失败: " + e.getMessage());
+ } finally {
+ s3Client.shutdown();
+ }
+ }
+
+ /**
+ * 测试 2: 列出 Bucket 内容
+ */
+ private static void test2_ListBucket() {
+ System.out.println("\n[测试 2] 列出 Bucket 内容...");
+ AmazonS3 s3Client = createS3Client();
+
+ try {
+ com.amazonaws.services.s3.model.ObjectListing listing = s3Client.listObjects(S3_BUCKET);
+
+ System.out.println(" 对象总数: " + listing.getObjectSummaries().size());
+
+ if (!listing.getObjectSummaries().isEmpty()) {
+ System.out.println(" 前 10 个对象:");
+ int count = 0;
+ for (S3ObjectSummary summary : listing.getObjectSummaries()) {
+ if (count >= 10) break;
+ System.out.printf(" - %s (%s)%n", summary.getKey(), formatSize(summary.getSize()));
+ count++;
+ }
+ }
+
+ System.out.println(" ✅ Bucket 列出成功");
+ } catch (Exception e) {
+ System.out.println(" ❌ 列出失败: " + e.getMessage());
+ } finally {
+ s3Client.shutdown();
+ }
+ }
+
+ /**
+ * 测试 3: 扫描 Warehouse 目录
+ */
+ private static void test3_ScanWarehouse() {
+ System.out.println("\n[测试 3] 扫描 Warehouse 目录...");
+ String warehousePath = S3_WAREHOUSE;
+ System.out.println(" Warehouse: " + warehousePath);
+
+ AmazonS3 s3Client = createS3Client();
+
+ try {
+ // 递归列出所有对象
+ int fileCount = 0;
+ long totalSize = 0;
+ int parquetCount = 0;
+
+ com.amazonaws.services.s3.model.ObjectListing listing = null;
+ do {
+ if (listing == null) {
+ listing = s3Client.listObjects(S3_BUCKET, warehousePath);
+ } else {
+ listing = s3Client.listNextBatchOfObjects(listing);
+ }
+
+ for (S3ObjectSummary summary : listing.getObjectSummaries()) {
+ String key = summary.getKey();
+ fileCount++;
+ totalSize += summary.getSize();
+
+ if (key.endsWith(".parquet")) {
+ parquetCount++;
+ }
+
+ // 显示前 10 个 parquet 文件
+ if (parquetCount <= 10 && key.endsWith(".parquet")) {
+ System.out.printf(" 📄 %s (%s)%n", key, formatSize(summary.getSize()));
+ }
+ }
+ } while (listing.isTruncated());
+
+ System.out.println("\n 统计信息:");
+ System.out.printf(" 总文件数: %d%n", fileCount);
+ System.out.printf(" Parquet 文件: %d%n", parquetCount);
+ System.out.printf(" 总大小: %s%n", formatSize(totalSize));
+
+ if (fileCount > 0) {
+ System.out.println(" ✅ Warehouse 扫描成功");
+ } else {
+ System.out.println(" ⚠️ 未找到文件");
+ }
+
+ } catch (Exception e) {
+ System.out.println(" ❌ 扫描失败: " + e.getMessage());
+ e.printStackTrace();
+ } finally {
+ s3Client.shutdown();
+ }
+ }
+
+ /**
+ * 创建 S3 客户端
+ */
+ private static AmazonS3 createS3Client() {
+ BasicAWSCredentials awsCreds = new BasicAWSCredentials(S3_ACCESS_KEY, S3_SECRET_KEY);
+
+ return AmazonS3ClientBuilder.standard()
+ .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(S3_ENDPOINT, S3_REGION))
+ .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+ .withPathStyleAccessEnabled(true) // MinIO 需要
+ .disableChunkedEncoding()
+ .build();
+ }
+
+ private static String formatSize(long size) {
+ if (size < 1024) return size + " B";
+ if (size < 1024 * 1024) return String.format("%.2f KB", size / 1024.0);
+ if (size < 1024 * 1024 * 1024) return String.format("%.2f MB", size / (1024.0 * 1024));
+ return String.format("%.2f GB", size / (1024.0 * 1024 * 1024));
+ }
+}
diff --git a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/PaimonConnector.java b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/PaimonConnector.java
index c7709949..57fc17ea 100644
--- a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/PaimonConnector.java
+++ b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/PaimonConnector.java
@@ -77,7 +77,7 @@ public void onStart(TapConnectionContext connectionContext) throws Throwable {
connectionContext.getLog().info("Flush offset callback registered for StarRocks connector");
}
// Initialize Paimon service
- paimonService = new PaimonService(paimonConfig);
+ paimonService = new PaimonService(paimonConfig, connectionContext.getLog());
paimonService.setFlushOffsetCallback(flushOffsetCallback);
paimonService.init();
@@ -248,7 +248,7 @@ private CreateTableOptions createTable(TapConnectorContext connectorContext,
TapTable table = createTableEvent.getTable();
log.info("Creating Paimon table: " + table.getName());
- boolean created = paimonService.createTable(table, log);
+ boolean created = paimonService.createTable(table);
if (created) {
log.info("Table created successfully: " + table.getName());
} else {
diff --git a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/config/PaimonConfig.java b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/config/PaimonConfig.java
index 218e5d0e..09a087cc 100644
--- a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/config/PaimonConfig.java
+++ b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/config/PaimonConfig.java
@@ -94,8 +94,8 @@ public class PaimonConfig extends CommonDbConfig implements Serializable {
// Compaction merges small files for better query performance
private Boolean enableAutoCompaction = true;
- // Compaction interval in minutes (default: 30 minutes)
- private Integer compactionIntervalMinutes = 30;
+ // Full Compaction interval in minutes (default: 60 minutes)
+ private Integer compactionIntervalMinutes = 60;
// Target file size in MB (default: 128MB)
// Paimon will try to create files of this size
diff --git a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/ManagedIOStreamTableWrite.java b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/ManagedIOStreamTableWrite.java
new file mode 100644
index 00000000..cc20c9bf
--- /dev/null
+++ b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/ManagedIOStreamTableWrite.java
@@ -0,0 +1,119 @@
+package io.tapdata.connector.paimon.service;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableWrite;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+
+/**
+ * StreamTableWrite wrapper that owns IOManager lifecycle for non-Flink usage.
+ *
+ * Close order is strict: close write first, then close IOManager in finally block.
+ */
+public class ManagedIOStreamTableWrite implements StreamTableWrite {
+
+ private final StreamTableWrite delegate;
+ private final IOManager ioManager;
+
+ public ManagedIOStreamTableWrite(StreamTableWrite delegate, IOManager ioManager) {
+ this.delegate = delegate;
+ this.ioManager = ioManager;
+ }
+
+ @Override
+ public TableWrite withIOManager(IOManager ioManager) {
+ delegate.withIOManager(ioManager);
+ return this;
+ }
+
+ @Override
+ public TableWrite withWriteType(RowType writeType) {
+ delegate.withWriteType(writeType);
+ return this;
+ }
+
+ @Override
+ public TableWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) {
+ delegate.withMemoryPoolFactory(memoryPoolFactory);
+ return this;
+ }
+
+ @Override
+ public BinaryRow getPartition(InternalRow row) {
+ return delegate.getPartition(row);
+ }
+
+ @Override
+ public int getBucket(InternalRow row) {
+ return delegate.getBucket(row);
+ }
+
+ @Override
+ public void write(InternalRow row) throws Exception {
+ delegate.write(row);
+ }
+
+ @Override
+ public void write(InternalRow row, int bucket) throws Exception {
+ delegate.write(row, bucket);
+ }
+
+ @Override
+ public void writeBundle(BinaryRow partition, int bucket, BundleRecords bundle) throws Exception {
+ delegate.writeBundle(partition, bucket, bundle);
+ }
+
+ @Override
+ public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception {
+ delegate.compact(partition, bucket, fullCompaction);
+ }
+
+ @Override
+ public TableWrite withMetricRegistry(MetricRegistry registry) {
+ delegate.withMetricRegistry(registry);
+ return this;
+ }
+
+ @Override
+ public List prepareCommit(boolean waitCompaction, long commitIdentifier) throws Exception {
+ return delegate.prepareCommit(waitCompaction, commitIdentifier);
+ }
+
+ @Override
+ public void close() throws Exception {
+ Exception writeError = null;
+ try {
+ delegate.close();
+ } catch (Exception e) {
+ writeError = e;
+ }
+
+ Exception ioManagerError = null;
+ if (ioManager != null) {
+ try {
+ ioManager.close();
+ } catch (Exception e) {
+ ioManagerError = e;
+ }
+ }
+
+ if (writeError != null) {
+ if (ioManagerError != null) {
+ writeError.addSuppressed(ioManagerError);
+ }
+ throw writeError;
+ }
+ if (ioManagerError != null) {
+ throw ioManagerError;
+ }
+ }
+}
+
diff --git a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java
index c0d2f8e4..45c6079d 100644
--- a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java
+++ b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java
@@ -9,7 +9,6 @@
import io.tapdata.entity.event.dml.TapRecordEvent;
import io.tapdata.entity.event.dml.TapUpdateRecordEvent;
import io.tapdata.entity.logger.Log;
-import io.tapdata.entity.logger.TapLogger;
import io.tapdata.entity.schema.TapField;
import io.tapdata.entity.schema.TapIndex;
import io.tapdata.entity.schema.TapIndexField;
@@ -34,10 +33,7 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.StreamTableCommit;
-import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.sink.*;
import org.apache.paimon.table.source.*;
import org.apache.paimon.table.source.TableScan.Plan;
import org.apache.paimon.types.*;
@@ -131,8 +127,13 @@ protected boolean removeEldestEntry(Map.Entry> elde
}
}
);
+ /**
+ * save tapContext log
+ */
+ private Log log;
- public PaimonService(PaimonConfig config) {
+ public PaimonService(PaimonConfig config, Log log) {
+ this.log = log;
this.config = config;
this.firstOffsetByTable = Collections.synchronizedMap(new LinkedHashMap<>());
}
@@ -158,7 +159,7 @@ public void init() throws Exception {
// Create catalog
catalog = CatalogFactory.createCatalog(context);
-// // Initialize async commit if enabled
+ // Initialize async commit if enabled
initAsyncCommit();
}
@@ -196,7 +197,7 @@ private void initAsyncCommit() {
}
} catch (Exception e) {
// Log error but don't stop the scheduler
- System.err.println("Error in async commit: " + e.getMessage());
+ log.error("Error in async commit: {}", e.getMessage(), e);
}
}, commitInterval, commitInterval, TimeUnit.MILLISECONDS);
}
@@ -223,6 +224,14 @@ private void configureStorage(Options options) {
options.set("s3.upload.part-size", "16mb");
options.set("s3.fast-upload", "true");
options.set("s3.accelerate-mode", "true");
+ // 解决连接重置:调低并发、增大超时
+// options.set("fs.s3a.connection.maximum", "32");
+// options.set("fs.s3a.connection.timeout", "300000");
+// options.set("fs.s3a.socket.timeout", "300000");
+// // 重试机制(解决临时连接失败)
+// options.set("fs.s3a.retry.limit", "5");
+// options.set("fs.s3a.retry.interval", "1000");
+
break;
case "hdfs":
options.set("fs.defaultFS", "hdfs://" + config.getHdfsHost() + ":" + config.getHdfsPort());
@@ -477,7 +486,7 @@ private String convertDataType(DataType dataType) {
* @return true if created, false if already exists
* @throws Exception if creation fails
*/
- public boolean createTable(TapTable tapTable, Log log) throws Exception {
+ public boolean createTable(TapTable tapTable) throws Exception {
String database = config.getDatabase();
String tableName = tapTable.getName();
@@ -574,7 +583,6 @@ public boolean createTable(TapTable tapTable, Log log) throws Exception {
if (Boolean.TRUE.equals(config.getDiskOverflowWrite())) {
schemaBuilder.option("write-buffer-spillable", "true");
schemaBuilder.option("write-buffer-spill.max-disk-size", config.getDiskMaxSize() + "gb");
- schemaBuilder.option("write-buffer-spill.tmp-dirs", config.getDiskTmpDir(tableName));
}
// 2. Target file size - Paimon will try to create files of this size
@@ -587,25 +595,24 @@ public boolean createTable(TapTable tapTable, Log log) throws Exception {
if (config.getEnableAutoCompaction(tableName) != null) {
if (config.getEnableAutoCompaction(tableName)) {
// Enable full compaction for better query performance
- schemaBuilder.option("compaction.async.enabled", "true");
schemaBuilder.option("compaction.optimization-interval", config.getCompactionIntervalMinutes(tableName) + "min");
// Set compaction strategy
schemaBuilder.option("changelog-producer", "input");
// Compact small files more aggressively
- schemaBuilder.option("num-sorted-run.compaction-trigger", "3");
- schemaBuilder.option("num-sorted-run.stop-trigger", "5");
+ schemaBuilder.option("num-sorted-run.compaction-trigger", "30");
+ schemaBuilder.option("num-sorted-run.stop-trigger", "2147483647");
} else {
// Disable auto compaction
- schemaBuilder.option("compaction.optimization-interval", "0");
+ schemaBuilder.option("write-only", "true");
}
}
// 4. Snapshot settings for better performance
// Keep more snapshots in memory for faster access
- schemaBuilder.option("snapshot.num-retained.min", "5");
- schemaBuilder.option("snapshot.num-retained.max", "50");
+ schemaBuilder.option("snapshot.num-retained.min", "2");
+ schemaBuilder.option("snapshot.num-retained.max", "5");
schemaBuilder.option("snapshot.time-retained", "30min");
// 5. Commit settings
@@ -622,7 +629,15 @@ public boolean createTable(TapTable tapTable, Log log) throws Exception {
schemaBuilder.option("sink.parallelism", String.valueOf(config.getWriteThreads()));
if (EmptyKit.isNotEmpty(config.getTableProperties(tableName))) {
- config.getTableProperties(tableName).forEach(v -> schemaBuilder.option(v.get("propKey"), v.get("propValue")));
+ config.getTableProperties(tableName).forEach(v -> {
+ if (StringUtils.isEmpty(v.get("propKey"))
+ || StringUtils.isEmpty(v.get("propValue"))
+ ) {
+ log.warn("tapdata paimon config error", "key or value exists null in tableProperties");
+ } else {
+ schemaBuilder.option(v.get("propKey"), v.get("propValue"));
+ }
+ });
}
// Create table
catalog.createTable(identifier, schemaBuilder.build(), false);
@@ -985,6 +1000,7 @@ private WriteListResult writeRecordsWithStreamWriteInternal(List
// Double-check if we still need to commit (another thread might have committed)
int finalCount = recordCount.get();
if (finalCount > 0) {
+ long commitStartTime = System.currentTimeMillis();
// Prepare commit with commitIdentifier
// Use atomic counter to generate unique, incrementing commit identifier
long commitIdentifier = commitIdentifierGenerator.incrementAndGet();
@@ -997,8 +1013,9 @@ private WriteListResult writeRecordsWithStreamWriteInternal(List
recordCount.set(0);
lastCommit.set(System.currentTimeMillis());
- connectorContext.getLog().debug("Committed {} accumulated records for table {}",
- finalCount, tableKey);
+ long commitDuration = System.currentTimeMillis() - commitStartTime;
+ connectorContext.getLog().debug("Committed {} accumulated records for table {} in {} ms",
+ finalCount, tableKey, commitDuration);
}
}
}
@@ -1009,14 +1026,14 @@ private WriteListResult writeRecordsWithStreamWriteInternal(List
} catch (Exception e) {
if (retryCount < maxRetries) {
if (isThreadGroupDestroyedError(e)) {
- connectorContext.getLog().warn("ThreadGroup destroyed in stream write, retrying... (attempt {}/{})", retryCount + 1, maxRetries);
+ connectorContext.getLog().warn("ThreadGroup destroyed in stream write, retrying... (attempt {}/{})", retryCount + 1, maxRetries, e);
} else if (isPaimonConflict(e)) {
connectorContext.getLog().warn("Commit conflict detected, retrying... (attempt {}/{})", retryCount + 1, maxRetries, e);
} else {
- connectorContext.getLog().warn("Failed to write records to table {}, error message: {}, retrying... (attempt {}/{})", tableName, e.getMessage(), retryCount + 1, maxRetries);
+ connectorContext.getLog().warn("Failed to write records to table {}, error message: {}, retrying... (attempt {}/{})", tableName, e.getMessage(), retryCount + 1, maxRetries, e);
}
retryCount++;
- reinitCatalog();
+// reinitCatalog();
CommonUtils.ignoreAnyError(() -> TimeUnit.SECONDS.sleep(1L), TAG);
continue;
}
@@ -1231,10 +1248,11 @@ private StreamTableWrite createStreamWriter(Identifier identifier) throws Except
StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
String tmpDirs = config.getDiskTmpDir(table.name());
if (StringUtils.isEmpty(tmpDirs)) {
- return writeBuilder.newWrite();
+ return new ManagedIOStreamTableWrite(writeBuilder.newWrite(), null);
} else {
- return (StreamTableWrite) writeBuilder.newWrite()
- .withIOManager(IOManager.create(splitPaths(tmpDirs)));
+ IOManager ioManager = IOManager.create(splitPaths(tmpDirs));
+ StreamTableWrite streamTableWrite = (StreamTableWrite) writeBuilder.newWrite().withIOManager(ioManager);
+ return new ManagedIOStreamTableWrite(streamTableWrite, ioManager);
}
}
@@ -1338,7 +1356,7 @@ private void handleStreamInsert(TapInsertRecordEvent event, StreamTableWrite wri
String database = config.getDatabase();
Identifier identifier = Identifier.create(database, table.getName());
GenericRow row = convertToGenericRow(after, table, identifier);
- if (config.getBucketMode(table.getName()).equals("fixed") || config.getBucketCount() == -2) {
+ if (config.getBucketMode(table.getName()).equals("fixed")) {
writer.write(row);
} else {
int bucket = selectBucketForDynamic(row, table);
@@ -2700,7 +2718,7 @@ private void commitCallback(String tableName) {
try {
flushOffsetCallback.accept(offsetToSave);
} catch (Exception e) {
- TapLogger.warn("Failed to flush offset callback: {}", e.getMessage(), e);
+ log.warn("Failed to flush offset callback: {}", e.getMessage());
}
}
}
diff --git a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/TestCase.java b/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/TestCase.java
deleted file mode 100644
index 33d2b59d..00000000
--- a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/TestCase.java
+++ /dev/null
@@ -1,473 +0,0 @@
-package io.tapdata.connector.paimon.perf;
-
-import java.util.*;
-
-import static io.tapdata.connector.paimon.perf.PerformanceTestRunner.TOTAL_RECORDS;
-import static io.tapdata.connector.paimon.perf.PerformanceTestRunner.BASE_TEST_DIR;
-
-/**
- * 性能测试用例定义
- * 参数键直接对应 Paimon CoreOptions 表选项名称,可通过 tableProperties 直接注入
- *
- * 注意:所有测试用例的 dataSize 统一使用 BATCH_SIZE,便于集中控制测试数据量
- */
-public class TestCase {
- private final String id;
- private final String name;
- private final String group;
- private final Map parameters;
- private final int primaryKeyDuplicateRate; // 0-100
- private final int qps; // 0 = unlimited
- private final String description;
-
- public TestCase(String id, String name, String group,
- Map parameters,
- int primaryKeyDuplicateRate, int qps,
- String description) {
- this.id = id;
- this.name = name;
- this.group = group;
- this.parameters = parameters;
- this.primaryKeyDuplicateRate = primaryKeyDuplicateRate;
- this.qps = qps;
- this.description = description;
- }
-
- // ─── Accessors ────────────────────────────────────────────────────────────
-
- public String getId() { return id; }
- public String getName() { return name; }
- public String getGroup() { return group; }
- public Map getParameters() { return parameters; }
-
- /**
- * 获取数据量:统一使用 BATCH_SIZE 控制
- * 所有测试用例的数据量都由 PerformanceTestRunner.BATCH_SIZE 统一管理
- */
- public long getDataSize() { return TOTAL_RECORDS; }
-
- public int getPrimaryKeyDuplicateRate() { return primaryKeyDuplicateRate; }
- public int getQps() { return qps; }
- public String getDescription() { return description; }
-
- @Override
- public String toString() {
- return String.format("[%s] %s (%s) - %s [数据量: %,d]",
- id, name, group, description, TOTAL_RECORDS);
- }
-
- // ─── Builder helper ───────────────────────────────────────────────────────
-
- private static Map params(String... kvs) {
- Map m = new LinkedHashMap<>();
- for (int i = 0; i + 1 < kvs.length; i += 2) m.put(kvs[i], kvs[i + 1]);
- return m;
- }
-
- // ─── 5.1 基础测试用例组 ────────────────────────────────────────────────────
-
- public static List createBasicTests() {
- return Arrays.asList(
- new TestCase("TC-01", "基准测试(默认配置)", "基础测试",
- params(
- "write-buffer-size", "256mb",
- "target-file-size", "128mb",
- "file.format", "parquet",
- "file.compression", "zstd",
- "bucket", "-1",
- "compaction.async.enabled", "true",
- "num-sorted-run.compaction-trigger", "5",
- "num-sorted-run.stop-trigger", "8",
- "write-only", "false"
- ),
- 0, 0,
- "默认配置基线,建立性能参考点"),
-
- new TestCase("TC-02", "大数据量测试", "基础测试",
- params(
- "write-buffer-size", "512mb",
- "write-buffer-spillable", "true",
- "target-file-size", "256mb",
- "file.format", "parquet",
- "file.compression", "zstd",
- "bucket", "-1",
- "compaction.async.enabled", "false",
- "write-only", "true"
- ),
- 0, 0,
- "大数据量写入,验证稳定性和文件大小"),
-
- new TestCase("TC-03", "小批量测试", "基础测试",
- params(
- "write-buffer-size", "64mb",
- "target-file-size", "64mb",
- "file.format", "parquet",
- "bucket", "-1",
- "compaction.async.enabled", "true"
- ),
- 0, 0,
- "小数据量,验证最小写入场景")
- );
- }
-
- // ─── 5.2 写入缓冲区测试组 ─────────────────────────────────────────────────
-
- public static List createWriteBufferTests() {
- return Arrays.asList(
- new TestCase("TC-10", "缓冲区-64MB", "写入缓冲区",
- params("write-buffer-size", "64mb", "write-buffer-spillable", "true",
- "target-file-size", "128mb", "bucket", "-1",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "64MB缓冲区,验证小缓冲区写入行为和文件数量"),
-
- new TestCase("TC-11", "缓冲区-128MB", "写入缓冲区",
- params("write-buffer-size", "128mb", "write-buffer-spillable", "true",
- "target-file-size", "128mb", "bucket", "-1",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "128MB缓冲区,中等缓冲区写入行为"),
-
- new TestCase("TC-12", "缓冲区-256MB(默认)", "写入缓冲区",
- params("write-buffer-size", "256mb", "write-buffer-spillable", "true",
- "target-file-size", "128mb", "bucket", "-1",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "256MB缓冲区(默认值)"),
-
- new TestCase("TC-13", "缓冲区-512MB", "写入缓冲区",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "256mb", "bucket", "-1",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "512MB缓冲区,大缓冲区减少flush次数"),
-
- new TestCase("TC-14", "缓冲区-1024MB", "写入缓冲区",
- params("write-buffer-size", "1024mb", "write-buffer-spillable", "true",
- "target-file-size", "512mb", "bucket", "-1",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "1GB缓冲区,超大缓冲区验证极限"),
-
- new TestCase("TC-15", "缓冲区-不可溢出", "写入缓冲区",
- params("write-buffer-size", "256mb", "write-buffer-spillable", "false",
- "target-file-size", "128mb", "bucket", "-1",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "关闭溢写,缓冲区满时触发flush,验证OOM风险"),
-
- new TestCase("TC-16", "缓冲区-可溢出", "写入缓冲区",
- params("write-buffer-size", "64mb", "write-buffer-spillable", "true",
- "write-buffer-spill.max-disk-size", "1gb",
- "target-file-size", "128mb", "bucket", "-1",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "小缓冲区+溢写磁盘,验证溢写行为和最终文件大小")
- );
- }
-
- // ─── 5.3 目标文件大小测试组 ───────────────────────────────────────────────
-
- public static List createTargetFileSizeTests() {
- return Arrays.asList(
- new TestCase("TC-20", "目标文件-64MB", "目标文件大小",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "64mb", "bucket", "-1",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "64MB目标文件,验证小目标文件的文件数量"),
-
- new TestCase("TC-21", "目标文件-128MB(默认)", "目标文件大小",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "128mb", "bucket", "-1",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "128MB目标文件(默认值)"),
-
- new TestCase("TC-22", "目标文件-256MB", "目标文件大小",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "256mb", "bucket", "-1",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "256MB目标文件,减少文件数量"),
-
- new TestCase("TC-23", "目标文件-512MB", "目标文件大小",
- params("write-buffer-size", "1024mb", "write-buffer-spillable", "true",
- "target-file-size", "512mb", "bucket", "-1",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "512MB目标文件,超大目标文件")
- );
- }
-
- // ─── 5.4 分桶策略测试组 ───────────────────────────────────────────────────
-
- public static List createBucketTests() {
- return Arrays.asList(
- new TestCase("TC-30", "分桶-bucket=-2(延迟分桶)", "分桶策略",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "256mb", "bucket", "-2",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "bucket=-2 延迟分桶,验证自动调整分桶性能和文件布局"),
-
- new TestCase("TC-31", "分桶-动态(bucket=-1)", "分桶策略",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "256mb", "bucket", "-1",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "bucket=-1 动态分桶,基准对比"),
-
- new TestCase("TC-32", "分桶-固定4桶", "分桶策略",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "256mb", "bucket", "4",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "固定4桶,验证固定分桶文件分布"),
-
- new TestCase("TC-33", "分桶-固定8桶", "分桶策略",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "256mb", "bucket", "8",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "固定8桶,验证更多分桶的文件分布"),
-
- new TestCase("TC-34", "分桶-固定16桶", "分桶策略",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "256mb", "bucket", "16",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "固定16桶"),
-
- new TestCase("TC-35", "分桶-固定32桶", "分桶策略",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "256mb", "bucket", "32",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "固定32桶,桶数过多时的文件碎片化")
- );
- }
-
- // ─── 5.5 合并(Compaction)策略测试组 ──────────────────────────────────────
-
- public static List createCompactionTests() {
- return Arrays.asList(
- new TestCase("TC-40", "合并-激进(trigger=2)", "合并策略",
- params("write-buffer-size", "256mb", "target-file-size", "128mb",
- "bucket", "-1",
- "compaction.async.enabled", "true",
- "num-sorted-run.compaction-trigger", "2",
- "num-sorted-run.stop-trigger", "5",
- "compaction.size-ratio", "1",
- "write-only", "false"),
- 0, 0, "激进合并:trigger=2,合并最频繁,验证合并开销"),
-
- new TestCase("TC-41", "合并-默认(trigger=5)", "合并策略",
- params("write-buffer-size", "256mb", "target-file-size", "128mb",
- "bucket", "-1",
- "compaction.async.enabled", "true",
- "num-sorted-run.compaction-trigger", "5",
- "num-sorted-run.stop-trigger", "8",
- "write-only", "false"),
- 0, 0, "默认合并参数"),
-
- new TestCase("TC-42", "合并-保守(trigger=10)", "合并策略",
- params("write-buffer-size", "256mb", "target-file-size", "128mb",
- "bucket", "-1",
- "compaction.async.enabled", "true",
- "num-sorted-run.compaction-trigger", "10",
- "num-sorted-run.stop-trigger", "20",
- "write-only", "false"),
- 0, 0, "保守合并:trigger=10,减少合并次数"),
-
- new TestCase("TC-43", "合并-超保守(trigger=100)", "合并策略",
- params("write-buffer-size", "512mb", "target-file-size", "256mb",
- "bucket", "-1",
- "compaction.async.enabled", "true",
- "num-sorted-run.compaction-trigger", "100",
- "num-sorted-run.stop-trigger", "200",
- "write-only", "false"),
- 0, 0, "超保守:trigger=100,几乎不触发合并,接近write-only效果"),
-
- new TestCase("TC-44", "合并-禁用异步", "合并策略",
- params("write-buffer-size", "256mb", "target-file-size", "128mb",
- "bucket", "-1",
- "compaction.async.enabled", "false",
- "write-only", "false"),
- 0, 0, "关闭异步合并,合并仅在写入时同步执行"),
-
- new TestCase("TC-45", "合并-write-only模式", "合并策略",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "256mb", "bucket", "-1",
- "compaction.async.enabled", "false",
- "write-only", "true"),
- 0, 0, "write-only=true:完全跳过合并,最大化写入吞吐")
- );
- }
-
- // ─── 5.6 无小文件写入测试组(重点) ────────────────────────────────────────
-
- public static List createNoSmallFileTests() {
- return Arrays.asList(
- new TestCase("TC-50", "无小文件-大缓冲无合并", "无小文件",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "256mb", "bucket", "1",
- "compaction.async.enabled", "false", "write-only", "true",
- "num-sorted-run.compaction-trigger", "100",
- "num-sorted-run.stop-trigger", "200"),
- 0, 0,
- "大缓冲(512MB)+大目标文件(256MB)+禁止合并,一次性写入验证"),
-
- new TestCase("TC-51", "无小文件-溢写验证", "无小文件",
- params("write-buffer-size", "128mb", "write-buffer-spillable", "true",
- "write-buffer-spill.max-disk-size", "2gb",
- "write-buffer-spill.tmp-dirs", BASE_TEST_DIR + "/tmp," + BASE_TEST_DIR + "/tmp2",
- "target-file-size", "256mb", "bucket", "1",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0,
- "小缓冲(128MB)+磁盘溢写,验证溢写后文件是否仍然大"),
-
- new TestCase("TC-52", "无小文件-bucket=-2组合", "无小文件",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "256mb", "bucket", "-2",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0,
- "性能最好配置:bucket=-2+大缓冲+禁止合并,验证延迟分桶的无小文件效果,但需要compaction数据才能可见"),
-
- new TestCase("TC-53", "无小文件-增加local-merge-buffer", "无小文件",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "512mb", "bucket", "1",
- "compaction.async.enabled", "false", "write-only", "true",
- "local-merge-buffer-size", "64mb",
- "sort-spill-buffer-size", "128mb",
- "num-sorted-run.compaction-trigger", "100",
- "num-sorted-run.stop-trigger", "200"),
- 0, 0,
- "多bucket最优生产配置:所有参数协同优化,预期最少文件数、最大文件"),
- new TestCase("TC-54", "无小文件-去除changelog", "无小文件",
- params("write-buffer-size", "512mb", "write-buffer-spillable", "true",
- "target-file-size", "512mb", "bucket", "1",
- "compaction.async.enabled", "false", "write-only", "true",
- "local-merge-buffer-size", "64mb",
- "sort-spill-buffer-size", "128mb",
- "num-sorted-run.compaction-trigger", "100",
- "num-sorted-run.stop-trigger", "200",
- "changelog-producer", "input"),
- 0, 0,
- "多bucket最优生产配置:所有参数协同优化,预期最少文件数、最大文件")
- );
- }
-
- // ─── 5.7 文件格式和压缩测试组 ─────────────────────────────────────────────
-
- public static List createFormatCompressionTests() {
- return Arrays.asList(
- new TestCase("TC-60", "格式-Parquet+ZSTD(默认)", "文件格式",
- params("write-buffer-size", "256mb", "target-file-size", "128mb",
- "file.format", "parquet", "file.compression", "zstd",
- "bucket", "-1", "write-only", "true"),
- 0, 0, "Parquet+ZSTD,默认配置基准"),
-
- new TestCase("TC-61", "格式-Parquet+LZ4", "文件格式",
- params("write-buffer-size", "256mb", "target-file-size", "128mb",
- "file.format", "parquet", "file.compression", "lz4",
- "bucket", "-1", "write-only", "true"),
- 0, 0, "Parquet+LZ4,更快压缩速度但压缩率稍低"),
-
- new TestCase("TC-62", "格式-Parquet+无压缩", "文件格式",
- params("write-buffer-size", "256mb", "target-file-size", "128mb",
- "file.format", "parquet", "file.compression", "none",
- "bucket", "-1", "write-only", "true"),
- 0, 0, "Parquet+无压缩,最快写入速度但文件最大"),
-
- new TestCase("TC-63", "格式-ORC+ZSTD", "文件格式",
- params("write-buffer-size", "256mb", "target-file-size", "128mb",
- "file.format", "orc", "file.compression", "zstd",
- "bucket", "-1", "write-only", "true"),
- 0, 0, "ORC+ZSTD,列存格式写入性能对比"),
-
- new TestCase("TC-64", "格式-ORC+LZ4", "文件格式",
- params("write-buffer-size", "256mb", "target-file-size", "128mb",
- "file.format", "orc", "file.compression", "lz4",
- "bucket", "-1", "write-only", "true"),
- 0, 0, "ORC+LZ4")
- );
- }
-
- // ─── 5.8 主键更新测试组 ───────────────────────────────────────────────────
-
- public static List createPrimaryKeyUpdateTests() {
- return Arrays.asList(
- new TestCase("TC-70", "主键-纯插入(0%重复)", "主键更新",
- params("write-buffer-size", "256mb", "target-file-size", "128mb",
- "bucket", "-1", "compaction.async.enabled", "true",
- "write-only", "false"),
- 0, 0, "0%主键重复,纯INSERT性能基准"),
-
- new TestCase("TC-71", "主键-10%重复", "主键更新",
- params("write-buffer-size", "256mb", "target-file-size", "128mb",
- "bucket", "-1", "compaction.async.enabled", "true",
- "write-only", "false"),
- 10, 0, "10%主键重复(低更新率),触发部分UPDATE合并"),
-
- new TestCase("TC-72", "主键-30%重复", "主键更新",
- params("write-buffer-size", "256mb", "target-file-size", "128mb",
- "bucket", "-1", "compaction.async.enabled", "true",
- "write-only", "false"),
- 30, 0, "30%主键重复(中等更新率)"),
-
- new TestCase("TC-73", "主键-50%重复", "主键更新",
- params("write-buffer-size", "256mb", "target-file-size", "128mb",
- "bucket", "-1", "compaction.async.enabled", "true",
- "write-only", "false"),
- 50, 0, "50%主键重复(高更新率),大量合并开销")
- );
- }
-
- // ─── 5.9 并行度测试组 ─────────────────────────────────────────────────────
-
- public static List createParallelismTests() {
- return Arrays.asList(
- new TestCase("TC-80", "并行度-1线程", "写入并行度",
- params("write-buffer-size", "512mb", "target-file-size", "256mb",
- "bucket", "1", "sink.parallelism", "1",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "单线程写入,最少文件数"),
-
- new TestCase("TC-81", "并行度-2线程", "写入并行度",
- params("write-buffer-size", "512mb", "target-file-size", "256mb",
- "bucket", "2", "sink.parallelism", "2",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "2线程写入"),
-
- new TestCase("TC-82", "并行度-4线程", "写入并行度",
- params("write-buffer-size", "512mb", "target-file-size", "256mb",
- "bucket", "4", "sink.parallelism", "4",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "4线程写入(默认)"),
-
- new TestCase("TC-83", "并行度-8线程", "写入并行度",
- params("write-buffer-size", "512mb", "target-file-size", "256mb",
- "bucket", "8", "sink.parallelism", "8",
- "compaction.async.enabled", "false", "write-only", "true"),
- 0, 0, "8线程写入")
- );
- }
-
- // ─── 组合工厂方法 ─────────────────────────────────────────────────────────
-
- public static List createAllTests() {
- List all = new ArrayList<>();
- all.addAll(createBasicTests());
- all.addAll(createWriteBufferTests());
- all.addAll(createTargetFileSizeTests());
- all.addAll(createBucketTests());
- all.addAll(createCompactionTests());
- all.addAll(createNoSmallFileTests());
- all.addAll(createFormatCompressionTests());
- all.addAll(createPrimaryKeyUpdateTests());
- all.addAll(createParallelismTests());
- return all;
- }
-
- /**
- * 获取所有支持的测试组名称
- */
- public static Map getGroupDescriptions() {
- Map m = new LinkedHashMap<>();
- m.put("basic", "基础测试用例(TC-01~03) - 默认配置基线");
- m.put("buffer", "写入缓冲区测试(TC-10~16) - write-buffer-size / spillable");
- m.put("target", "目标文件大小测试(TC-20~23) - target-file-size");
- m.put("bucket", "分桶策略测试(TC-30~35) - bucket=-2/-1/固定");
- m.put("compaction", "合并策略测试(TC-40~45) - compaction trigger/stop/write-only");
- m.put("nosmallfile", "无小文件写入测试(TC-50~53) - 生产最优配置");
- m.put("format", "文件格式&压缩测试(TC-60~64) - parquet/orc + zstd/lz4");
- m.put("pkupdate", "主键更新测试(TC-70~73) - 重复率 0%/10%/30%/50%");
- m.put("parallelism", "写入并行度测试(TC-80~83) - sink.parallelism");
- m.put("all", "全量测试(所有组)");
- return m;
- }
-}
diff --git a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/service/ManagedIOStreamTableWriteTest.java b/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/service/ManagedIOStreamTableWriteTest.java
new file mode 100644
index 00000000..089dc896
--- /dev/null
+++ b/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/service/ManagedIOStreamTableWriteTest.java
@@ -0,0 +1,209 @@
+package io.tapdata.connector.paimon.service;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.BufferFileReader;
+import org.apache.paimon.disk.BufferFileWriter;
+import org.apache.paimon.disk.FileIOChannel;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableWrite;
+import org.apache.paimon.types.RowType;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class ManagedIOStreamTableWriteTest {
+
+ @Test
+ void closeShouldCloseWriteThenIoManager() throws Exception {
+ List order = new ArrayList<>();
+ StreamTableWrite delegate = new FakeStreamTableWrite(order, null);
+ IOManager ioManager = new FakeIOManager(order, null);
+ ManagedIOStreamTableWrite writer = new ManagedIOStreamTableWrite(delegate, ioManager);
+
+ writer.close();
+
+ assertEquals(2, order.size());
+ assertEquals("write.close", order.get(0));
+ assertEquals("io.close", order.get(1));
+ }
+
+ @Test
+ void closeShouldStillCloseIoManagerWhenWriteCloseFails() {
+ List order = new ArrayList<>();
+ Exception writeError = new Exception("write close failed");
+ StreamTableWrite delegate = new FakeStreamTableWrite(order, writeError);
+ IOManager ioManager = new FakeIOManager(order, null);
+
+ ManagedIOStreamTableWrite writer = new ManagedIOStreamTableWrite(delegate, ioManager);
+
+ Exception thrown = assertThrows(Exception.class, writer::close);
+ assertSame(writeError, thrown);
+ assertEquals(2, order.size());
+ assertEquals("write.close", order.get(0));
+ assertEquals("io.close", order.get(1));
+ }
+
+ @Test
+ void closeShouldAttachIoManagerErrorAsSuppressedWhenBothFail() {
+ List order = new ArrayList<>();
+ Exception writeError = new Exception("write close failed");
+ Exception ioError = new Exception("io close failed");
+ StreamTableWrite delegate = new FakeStreamTableWrite(order, writeError);
+ IOManager ioManager = new FakeIOManager(order, ioError);
+
+ ManagedIOStreamTableWrite writer = new ManagedIOStreamTableWrite(delegate, ioManager);
+
+ Exception thrown = assertThrows(Exception.class, writer::close);
+ assertSame(writeError, thrown);
+ assertArrayEquals(new Throwable[] {ioError}, thrown.getSuppressed());
+ assertEquals(2, order.size());
+ assertEquals("write.close", order.get(0));
+ assertEquals("io.close", order.get(1));
+ }
+
+ @Test
+ void closeShouldWorkWithoutIoManager() throws Exception {
+ List order = new ArrayList<>();
+ StreamTableWrite delegate = new FakeStreamTableWrite(order, null);
+ ManagedIOStreamTableWrite writer = new ManagedIOStreamTableWrite(delegate, null);
+
+ writer.close();
+
+ assertEquals(Collections.singletonList("write.close"), order);
+ }
+
+ private static class FakeStreamTableWrite implements StreamTableWrite {
+
+ private final List order;
+ private final Exception closeError;
+
+ private FakeStreamTableWrite(List order, Exception closeError) {
+ this.order = order;
+ this.closeError = closeError;
+ }
+
+ @Override
+ public List prepareCommit(boolean waitCompaction, long commitIdentifier) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public TableWrite withIOManager(IOManager ioManager) {
+ return this;
+ }
+
+ @Override
+ public TableWrite withWriteType(RowType writeType) {
+ return this;
+ }
+
+ @Override
+ public TableWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) {
+ return this;
+ }
+
+ @Override
+ public BinaryRow getPartition(InternalRow row) {
+ return null;
+ }
+
+ @Override
+ public int getBucket(InternalRow row) {
+ return 0;
+ }
+
+ @Override
+ public void write(InternalRow row) {
+ }
+
+ @Override
+ public void write(InternalRow row, int bucket) {
+ }
+
+ @Override
+ public void writeBundle(BinaryRow partition, int bucket, BundleRecords bundle) {
+ }
+
+ @Override
+ public void compact(BinaryRow partition, int bucket, boolean fullCompaction) {
+ }
+
+ @Override
+ public TableWrite withMetricRegistry(MetricRegistry registry) {
+ return this;
+ }
+
+ @Override
+ public void close() throws Exception {
+ order.add("write.close");
+ if (closeError != null) {
+ throw closeError;
+ }
+ }
+ }
+
+ private static class FakeIOManager implements IOManager {
+
+ private final List order;
+ private final Exception closeError;
+
+ private FakeIOManager(List order, Exception closeError) {
+ this.order = order;
+ this.closeError = closeError;
+ }
+
+ @Override
+ public FileIOChannel.ID createChannel() {
+ return null;
+ }
+
+ @Override
+ public FileIOChannel.ID createChannel(String prefix) {
+ return null;
+ }
+
+ @Override
+ public String[] tempDirs() {
+ return new String[0];
+ }
+
+ @Override
+ public FileIOChannel.Enumerator createChannelEnumerator() {
+ return null;
+ }
+
+ @Override
+ public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) {
+ return null;
+ }
+
+ @Override
+ public BufferFileReader createBufferFileReader(FileIOChannel.ID channelID) {
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ order.add("io.close");
+ if (closeError != null) {
+ throw closeError;
+ }
+ }
+ }
+}
+
+
+