diff --git a/connectors/connector-perf-test/pom.xml b/connectors/connector-perf-test/pom.xml new file mode 100644 index 00000000..db7b3c10 --- /dev/null +++ b/connectors/connector-perf-test/pom.xml @@ -0,0 +1,194 @@ + + + 4.0.0 + + io.tapdata + connector-perf-test + 1.0-SNAPSHOT + jar + + Paimon Connector Performance Test + Paimon 写入性能参数调优测试 - 独立 Maven 工程 + + + + 11 + 11 + UTF-8 + 11 + + 1.3.1 + 3.3.6 + 2.5-SNAPSHOT + 2.0.6-SNAPSHOT + 1.12.600 + 4.11.0 + 5.8.1 + + + + + + org.slf4j + slf4j-api + 2.0.12 + + + ch.qos.logback + logback-classic + 1.5.13 + + + + io.tapdata + paimon-connector + 1.0-SNAPSHOT + + + + + io.tapdata + tapdata-pdk-runner + ${tapdata.pdk.runner.version} + + + + + io.tapdata + tapdata-pdk-api + ${tapdata.api.version} + + + + + io.tapdata + tapdata-api + ${tapdata.api.version} + + + + org.apache.paimon + paimon-core + ${paimon.version} + + + org.apache.paimon + paimon-common + ${paimon.version} + + + org.apache.paimon + paimon-format + ${paimon.version} + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + log4jlog4j + org.slf4jslf4j-log4j12 + + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + + + + com.amazonaws + aws-java-sdk-s3 + ${aws.sdk.version} + + + + org.apache.commons + commons-lang3 + 3.12.0 + + + + + org.mockito + mockito-core + ${mockito.version} + + + + org.junit.jupiter + junit-jupiter + ${junit.jupiter.version} + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + ${java.version} + ${java.version} + UTF-8 + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.2 + + + + **/S3FileScanDebugTest.java + + 300 + + + + + + org.codehaus.mojo + exec-maven-plugin + 3.1.0 + + io.tapdata.connector.paimon.perf.PerformanceTestRunner + false + compile + + + + + + + + + nexus-releases + nexus-maven-release + https://nexus.tapdata.net/repository/maven-releases/ + true + false + + + nexus-snapshots + nexus-maven-snapshot + https://nexus.tapdata.net/repository/maven-snapshots/ + false + true + + + tapdata-tapdata-maven + https://tapdata-maven.pkg.coding.net/repository/tapdata/maven/ + true + true + + + diff --git a/connectors/paimon-connector/run-perf-test.sh b/connectors/connector-perf-test/run-perf-test.sh similarity index 78% rename from connectors/paimon-connector/run-perf-test.sh rename to connectors/connector-perf-test/run-perf-test.sh index e4e0f451..1020a96e 100755 --- a/connectors/paimon-connector/run-perf-test.sh +++ b/connectors/connector-perf-test/run-perf-test.sh @@ -1,9 +1,9 @@ #!/bin/bash ################################################################################ -# Paimon 写入性能参数调优测试 - 一键启动脚本 -# +# Paimon 写入性能参数调优测试 - 一键启动脚本(独立工程版) +# # 测试模式配置已统一到 TestModeConfig.java -# 新增测试模式只需修改 TestModeConfig.ALL_MODES 列表 +# 新增测试模式只需修改 TestModeConfig.java,无需修改此脚本 # # 用法: ./run-perf-test.sh [模式] # @@ -34,14 +34,6 @@ YELLOW='\033[1;33m' BLUE='\033[0;34m' NC='\033[0m' -# ═══════════════════════════════════════════════════════════════════════ -# 测试模式配置(与 TestModeConfig.java 保持一致) -# 新增模式请修改 TestModeConfig.java,无需修改此脚本 -# ═══════════════════════════════════════════════════════════════════════ - -# 所有可用的命令行别名(用于帮助提示) -VALID_MODES="basic all nosmallfile single bucket compaction buffer target format pkupdate parallelism auto" - echo -e "${BLUE}" echo "════════════════════════════════════════════════════════════" echo " Paimon 1.3.1 写入性能参数调优测试" @@ -65,7 +57,7 @@ if [ "$1" = "--help" ] || [ "$1" = "-h" ]; then echo " 9 format - 文件格式压缩测试(TC-60~64)" echo " 10 pkupdate - 主键更新测试(TC-70~73)" echo " 11 parallelism - 写入并行度测试(TC-80~83)" - echo " auto - 全自动运行(无需交互)" + echo " auto - 全自动运行(无需按回车)" echo "" echo "默认模式: basic (1)" exit 0 @@ -73,7 +65,7 @@ fi # ── 环境检查 ────────────────────────────────────────────────────────────────── if ! command -v java &>/dev/null; then - echo -e "${RED}[ERROR] 未找到 Java,请先安装 JDK 8+${NC}"; exit 1 + echo -e "${RED}[ERROR] 未找到 Java,请先安装 JDK 11+${NC}"; exit 1 fi echo -e "${GREEN}[INFO] Java : $(java -version 2>&1 | head -1)${NC}" @@ -86,15 +78,15 @@ echo "" # ── 获取测试模式 ──────────────────────────────────────────────────────────── TEST_MODE="${1:-}" -# ── 编译主工程 ────────────────────────────────────────────────────────────── +# ── 编译 paimon-connector 主工程(兄弟目录) ────────────────────────────── echo -e "${YELLOW}[1/3] 编译 paimon-connector 主工程...${NC}" -mvn clean install -DskipTests -q -f pom.xml +mvn clean install -DskipTests -q -f ../paimon-connector/pom.xml echo -e "${GREEN} 主工程编译完成${NC}" echo "" -# ── 编译测试代码 ──────────────────────────────────────────────────────────── +# ── 编译性能测试代码(src/main/java,标准 compile goal) ──────────────────── echo -e "${YELLOW}[2/3] 编译性能测试代码...${NC}" -mvn test-compile -q -f pom-perf-test.xml +mvn compile -q echo -e "${GREEN} 测试代码编译完成${NC}" echo "" @@ -108,13 +100,11 @@ JVM_OPTS="-Xmx4g -Xms512m -XX:+UseG1GC" if [ -n "$TEST_MODE" ]; then echo -e "${BLUE}>> 测试模式: ${TEST_MODE}${NC}" mvn exec:java \ - -f pom-perf-test.xml \ - -Dexec.mainClass="io.tapdata.connector.paimon.perf.PerformanceTestRunner" \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args="${TEST_MODE}" \ -Dexec.jvmArgs="${JVM_OPTS}" else - # 交互式菜单(与 TestModeConfig.java 保持一致) + # 交互式菜单 echo -e "${BLUE}请选择测试模式 (直接回车默认 1):${NC}" echo " 1 basic - 基础用例组(TC-01~03)" echo " 2 all - 全量测试(所有组)" @@ -134,8 +124,6 @@ else echo "" echo -e "${BLUE}>> 测试模式: ${CHOICE}${NC}" mvn exec:java \ - -f pom-perf-test.xml \ - -Dexec.mainClass="io.tapdata.connector.paimon.perf.PerformanceTestRunner" \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args="${CHOICE}" \ -Dexec.jvmArgs="${JVM_OPTS}" diff --git a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/AutoTestRunner.java b/connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/AutoTestRunner.java similarity index 100% rename from connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/AutoTestRunner.java rename to connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/AutoTestRunner.java diff --git a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/DataGenerator.java b/connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/DataGenerator.java similarity index 63% rename from connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/DataGenerator.java rename to connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/DataGenerator.java index 9776de69..94879c2c 100644 --- a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/DataGenerator.java +++ b/connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/DataGenerator.java @@ -14,12 +14,57 @@ /** * 数据生成器 - 支持配置总数据量、写入QPS、主键重复率 + * + *

内存优化设计(支持 10 亿+ 规模): + *

+ * + *

内存对比: + *

+ *   优化前(HashSet + ArrayList):
+ *     1 亿条 → ~5 GB
+ *    10 亿条 → OOM (>50 GB)
+ *
+ *   优化后(确定性伪随机):
+ *     1 亿条 → < 1 MB
+ *    10 亿条 → < 1 MB ✅
+ *   100 亿条 → < 1 MB ✅
+ * 
+ * + *

重复 ID 生成原理: + *

+ *   当需要重复 ID 时:
+ *     1. 确定重复池大小 = min(当前已生成唯一 ID 数, 上限值)
+ *     2. 使用 MurmurHash3(currentId + seed) 计算哈希
+ *     3. 取模映射到重复池:hash % poolSize
+ *     4. 确定性:相同输入 → 相同输出,无需存储
+ *
+ *   优点:
+ *     - 均匀分布:哈希函数保证重复 ID 分散
+ *     - 可重现:固定 seed 下结果一致,便于调试
+ *     - 零存储:不保留任何历史 ID
+ * 
*/ public class DataGenerator { private final Random random; private final AtomicLong idGenerator; - private final Set generatedIds; - private final List generatedIdList; // 用于快速随机访问 + + /** + * 确定性重复 ID 生成的种子(固定值保证可重现) + * 可修改此值改变重复分布,但相同种子下结果一致 + */ + private static final long DUPLICATE_SEED = 0x517cc1b727220a95L; + + /** + * 重复 ID 池大小上限 + * 当已生成唯一 ID 超过此值时,重复池固定为此大小 + * 避免取模运算溢出,同时保证重复 ID 分布均匀 + */ + private static final int MAX_DUPLICATE_POOL = 100_000_000; // 1 亿 + private final int primaryKeyDuplicateRate; private final String tableName; @@ -30,12 +75,48 @@ public DataGenerator(int primaryKeyDuplicateRate) { public DataGenerator(int primaryKeyDuplicateRate, String tableName) { this.random = new Random(System.currentTimeMillis()); this.idGenerator = new AtomicLong(1); - this.generatedIds = new HashSet<>(); - this.generatedIdList = new ArrayList<>(); this.primaryKeyDuplicateRate = Math.max(0, Math.min(100, primaryKeyDuplicateRate)); this.tableName = tableName; } + /** + * MurmurHash3 32-bit 简化实现 + * 用于确定性伪随机映射:相同输入 → 相同输出 + */ + private static int murmurHash3_32(long key) { + long h = key ^ (key >>> 33); + h *= 0xff51afd7ed558ccdL; + h ^= h >>> 33; + h *= 0xc4ceb9fe1a85ec53L; + h ^= h >>> 33; + return (int) h; + } + + /** + * 确定性生成一个重复 ID(无需存储历史) + * + * @param currentId 当前记录的 ID + * @param poolSize 重复池大小(已生成的唯一 ID 数) + * @return 映射到重复池中的 ID(1 ~ poolSize) + */ + private static long deterministicDuplicateId(long currentId, long poolSize) { + if (poolSize <= 0) { + return currentId; // 无可用重复 ID + } + // 使用 MurmurHash3 计算确定性哈希 + int hash = murmurHash3_32(currentId ^ DUPLICATE_SEED); + // 映射到 [1, poolSize] 范围 + long offset = (hash & 0x7fffffffL) % poolSize; + return 1 + offset; + } + + /** + * 将 long ID 转为 String(按需创建,不缓存) + */ + private static String idToString(long id) { + return Long.toString(id); + } + /** * 生成单个记录 */ @@ -43,19 +124,24 @@ public Map generateRecord() { Map record = new HashMap<>(); // 生成主键 - String id; - if (primaryKeyDuplicateRate > 0 && !generatedIdList.isEmpty() + long idLong; + long currentUniqueCount = idGenerator.get(); // 当前已生成的唯一 ID 数(未包含本次) + + if (primaryKeyDuplicateRate > 0 && currentUniqueCount > 1 && random.nextInt(100) < primaryKeyDuplicateRate) { - // 生成重复主键(用于UPDATE场景) - id = generatedIdList.get(random.nextInt(generatedIdList.size())); + // 生成重复主键(用于 UPDATE 场景) + // 使用确定性伪随机映射,无需存储历史 ID + long poolSize = Math.min(currentUniqueCount - 1, MAX_DUPLICATE_POOL); + // 使用当前时间戳 + 随机值作为种子,确保每次调用有不同的哈希输入 + long seed = System.nanoTime() ^ random.nextLong(); + idLong = deterministicDuplicateId(seed, poolSize); + // 注意:不增加 idGenerator,因为这是重复 ID } else { // 生成新主键 - id = String.valueOf(idGenerator.getAndIncrement()); - if (generatedIds.add(id)) { - generatedIdList.add(id); - } + idLong = idGenerator.getAndIncrement(); } + String id = idToString(idLong); record.put("id", id); record.put("name", "name-" + id + "-" + System.currentTimeMillis()); record.put("value", random.nextInt(1000000)); @@ -86,11 +172,16 @@ public List generateRecordEvents(int batchSize) { * 生成更新事件 */ public TapUpdateRecordEvent generateUpdateEvent() { - if (generatedIdList.isEmpty()) { + long currentUniqueCount = idGenerator.get(); + if (currentUniqueCount <= 1) { return null; } - String id = generatedIdList.get(random.nextInt(generatedIdList.size())); + // 使用确定性伪随机生成重复 ID + long poolSize = Math.min(currentUniqueCount - 1, MAX_DUPLICATE_POOL); + long dummyId = random.nextLong(); + long idLong = deterministicDuplicateId(dummyId, poolSize); + String id = idToString(idLong); Map before = new HashMap<>(); before.put("id", id); @@ -222,9 +313,11 @@ public void generateRecordsWithRate(long totalRecords, int qps, Consumer返回实际生成的唯一 ID 总数(非窗口限制值) */ - public int getUniqueIdsCount() { - return generatedIds.size(); + public long getUniqueIdsCount() { + return idGenerator.get() - 1; } /** diff --git a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/PaimonDataGenerator.java b/connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/PaimonDataGenerator.java similarity index 100% rename from connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/PaimonDataGenerator.java rename to connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/PaimonDataGenerator.java diff --git a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/PaimonFileObserver.java b/connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/PaimonFileObserver.java similarity index 100% rename from connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/PaimonFileObserver.java rename to connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/PaimonFileObserver.java diff --git a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/PerformanceTestRunner.java b/connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/PerformanceTestRunner.java similarity index 83% rename from connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/PerformanceTestRunner.java rename to connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/PerformanceTestRunner.java index 20f2a77f..4c87e608 100644 --- a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/PerformanceTestRunner.java +++ b/connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/PerformanceTestRunner.java @@ -9,6 +9,7 @@ import io.tapdata.entity.utils.DataMap; import io.tapdata.pdk.apis.context.TapConnectorContext; import io.tapdata.pdk.apis.spec.TapNodeSpecification; +import org.apache.commons.lang3.StringUtils; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; @@ -38,12 +39,12 @@ public class PerformanceTestRunner { // ─── 常量 ───────────────────────────────────────────────────────────────── - public static final String BASE_TEST_DIR = "/tmp/paimon-perf-test"; + public static final String BASE_TEST_DIR = "/tmp/paimon-perf-test/"; private static final String DATABASE = "default"; private static final String TABLE_NAME = "test_table"; public static final int TOTAL_RECORDS = 5_000_000; // 数据集总大小 - private static final int BATCH_SIZE = 1_000_00; // 每批次写入记录数,也是PaimonService 累积批次大小 - private static final int INIT_TOTAL_RECORDS = 0; //模拟初始化阶段全表数据量 + private static final int BATCH_SIZE = 100_000; // 每批次写入记录数,也是PaimonService 累积批次大小 + private static final int INIT_TOTAL_RECORDS = 5_000_000; //模拟初始化阶段全表数据量 // ─── S3 测试配置 ────────────────────────────────────────────────────────── @@ -52,6 +53,7 @@ public class PerformanceTestRunner { /** S3 端点地址 */ private static final String S3_ENDPOINT = "http://192.168.1.184:9080"; +// private static final String S3_ENDPOINT = "http://113.98.206.142:9080"; /** S3 访问密钥 */ private static final String S3_ACCESS_KEY = "admin"; @@ -92,7 +94,7 @@ public void setInteractive(boolean interactive) { private static Log buildConsoleLog() { return new Log() { - @Override public void debug(String m, Object... p) {} + @Override public void debug(String m, Object... p) { print("[DEBUG] ", m, p);} @Override public void info(String m, Object... p) { print("[INFO] ", m, p); } @Override public void warn(String m, Object... p) { print("[WARN] ", m, p); } @Override public void error(String m, Object... p) { print("[ERROR]", m, p); } @@ -117,7 +119,23 @@ private String warehouseForCase(TestCase tc) { private PaimonService buildPaimonService(TestCase tc) throws Exception { PaimonConfig config = new PaimonConfig(); - + + // ── 核心参数 → PaimonConfig setters(会自动填充 tableProperties)────────── + applyConfigSetters(config, tc); + applyConfigSettersGlobal(config, tc); + + PaimonService service = new PaimonService(config, logger); + service.init(); + return service; + } + + /** + * 全局参数生效:优先级高于TestCase中 + * + * @param config + * @param tc + */ + private void applyConfigSettersGlobal(PaimonConfig config, TestCase tc) { // ── 存储类型和仓库路径 ────────────────────────────────────────────── if (ENABLE_S3) { // 使用 S3 存储 @@ -132,37 +150,35 @@ private PaimonService buildPaimonService(TestCase tc) throws Exception { // 使用本地文件系统 config.setStorageType("local"); config.setWarehouse(warehouseForCase(tc)); - System.out.println(" >> 存储类型: 本地文件系统"); + System.out.println(" >> 存储类型: 本地文件系统,地址:" + config.getWarehouse()); } - + config.setDatabase(database); - config.setBatchAccumulationSize(BATCH_SIZE); - config.setCommitIntervalMs(0); // 关闭时间触发,依靠数量触发 - config.setEnableAsyncCommit(false); // 测试中关闭异步 commit +// config.setBatchAccumulationSize(BATCH_SIZE); + //模拟全量+增量模式使用: config.setCreateAutoInc(true); + config.setDiskTmpDir(BASE_TEST_DIR + "/tmp," + BASE_TEST_DIR + "/tmp2"); + //为了验证Paimon参数的效果,测试中关闭: + config.setEnableAutoCompaction(false); - // 设置 bucketMode & bucketCount(先给一个安全默认,后续 tableProperties 可覆盖) - Map params = tc.getParameters(); - String bucketStr = params.getOrDefault("bucket", "-1"); - int bucket; - try { bucket = Integer.parseInt(bucketStr); } catch (NumberFormatException e) { bucket = -1; } - - if (bucket > 0) { - config.setBucketMode("fixed"); - config.setBucketCount(bucket); - } else if (bucket == -2) { -// config.setBucketMode("fixed"); - config.setBucketCount(bucket); - } else { - config.setBucketMode("dynamic"); - } + } - // ── 核心参数 → PaimonConfig setters ────────────────────────────────── - applyConfigSetters(config, params); + /** + * 将参数 Map 中的所有值自动写入到 PaimonConfig 的对应属性中 + * 使用反射机制自动匹配参数名和 setter 方法,支持特殊映射和类型转换 + * + * 支持的参数映射规则: + * 1. 特殊映射:通过 specialSetters 定义复杂映射关系(如单位转换、多属性设置等) + * 2. 反射自动映射:参数名转驼峰命名后匹配 setter 方法 + * 3. 表属性降级:未匹配的参数放入 tableProperties,用于构建 Paimon 表选项 + */ + private void applyConfigSetters(PaimonConfig config, TestCase tc) { - // ── 所有参数追加到 tableProperties(最高优先级覆盖)───────────────── - List> tableProps = buildTableProperties(params); - config.setTableProperties(tableProps); + // 批次累积大小 + config.setBatchAccumulationSize(tc.getBatchSize()); + Map params = tc.getParameters(); + + if (params == null || params.isEmpty()) return; // ── 写入线程 / 并行度 ───────────────────────────────────────────────── String parallelism = params.get("sink.parallelism"); @@ -171,52 +187,203 @@ private PaimonService buildPaimonService(TestCase tc) throws Exception { catch (NumberFormatException ignored) {} } - PaimonService service = new PaimonService(config); - service.init(); - return service; - } + // 特殊参数映射表:参数名 → Setter 方法调用逻辑 + Map specialSetters = new HashMap<>(); + + // 缓冲区大小(MB) + specialSetters.put("write-buffer-size", (cfg, val) -> cfg.setWriteBufferSize(parseSizeMb(val))); + // 目标文件大小(MB) + specialSetters.put("target-file-size", (cfg, val) -> cfg.setTargetFileSize(parseSizeMb(val))); + // 分桶策略 + specialSetters.put("bucket", (cfg, val) -> { + int bucket = Integer.parseInt(val); + if (bucket > 0) { + cfg.setBucketMode("fixed"); + cfg.setBucketCount(bucket); + } else if (bucket == -1 || bucket == -2) { + cfg.setBucketMode("dynamic"); + cfg.setBucketCount(bucket); + } + }); + // 写入线程/并行度 + specialSetters.put("sink.parallelism", (cfg, val) -> cfg.setWriteThreads(Integer.parseInt(val))); + // 仅写入模式(禁用自动合并):注释掉的原因为可以走下面的反射 +// specialSetters.put("enableAutoCompaction", (cfg, val) -> cfg.setEnableAutoCompaction(!Boolean.parseBoolean(val))); + // 缓冲区溢写 + specialSetters.put("write-buffer-spillable", (cfg, val) -> cfg.setDiskOverflowWrite(Boolean.parseBoolean(val))); + // 溢写最大磁盘大小(GB) + specialSetters.put("write-buffer-spill.max-disk-size", (cfg, val) -> cfg.setDiskMaxSize(parseSizeGb(val))); + // 溢写临时目录 + specialSetters.put("write-buffer-spill.tmp-dirs", (cfg, val) -> cfg.setDiskTmpDir(val)); + // 提交间隔 + specialSetters.put("commit-interval-ms", (cfg, val) -> cfg.setCommitIntervalMs(Integer.parseInt(val))); + // 异步提交 + specialSetters.put("enable-async-commit", (cfg, val) -> cfg.setEnableAsyncCommit(Boolean.parseBoolean(val))); + // 合并间隔 + specialSetters.put("compaction-interval-minutes", (cfg, val) -> cfg.setCompactionIntervalMinutes(Integer.parseInt(val))); + // 主键更新 + specialSetters.put("enable-primary-key-update", (cfg, val) -> cfg.setEnablePrimaryKeyUpdate(Boolean.parseBoolean(val))); + // 分区键(逗号分隔) + specialSetters.put("partition-key", (cfg, val) -> cfg.setPartitionKey(Arrays.asList(val.split(",")))); - private void applyConfigSetters(PaimonConfig config, Map params) { - // write-buffer-size: e.g. "256mb" → 256 - String wbs = params.get("write-buffer-size"); - if (wbs != null) { - try { config.setWriteBufferSize(parseSizeMb(wbs)); } catch (Exception ignored) {} - } - String tfs = params.get("target-file-size"); - if (tfs != null) { - try { config.setTargetFileSize(parseSizeMb(tfs)); } catch (Exception ignored) {} + for (Map.Entry entry : params.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (value == null || value.isEmpty()) continue; + + try { + // 1. 优先检查特殊映射 + ConfigSetter specialSetter = specialSetters.get(key); + if (specialSetter != null) { + specialSetter.set(config, value); + System.out.println(" [INFO] 参数='" + key + "',值="+ value +",通过Tapdata属性设置"); + continue; + } + + // 2. 尝试通过反射自动设置 + if (applyConfigPropertyByReflection(config, key, value)) { + System.out.println(" [WARN] 参数='" + key + "' ',值="+ value +",尝试通过反射自动设置成功"); + continue; + } + + // 3. 未匹配的参数放入 tableProperties(用于构建 Paimon 表选项) + put2TableProperties(config, key, value); + } catch (Exception e) { + System.err.println(" [ERROR] 设置配置属性失败: " + key + "=" + value + " - " + e.getMessage()); + } } - config.setProperties(map2prop(params)); + } + /** + * 通过反射自动调用 PaimonConfig 的 setter 方法 + * + * @param config PaimonConfig 实例 + * @param key 参数名(支持横线和下划线分隔,会自动转为驼峰) + * @param value 参数值(String 类型,会自动类型转换) + * @return true 如果成功找到并调用了对应的 setter 方法 + */ + private boolean applyConfigPropertyByReflection(PaimonConfig config, String key, String value) { + // 将参数名转为驼峰命名(例:write-buffer-size → writeBufferSize) + String camelKey = toCamelCase(key); + + // 构造 setter 方法名 + String setterName = "set" + Character.toUpperCase(camelKey.charAt(0)) + camelKey.substring(1); + + try { + // 遍历所有可能的参数类型 + Class[] paramTypes = {String.class, Integer.class, Boolean.class, List.class}; + + for (Class paramType : paramTypes) { + try { + java.lang.reflect.Method method = PaimonConfig.class.getMethod(setterName, paramType); + Object convertedValue = convertValue(value, paramType); + if (convertedValue != null) { + method.invoke(config, convertedValue); + return true; + } + } catch (NoSuchMethodException e) { + // 继续尝试下一个类型 + } + } + } catch (Exception e) { + // 反射调用失败,返回 false 交由上层处理 + } + + return false; } - private Properties map2prop(Map params) { - Properties props = new Properties(); - for (Map.Entry e : params.entrySet()) { - props.setProperty(e.getKey(), e.getValue()); + /** + * 将横线/下划线分隔的字符串转为驼峰命名 + * 例:write-buffer-size → writeBufferSize + * s3_endpoint → s3Endpoint + */ + private String toCamelCase(String str) { + StringBuilder result = new StringBuilder(); + boolean nextUpper = false; + + for (int i = 0; i < str.length(); i++) { + char c = str.charAt(i); + if (c == '-' || c == '_' || c == '.') { + nextUpper = true; + } else { + if (nextUpper) { + result.append(Character.toUpperCase(c)); + nextUpper = false; + } else { + result.append(c); + } + } } - return props; + + return result.toString(); } /** - * 将所有参数转为 tableProperties 键值对列表(最终覆盖 createTable 中的硬编码值) + * 将 String 值转换为目标类型 */ - private List> buildTableProperties(Map params) { - List> props = new ArrayList<>(); - for (Map.Entry e : params.entrySet()) { - LinkedHashMap m = new LinkedHashMap<>(); - m.put("propKey", e.getKey()); - m.put("propValue", e.getValue()); - props.add(m); + private Object convertValue(String value, Class targetType) { + if (targetType == String.class) { + return value; + } else if (targetType == Integer.class) { + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + return null; + } + } else if (targetType == Boolean.class) { + return Boolean.parseBoolean(value); + } else if (targetType == List.class) { + return Arrays.asList(value.split(",")); } - return props; + return null; + } + + /** + * 配置设置器函数式接口 + */ + @FunctionalInterface + private interface ConfigSetter { + void set(PaimonConfig config, String value); + } + + /** + * 将未匹配的参数放入 PaimonConfig.tableProperties 中 + * 这些属性将直接写入 Paimon 表的 OPTIONS 中,用于构建表配置 + * + * @param config PaimonConfig 实例 + * @param key 参数名 + * @param value 参数值 + */ + private void put2TableProperties(PaimonConfig config, String key, String value) { + List> tableProperties = config.getTableProperties(); + + LinkedHashMap kv = new LinkedHashMap<>(); + // 将参数添加到第一个 LinkedHashMap 中 + kv.put("propKey", key); + kv.put("propValue", value); + tableProperties.add(kv); + System.out.println(" [INFO] 参数='" + key + "' ',值="+ value +",通过paimon属性设置"); } - /** 解析带单位的大小,返回 MB 数(如 "512mb" → 512,"1gb" → 1024) */ + /** + * 解析带单位的大小,返回 MB 数(如 "512mb" → 512,"1gb" → 1024) + */ private static int parseSizeMb(String s) { s = s.trim().toLowerCase(); - if (s.endsWith("gb")) return Integer.parseInt(s.replace("gb", "").trim()) * 1024; + if (s.endsWith("gb")) return (int) (Double.parseDouble(s.replace("gb", "").trim()) * 1024); if (s.endsWith("mb")) return Integer.parseInt(s.replace("mb", "").trim()); + if (s.endsWith("kb")) return (int) (Double.parseDouble(s.replace("kb", "").trim()) / 1024); + return Integer.parseInt(s); + } + + /** + * 解析带单位的大小,返回 GB 数 + */ + private static int parseSizeGb(String s) { + s = s.trim().toLowerCase(); + if (s.endsWith("tb")) return (int) (Double.parseDouble(s.replace("tb", "").trim()) * 1024); + if (s.endsWith("gb")) return Integer.parseInt(s.replace("gb", "").trim()); + if (s.endsWith("mb")) return (int) (Double.parseDouble(s.replace("mb", "").trim()) / 1024); return Integer.parseInt(s); } @@ -230,7 +397,7 @@ private TapTable createTapTable() { private void createFreshTable(PaimonService service) throws Exception { try { service.dropTable(tableName); } catch (Exception ignored) {} TapTable table = createTapTable(); - service.createTable(table, logger); + service.createTable(table); } // ─── 核心执行方法 ────────────────────────────────────────────────────────── @@ -289,18 +456,36 @@ public TestResult runTestCase(TestCase tc) { tc.getDataSize(), tc.getPrimaryKeyDuplicateRate(), tc.getQps() > 0 ? tc.getQps() + "" : "无限制"); + // 显示用例级别的参数覆盖 + if (tc.getBatchSize() != null || tc.getInitTotalRecords() != null) { + System.out.println(" >> 用例级别参数覆盖:"); + if (tc.getBatchSize() != null) { + System.out.printf(" - batchSize = %,d (全局: %,d)%n", tc.getBatchSize(), BATCH_SIZE); + } + if (tc.getInitTotalRecords() != null) { + System.out.printf(" - initTotalRecords = %,d (全局: %,d)%n", tc.getInitTotalRecords(), INIT_TOTAL_RECORDS); + } + } + // 2. 执行写入 TapTable tapTable = createTapTable(); DataGenerator gen = new DataGenerator(tc.getPrimaryKeyDuplicateRate(), tableName); + // 使用 TestCase 独立的参数(如果设置了),否则使用全局常量 + long totalRecordsToWrite = tc.getDataSize(); + Integer caseBatchSize = tc.getBatchSize(); + int effectiveBatchSize = caseBatchSize != null ? caseBatchSize : BATCH_SIZE; + Integer caseInitTotal = tc.getInitTotalRecords(); + int effectiveInitTotal = caseInitTotal != null ? caseInitTotal : INIT_TOTAL_RECORDS; + startMs = System.currentTimeMillis(); - long total = tc.getDataSize(); + long total = totalRecordsToWrite; long remain = total; long qpsSlotStartMs = System.currentTimeMillis(); long qpsSlotWritten = 0; while (remain > 0) { - int batchSz = (int) Math.min(BATCH_SIZE, remain); + int batchSz = (int) Math.min(effectiveBatchSize, remain); List batch = new ArrayList<>(batchSz); for (int i = 0; i < batchSz; i++) { Map rec = gen.generateRecord(); @@ -312,7 +497,7 @@ public TestResult runTestCase(TestCase tc) { info.put("batchOffset",i); evt.setInfo(info); batch.add(evt); - if (qpsSlotWritten + BATCH_SIZE >= INIT_TOTAL_RECORDS) { + if (qpsSlotWritten + effectiveBatchSize >= effectiveInitTotal) { // 模拟 已写数据量 大于 初始化阶段全表数据量 为增量cdc: evt.getInfo().put(TapRecordEvent.INFO_KEY_SYNC_STAGE, "CDC"); } @@ -334,7 +519,7 @@ public TestResult runTestCase(TestCase tc) { // 进度打印 long pct = (written.get() * 100) / total; - if (written.get() % (BATCH_SIZE * 10) == 0 || remain == 0) { + if (written.get() % (effectiveBatchSize * 10) == 0 || remain == 0) { double elapsed = (System.currentTimeMillis() - startMs) / 1000.0; double throughput = elapsed > 0 ? written.get() / elapsed : 0; System.out.printf(" >> 进度: %,d/%,d (%d%%) | 吞吐: %.0f 条/秒%n", @@ -421,7 +606,7 @@ public List runTestGroup(String groupName) throws Exception { // ─── 打印工具 ────────────────────────────────────────────────────────────── private static void printSeparator(String ch) { - System.out.println(ch.repeat(70)); + System.out.println(StringUtils.repeat(ch, 70)); } /** @@ -817,6 +1002,11 @@ private void printResult(TestResult r, PaimonFileObserver observer) { // ─── 报告生成 ────────────────────────────────────────────────────────────── + /** 转义 Markdown 表元格中的管道符,防止破坏表格结构 */ + private static String escapePipe(String s) { + return s == null ? "" : s.replace("|", "\\|"); + } + public String generateReport(List results, String reportPath) throws IOException { StringBuilder sb = new StringBuilder(); String now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); @@ -849,12 +1039,19 @@ public String generateReport(List results, String reportPath) throws sb.append("|--------|------|-----------|---------|--------|--------|----------|------|\n"); for (TestResult r : entry.getValue()) { String status = r.error == null ? "✅" : "❌"; - sb.append(String.format("| %s | %s | %.0f | %.2f | %d | %s | %s | %s |\n", - r.testCase.getId(), r.testCase.getName(), r.throughput, - r.durationMs / 1000.0, r.fileCount, - PaimonFileObserver.formatSize(r.totalFileSize), - r.fileCount > 0 ? PaimonFileObserver.formatSize(r.totalFileSize / r.fileCount) : "N/A", - status)); + String name = escapePipe(r.testCase.getName()); + // 失败用例显示 '-' 避免误导 + if (r.error != null) { + sb.append(String.format("| %s | %s | - | - | - | - | - | %s |\n", + r.testCase.getId(), name, status)); + } else { + sb.append(String.format("| %s | %s | %.0f | %.2f | %d | %s | %s | %s |\n", + r.testCase.getId(), name, r.throughput, + r.durationMs / 1000.0, r.fileCount, + PaimonFileObserver.formatSize(r.totalFileSize), + r.fileCount > 0 ? PaimonFileObserver.formatSize(r.totalFileSize / r.fileCount) : "N/A", + status)); + } } sb.append("\n"); } @@ -864,15 +1061,20 @@ public String generateReport(List results, String reportPath) throws sb.append("| 用例ID | <1KB | 1KB-1MB | 1MB-10MB | 10MB-100MB | 100MB-500MB | >500MB |\n"); sb.append("|--------|------|---------|----------|-----------|-------------|--------|\n"); for (TestResult r : results) { - Map d = r.sizeDistribution; - sb.append(String.format("| %s | %d | %d | %d | %d | %d | %d |\n", - r.testCase.getId(), - d.getOrDefault("< 1KB", 0L), - d.getOrDefault("1KB - 1MB", 0L), - d.getOrDefault("1MB - 10MB", 0L), - d.getOrDefault("10MB-100MB", 0L), - d.getOrDefault("100MB-500MB", 0L), - d.getOrDefault("> 500MB", 0L))); + if (r.error != null) { + // 失败用例显示 '-' + sb.append(String.format("| %s | - | - | - | - | - | - |\n", r.testCase.getId())); + } else { + Map d = r.sizeDistribution; + sb.append(String.format("| %s | %d | %d | %d | %d | %d | %d |\n", + r.testCase.getId(), + d.getOrDefault("< 1KB", 0L), + d.getOrDefault("1KB - 1MB", 0L), + d.getOrDefault("1MB - 10MB", 0L), + d.getOrDefault("10MB-100MB", 0L), + d.getOrDefault("100MB-500MB", 0L), + d.getOrDefault("> 500MB", 0L))); + } } sb.append("\n"); diff --git a/connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/TestCase.java b/connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/TestCase.java new file mode 100644 index 00000000..de8e83cf --- /dev/null +++ b/connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/TestCase.java @@ -0,0 +1,799 @@ +package io.tapdata.connector.paimon.perf; + +import java.util.*; + +import static io.tapdata.connector.paimon.perf.PerformanceTestRunner.TOTAL_RECORDS; +import static io.tapdata.connector.paimon.perf.PerformanceTestRunner.BASE_TEST_DIR; + +/** + * 性能测试用例定义 + * 参数键直接对应 Paimon CoreOptions 表选项名称,可通过 tableProperties 直接注入 + * + *

用例级别的全局参数覆盖

+ *

+ * 默认情况下,所有测试用例使用 PerformanceTestRunner 中定义的全局常量: + *

    + *
  • {@code TOTAL_RECORDS} = 100,000,000(数据集总大小)
  • + *
  • {@code BATCH_SIZE} = 100,000(每批次写入记录数)
  • + *
  • {@code INIT_TOTAL_RECORDS} = 95,000,000(初始化阶段全表数据量)
  • + *
+ *

+ * 如果单个 TestCase 需要灵活控制这些值,可以使用 builder 方法覆盖: + *

{@code
+ * new TestCase("TC-XX", "用例名称", "测试组",
+ *     params(...),
+ *     0, 0,
+ *     "用例描述")
+ *     .totalRecords(5_000_000)        // 覆盖全局 TOTAL_RECORDS
+ *     .batchSize(50_000)              // 覆盖全局 BATCH_SIZE
+ *     .initTotalRecords(4_500_000);   // 覆盖全局 INIT_TOTAL_RECORDS
+ * }
+ * + *

优先级规则

+ *
    + *
  1. 如果 TestCase 中设置了值,以 TestCase 中的值为准
  2. + *
  3. 如果未设置,使用 PerformanceTestRunner 中的全局常量
  4. + *
+ * + *

使用场景

+ *
    + *
  • 快速验证:用小数据量快速验证配置是否正确
  • + *
  • 对比测试:不同数据量下的性能表现对比
  • + *
  • 边界测试:测试极端数据量下的系统行为
  • + *
+ */ +public class TestCase { + private final String id; + private final String name; + private final String group; + private final Map parameters; + private final int primaryKeyDuplicateRate; // 0-100 + private final int qps; // 0 = unlimited + private final String description; + + // ─── 用例级别的全局参数覆盖 ───────────────────────────────────────────── + // 如果设置了这些值,以 TestCase 中的值为准;否则使用 PerformanceTestRunner 的全局常量 + + /** 用例独立的数据集总大小(覆盖 PerformanceTestRunner.TOTAL_RECORDS) */ + private Integer totalRecords = null; + + /** 用例独立的批次大小(覆盖 PerformanceTestRunner.BATCH_SIZE) */ + private Integer batchSize = null; + + /** 用例独立的初始化阶段全表数据量(覆盖 PerformanceTestRunner.INIT_TOTAL_RECORDS) */ + private Integer initTotalRecords = null; + + public TestCase(String id, String name, String group, + Map parameters, + int primaryKeyDuplicateRate, int qps, + String description) { + this.id = id; + this.name = name; + this.group = group; + this.parameters = parameters; + this.primaryKeyDuplicateRate = primaryKeyDuplicateRate; + this.qps = qps; + this.description = description; + } + + // ─── Accessors ──────────────────────────────────────────────────────────── + + public String getId() { return id; } + public String getName() { return name; } + public String getGroup() { return group; } + public Map getParameters() { return parameters; } + + /** + * 获取数据量:默认使用 PerformanceTestRunner.TOTAL_RECORDS, + * 但如果当前 TestCase 设置了 totalRecords,则以用例中的值为准 + */ + public long getDataSize() { + return totalRecords != null ? totalRecords.longValue() : TOTAL_RECORDS; + } + + /** + * 获取当前用例的批次大小 + * 如果未设置,返回 null,调用方应使用 PerformanceTestRunner.BATCH_SIZE + */ + public Integer getBatchSize() { + return batchSize; + } + + /** + * 获取当前用例的初始化阶段全表数据量 + * 如果未设置,返回 null,调用方应使用 PerformanceTestRunner.INIT_TOTAL_RECORDS + */ + public Integer getInitTotalRecords() { + return initTotalRecords; + } + + public int getPrimaryKeyDuplicateRate() { return primaryKeyDuplicateRate; } + public int getQps() { return qps; } + public String getDescription() { return description; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(String.format("[%s] %s (%s) - %s [数据量: %,d", + id, name, group, description, getDataSize())); + + if (batchSize != null) { + sb.append(", batchSize: ").append(batchSize); + } + if (initTotalRecords != null) { + sb.append(", initTotal: ").append(initTotalRecords); + } + sb.append("]"); + return sb.toString(); + } + + // ─── Builder helper ─────────────────────────────────────────────────────── + + private static Map params(String... kvs) { + Map m = new LinkedHashMap<>(); + for (int i = 0; i + 1 < kvs.length; i += 2) m.put(kvs[i], kvs[i + 1]); + return m; + } + + // ─── 用例级别全局参数覆盖的 Builder 方法 ───────────────────────────────── + + /** + * 设置当前用例的数据集总大小(覆盖 PerformanceTestRunner.TOTAL_RECORDS) + */ + public TestCase totalRecords(int records) { + this.totalRecords = records; + return this; + } + + /** + * 设置当前用例的批次大小(覆盖 PerformanceTestRunner.BATCH_SIZE) + */ + public TestCase batchSize(int size) { + this.batchSize = size; + return this; + } + + /** + * 设置当前用例的初始化阶段全表数据量(覆盖 PerformanceTestRunner.INIT_TOTAL_RECORDS) + */ + public TestCase initTotalRecords(int records) { + this.initTotalRecords = records; + return this; + } + + // ─── 5.1 基础测试用例组 ──────────────────────────────────────────────────── + + public static List createBasicTests() { + return Arrays.asList( + new TestCase("TC-01", "基准测试(默认配置)", "基础测试", + params( + "write-buffer-size", "256mb", + "target-file-size", "128mb", + "file.format", "parquet", + "file.compression", "zstd", + "bucket", "-1", + "compaction.async.enabled", "true", + "num-sorted-run.compaction-trigger", "5", + "num-sorted-run.stop-trigger", "8", + "write-only", "false" + ), + 0, 0, + "默认配置基线,建立性能参考点"), + + new TestCase("TC-02", "大数据量测试", "基础测试", + params( + "write-buffer-size", "512mb", + "write-buffer-spillable", "true", + "target-file-size", "256mb", + "file.format", "parquet", + "file.compression", "zstd", + "bucket", "-1", + "compaction.async.enabled", "false", + "write-only", "true" + ), + 0, 0, + "大数据量写入,验证稳定性和文件大小"), + + new TestCase("TC-03", "小批量测试", "基础测试", + params( + "write-buffer-size", "64mb", + "target-file-size", "64mb", + "file.format", "parquet", + "bucket", "-1", + "compaction.async.enabled", "true" + ), + 0, 0, + "小数据量,验证最小写入场景"), + + // 示例:使用用例级别的参数覆盖全局值 + new TestCase("TC-04", "用例独立参数覆盖", "基础测试", + params( + "write-buffer-size", "256mb", + "target-file-size", "128mb", + "file.format", "parquet", + "file.compression", "zstd", + "bucket", "-1", + "compaction.async.enabled", "true", + "write-only", "false" + ), + 0, 0, + "示例:使用 totalRecords()/batchSize()/initTotalRecords() 覆盖全局参数") + .totalRecords(5_000_000) // 覆盖全局 TOTAL_RECORDS(1亿) + .batchSize(50_000) // 覆盖全局 BATCH_SIZE(10万) + .initTotalRecords(4_500_000) // 覆盖全局 INIT_TOTAL_RECORDS(9500万) + ); + } + + // ─── 5.2 写入缓冲区测试组 ───────────────────────────────────────────────── + + public static List createWriteBufferTests() { + return Arrays.asList( + new TestCase("TC-10", "缓冲区-64MB", "写入缓冲区", + params("write-buffer-size", "64mb", "write-buffer-spillable", "true", + "target-file-size", "128mb", "bucket", "-1", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "64MB缓冲区,验证小缓冲区写入行为和文件数量"), + + new TestCase("TC-11", "缓冲区-128MB", "写入缓冲区", + params("write-buffer-size", "128mb", "write-buffer-spillable", "true", + "target-file-size", "128mb", "bucket", "-1", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "128MB缓冲区,中等缓冲区写入行为"), + + new TestCase("TC-12", "缓冲区-256MB(默认)", "写入缓冲区", + params("write-buffer-size", "256mb", "write-buffer-spillable", "true", + "target-file-size", "128mb", "bucket", "-1", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "256MB缓冲区(默认值)"), + + new TestCase("TC-13", "缓冲区-512MB", "写入缓冲区", + params("write-buffer-size", "512mb", "write-buffer-spillable", "true", + "target-file-size", "256mb", "bucket", "-1", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "512MB缓冲区,大缓冲区减少flush次数"), + + new TestCase("TC-14", "缓冲区-1024MB", "写入缓冲区", + params("write-buffer-size", "1024mb", "write-buffer-spillable", "true", + "target-file-size", "512mb", "bucket", "-1", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "1GB缓冲区,超大缓冲区验证极限"), + + new TestCase("TC-15", "缓冲区-不可溢出", "写入缓冲区", + params("write-buffer-size", "256mb", "write-buffer-spillable", "false", + "target-file-size", "128mb", "bucket", "-1", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "关闭溢写,缓冲区满时触发flush,验证OOM风险"), + + new TestCase("TC-16", "缓冲区-可溢出", "写入缓冲区", + params("write-buffer-size", "64mb", "write-buffer-spillable", "true", + "write-buffer-spill.max-disk-size", "1gb", + "target-file-size", "128mb", "bucket", "-1", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "小缓冲区+溢写磁盘,验证溢写行为和最终文件大小") + ); + } + + // ─── 5.3 目标文件大小测试组 ─────────────────────────────────────────────── + + public static List createTargetFileSizeTests() { + return Arrays.asList( + new TestCase("TC-20", "目标文件-64MB", "目标文件大小", + params("write-buffer-size", "512mb", "write-buffer-spillable", "true", + "target-file-size", "64mb", "bucket", "-1", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "64MB目标文件,验证小目标文件的文件数量"), + + new TestCase("TC-21", "目标文件-128MB(默认)", "目标文件大小", + params("write-buffer-size", "512mb", "write-buffer-spillable", "true", + "target-file-size", "128mb", "bucket", "-1", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "128MB目标文件(默认值)"), + + new TestCase("TC-22", "目标文件-256MB", "目标文件大小", + params("write-buffer-size", "512mb", "write-buffer-spillable", "true", + "target-file-size", "256mb", "bucket", "-1", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "256MB目标文件,减少文件数量"), + + new TestCase("TC-23", "目标文件-512MB", "目标文件大小", + params("write-buffer-size", "1024mb", "write-buffer-spillable", "true", + "target-file-size", "512mb", "bucket", "-1", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "512MB目标文件,超大目标文件") + ); + } + + // ─── 5.4 分桶策略测试组 ─────────────────────────────────────────────────── + + public static List createBucketTests() { + return Arrays.asList( + new TestCase("TC-30", "分桶-bucket=-2(延迟分桶)", "分桶策略", + params("write-buffer-size", "512mb", "write-buffer-spillable", "true", + "target-file-size", "256mb", "bucket", "-2", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "bucket=-2 延迟分桶,验证自动调整分桶性能和文件布局"), + + new TestCase("TC-31", "分桶-动态(bucket=-1)", "分桶策略", + params("write-buffer-size", "512mb", "write-buffer-spillable", "true", + "target-file-size", "256mb", "bucket", "-1", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "bucket=-1 动态分桶,基准对比"), + + new TestCase("TC-32", "分桶-固定4桶", "分桶策略", + params("write-buffer-size", "512mb", "write-buffer-spillable", "true", + "target-file-size", "256mb", "bucket", "4", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "固定4桶,验证固定分桶文件分布"), + + new TestCase("TC-33", "分桶-固定8桶", "分桶策略", + params("write-buffer-size", "512mb", "write-buffer-spillable", "true", + "target-file-size", "256mb", "bucket", "8", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "固定8桶,验证更多分桶的文件分布"), + + new TestCase("TC-34", "分桶-固定16桶", "分桶策略", + params("write-buffer-size", "512mb", "write-buffer-spillable", "true", + "target-file-size", "256mb", "bucket", "16", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "固定16桶"), + + new TestCase("TC-35", "分桶-固定32桶", "分桶策略", + params("write-buffer-size", "512mb", "write-buffer-spillable", "true", + "target-file-size", "256mb", "bucket", "32", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "固定32桶,桶数过多时的文件碎片化"), + new TestCase("TC-36", "分桶-固定1桶", "分桶策略", + params("write-buffer-size", "512mb", "write-buffer-spillable", "true", + "target-file-size", "256mb", "bucket", "1", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "固定1桶,桶数少时的文件碎片化") + ); + } + + // ─── 5.5 合并(Compaction)策略测试组 ────────────────────────────────────── + + public static List createCompactionTests() { + return Arrays.asList( + new TestCase("TC-40", "无小合并|定期大合并|增量写入", "合并策略对比测试", + params("write-buffer-size", "128mb", + "write-buffer-spillable", "true", + "target-file-size", "128mb", + "bucket", "1", + "diskMaxSize", "10", + //sorted runs 过多时 merge 防 OOM: + "sort-spill-threshold", "100", + "sort-spill-buffer-size", "128mb", + "write-only", "false", + "compaction.optimization-interval", "5min",//10sec + //保持默认即可:因为新生成的文件不可能是大文件,当前最小的几个 sorted run 加在一起,是否比下一个 sorted run 小到一定比例 + "compaction.size-ratio", "0", + //文件写放大倍数:即使太多小文件也不触发Full compaction: + "compaction.max-size-amplification-percent", "50000", + "full-compaction.delta-commits", "200", +// "compaction.total-size-threshold", "100", +// "compaction.force-rewrite-all-files", "true", + "num-sorted-run.compaction-trigger", "200", + //# 写入永不因 sorted runs 过多而暂停(默认 = trigger+3 = 8) + "num-sorted-run.stop-trigger", "2147483647"), + 0, 0, + "无小合并|定期大合并|增量写入"), + new TestCase("TC-41", "小合并-默认(trigger=5)|定期大合并|增量写入", "合并策略对比测试", + params("write-buffer-size", "128mb", + "write-buffer-spillable", "true", + "target-file-size", "128mb", + "bucket", "1", + "diskMaxSize", "10", + //sorted runs 过多时 merge 防 OOM: + "sort-spill-threshold", "100", + "sort-spill-buffer-size", "128mb", + "write-only", "false", + "compaction.optimization-interval", "5min",//10sec + //保持默认即可:因为新生成的文件不可能是大文件,当前最小的几个 sorted run 加在一起,是否比下一个 sorted run 小到一定比例 + "compaction.size-ratio", "0", + //文件写放大倍数:即使太多小文件也不触发Full compaction: + "compaction.max-size-amplification-percent", "50000", + "full-compaction.delta-commits", "200", +// "compaction.total-size-threshold", "100", +// "compaction.force-rewrite-all-files", "true", + "num-sorted-run.compaction-trigger", "5", + //# 写入永不因 sorted runs 过多而暂停(默认 = trigger+3 = 8) + "num-sorted-run.stop-trigger", "8"), + 0, 0, + "小合并-默认(trigger=5)|定期大合并|增量写入:合并最频繁,验证合并开销"), + new TestCase("TC-42", "小合并10|定期大合并|增量写入", "合并策略对比测试", + params("write-buffer-size", "128mb", + "write-buffer-spillable", "true", + "target-file-size", "128mb", + "bucket", "1", + "diskMaxSize", "10", + //sorted runs 过多时 merge 防 OOM: + "sort-spill-threshold", "100", + "sort-spill-buffer-size", "128mb", + "write-only", "false", + "compaction.optimization-interval", "5min",//10sec + //保持默认即可:因为新生成的文件不可能是大文件,当前最小的几个 sorted run 加在一起,是否比下一个 sorted run 小到一定比例 + "compaction.size-ratio", "0", + //文件写放大倍数:即使太多小文件也不触发Full compaction: + "compaction.max-size-amplification-percent", "50000", + "full-compaction.delta-commits", "200", +// "compaction.total-size-threshold", "100", +// "compaction.force-rewrite-all-files", "true", + "num-sorted-run.compaction-trigger", "10", + //# 写入永不因 sorted runs 过多而暂停(默认 = trigger+3 = 8) + "num-sorted-run.stop-trigger", "20"), + 0, 0, + "10个文件一次小合并|定期大合并|增量写入"), + new TestCase("TC-43", "保守小合并20|定期大合并|增量写入", "合并策略对比测试", + params("write-buffer-size", "128mb", + "write-buffer-spillable", "true", + "target-file-size", "128mb", + "bucket", "1", + "diskMaxSize", "10", + //sorted runs 过多时 merge 防 OOM: + "sort-spill-threshold", "100", + "sort-spill-buffer-size", "128mb", + "write-only", "false", + "compaction.optimization-interval", "5min",//10sec + //保持默认即可:因为新生成的文件不可能是大文件,当前最小的几个 sorted run 加在一起,是否比下一个 sorted run 小到一定比例 + "compaction.size-ratio", "0", + //文件写放大倍数:即使太多小文件也不触发Full compaction: + "compaction.max-size-amplification-percent", "50000", + "full-compaction.delta-commits", "200", +// "compaction.total-size-threshold", "100", +// "compaction.force-rewrite-all-files", "true", + "num-sorted-run.compaction-trigger", "20", + //# 写入永不因 sorted runs 过多而暂停(默认 = trigger+3 = 8) + "num-sorted-run.stop-trigger", "30"), + 0, 0, + "保守小合并20|定期大合并|增量写入"), + new TestCase("TC-44", "合并-write-only模式|增量写入", "合并策略对比测试", + params("write-buffer-size", "128mb", + "write-buffer-spillable", "true", + "target-file-size", "128mb", + "bucket", "1", + "diskMaxSize", "10", + //sorted runs 过多时 merge 防 OOM: + "sort-spill-threshold", "100", + "sort-spill-buffer-size", "128mb", + "write-only", "true", + "compaction.optimization-interval", "5min",//10sec + //保持默认即可:因为新生成的文件不可能是大文件,当前最小的几个 sorted run 加在一起,是否比下一个 sorted run 小到一定比例 + "compaction.size-ratio", "0", + //文件写放大倍数:即使太多小文件也不触发Full compaction: + "compaction.max-size-amplification-percent", "50000", + "full-compaction.delta-commits", "200", +// "compaction.total-size-threshold", "100", +// "compaction.force-rewrite-all-files", "true", + "num-sorted-run.compaction-trigger", "200", + //# 写入永不因 sorted runs 过多而暂停(默认 = trigger+3 = 8) + "num-sorted-run.stop-trigger", "2147483647"), + 0, 0, + "write-only=true:完全跳过合并,最大化写入吞吐"), + new TestCase("TC-45", "参数最佳实践:30小合并|1h大合并|增量10亿数据写入", "合并策略对比测试", + params("write-buffer-size", "128mb", + "write-buffer-spillable", "true", + "target-file-size", "128mb", + //bucket越少越好: + "bucket", "1", + "diskMaxSize", "10", + //sorted runs 过多时 merge 防 OOM: + "sort-spill-threshold","100", + "sort-spill-buffer-size","128mb", + "write-only", "false", + "compaction.optimization-interval","60min",//10sec + //保持默认即可:因为新生成的文件(L0)不可能是大文件,只要保证合并都是L0的就没写放大:当前最小的几个 sorted run 加在一起,是否比下一个 sorted run 小到一定比例 + "compaction.size-ratio","1", + //文件写放大倍数:即使太多小文件也不触发Full compaction: + "compaction.max-size-amplification-percent","400", +// "full-compaction.delta-commits", "200", + //只合并本次待合并的文件总大小没达到158m就合并,而不是所有文件总大小达到158mb才合并: +// "compaction.total-size-threshold", "158mb", +// "compaction.force-rewrite-all-files", "true", + "num-sorted-run.compaction-trigger", "30", + //# 写入永不因 sorted runs 过多而暂停(默认 = trigger+3 = 8) + "num-sorted-run.stop-trigger", "200", + "snapshot.num-retained.min", "1", + "snapshot.num-retained.max", "1", + "snapshot.time-retained", "30min"), + 0, 0, + "30小合并|1h大合并|增量10亿增量写入") +// new TestCase("TC-45", "参数最佳实践:30小合并|1h大合并|增量10亿数据写入", "合并策略对比测试", +// params("write-buffer-size", "128mb", +// "write-buffer-spillable", "true", +// "target-file-size", "128mb", +// //bucket越少越好: +// "bucket", "1", +// "diskMaxSize", "10", +// //sorted runs 过多时 merge 防 OOM: +// "sort-spill-threshold","100", +// "sort-spill-buffer-size","128mb", +// "write-only", "false", +// "compaction.optimization-interval","60min",//10sec +// //保持默认即可:因为新生成的文件(L0)不可能是大文件,只要保证合并都是L0的就没写放大:当前最小的几个 sorted run 加在一起,是否比下一个 sorted run 小到一定比例 +// "compaction.size-ratio","1", +// //文件写放大倍数:即使太多小文件也不触发Full compaction: +// "compaction.max-size-amplification-percent","50000", +//// "full-compaction.delta-commits", "200", +// //只合并本次待合并的文件总大小没达到158m就合并,而不是所有文件总大小达到158mb才合并: +//// "compaction.total-size-threshold", "158mb", +//// "compaction.force-rewrite-all-files", "true", +// "num-sorted-run.compaction-trigger", "30", +// //# 写入永不因 sorted runs 过多而暂停(默认 = trigger+3 = 8) +// "num-sorted-run.stop-trigger", "2147483647", +// "snapshot.num-retained.min", "1", +// "snapshot.num-retained.max", "1", +// "snapshot.time-retained", "30min"), +// 0, 0, +// "30小合并|1h大合并|增量10亿增量写入") +// .batchSize(100_000) +// .initTotalRecords(0) +// .totalRecords(1_000_000_000) + ); + } + + // ─── 5.6 无小文件写入测试组(重点) ──────────────────────────────────────── + + public static List createNoSmallFileTests() { + return Arrays.asList( + new TestCase("TC-50", "无小文件-大缓冲|无小合并|定期大合并|一次性写入", "无小文件", + params("write-buffer-size", "2gb", + "write-buffer-spillable", "true", + "target-file-size", "128mb", + "bucket", "1", + "write-buffer-spillable", "true", + "diskMaxSize", "10", + "compaction.async.enabled111", "true", + "write-only", "false", + "compaction.optimization-interval","5min",//10sec + //保持默认即可:因为新生成的文件不可能是大文件,当前最小的几个 sorted run 加在一起,是否比下一个 sorted run 小到一定比例 + "compaction.size-ratio","0", + //文件写放大倍数:即使太多小文件也不触发Full compaction: + "compaction.max-size-amplification-percent","50000", + //sorted runs 过多时 merge 防 OOM: + "sort-spill-threshold","100", + "sort-spill-buffer-size","128mb", + "num-sorted-run.compaction-trigger", "200", + //# 写入永不因 sorted runs 过多而暂停(默认 = trigger+3 = 8) + "num-sorted-run.stop-trigger", "2147483647"), + 0, 0, + "大缓冲(512MB)+大目标文件(128MB)+禁止小合并,一次性写入验证") + .batchSize(100_000) + .initTotalRecords(100_000_000) + .totalRecords(100_000_000), + + new TestCase("TC-51", ":无小文件-小缓冲|无小合并|定期大合并|全量&增量", "无小文件", + params("write-buffer-size", "128mb", + "write-buffer-spillable", "true", + "target-file-size", "128mb", + "bucket", "1", + "write-buffer-spillable", "true", + "diskMaxSize", "1", + "write-only", "false", + "compaction.optimization-interval","5min",//10sec + //保持默认即可:因为新生成的文件不可能是大文件,当前最小的几个 sorted run 加在一起,是否比下一个 sorted run 小到一定比例 + "compaction.size-ratio","0", + //文件写放大倍数:即使太多小文件也不触发Full compaction: + "compaction.max-size-amplification-percent","50000", + //sorted runs 过多时 merge 防 OOM: + "sort-spill-threshold","100", + "sort-spill-buffer-size","128mb", + "num-sorted-run.compaction-trigger", "200", + //# 写入永不因 sorted runs 过多而暂停(默认 = trigger+3 = 8) + "num-sorted-run.stop-trigger", "2147483647"), + 0, 0, + "小缓冲(128MB)+大目标文件(128MB)+禁止小合并,全量&增量") + .batchSize(100_000) + .initTotalRecords(90_000_000) + .totalRecords(100_000_000), + + new TestCase("TC-52", ":无小文件-小缓冲|无小合并|定期大合并|全量&增量|bucket=-2", "无小文件", + params("write-buffer-size", "128mb", + "write-buffer-spillable", "true", + "target-file-size", "128mb", + //性能最好: + "bucket", "1", + "write-buffer-spillable", "true", + "diskMaxSize", "1", + "write-only", "false", + "compaction.optimization-interval","5min",//10sec + //保持默认即可:因为新生成的文件不可能是大文件,当前最小的几个 sorted run 加在一起,是否比下一个 sorted run 小到一定比例 + "compaction.size-ratio","0", + //文件写放大倍数:即使太多小文件也不触发Full compaction: + "compaction.max-size-amplification-percent","50000", + //sorted runs 过多时 merge 防 OOM: + "sort-spill-threshold","100", + "sort-spill-buffer-size","128mb", + "num-sorted-run.compaction-trigger", "200", + //# 写入永不因 sorted runs 过多而暂停(默认 = trigger+3 = 8) + "num-sorted-run.stop-trigger", "2147483647"), + 0, 0, + "小缓冲(128MB)+大目标文件(128MB)+禁止小合并,全量&增量,bucket=-2,验证延迟分桶的无小文件效果,但需要Full compaction后才能可见") + .batchSize(100_000) + .initTotalRecords(90_000_000) + .totalRecords(100_000_000), + + new TestCase("TC-53", ":无小文件-小缓冲|无小合并|定期大合并|全量&增量|增加local-merge-buffer", "无小文件", + params("local-merge-buffer-size", "64mb", + "write-buffer-size", "128mb", + "write-buffer-spillable", "true", + "target-file-size", "128mb", + "bucket", "1", + "write-buffer-spillable", "true", + "diskMaxSize", "1", + "write-only", "false", + "compaction.optimization-interval","5min",//10sec + //保持默认即可:因为新生成的文件不可能是大文件,当前最小的几个 sorted run 加在一起,是否比下一个 sorted run 小到一定比例 + "compaction.size-ratio","0", + //文件写放大倍数:即使太多小文件也不触发Full compaction: + "compaction.max-size-amplification-percent","50000", + //sorted runs 过多时 merge 防 OOM: + "sort-spill-threshold","100", + "sort-spill-buffer-size","128mb", + "num-sorted-run.compaction-trigger", "200", + //# 写入永不因 sorted runs 过多而暂停(默认 = trigger+3 = 8) + "num-sorted-run.stop-trigger", "2147483647"), + 0, 0, + "小缓冲(128MB)+大目标文件(128MB)+禁止小合并,全量&增量|增加local-merge-buffer") + .batchSize(100_000) + .initTotalRecords(90_000_000) + .totalRecords(100_000_000), + + new TestCase("TC-54", ":无小文件-小缓冲|无小合并|定期大合并|全量&增量|增加local-merge-buffer|changelog-producer", "无小文件", + params("local-merge-buffer-size", "64mb", + "changelog-producer", "input", + "write-buffer-size", "128mb", + "write-buffer-spillable", "true", + "target-file-size", "128mb", + "bucket", "1", + "write-buffer-spillable", "true", + "diskMaxSize", "1", + "write-only", "false", + "compaction.optimization-interval","5min",//10sec + //保持默认即可:因为新生成的文件不可能是大文件,当前最小的几个 sorted run 加在一起,是否比下一个 sorted run 小到一定比例 + "compaction.size-ratio","0", + //文件写放大倍数:即使太多小文件也不触发Full compaction: + "compaction.max-size-amplification-percent","50000", + //sorted runs 过多时 merge 防 OOM: + "sort-spill-threshold","100", + "sort-spill-buffer-size","128mb", + "num-sorted-run.compaction-trigger", "200", + //# 写入永不因 sorted runs 过多而暂停(默认 = trigger+3 = 8) + "num-sorted-run.stop-trigger", "2147483647"), + 0, 0, + "小缓冲(128MB)+大目标文件(128MB)+禁止小合并,全量&增量|增加local-merge-buffer|changelog-producer") + .batchSize(100_000) + .initTotalRecords(90_000_000) + .totalRecords(100_000_000) + ); + } + + // ─── 5.7 文件格式和压缩测试组 ───────────────────────────────────────────── + + public static List createFormatCompressionTests() { + return Arrays.asList( + new TestCase("TC-60", "格式-Parquet+ZSTD(默认)", "文件格式", + params("write-buffer-size", "256mb", "target-file-size", "128mb", + "file.format", "parquet", "file.compression", "zstd", + "bucket", "-1", "write-only", "true"), + 0, 0, "Parquet+ZSTD,默认配置基准"), + + new TestCase("TC-61", "格式-Parquet+LZ4", "文件格式", + params("write-buffer-size", "256mb", "target-file-size", "128mb", + "file.format", "parquet", "file.compression", "lz4", + "bucket", "-1", "write-only", "true"), + 0, 0, "Parquet+LZ4,更快压缩速度但压缩率稍低"), + + new TestCase("TC-62", "格式-Parquet+无压缩", "文件格式", + params("write-buffer-size", "256mb", "target-file-size", "128mb", + "file.format", "parquet", "file.compression", "gzip", + "bucket", "-1", "write-only", "true"), + 0, 0, "Parquet+无压缩,最快写入速度但文件最大"), + + new TestCase("TC-63", "格式-ORC+ZSTD", "文件格式", + params("write-buffer-size", "256mb", "target-file-size", "128mb", + "file.format", "orc", "file.compression", "zstd", + "bucket", "-1", "write-only", "true"), + 0, 0, "ORC+ZSTD,列存格式写入性能对比"), + + new TestCase("TC-64", "格式-ORC+LZ4", "文件格式", + params("write-buffer-size", "256mb", "target-file-size", "128mb", + "file.format", "orc", "file.compression", "lz4", + "bucket", "-1", "write-only", "true"), + 0, 0, "ORC+LZ4") + ); + } + + // ─── 5.8 主键更新测试组 ─────────────────────────────────────────────────── + + public static List createPrimaryKeyUpdateTests() { + return Arrays.asList( + new TestCase("TC-70", "主键-纯插入(0%重复)", "主键更新", + params("write-buffer-size", "256mb", "target-file-size", "128mb", + "bucket", "-1", "compaction.async.enabled", "true", + "write-only", "false"), + 0, 0, "0%主键重复,纯INSERT性能基准"), + + new TestCase("TC-71", "主键-10%重复", "主键更新", + params("write-buffer-size", "256mb", "target-file-size", "128mb", + "bucket", "-1", "compaction.async.enabled", "true", + "write-only", "false"), + 10, 0, "10%主键重复(低更新率),触发部分UPDATE合并"), + + new TestCase("TC-72", "主键-30%重复", "主键更新", + params("write-buffer-size", "256mb", "target-file-size", "128mb", + "bucket", "-1", "compaction.async.enabled", "true", + "write-only", "false"), + 30, 0, "30%主键重复(中等更新率)"), + + new TestCase("TC-73", "主键-50%重复", "主键更新", + params("write-buffer-size", "256mb", "target-file-size", "128mb", + "bucket", "-1", "compaction.async.enabled", "true", + "write-only", "false"), + 50, 0, "50%主键重复(高更新率),大量合并开销") + ); + } + + // ─── 5.9 并行度测试组 ───────────────────────────────────────────────────── + + public static List createParallelismTests() { + return Arrays.asList( + new TestCase("TC-80", "并行度-1线程", "写入并行度", + params("write-buffer-size", "512mb", "target-file-size", "256mb", + "bucket", "1", "sink.parallelism", "1", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "单线程写入,最少文件数"), + + new TestCase("TC-81", "并行度-2线程", "写入并行度", + params("write-buffer-size", "512mb", "target-file-size", "256mb", + "bucket", "2", "sink.parallelism", "2", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "2线程写入"), + + new TestCase("TC-82", "并行度-4线程", "写入并行度", + params("write-buffer-size", "512mb", "target-file-size", "256mb", + "bucket", "4", "sink.parallelism", "4", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "4线程写入(默认)"), + + new TestCase("TC-83", "并行度-8线程", "写入并行度", + params("write-buffer-size", "512mb", "target-file-size", "256mb", + "bucket", "8", "sink.parallelism", "8", + "compaction.async.enabled", "false", "write-only", "true"), + 0, 0, "8线程写入") + ); + } + + // ─── 组合工厂方法 ───────────────────────────────────────────────────────── + + public static List createAllTests() { + List all = new ArrayList<>(); + all.addAll(createBasicTests()); + all.addAll(createWriteBufferTests()); + all.addAll(createTargetFileSizeTests()); + all.addAll(createBucketTests()); + all.addAll(createCompactionTests()); + all.addAll(createNoSmallFileTests()); + all.addAll(createFormatCompressionTests()); + all.addAll(createPrimaryKeyUpdateTests()); + all.addAll(createParallelismTests()); + return all; + } + + /** + * 获取所有支持的测试组名称 + */ + public static Map getGroupDescriptions() { + Map m = new LinkedHashMap<>(); + m.put("basic", "基础测试用例(TC-01~03) - 默认配置基线"); + m.put("buffer", "写入缓冲区测试(TC-10~16) - write-buffer-size / spillable"); + m.put("target", "目标文件大小测试(TC-20~23) - target-file-size"); + m.put("bucket", "分桶策略测试(TC-30~35) - bucket=-2/-1/固定"); + m.put("compaction", "合并策略测试(TC-40~45) - compaction trigger/stop/write-only"); + m.put("nosmallfile", "无小文件写入测试(TC-50~53) - 生产最优配置"); + m.put("format", "文件格式&压缩测试(TC-60~64) - parquet/orc + zstd/lz4"); + m.put("pkupdate", "主键更新测试(TC-70~73) - 重复率 0%/10%/30%/50%"); + m.put("parallelism", "写入并行度测试(TC-80~83) - sink.parallelism"); + m.put("all", "全量测试(所有组)"); + return m; + } +} diff --git a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/TestModeConfig.java b/connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/TestModeConfig.java similarity index 99% rename from connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/TestModeConfig.java rename to connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/TestModeConfig.java index f2140053..ad497a94 100644 --- a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/TestModeConfig.java +++ b/connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/TestModeConfig.java @@ -162,9 +162,9 @@ public static String resolveToGroupKey(String input) { // 特殊处理 auto 模式 if (AUTO_MODE_KEY.equals(normalized)) { - return "all"; // auto 模式默认运行全量 + return AUTO_MODE_KEY; // auto 模式默认运行全量 } - + // 遍历所有模式查找匹配 for (TestModeEntry mode : ALL_MODES) { if (mode.getId().equals(normalized) || diff --git a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/UnifiedFileSystem.java b/connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/UnifiedFileSystem.java similarity index 100% rename from connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/UnifiedFileSystem.java rename to connectors/connector-perf-test/src/main/java/io/tapdata/connector/paimon/perf/UnifiedFileSystem.java diff --git a/connectors/paimon-connector/src/test/resources/logback.xml b/connectors/connector-perf-test/src/main/resources/logback.xml similarity index 97% rename from connectors/paimon-connector/src/test/resources/logback.xml rename to connectors/connector-perf-test/src/main/resources/logback.xml index d0d4e7ee..e613d2a6 100644 --- a/connectors/paimon-connector/src/test/resources/logback.xml +++ b/connectors/connector-perf-test/src/main/resources/logback.xml @@ -25,7 +25,7 @@ - + @@ -51,7 +51,7 @@ !-- 日志输出格式 --> - + diff --git a/connectors/connector-perf-test/src/test/java/io/tapdata/connector/paimon/perf/DataGeneratorTest.java b/connectors/connector-perf-test/src/test/java/io/tapdata/connector/paimon/perf/DataGeneratorTest.java new file mode 100644 index 00000000..de65fd42 --- /dev/null +++ b/connectors/connector-perf-test/src/test/java/io/tapdata/connector/paimon/perf/DataGeneratorTest.java @@ -0,0 +1,667 @@ +package io.tapdata.connector.paimon.perf; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.Timeout; + +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +import io.tapdata.entity.event.dml.TapInsertRecordEvent; +import io.tapdata.entity.event.dml.TapRecordEvent; +import io.tapdata.entity.event.dml.TapUpdateRecordEvent; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * DataGenerator 单元测试 + * + *

验证目标: + *

    + *
  • ID 生成正确性(连续性、唯一性)
  • + *
  • 重复率合理性(实际重复率 ≈ 配置重复率)
  • + *
  • 重复 ID 分布均匀性(无热点)
  • + *
  • 大规模数据生成(10亿+ 内存恒定)
  • + *
  • 确定性伪随机可重现性
  • + *
+ */ +@DisplayName("DataGenerator 单元测试") +class DataGeneratorTest { + + // ═══════════════════════════════════════════════════════════ + // 1. 新 ID 生成测试 + // ═══════════════════════════════════════════════════════════ + + @Nested + @DisplayName("新 ID 生成") + class NewIdGeneration { + + @Test + @DisplayName("0%重复率 - 所有ID唯一且递增") + void allIdsUniqueAndIncremental() { + DataGenerator gen = new DataGenerator(0, "test_table"); + Set seenIds = new HashSet<>(); + + for (int i = 0; i < 100_000; i++) { + Map record = gen.generateRecord(); + String id = (String) record.get("id"); + + assertNotNull(id, "ID 不应为 null"); + assertFalse(seenIds.contains(id), "ID 不应重复: " + id); + seenIds.add(id); + } + + assertEquals(100_000, seenIds.size(), "应生成 100K 唯一 ID"); + assertEquals(100_000, gen.getUniqueIdsCount(), "唯一 ID 计数应匹配"); + } + + @Test + @DisplayName("ID 内容为数字字符串") + void idsAreNumericStrings() { + DataGenerator gen = new DataGenerator(0); + + for (int i = 0; i < 1000; i++) { + Map record = gen.generateRecord(); + String id = (String) record.get("id"); + assertDoesNotThrow(() -> Long.parseLong(id), "ID 应为合法数字: " + id); + } + } + + @Test + @DisplayName("记录包含所有必需字段") + void recordHasAllFields() { + DataGenerator gen = new DataGenerator(0); + Map record = gen.generateRecord(); + + assertTrue(record.containsKey("id"), "应包含 id 字段"); + assertTrue(record.containsKey("name"), "应包含 name 字段"); + assertTrue(record.containsKey("value"), "应包含 value 字段"); + assertTrue(record.containsKey("ts"), "应包含 ts 字段"); + + assertNotNull(record.get("name")); + assertNotNull(record.get("value")); + assertNotNull(record.get("ts")); + } + } + + // ═══════════════════════════════════════════════════════════ + // 2. 重复率合理性测试 + // ═══════════════════════════════════════════════════════════ + + @Nested + @DisplayName("重复率验证") + class DuplicateRateValidation { + + @Test + @DisplayName("0%重复率 - 无重复ID") + void zeroPercentDuplicate() { + DataGenerator gen = new DataGenerator(0); + Set ids = new HashSet<>(); + int count = 50_000; + + for (int i = 0; i < count; i++) { + ids.add((String) gen.generateRecord().get("id")); + } + + assertEquals(count, ids.size(), "0% 重复率下不应有重复 ID"); + } + + @Test + @DisplayName("50%重复率 - 实际重复率应在 45%-55% 范围内") + void fiftyPercentDuplicate() { + DataGenerator gen = new DataGenerator(50); + Map idCounts = new HashMap<>(); + int count = 100_000; + int warmup = 1000; // 预热期,不统计 + + // 预热阶段 + for (int i = 0; i < warmup; i++) { + gen.generateRecord(); + } + + // 统计阶段 + for (int i = 0; i < count; i++) { + String id = (String) gen.generateRecord().get("id"); + idCounts.put(id, idCounts.getOrDefault(id, 0) + 1); + } + + long duplicateCount = idCounts.values().stream() + .filter(c -> c > 1) + .mapToLong(c -> c - 1) + .sum(); + + double actualRate = (double) duplicateCount / count * 100; + + // 允许 ±5% 误差(统计学波动) + assertTrue(actualRate >= 45 && actualRate <= 55, + String.format("实际重复率 %.2f%% 应在 45%%-55%% 范围内", actualRate)); + } + + @Test + @DisplayName("100%重复率 - 所有ID都来自重复池") + void hundredPercentDuplicate() { + DataGenerator gen = new DataGenerator(100); + Set ids = new HashSet<>(); + int count = 10_000; + int warmup = 100; + + // 预热:生成一些初始 ID + for (int i = 0; i < warmup; i++) { + ids.add((String) gen.generateRecord().get("id")); + } + + int initialUniqueCount = ids.size(); + + // 统计:100% 重复率下,新 ID 应全部来自重复池 + for (int i = 0; i < count; i++) { + String id = (String) gen.generateRecord().get("id"); + ids.add(id); + } + + int finalUniqueCount = ids.size(); + int newUniqueIds = finalUniqueCount - initialUniqueCount; + + // 100% 重复率下,新 ID 应极少(统计误差范围内) + // 允许少量误差(预热期后仍有新 ID 生成的边界情况) + assertTrue(newUniqueIds < count * 0.05, + String.format("100%% 重复率下新唯一 ID 数 %d 应 < 5%%", newUniqueIds)); + } + + @Test + @DisplayName("10%重复率 - 低重复场景") + void tenPercentDuplicate() { + DataGenerator gen = new DataGenerator(10); + Map idCounts = new HashMap<>(); + int count = 50_000; + int warmup = 500; + + // 预热 + for (int i = 0; i < warmup; i++) { + gen.generateRecord(); + } + + // 统计 + for (int i = 0; i < count; i++) { + String id = (String) gen.generateRecord().get("id"); + idCounts.put(id, idCounts.getOrDefault(id, 0) + 1); + } + + long duplicateCount = idCounts.values().stream() + .filter(c -> c > 1) + .mapToLong(c -> c - 1) + .sum(); + + double actualRate = (double) duplicateCount / count * 100; + + // 允许 ±3% 误差 + assertTrue(actualRate >= 7 && actualRate <= 13, + String.format("实际重复率 %.2f%% 应在 7%%-13%% 范围内", actualRate)); + } + } + + // ═══════════════════════════════════════════════════════════ + // 3. 重复 ID 分布均匀性测试 + // ═══════════════════════════════════════════════════════════ + + @Nested + @DisplayName("重复 ID 分布均匀性") + class DuplicateDistribution { + + @Test + @DisplayName("50%重复率 - ID访问分布均匀") + void evenDistributionAcrossIdSpace() { + DataGenerator gen = new DataGenerator(50); + int warmup = 10_000; + int count = 100_000; + int bucketCount = 100; + int[] buckets = new int[bucketCount]; + + // 预热 + for (int i = 0; i < warmup; i++) { + gen.generateRecord(); + } + + // 统计重复 ID 的分布 + for (int i = 0; i < count; i++) { + String idStr = (String) gen.generateRecord().get("id"); + long id = Long.parseLong(idStr); + int bucket = (int) (id % bucketCount); + buckets[bucket]++; + } + + // 计算每个桶的期望值和实际偏差 + double expected = (double) count / bucketCount; + double maxDeviation = 0; + + for (int bucket : buckets) { + double deviation = Math.abs(bucket - expected) / expected * 100; + maxDeviation = Math.max(maxDeviation, deviation); + } + + // 最大偏差不应超过 20%(哈希函数保证均匀分布) + assertTrue(maxDeviation < 20, + String.format("ID 分布最大偏差 %.2f%% 应 < 20%%", maxDeviation)); + } + + @Test + @DisplayName("重复ID不集中在某个小区间") + void noHotspotInDuplicateIds() { + DataGenerator gen = new DataGenerator(80); + Map idCounts = new HashMap<>(); + int warmup = 5000; + int count = 50_000; + + // 预热 + for (int i = 0; i < warmup; i++) { + gen.generateRecord(); + } + + // 统计 + for (int i = 0; i < count; i++) { + String id = (String) gen.generateRecord().get("id"); + idCounts.put(id, idCounts.getOrDefault(id, 0) + 1); + } + + // 找出最频繁的 ID + int maxFreq = idCounts.values().stream().max(Integer::compareTo).orElse(0); + double maxFreqRate = (double) maxFreq / count * 100; + + // 单个 ID 不应占总数的 > 1%(证明无热点) + assertTrue(maxFreqRate < 1.0, + String.format("最频繁 ID 占比 %.2f%% 应 < 1%%", maxFreqRate)); + } + + @Test + @DisplayName("100%重复率 - 所有ID均匀分布") + void uniformDistributionAtHundredPercent() { + DataGenerator gen = new DataGenerator(100); + int warmup = 1000; + int count = 20_000; + Map idCounts = new LinkedHashMap<>(); + + // 预热 + for (int i = 0; i < warmup; i++) { + gen.generateRecord(); + } + + // 统计 + for (int i = 0; i < count; i++) { + String id = (String) gen.generateRecord().get("id"); + idCounts.put(id, idCounts.getOrDefault(id, 0) + 1); + } + + // 卡方检验简化版:各桶计数应接近期望 + int uniqueIds = idCounts.size(); + double expected = (double) count / uniqueIds; + double chiSquare = 0; + + for (int observed : idCounts.values()) { + double diff = observed - expected; + chiSquare += (diff * diff) / expected; + } + + // 自由度 = uniqueIds - 1,p=0.01 临界值约为 uniqueIds + 3*sqrt(2*uniqueIds) + double criticalValue = uniqueIds + 3 * Math.sqrt(2 * uniqueIds); + assertTrue(chiSquare < criticalValue, + String.format("卡方值 %.2f 应 < 临界值 %.2f(分布均匀)", chiSquare, criticalValue)); + } + } + + // ═══════════════════════════════════════════════════════════ + // 4. 确定性伪随机可重现性测试 + // ═══════════════════════════════════════════════════════════ + + @Nested + @DisplayName("确定性伪随机") + class DeterministicPseudoRandom { + + @Test + @DisplayName("确定性伪随机可重现性验证") + void deterministicMappingReproducible() { + // MurmurHash3 确定性验证:相同输入产生相同输出 + // 由于 random 是 private,我们验证 ID 生成逻辑的确定性部分 + + // 验证 deterministicDuplicateId 的确定性 + // 通过生成相同模式的记录,验证 ID 分布一致性 + DataGenerator gen1 = new DataGenerator(50, "test_table"); + DataGenerator gen2 = new DataGenerator(50, "test_table"); + + // 生成相同数量的预热记录 + for (int i = 0; i < 1000; i++) { + gen1.generateRecord(); + gen2.generateRecord(); + } + + // 统计后续生成中重复 ID 的分布特征 + Map dist1 = new HashMap<>(); + Map dist2 = new HashMap<>(); + + for (int i = 0; i < 5000; i++) { + String id1 = (String) gen1.generateRecord().get("id"); + String id2 = (String) gen2.generateRecord().get("id"); + dist1.put(id1, dist1.getOrDefault(id1, 0) + 1); + dist2.put(id2, dist2.getOrDefault(id2, 0) + 1); + } + + // 两个生成器应有相似的重复率(统计意义上) + double rate1 = dist1.values().stream().filter(c -> c > 1).mapToInt(Integer::intValue).sum() / 50.0; + double rate2 = dist2.values().stream().filter(c -> c > 1).mapToInt(Integer::intValue).sum() / 50.0; + + // 两者重复率应接近(误差 < 10%) + assertTrue(Math.abs(rate1 - rate2) < 10, + String.format("重复率差异 %.2f%% 应 < 10%%", Math.abs(rate1 - rate2))); + } + } + + // ═══════════════════════════════════════════════════════════ + // 5. 大规模数据生成测试 + // ═══════════════════════════════════════════════════════════ + + @Nested + @DisplayName("大规模数据生成") + class LargeScaleGeneration { + + @Test + @Timeout(value = 60, unit = TimeUnit.SECONDS) + @DisplayName("1000万条生成 - 验证性能") + void tenMillionRecords() { + DataGenerator gen = new DataGenerator(30); + int count = 10_000_000; + + long start = System.currentTimeMillis(); + for (int i = 0; i < count; i++) { + gen.generateRecord(); + } + long elapsed = System.currentTimeMillis() - start; + + double throughput = (double) count / elapsed * 1000; + System.out.printf("1000万条生成耗时: %d ms, 吞吐: %.0f 条/秒%n", elapsed, throughput); + + assertTrue(elapsed < 30_000, "1000万条应在 30s 内完成,实际: " + elapsed + "ms"); + assertTrue(throughput > 100_000, "吞吐应 > 100K 条/秒,实际: " + throughput); + } + + @Test + @DisplayName("内存占用恒定 - 1亿条无OOM") + @Timeout(value = 120, unit = TimeUnit.SECONDS) + void constantMemoryUsage() { + DataGenerator gen = new DataGenerator(50); + int count = 100_000_000; // 1 亿条 + + Runtime rt = Runtime.getRuntime(); + long startMem = rt.totalMemory() - rt.freeMemory(); + + // 生成数据 + for (int i = 0; i < count; i++) { + gen.generateRecord(); + } + + System.gc(); // 提示 GC + long endMem = rt.totalMemory() - rt.freeMemory(); + long memIncrease = endMem - startMem; + + // 内存增长应 < 100 MB(证明无累积) + assertTrue(memIncrease < 100_000_000, + String.format("1亿条后内存增长 %d MB 应 < 100 MB", memIncrease / 1_000_000)); + } + } + + // ═══════════════════════════════════════════════════════════ + // 6. generateUpdateEvent 测试 + // ═══════════════════════════════════════════════════════════ + + @Nested + @DisplayName("generateUpdateEvent") + class UpdateEventGeneration { + + @Test + @DisplayName("无预热时返回null") + void returnsNullWithoutWarmup() { + DataGenerator gen = new DataGenerator(0); + assertNull(gen.generateUpdateEvent(), "无历史记录时应返回 null"); + } + + @Test + @DisplayName("预热后生成有效更新事件") + void generatesValidUpdateAfterWarmup() { + DataGenerator gen = new DataGenerator(0); + + // 预热 + for (int i = 0; i < 100; i++) { + gen.generateRecord(); + } + + // 生成更新事件 + for (int i = 0; i < 50; i++) { + var event = gen.generateUpdateEvent(); + assertNotNull(event, "应生成更新事件"); + assertNotNull(event.getBefore(), "应包含 before 数据"); + assertNotNull(event.getAfter(), "应包含 after 数据"); + assertEquals("test_table", event.getTableId(), "表名应匹配"); + + // before 和 after 的 ID 应相同 + assertEquals( + event.getBefore().get("id"), + event.getAfter().get("id"), + "更新前后 ID 应一致" + ); + } + } + + @Test + @DisplayName("更新事件的ID来自历史生成范围") + void updateEventIdsFromHistory() { + DataGenerator gen = new DataGenerator(0); + int warmup = 10_000; + Set historicalIds = new HashSet<>(); + + // 预热并记录历史 ID + for (int i = 0; i < warmup; i++) { + String id = (String) gen.generateRecord().get("id"); + historicalIds.add(id); + } + + // 生成更新事件,验证 ID 来自历史范围 + for (int i = 0; i < 1000; i++) { + var event = gen.generateUpdateEvent(); + String id = (String) event.getBefore().get("id"); + long idNum = Long.parseLong(id); + + // ID 应在 [1, warmup] 范围内 + assertTrue(idNum >= 1 && idNum <= warmup, + String.format("更新事件 ID %d 应在历史范围内 [1, %d]", idNum, warmup)); + } + } + } + + // ═══════════════════════════════════════════════════════════ + // 7. 边界条件测试 + // ═══════════════════════════════════════════════════════════ + + @Nested + @DisplayName("边界条件") + class BoundaryConditions { + + @Test + @DisplayName("重复率限制在0-100范围") + void duplicateRateClamped() { + // 负值应被限制为 0 + DataGenerator gen1 = new DataGenerator(-10); + // 通过生成大量记录验证无重复 + Set ids = new HashSet<>(); + for (int i = 0; i < 1000; i++) { + ids.add((String) gen1.generateRecord().get("id")); + } + assertEquals(1000, ids.size(), "负重复率应被限制为 0"); + + // 超100应被限制为 100 + DataGenerator gen2 = new DataGenerator(150); + // 预热后应全部重复 + for (int i = 0; i < 100; i++) gen2.generateRecord(); + Set ids2 = new HashSet<>(); + for (int i = 0; i < 1000; i++) { + ids2.add((String) gen2.generateRecord().get("id")); + } + assertTrue(ids2.size() < 100, "150% 重复率应被限制为 100%"); + } + + @Test + @DisplayName("批量生成方法正确性") + void batchGeneration() { + DataGenerator gen = new DataGenerator(0); + int batchSize = 500; + + List events = gen.generateRecordEvents(batchSize); + + assertEquals(batchSize, events.size(), "批量大小应匹配"); + for (TapRecordEvent event : events) { + // generateRecordEvents 生成的是 TapInsertRecordEvent + assertTrue(event instanceof TapInsertRecordEvent, "事件类型应为 TapInsertRecordEvent"); + TapInsertRecordEvent insertEvent = (TapInsertRecordEvent) event; + assertNotNull(insertEvent.getAfter(), "事件应包含 after 数据"); + assertEquals("test_table", event.getTableId(), "表名应匹配"); + } + } + + @Test + @DisplayName("generateTapTable 生成有效表结构") + void generateValidTapTable() { + DataGenerator gen = new DataGenerator(0); + var table = gen.generateTapTable(); + + assertNotNull(table); + assertEquals("test_table", table.getName()); + assertEquals("test_table", table.getId()); + assertTrue(table.primaryKeys().contains("id"), "id 应为主键"); + } + + @Test + @DisplayName("generateRecordsWithRate 回调正确性") + void generateRecordsWithRateCallback() { + DataGenerator gen = new DataGenerator(0); + List receivedBatchSizes = new ArrayList<>(); + int totalRecords = 5000; + int qps = 0; // 不限制 QPS + + gen.generateRecordsWithRate(totalRecords, qps, events -> { + receivedBatchSizes.add(events.size()); + }); + + int totalReceived = receivedBatchSizes.stream().mapToInt(Integer::intValue).sum(); + assertEquals(totalRecords, totalReceived, "总接收记录数应匹配"); + } + } + + // ═══════════════════════════════════════════════════════════ + // 8. 统计分析辅助测试 + // ═══════════════════════════════════════════════════════════ + + @Nested + @DisplayName("统计分析") + class StatisticalAnalysis { + + @Test + @DisplayName("getUniqueIdsCount 准确性") + void uniqueIdsCountAccuracy() { + DataGenerator gen = new DataGenerator(0); + int count = 50_000; + + for (int i = 0; i < count; i++) { + gen.generateRecord(); + } + + assertEquals(count, gen.getUniqueIdsCount(), + "唯一 ID 计数应等于生成次数(0% 重复)"); + } + + @Test + @DisplayName("getTotalGenerated 返回唯一ID数(非调用次数)") + void totalGeneratedAccuracy() { + DataGenerator gen = new DataGenerator(0); + int count = 10_000; + + for (int i = 0; i < count; i++) { + gen.generateRecord(); + } + + // 0% 重复率下,getTotalGenerated 应等于调用次数 + assertEquals(count, gen.getTotalGenerated(), + "0%% 重复率下总生成计数应等于调用次数"); + + // 测试 50% 重复率 + DataGenerator gen2 = new DataGenerator(50); + int warmup = 1000; + int statCount = 10_000; + + // 预热 + for (int i = 0; i < warmup; i++) { + gen2.generateRecord(); + } + + long uniqueBefore = gen2.getTotalGenerated(); + + // 统计 + for (int i = 0; i < statCount; i++) { + gen2.generateRecord(); + } + + long uniqueAfter = gen2.getTotalGenerated(); + long uniqueAdded = uniqueAfter - uniqueBefore; + + // 50% 重复率下,新增唯一 ID 数应约为 statCount * 0.5 + double expectedUnique = statCount * 0.5; + double tolerance = statCount * 0.15; // 允许 15% 误差 + assertTrue(Math.abs(uniqueAdded - expectedUnique) < tolerance, + String.format("50%% 重复率下新增唯一 ID %d 应接近 %.0f (±%.0f)", + uniqueAdded, expectedUnique, tolerance)); + } + + @Test + @DisplayName("重复ID的ID值分布统计") + void duplicateIdValueDistribution() { + DataGenerator gen = new DataGenerator(70); + int warmup = 10000; + int count = 50000; + Map idCounts = new HashMap<>(); + + // 预热 + for (int i = 0; i < warmup; i++) { + gen.generateRecord(); + } + + // 统计 + for (int i = 0; i < count; i++) { + String id = (String) gen.generateRecord().get("id"); + idCounts.put(id, idCounts.getOrDefault(id, 0) + 1); + } + + // 分析重复 ID 的频次分布 + Map freqDistribution = new HashMap<>(); + for (int freq : idCounts.values()) { + freqDistribution.merge(freq, 1, Integer::sum); + } + + System.out.println("\n=== 重复 ID 频次分布 ==="); + freqDistribution.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .forEach(e -> System.out.printf( + "出现 %d 次的 ID 有 %d 个%n", e.getKey(), e.getValue())); + + // 计算实际重复率:基于唯一 ID 数 vs 总记录数 + int uniqueIds = idCounts.size(); + int totalRecords = idCounts.values().stream().mapToInt(Integer::intValue).sum(); + int duplicateRecords = totalRecords - uniqueIds; + double actualDuplicateRate = (double) duplicateRecords / totalRecords * 100; + + System.out.printf("%n实际重复率分析:%n"); + System.out.printf(" 总记录数: %,d%n", totalRecords); + System.out.printf(" 唯一 ID 数: %,d%n", uniqueIds); + System.out.printf(" 重复记录数: %,d%n", duplicateRecords); + System.out.printf(" 实际重复率: %.2f%%%n", actualDuplicateRate); + + // 70% 重复率下,实际重复率应在 60%-80% 范围 + assertTrue(actualDuplicateRate >= 60 && actualDuplicateRate <= 80, + String.format("实际重复率 %.2f%% 应在 60%%-80%% 范围内(配置 70%%)", actualDuplicateRate)); + } + } +} diff --git a/connectors/connector-perf-test/src/test/java/io/tapdata/connector/paimon/perf/S3FileScanDebugTest.java b/connectors/connector-perf-test/src/test/java/io/tapdata/connector/paimon/perf/S3FileScanDebugTest.java new file mode 100644 index 00000000..a5c2efa0 --- /dev/null +++ b/connectors/connector-perf-test/src/test/java/io/tapdata/connector/paimon/perf/S3FileScanDebugTest.java @@ -0,0 +1,176 @@ +package io.tapdata.connector.paimon.perf; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; + +/** + * S3 文件扫描调试测试(使用 AWS S3 SDK) + */ +public class S3FileScanDebugTest { + + // S3 配置(根据你的实际情况修改) + private static final String S3_ENDPOINT = "http://192.168.1.184:9081"; + private static final String S3_ACCESS_KEY = "admin"; + private static final String S3_SECRET_KEY = "admin123"; + private static final String S3_REGION = "us-east-1"; + private static final String S3_BUCKET = "luke"; + private static final String S3_WAREHOUSE = "warehouse-paimon-perf"; + + public static void main(String[] args) { + System.out.println("=== S3 文件扫描调试测试(AWS S3 SDK) ===\n"); + + try { + test1_S3Connection(); + test2_ListBucket(); + test3_ScanWarehouse(); + + System.out.println("\n=== ✅ 所有测试通过 ==="); + } catch (Exception e) { + System.err.println("\n❌ 测试失败: " + e.getMessage()); + e.printStackTrace(); + } + } + + /** + * 测试 1: S3 连接验证 + */ + private static void test1_S3Connection() { + System.out.println("[测试 1] 验证 S3 连接..."); + System.out.println(" Endpoint: " + S3_ENDPOINT); + System.out.println(" Bucket: " + S3_BUCKET); + System.out.println(" Region: " + S3_REGION); + + AmazonS3 s3Client = createS3Client(); + + try { + // 验证 bucket 是否存在 + boolean exists = s3Client.doesBucketExistV2(S3_BUCKET); + System.out.println(" Bucket 存在: " + exists); + + if (exists) { + System.out.println(" ✅ S3 连接成功"); + } else { + System.out.println(" ⚠️ Bucket 不存在,请检查配置"); + } + } catch (Exception e) { + System.out.println(" ❌ 连接失败: " + e.getMessage()); + } finally { + s3Client.shutdown(); + } + } + + /** + * 测试 2: 列出 Bucket 内容 + */ + private static void test2_ListBucket() { + System.out.println("\n[测试 2] 列出 Bucket 内容..."); + AmazonS3 s3Client = createS3Client(); + + try { + com.amazonaws.services.s3.model.ObjectListing listing = s3Client.listObjects(S3_BUCKET); + + System.out.println(" 对象总数: " + listing.getObjectSummaries().size()); + + if (!listing.getObjectSummaries().isEmpty()) { + System.out.println(" 前 10 个对象:"); + int count = 0; + for (S3ObjectSummary summary : listing.getObjectSummaries()) { + if (count >= 10) break; + System.out.printf(" - %s (%s)%n", summary.getKey(), formatSize(summary.getSize())); + count++; + } + } + + System.out.println(" ✅ Bucket 列出成功"); + } catch (Exception e) { + System.out.println(" ❌ 列出失败: " + e.getMessage()); + } finally { + s3Client.shutdown(); + } + } + + /** + * 测试 3: 扫描 Warehouse 目录 + */ + private static void test3_ScanWarehouse() { + System.out.println("\n[测试 3] 扫描 Warehouse 目录..."); + String warehousePath = S3_WAREHOUSE; + System.out.println(" Warehouse: " + warehousePath); + + AmazonS3 s3Client = createS3Client(); + + try { + // 递归列出所有对象 + int fileCount = 0; + long totalSize = 0; + int parquetCount = 0; + + com.amazonaws.services.s3.model.ObjectListing listing = null; + do { + if (listing == null) { + listing = s3Client.listObjects(S3_BUCKET, warehousePath); + } else { + listing = s3Client.listNextBatchOfObjects(listing); + } + + for (S3ObjectSummary summary : listing.getObjectSummaries()) { + String key = summary.getKey(); + fileCount++; + totalSize += summary.getSize(); + + if (key.endsWith(".parquet")) { + parquetCount++; + } + + // 显示前 10 个 parquet 文件 + if (parquetCount <= 10 && key.endsWith(".parquet")) { + System.out.printf(" 📄 %s (%s)%n", key, formatSize(summary.getSize())); + } + } + } while (listing.isTruncated()); + + System.out.println("\n 统计信息:"); + System.out.printf(" 总文件数: %d%n", fileCount); + System.out.printf(" Parquet 文件: %d%n", parquetCount); + System.out.printf(" 总大小: %s%n", formatSize(totalSize)); + + if (fileCount > 0) { + System.out.println(" ✅ Warehouse 扫描成功"); + } else { + System.out.println(" ⚠️ 未找到文件"); + } + + } catch (Exception e) { + System.out.println(" ❌ 扫描失败: " + e.getMessage()); + e.printStackTrace(); + } finally { + s3Client.shutdown(); + } + } + + /** + * 创建 S3 客户端 + */ + private static AmazonS3 createS3Client() { + BasicAWSCredentials awsCreds = new BasicAWSCredentials(S3_ACCESS_KEY, S3_SECRET_KEY); + + return AmazonS3ClientBuilder.standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(S3_ENDPOINT, S3_REGION)) + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .withPathStyleAccessEnabled(true) // MinIO 需要 + .disableChunkedEncoding() + .build(); + } + + private static String formatSize(long size) { + if (size < 1024) return size + " B"; + if (size < 1024 * 1024) return String.format("%.2f KB", size / 1024.0); + if (size < 1024 * 1024 * 1024) return String.format("%.2f MB", size / (1024.0 * 1024)); + return String.format("%.2f GB", size / (1024.0 * 1024 * 1024)); + } +} diff --git a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/PaimonConnector.java b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/PaimonConnector.java index c7709949..57fc17ea 100644 --- a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/PaimonConnector.java +++ b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/PaimonConnector.java @@ -77,7 +77,7 @@ public void onStart(TapConnectionContext connectionContext) throws Throwable { connectionContext.getLog().info("Flush offset callback registered for StarRocks connector"); } // Initialize Paimon service - paimonService = new PaimonService(paimonConfig); + paimonService = new PaimonService(paimonConfig, connectionContext.getLog()); paimonService.setFlushOffsetCallback(flushOffsetCallback); paimonService.init(); @@ -248,7 +248,7 @@ private CreateTableOptions createTable(TapConnectorContext connectorContext, TapTable table = createTableEvent.getTable(); log.info("Creating Paimon table: " + table.getName()); - boolean created = paimonService.createTable(table, log); + boolean created = paimonService.createTable(table); if (created) { log.info("Table created successfully: " + table.getName()); } else { diff --git a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/config/PaimonConfig.java b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/config/PaimonConfig.java index 218e5d0e..09a087cc 100644 --- a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/config/PaimonConfig.java +++ b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/config/PaimonConfig.java @@ -94,8 +94,8 @@ public class PaimonConfig extends CommonDbConfig implements Serializable { // Compaction merges small files for better query performance private Boolean enableAutoCompaction = true; - // Compaction interval in minutes (default: 30 minutes) - private Integer compactionIntervalMinutes = 30; + // Full Compaction interval in minutes (default: 60 minutes) + private Integer compactionIntervalMinutes = 60; // Target file size in MB (default: 128MB) // Paimon will try to create files of this size diff --git a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/ManagedIOStreamTableWrite.java b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/ManagedIOStreamTableWrite.java new file mode 100644 index 00000000..cc20c9bf --- /dev/null +++ b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/ManagedIOStreamTableWrite.java @@ -0,0 +1,119 @@ +package io.tapdata.connector.paimon.service; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.io.BundleRecords; +import org.apache.paimon.memory.MemoryPoolFactory; +import org.apache.paimon.metrics.MetricRegistry; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.TableWrite; +import org.apache.paimon.types.RowType; + +import java.util.List; + +/** + * StreamTableWrite wrapper that owns IOManager lifecycle for non-Flink usage. + * + *

Close order is strict: close write first, then close IOManager in finally block. + */ +public class ManagedIOStreamTableWrite implements StreamTableWrite { + + private final StreamTableWrite delegate; + private final IOManager ioManager; + + public ManagedIOStreamTableWrite(StreamTableWrite delegate, IOManager ioManager) { + this.delegate = delegate; + this.ioManager = ioManager; + } + + @Override + public TableWrite withIOManager(IOManager ioManager) { + delegate.withIOManager(ioManager); + return this; + } + + @Override + public TableWrite withWriteType(RowType writeType) { + delegate.withWriteType(writeType); + return this; + } + + @Override + public TableWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) { + delegate.withMemoryPoolFactory(memoryPoolFactory); + return this; + } + + @Override + public BinaryRow getPartition(InternalRow row) { + return delegate.getPartition(row); + } + + @Override + public int getBucket(InternalRow row) { + return delegate.getBucket(row); + } + + @Override + public void write(InternalRow row) throws Exception { + delegate.write(row); + } + + @Override + public void write(InternalRow row, int bucket) throws Exception { + delegate.write(row, bucket); + } + + @Override + public void writeBundle(BinaryRow partition, int bucket, BundleRecords bundle) throws Exception { + delegate.writeBundle(partition, bucket, bundle); + } + + @Override + public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception { + delegate.compact(partition, bucket, fullCompaction); + } + + @Override + public TableWrite withMetricRegistry(MetricRegistry registry) { + delegate.withMetricRegistry(registry); + return this; + } + + @Override + public List prepareCommit(boolean waitCompaction, long commitIdentifier) throws Exception { + return delegate.prepareCommit(waitCompaction, commitIdentifier); + } + + @Override + public void close() throws Exception { + Exception writeError = null; + try { + delegate.close(); + } catch (Exception e) { + writeError = e; + } + + Exception ioManagerError = null; + if (ioManager != null) { + try { + ioManager.close(); + } catch (Exception e) { + ioManagerError = e; + } + } + + if (writeError != null) { + if (ioManagerError != null) { + writeError.addSuppressed(ioManagerError); + } + throw writeError; + } + if (ioManagerError != null) { + throw ioManagerError; + } + } +} + diff --git a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java index c0d2f8e4..45c6079d 100644 --- a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java +++ b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java @@ -9,7 +9,6 @@ import io.tapdata.entity.event.dml.TapRecordEvent; import io.tapdata.entity.event.dml.TapUpdateRecordEvent; import io.tapdata.entity.logger.Log; -import io.tapdata.entity.logger.TapLogger; import io.tapdata.entity.schema.TapField; import io.tapdata.entity.schema.TapIndex; import io.tapdata.entity.schema.TapIndexField; @@ -34,10 +33,7 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.Table; -import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.table.sink.StreamTableCommit; -import org.apache.paimon.table.sink.StreamTableWrite; -import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.sink.*; import org.apache.paimon.table.source.*; import org.apache.paimon.table.source.TableScan.Plan; import org.apache.paimon.types.*; @@ -131,8 +127,13 @@ protected boolean removeEldestEntry(Map.Entry> elde } } ); + /** + * save tapContext log + */ + private Log log; - public PaimonService(PaimonConfig config) { + public PaimonService(PaimonConfig config, Log log) { + this.log = log; this.config = config; this.firstOffsetByTable = Collections.synchronizedMap(new LinkedHashMap<>()); } @@ -158,7 +159,7 @@ public void init() throws Exception { // Create catalog catalog = CatalogFactory.createCatalog(context); -// // Initialize async commit if enabled + // Initialize async commit if enabled initAsyncCommit(); } @@ -196,7 +197,7 @@ private void initAsyncCommit() { } } catch (Exception e) { // Log error but don't stop the scheduler - System.err.println("Error in async commit: " + e.getMessage()); + log.error("Error in async commit: {}", e.getMessage(), e); } }, commitInterval, commitInterval, TimeUnit.MILLISECONDS); } @@ -223,6 +224,14 @@ private void configureStorage(Options options) { options.set("s3.upload.part-size", "16mb"); options.set("s3.fast-upload", "true"); options.set("s3.accelerate-mode", "true"); + // 解决连接重置:调低并发、增大超时 +// options.set("fs.s3a.connection.maximum", "32"); +// options.set("fs.s3a.connection.timeout", "300000"); +// options.set("fs.s3a.socket.timeout", "300000"); +// // 重试机制(解决临时连接失败) +// options.set("fs.s3a.retry.limit", "5"); +// options.set("fs.s3a.retry.interval", "1000"); + break; case "hdfs": options.set("fs.defaultFS", "hdfs://" + config.getHdfsHost() + ":" + config.getHdfsPort()); @@ -477,7 +486,7 @@ private String convertDataType(DataType dataType) { * @return true if created, false if already exists * @throws Exception if creation fails */ - public boolean createTable(TapTable tapTable, Log log) throws Exception { + public boolean createTable(TapTable tapTable) throws Exception { String database = config.getDatabase(); String tableName = tapTable.getName(); @@ -574,7 +583,6 @@ public boolean createTable(TapTable tapTable, Log log) throws Exception { if (Boolean.TRUE.equals(config.getDiskOverflowWrite())) { schemaBuilder.option("write-buffer-spillable", "true"); schemaBuilder.option("write-buffer-spill.max-disk-size", config.getDiskMaxSize() + "gb"); - schemaBuilder.option("write-buffer-spill.tmp-dirs", config.getDiskTmpDir(tableName)); } // 2. Target file size - Paimon will try to create files of this size @@ -587,25 +595,24 @@ public boolean createTable(TapTable tapTable, Log log) throws Exception { if (config.getEnableAutoCompaction(tableName) != null) { if (config.getEnableAutoCompaction(tableName)) { // Enable full compaction for better query performance - schemaBuilder.option("compaction.async.enabled", "true"); schemaBuilder.option("compaction.optimization-interval", config.getCompactionIntervalMinutes(tableName) + "min"); // Set compaction strategy schemaBuilder.option("changelog-producer", "input"); // Compact small files more aggressively - schemaBuilder.option("num-sorted-run.compaction-trigger", "3"); - schemaBuilder.option("num-sorted-run.stop-trigger", "5"); + schemaBuilder.option("num-sorted-run.compaction-trigger", "30"); + schemaBuilder.option("num-sorted-run.stop-trigger", "2147483647"); } else { // Disable auto compaction - schemaBuilder.option("compaction.optimization-interval", "0"); + schemaBuilder.option("write-only", "true"); } } // 4. Snapshot settings for better performance // Keep more snapshots in memory for faster access - schemaBuilder.option("snapshot.num-retained.min", "5"); - schemaBuilder.option("snapshot.num-retained.max", "50"); + schemaBuilder.option("snapshot.num-retained.min", "2"); + schemaBuilder.option("snapshot.num-retained.max", "5"); schemaBuilder.option("snapshot.time-retained", "30min"); // 5. Commit settings @@ -622,7 +629,15 @@ public boolean createTable(TapTable tapTable, Log log) throws Exception { schemaBuilder.option("sink.parallelism", String.valueOf(config.getWriteThreads())); if (EmptyKit.isNotEmpty(config.getTableProperties(tableName))) { - config.getTableProperties(tableName).forEach(v -> schemaBuilder.option(v.get("propKey"), v.get("propValue"))); + config.getTableProperties(tableName).forEach(v -> { + if (StringUtils.isEmpty(v.get("propKey")) + || StringUtils.isEmpty(v.get("propValue")) + ) { + log.warn("tapdata paimon config error", "key or value exists null in tableProperties"); + } else { + schemaBuilder.option(v.get("propKey"), v.get("propValue")); + } + }); } // Create table catalog.createTable(identifier, schemaBuilder.build(), false); @@ -985,6 +1000,7 @@ private WriteListResult writeRecordsWithStreamWriteInternal(List // Double-check if we still need to commit (another thread might have committed) int finalCount = recordCount.get(); if (finalCount > 0) { + long commitStartTime = System.currentTimeMillis(); // Prepare commit with commitIdentifier // Use atomic counter to generate unique, incrementing commit identifier long commitIdentifier = commitIdentifierGenerator.incrementAndGet(); @@ -997,8 +1013,9 @@ private WriteListResult writeRecordsWithStreamWriteInternal(List recordCount.set(0); lastCommit.set(System.currentTimeMillis()); - connectorContext.getLog().debug("Committed {} accumulated records for table {}", - finalCount, tableKey); + long commitDuration = System.currentTimeMillis() - commitStartTime; + connectorContext.getLog().debug("Committed {} accumulated records for table {} in {} ms", + finalCount, tableKey, commitDuration); } } } @@ -1009,14 +1026,14 @@ private WriteListResult writeRecordsWithStreamWriteInternal(List } catch (Exception e) { if (retryCount < maxRetries) { if (isThreadGroupDestroyedError(e)) { - connectorContext.getLog().warn("ThreadGroup destroyed in stream write, retrying... (attempt {}/{})", retryCount + 1, maxRetries); + connectorContext.getLog().warn("ThreadGroup destroyed in stream write, retrying... (attempt {}/{})", retryCount + 1, maxRetries, e); } else if (isPaimonConflict(e)) { connectorContext.getLog().warn("Commit conflict detected, retrying... (attempt {}/{})", retryCount + 1, maxRetries, e); } else { - connectorContext.getLog().warn("Failed to write records to table {}, error message: {}, retrying... (attempt {}/{})", tableName, e.getMessage(), retryCount + 1, maxRetries); + connectorContext.getLog().warn("Failed to write records to table {}, error message: {}, retrying... (attempt {}/{})", tableName, e.getMessage(), retryCount + 1, maxRetries, e); } retryCount++; - reinitCatalog(); +// reinitCatalog(); CommonUtils.ignoreAnyError(() -> TimeUnit.SECONDS.sleep(1L), TAG); continue; } @@ -1231,10 +1248,11 @@ private StreamTableWrite createStreamWriter(Identifier identifier) throws Except StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder(); String tmpDirs = config.getDiskTmpDir(table.name()); if (StringUtils.isEmpty(tmpDirs)) { - return writeBuilder.newWrite(); + return new ManagedIOStreamTableWrite(writeBuilder.newWrite(), null); } else { - return (StreamTableWrite) writeBuilder.newWrite() - .withIOManager(IOManager.create(splitPaths(tmpDirs))); + IOManager ioManager = IOManager.create(splitPaths(tmpDirs)); + StreamTableWrite streamTableWrite = (StreamTableWrite) writeBuilder.newWrite().withIOManager(ioManager); + return new ManagedIOStreamTableWrite(streamTableWrite, ioManager); } } @@ -1338,7 +1356,7 @@ private void handleStreamInsert(TapInsertRecordEvent event, StreamTableWrite wri String database = config.getDatabase(); Identifier identifier = Identifier.create(database, table.getName()); GenericRow row = convertToGenericRow(after, table, identifier); - if (config.getBucketMode(table.getName()).equals("fixed") || config.getBucketCount() == -2) { + if (config.getBucketMode(table.getName()).equals("fixed")) { writer.write(row); } else { int bucket = selectBucketForDynamic(row, table); @@ -2700,7 +2718,7 @@ private void commitCallback(String tableName) { try { flushOffsetCallback.accept(offsetToSave); } catch (Exception e) { - TapLogger.warn("Failed to flush offset callback: {}", e.getMessage(), e); + log.warn("Failed to flush offset callback: {}", e.getMessage()); } } } diff --git a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/TestCase.java b/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/TestCase.java deleted file mode 100644 index 33d2b59d..00000000 --- a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/perf/TestCase.java +++ /dev/null @@ -1,473 +0,0 @@ -package io.tapdata.connector.paimon.perf; - -import java.util.*; - -import static io.tapdata.connector.paimon.perf.PerformanceTestRunner.TOTAL_RECORDS; -import static io.tapdata.connector.paimon.perf.PerformanceTestRunner.BASE_TEST_DIR; - -/** - * 性能测试用例定义 - * 参数键直接对应 Paimon CoreOptions 表选项名称,可通过 tableProperties 直接注入 - * - * 注意:所有测试用例的 dataSize 统一使用 BATCH_SIZE,便于集中控制测试数据量 - */ -public class TestCase { - private final String id; - private final String name; - private final String group; - private final Map parameters; - private final int primaryKeyDuplicateRate; // 0-100 - private final int qps; // 0 = unlimited - private final String description; - - public TestCase(String id, String name, String group, - Map parameters, - int primaryKeyDuplicateRate, int qps, - String description) { - this.id = id; - this.name = name; - this.group = group; - this.parameters = parameters; - this.primaryKeyDuplicateRate = primaryKeyDuplicateRate; - this.qps = qps; - this.description = description; - } - - // ─── Accessors ──────────────────────────────────────────────────────────── - - public String getId() { return id; } - public String getName() { return name; } - public String getGroup() { return group; } - public Map getParameters() { return parameters; } - - /** - * 获取数据量:统一使用 BATCH_SIZE 控制 - * 所有测试用例的数据量都由 PerformanceTestRunner.BATCH_SIZE 统一管理 - */ - public long getDataSize() { return TOTAL_RECORDS; } - - public int getPrimaryKeyDuplicateRate() { return primaryKeyDuplicateRate; } - public int getQps() { return qps; } - public String getDescription() { return description; } - - @Override - public String toString() { - return String.format("[%s] %s (%s) - %s [数据量: %,d]", - id, name, group, description, TOTAL_RECORDS); - } - - // ─── Builder helper ─────────────────────────────────────────────────────── - - private static Map params(String... kvs) { - Map m = new LinkedHashMap<>(); - for (int i = 0; i + 1 < kvs.length; i += 2) m.put(kvs[i], kvs[i + 1]); - return m; - } - - // ─── 5.1 基础测试用例组 ──────────────────────────────────────────────────── - - public static List createBasicTests() { - return Arrays.asList( - new TestCase("TC-01", "基准测试(默认配置)", "基础测试", - params( - "write-buffer-size", "256mb", - "target-file-size", "128mb", - "file.format", "parquet", - "file.compression", "zstd", - "bucket", "-1", - "compaction.async.enabled", "true", - "num-sorted-run.compaction-trigger", "5", - "num-sorted-run.stop-trigger", "8", - "write-only", "false" - ), - 0, 0, - "默认配置基线,建立性能参考点"), - - new TestCase("TC-02", "大数据量测试", "基础测试", - params( - "write-buffer-size", "512mb", - "write-buffer-spillable", "true", - "target-file-size", "256mb", - "file.format", "parquet", - "file.compression", "zstd", - "bucket", "-1", - "compaction.async.enabled", "false", - "write-only", "true" - ), - 0, 0, - "大数据量写入,验证稳定性和文件大小"), - - new TestCase("TC-03", "小批量测试", "基础测试", - params( - "write-buffer-size", "64mb", - "target-file-size", "64mb", - "file.format", "parquet", - "bucket", "-1", - "compaction.async.enabled", "true" - ), - 0, 0, - "小数据量,验证最小写入场景") - ); - } - - // ─── 5.2 写入缓冲区测试组 ───────────────────────────────────────────────── - - public static List createWriteBufferTests() { - return Arrays.asList( - new TestCase("TC-10", "缓冲区-64MB", "写入缓冲区", - params("write-buffer-size", "64mb", "write-buffer-spillable", "true", - "target-file-size", "128mb", "bucket", "-1", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "64MB缓冲区,验证小缓冲区写入行为和文件数量"), - - new TestCase("TC-11", "缓冲区-128MB", "写入缓冲区", - params("write-buffer-size", "128mb", "write-buffer-spillable", "true", - "target-file-size", "128mb", "bucket", "-1", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "128MB缓冲区,中等缓冲区写入行为"), - - new TestCase("TC-12", "缓冲区-256MB(默认)", "写入缓冲区", - params("write-buffer-size", "256mb", "write-buffer-spillable", "true", - "target-file-size", "128mb", "bucket", "-1", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "256MB缓冲区(默认值)"), - - new TestCase("TC-13", "缓冲区-512MB", "写入缓冲区", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "256mb", "bucket", "-1", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "512MB缓冲区,大缓冲区减少flush次数"), - - new TestCase("TC-14", "缓冲区-1024MB", "写入缓冲区", - params("write-buffer-size", "1024mb", "write-buffer-spillable", "true", - "target-file-size", "512mb", "bucket", "-1", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "1GB缓冲区,超大缓冲区验证极限"), - - new TestCase("TC-15", "缓冲区-不可溢出", "写入缓冲区", - params("write-buffer-size", "256mb", "write-buffer-spillable", "false", - "target-file-size", "128mb", "bucket", "-1", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "关闭溢写,缓冲区满时触发flush,验证OOM风险"), - - new TestCase("TC-16", "缓冲区-可溢出", "写入缓冲区", - params("write-buffer-size", "64mb", "write-buffer-spillable", "true", - "write-buffer-spill.max-disk-size", "1gb", - "target-file-size", "128mb", "bucket", "-1", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "小缓冲区+溢写磁盘,验证溢写行为和最终文件大小") - ); - } - - // ─── 5.3 目标文件大小测试组 ─────────────────────────────────────────────── - - public static List createTargetFileSizeTests() { - return Arrays.asList( - new TestCase("TC-20", "目标文件-64MB", "目标文件大小", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "64mb", "bucket", "-1", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "64MB目标文件,验证小目标文件的文件数量"), - - new TestCase("TC-21", "目标文件-128MB(默认)", "目标文件大小", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "128mb", "bucket", "-1", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "128MB目标文件(默认值)"), - - new TestCase("TC-22", "目标文件-256MB", "目标文件大小", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "256mb", "bucket", "-1", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "256MB目标文件,减少文件数量"), - - new TestCase("TC-23", "目标文件-512MB", "目标文件大小", - params("write-buffer-size", "1024mb", "write-buffer-spillable", "true", - "target-file-size", "512mb", "bucket", "-1", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "512MB目标文件,超大目标文件") - ); - } - - // ─── 5.4 分桶策略测试组 ─────────────────────────────────────────────────── - - public static List createBucketTests() { - return Arrays.asList( - new TestCase("TC-30", "分桶-bucket=-2(延迟分桶)", "分桶策略", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "256mb", "bucket", "-2", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "bucket=-2 延迟分桶,验证自动调整分桶性能和文件布局"), - - new TestCase("TC-31", "分桶-动态(bucket=-1)", "分桶策略", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "256mb", "bucket", "-1", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "bucket=-1 动态分桶,基准对比"), - - new TestCase("TC-32", "分桶-固定4桶", "分桶策略", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "256mb", "bucket", "4", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "固定4桶,验证固定分桶文件分布"), - - new TestCase("TC-33", "分桶-固定8桶", "分桶策略", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "256mb", "bucket", "8", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "固定8桶,验证更多分桶的文件分布"), - - new TestCase("TC-34", "分桶-固定16桶", "分桶策略", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "256mb", "bucket", "16", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "固定16桶"), - - new TestCase("TC-35", "分桶-固定32桶", "分桶策略", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "256mb", "bucket", "32", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "固定32桶,桶数过多时的文件碎片化") - ); - } - - // ─── 5.5 合并(Compaction)策略测试组 ────────────────────────────────────── - - public static List createCompactionTests() { - return Arrays.asList( - new TestCase("TC-40", "合并-激进(trigger=2)", "合并策略", - params("write-buffer-size", "256mb", "target-file-size", "128mb", - "bucket", "-1", - "compaction.async.enabled", "true", - "num-sorted-run.compaction-trigger", "2", - "num-sorted-run.stop-trigger", "5", - "compaction.size-ratio", "1", - "write-only", "false"), - 0, 0, "激进合并:trigger=2,合并最频繁,验证合并开销"), - - new TestCase("TC-41", "合并-默认(trigger=5)", "合并策略", - params("write-buffer-size", "256mb", "target-file-size", "128mb", - "bucket", "-1", - "compaction.async.enabled", "true", - "num-sorted-run.compaction-trigger", "5", - "num-sorted-run.stop-trigger", "8", - "write-only", "false"), - 0, 0, "默认合并参数"), - - new TestCase("TC-42", "合并-保守(trigger=10)", "合并策略", - params("write-buffer-size", "256mb", "target-file-size", "128mb", - "bucket", "-1", - "compaction.async.enabled", "true", - "num-sorted-run.compaction-trigger", "10", - "num-sorted-run.stop-trigger", "20", - "write-only", "false"), - 0, 0, "保守合并:trigger=10,减少合并次数"), - - new TestCase("TC-43", "合并-超保守(trigger=100)", "合并策略", - params("write-buffer-size", "512mb", "target-file-size", "256mb", - "bucket", "-1", - "compaction.async.enabled", "true", - "num-sorted-run.compaction-trigger", "100", - "num-sorted-run.stop-trigger", "200", - "write-only", "false"), - 0, 0, "超保守:trigger=100,几乎不触发合并,接近write-only效果"), - - new TestCase("TC-44", "合并-禁用异步", "合并策略", - params("write-buffer-size", "256mb", "target-file-size", "128mb", - "bucket", "-1", - "compaction.async.enabled", "false", - "write-only", "false"), - 0, 0, "关闭异步合并,合并仅在写入时同步执行"), - - new TestCase("TC-45", "合并-write-only模式", "合并策略", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "256mb", "bucket", "-1", - "compaction.async.enabled", "false", - "write-only", "true"), - 0, 0, "write-only=true:完全跳过合并,最大化写入吞吐") - ); - } - - // ─── 5.6 无小文件写入测试组(重点) ──────────────────────────────────────── - - public static List createNoSmallFileTests() { - return Arrays.asList( - new TestCase("TC-50", "无小文件-大缓冲无合并", "无小文件", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "256mb", "bucket", "1", - "compaction.async.enabled", "false", "write-only", "true", - "num-sorted-run.compaction-trigger", "100", - "num-sorted-run.stop-trigger", "200"), - 0, 0, - "大缓冲(512MB)+大目标文件(256MB)+禁止合并,一次性写入验证"), - - new TestCase("TC-51", "无小文件-溢写验证", "无小文件", - params("write-buffer-size", "128mb", "write-buffer-spillable", "true", - "write-buffer-spill.max-disk-size", "2gb", - "write-buffer-spill.tmp-dirs", BASE_TEST_DIR + "/tmp," + BASE_TEST_DIR + "/tmp2", - "target-file-size", "256mb", "bucket", "1", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, - "小缓冲(128MB)+磁盘溢写,验证溢写后文件是否仍然大"), - - new TestCase("TC-52", "无小文件-bucket=-2组合", "无小文件", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "256mb", "bucket", "-2", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, - "性能最好配置:bucket=-2+大缓冲+禁止合并,验证延迟分桶的无小文件效果,但需要compaction数据才能可见"), - - new TestCase("TC-53", "无小文件-增加local-merge-buffer", "无小文件", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "512mb", "bucket", "1", - "compaction.async.enabled", "false", "write-only", "true", - "local-merge-buffer-size", "64mb", - "sort-spill-buffer-size", "128mb", - "num-sorted-run.compaction-trigger", "100", - "num-sorted-run.stop-trigger", "200"), - 0, 0, - "多bucket最优生产配置:所有参数协同优化,预期最少文件数、最大文件"), - new TestCase("TC-54", "无小文件-去除changelog", "无小文件", - params("write-buffer-size", "512mb", "write-buffer-spillable", "true", - "target-file-size", "512mb", "bucket", "1", - "compaction.async.enabled", "false", "write-only", "true", - "local-merge-buffer-size", "64mb", - "sort-spill-buffer-size", "128mb", - "num-sorted-run.compaction-trigger", "100", - "num-sorted-run.stop-trigger", "200", - "changelog-producer", "input"), - 0, 0, - "多bucket最优生产配置:所有参数协同优化,预期最少文件数、最大文件") - ); - } - - // ─── 5.7 文件格式和压缩测试组 ───────────────────────────────────────────── - - public static List createFormatCompressionTests() { - return Arrays.asList( - new TestCase("TC-60", "格式-Parquet+ZSTD(默认)", "文件格式", - params("write-buffer-size", "256mb", "target-file-size", "128mb", - "file.format", "parquet", "file.compression", "zstd", - "bucket", "-1", "write-only", "true"), - 0, 0, "Parquet+ZSTD,默认配置基准"), - - new TestCase("TC-61", "格式-Parquet+LZ4", "文件格式", - params("write-buffer-size", "256mb", "target-file-size", "128mb", - "file.format", "parquet", "file.compression", "lz4", - "bucket", "-1", "write-only", "true"), - 0, 0, "Parquet+LZ4,更快压缩速度但压缩率稍低"), - - new TestCase("TC-62", "格式-Parquet+无压缩", "文件格式", - params("write-buffer-size", "256mb", "target-file-size", "128mb", - "file.format", "parquet", "file.compression", "none", - "bucket", "-1", "write-only", "true"), - 0, 0, "Parquet+无压缩,最快写入速度但文件最大"), - - new TestCase("TC-63", "格式-ORC+ZSTD", "文件格式", - params("write-buffer-size", "256mb", "target-file-size", "128mb", - "file.format", "orc", "file.compression", "zstd", - "bucket", "-1", "write-only", "true"), - 0, 0, "ORC+ZSTD,列存格式写入性能对比"), - - new TestCase("TC-64", "格式-ORC+LZ4", "文件格式", - params("write-buffer-size", "256mb", "target-file-size", "128mb", - "file.format", "orc", "file.compression", "lz4", - "bucket", "-1", "write-only", "true"), - 0, 0, "ORC+LZ4") - ); - } - - // ─── 5.8 主键更新测试组 ─────────────────────────────────────────────────── - - public static List createPrimaryKeyUpdateTests() { - return Arrays.asList( - new TestCase("TC-70", "主键-纯插入(0%重复)", "主键更新", - params("write-buffer-size", "256mb", "target-file-size", "128mb", - "bucket", "-1", "compaction.async.enabled", "true", - "write-only", "false"), - 0, 0, "0%主键重复,纯INSERT性能基准"), - - new TestCase("TC-71", "主键-10%重复", "主键更新", - params("write-buffer-size", "256mb", "target-file-size", "128mb", - "bucket", "-1", "compaction.async.enabled", "true", - "write-only", "false"), - 10, 0, "10%主键重复(低更新率),触发部分UPDATE合并"), - - new TestCase("TC-72", "主键-30%重复", "主键更新", - params("write-buffer-size", "256mb", "target-file-size", "128mb", - "bucket", "-1", "compaction.async.enabled", "true", - "write-only", "false"), - 30, 0, "30%主键重复(中等更新率)"), - - new TestCase("TC-73", "主键-50%重复", "主键更新", - params("write-buffer-size", "256mb", "target-file-size", "128mb", - "bucket", "-1", "compaction.async.enabled", "true", - "write-only", "false"), - 50, 0, "50%主键重复(高更新率),大量合并开销") - ); - } - - // ─── 5.9 并行度测试组 ───────────────────────────────────────────────────── - - public static List createParallelismTests() { - return Arrays.asList( - new TestCase("TC-80", "并行度-1线程", "写入并行度", - params("write-buffer-size", "512mb", "target-file-size", "256mb", - "bucket", "1", "sink.parallelism", "1", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "单线程写入,最少文件数"), - - new TestCase("TC-81", "并行度-2线程", "写入并行度", - params("write-buffer-size", "512mb", "target-file-size", "256mb", - "bucket", "2", "sink.parallelism", "2", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "2线程写入"), - - new TestCase("TC-82", "并行度-4线程", "写入并行度", - params("write-buffer-size", "512mb", "target-file-size", "256mb", - "bucket", "4", "sink.parallelism", "4", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "4线程写入(默认)"), - - new TestCase("TC-83", "并行度-8线程", "写入并行度", - params("write-buffer-size", "512mb", "target-file-size", "256mb", - "bucket", "8", "sink.parallelism", "8", - "compaction.async.enabled", "false", "write-only", "true"), - 0, 0, "8线程写入") - ); - } - - // ─── 组合工厂方法 ───────────────────────────────────────────────────────── - - public static List createAllTests() { - List all = new ArrayList<>(); - all.addAll(createBasicTests()); - all.addAll(createWriteBufferTests()); - all.addAll(createTargetFileSizeTests()); - all.addAll(createBucketTests()); - all.addAll(createCompactionTests()); - all.addAll(createNoSmallFileTests()); - all.addAll(createFormatCompressionTests()); - all.addAll(createPrimaryKeyUpdateTests()); - all.addAll(createParallelismTests()); - return all; - } - - /** - * 获取所有支持的测试组名称 - */ - public static Map getGroupDescriptions() { - Map m = new LinkedHashMap<>(); - m.put("basic", "基础测试用例(TC-01~03) - 默认配置基线"); - m.put("buffer", "写入缓冲区测试(TC-10~16) - write-buffer-size / spillable"); - m.put("target", "目标文件大小测试(TC-20~23) - target-file-size"); - m.put("bucket", "分桶策略测试(TC-30~35) - bucket=-2/-1/固定"); - m.put("compaction", "合并策略测试(TC-40~45) - compaction trigger/stop/write-only"); - m.put("nosmallfile", "无小文件写入测试(TC-50~53) - 生产最优配置"); - m.put("format", "文件格式&压缩测试(TC-60~64) - parquet/orc + zstd/lz4"); - m.put("pkupdate", "主键更新测试(TC-70~73) - 重复率 0%/10%/30%/50%"); - m.put("parallelism", "写入并行度测试(TC-80~83) - sink.parallelism"); - m.put("all", "全量测试(所有组)"); - return m; - } -} diff --git a/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/service/ManagedIOStreamTableWriteTest.java b/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/service/ManagedIOStreamTableWriteTest.java new file mode 100644 index 00000000..089dc896 --- /dev/null +++ b/connectors/paimon-connector/src/test/java/io/tapdata/connector/paimon/service/ManagedIOStreamTableWriteTest.java @@ -0,0 +1,209 @@ +package io.tapdata.connector.paimon.service; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.BufferFileReader; +import org.apache.paimon.disk.BufferFileWriter; +import org.apache.paimon.disk.FileIOChannel; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.io.BundleRecords; +import org.apache.paimon.memory.MemoryPoolFactory; +import org.apache.paimon.metrics.MetricRegistry; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.TableWrite; +import org.apache.paimon.types.RowType; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ManagedIOStreamTableWriteTest { + + @Test + void closeShouldCloseWriteThenIoManager() throws Exception { + List order = new ArrayList<>(); + StreamTableWrite delegate = new FakeStreamTableWrite(order, null); + IOManager ioManager = new FakeIOManager(order, null); + ManagedIOStreamTableWrite writer = new ManagedIOStreamTableWrite(delegate, ioManager); + + writer.close(); + + assertEquals(2, order.size()); + assertEquals("write.close", order.get(0)); + assertEquals("io.close", order.get(1)); + } + + @Test + void closeShouldStillCloseIoManagerWhenWriteCloseFails() { + List order = new ArrayList<>(); + Exception writeError = new Exception("write close failed"); + StreamTableWrite delegate = new FakeStreamTableWrite(order, writeError); + IOManager ioManager = new FakeIOManager(order, null); + + ManagedIOStreamTableWrite writer = new ManagedIOStreamTableWrite(delegate, ioManager); + + Exception thrown = assertThrows(Exception.class, writer::close); + assertSame(writeError, thrown); + assertEquals(2, order.size()); + assertEquals("write.close", order.get(0)); + assertEquals("io.close", order.get(1)); + } + + @Test + void closeShouldAttachIoManagerErrorAsSuppressedWhenBothFail() { + List order = new ArrayList<>(); + Exception writeError = new Exception("write close failed"); + Exception ioError = new Exception("io close failed"); + StreamTableWrite delegate = new FakeStreamTableWrite(order, writeError); + IOManager ioManager = new FakeIOManager(order, ioError); + + ManagedIOStreamTableWrite writer = new ManagedIOStreamTableWrite(delegate, ioManager); + + Exception thrown = assertThrows(Exception.class, writer::close); + assertSame(writeError, thrown); + assertArrayEquals(new Throwable[] {ioError}, thrown.getSuppressed()); + assertEquals(2, order.size()); + assertEquals("write.close", order.get(0)); + assertEquals("io.close", order.get(1)); + } + + @Test + void closeShouldWorkWithoutIoManager() throws Exception { + List order = new ArrayList<>(); + StreamTableWrite delegate = new FakeStreamTableWrite(order, null); + ManagedIOStreamTableWrite writer = new ManagedIOStreamTableWrite(delegate, null); + + writer.close(); + + assertEquals(Collections.singletonList("write.close"), order); + } + + private static class FakeStreamTableWrite implements StreamTableWrite { + + private final List order; + private final Exception closeError; + + private FakeStreamTableWrite(List order, Exception closeError) { + this.order = order; + this.closeError = closeError; + } + + @Override + public List prepareCommit(boolean waitCompaction, long commitIdentifier) { + return Collections.emptyList(); + } + + @Override + public TableWrite withIOManager(IOManager ioManager) { + return this; + } + + @Override + public TableWrite withWriteType(RowType writeType) { + return this; + } + + @Override + public TableWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) { + return this; + } + + @Override + public BinaryRow getPartition(InternalRow row) { + return null; + } + + @Override + public int getBucket(InternalRow row) { + return 0; + } + + @Override + public void write(InternalRow row) { + } + + @Override + public void write(InternalRow row, int bucket) { + } + + @Override + public void writeBundle(BinaryRow partition, int bucket, BundleRecords bundle) { + } + + @Override + public void compact(BinaryRow partition, int bucket, boolean fullCompaction) { + } + + @Override + public TableWrite withMetricRegistry(MetricRegistry registry) { + return this; + } + + @Override + public void close() throws Exception { + order.add("write.close"); + if (closeError != null) { + throw closeError; + } + } + } + + private static class FakeIOManager implements IOManager { + + private final List order; + private final Exception closeError; + + private FakeIOManager(List order, Exception closeError) { + this.order = order; + this.closeError = closeError; + } + + @Override + public FileIOChannel.ID createChannel() { + return null; + } + + @Override + public FileIOChannel.ID createChannel(String prefix) { + return null; + } + + @Override + public String[] tempDirs() { + return new String[0]; + } + + @Override + public FileIOChannel.Enumerator createChannelEnumerator() { + return null; + } + + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) { + return null; + } + + @Override + public BufferFileReader createBufferFileReader(FileIOChannel.ID channelID) { + return null; + } + + @Override + public void close() throws Exception { + order.add("io.close"); + if (closeError != null) { + throw closeError; + } + } + } +} + + +