From 0b3c97d36e924c9fc35c16deb23abee2be0ec7d6 Mon Sep 17 00:00:00 2001 From: xiaoyao0120 Date: Wed, 13 May 2026 11:53:04 +0800 Subject: [PATCH] fix: resolve join check error --- ...ExampleConfigurationCompatibilityTest.java | 54 +++++++++++++++++++ .../base/jdbc/JdbcDatasetHandle.java | 21 +++++--- .../base/jdbc/JdbcDatasetHandleTest.java | 2 +- .../core/segment/TableSegmenter.java | 15 +++++- .../strategy/FallbackSegmentStrategy.java | 2 +- .../core/segmentation/CheckpointSelector.java | 18 +++++-- .../segmentation/IntelligentSegmenter.java | 6 ++- .../core/segment/TableSegmenterTest.java | 54 +++++++++++++++++++ examples/minimal-mysql-to-pg.yaml | 2 +- examples/same-db-mysql-comparison.yaml | 32 +++++++++-- 10 files changed, 183 insertions(+), 23 deletions(-) create mode 100644 consilens-core/src/test/java/com/consilens/core/segment/TableSegmenterTest.java diff --git a/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java b/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java index 79e966e..26fb981 100644 --- a/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java +++ b/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java @@ -1,6 +1,8 @@ package com.consilens.cli.config; import com.consilens.cli.model.CliConfiguration; +import com.consilens.cli.service.CompareRequestFactory; +import com.consilens.connector.api.normalization.DefaultNormalizationSpecValidator; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.api.Test; @@ -73,6 +75,54 @@ void shouldCoverStructuredConfigurationOptionsAcrossExamples() throws Exception } } + @Test + void shouldCoverAllNormalizationRuleParametersInSameDbExample() throws Exception { + String exampleText = Files.readString(sameDbExamplePath()); + List requiredTokens = List.of( + "precision:", + "rounding:", + "format:", + "timezone:", + "comparisonMode:", + "encoding:", + "uppercase:", + "trueValue:", + "falseValue:", + "nullValue:"); + + for (String token : requiredTokens) { + assertTrue(exampleText.contains(token), + "same-db-mysql-comparison.yaml 缺少 normalization 参数覆盖: " + token); + } + } + + @Test + void shouldBuildValidNormalizationSpecForSameDbExample() throws Exception { + ConfigurationManager configurationManager = new ConfigurationManager(testEnvironment()); + CliConfiguration config = configurationManager.loadConfiguration(sameDbExamplePath().toString(), false); + + CompareRequestFactory factory = new CompareRequestFactory(); + new DefaultNormalizationSpecValidator().validate(factory.create(config).getNormalizationSpec()); + } + + @Test + void shouldEnableCsvDiffRecordSinkForSameDbExample() throws Exception { + ConfigurationManager configurationManager = new ConfigurationManager(testEnvironment()); + CliConfiguration config = configurationManager.loadConfiguration(sameDbExamplePath().toString(), false); + + assertNotNull(config.getResult()); + assertNotNull(config.getResult().getSinks()); + + var csvSink = config.getResult().getSinks().stream() + .filter(sink -> "csv".equalsIgnoreCase(sink.getFormat())) + .filter(sink -> "diff-record".equalsIgnoreCase(sink.getType())) + .findFirst() + .orElseThrow(); + + assertTrue(csvSink.isEnabled(), "same-db-mysql-comparison.yaml 的 csv diff-record sink 应默认启用"); + assertTrue(csvSink.getProperties().contains("\"path\":\"./output/orders-diff.csv\"")); + } + private static Stream exampleConfigurationPaths() throws IOException { Path examplesDirectory = Paths.get("..", "examples").toAbsolutePath().normalize(); List paths; @@ -91,6 +141,10 @@ private static Stream exampleConfigurationPaths() throws IOException { return paths.stream(); } + private static Path sameDbExamplePath() { + return Paths.get("..", "examples", "same-db-mysql-comparison.yaml").toAbsolutePath().normalize(); + } + private Map testEnvironment() { Map env = new HashMap<>(); env.put("MYSQL_USER", "test_user"); diff --git a/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/main/java/com/consilens/conncetor/base/jdbc/JdbcDatasetHandle.java b/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/main/java/com/consilens/conncetor/base/jdbc/JdbcDatasetHandle.java index b6a57c3..c6b5001 100644 --- a/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/main/java/com/consilens/conncetor/base/jdbc/JdbcDatasetHandle.java +++ b/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/main/java/com/consilens/conncetor/base/jdbc/JdbcDatasetHandle.java @@ -227,17 +227,22 @@ private DatasetMetadata createMetadata(String connectorName, attributes.putAll(discoverDorisPartitionAttributes(resource)); } + EnumSet capabilities = EnumSet.of( + ConnectorCapability.SCHEMA_DISCOVERY, + ConnectorCapability.FILTER_PUSHDOWN, + ConnectorCapability.PROJECTION_PUSHDOWN, + ConnectorCapability.SERVER_SIDE_HASH, + ConnectorCapability.ORDERED_SCAN, + ConnectorCapability.STREAM_SCAN + ); + if (!isSqlResource(resource)) { + capabilities.add(ConnectorCapability.SERVER_SIDE_JOIN); + } + return DatasetMetadata.builder() .logicalName(resource != null ? (resource.getName() != null ? resource.getName() : resource.getPath()) : null) .executionDomainId(buildExecutionDomainId(connectorType, connection)) - .capabilities(new CapabilitySet(EnumSet.of( - ConnectorCapability.SCHEMA_DISCOVERY, - ConnectorCapability.FILTER_PUSHDOWN, - ConnectorCapability.PROJECTION_PUSHDOWN, - ConnectorCapability.SERVER_SIDE_HASH, - ConnectorCapability.ORDERED_SCAN, - ConnectorCapability.STREAM_SCAN - ))) + .capabilities(new CapabilitySet(capabilities)) .attributes(attributes) .build(); } diff --git a/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/test/java/com/consilens/conncetor/base/jdbc/JdbcDatasetHandleTest.java b/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/test/java/com/consilens/conncetor/base/jdbc/JdbcDatasetHandleTest.java index e3f57a7..874922e 100644 --- a/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/test/java/com/consilens/conncetor/base/jdbc/JdbcDatasetHandleTest.java +++ b/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/test/java/com/consilens/conncetor/base/jdbc/JdbcDatasetHandleTest.java @@ -67,7 +67,7 @@ void shouldExposeRelationalSupportWithoutLeakingExecutionInputsToMetadata() { assertTrue(handle.getSupport(RelationalDatasetSupport.class).isPresent()); assertTrue(handle.getMetadata().getCapabilities().supports(ConnectorCapability.SERVER_SIDE_HASH)); - assertFalse(handle.getMetadata().getCapabilities().supports(ConnectorCapability.SERVER_SIDE_JOIN)); + assertTrue(handle.getMetadata().getCapabilities().supports(ConnectorCapability.SERVER_SIDE_JOIN)); assertNull(handle.getMetadata().getAttributes().get("readOptions")); assertNull(handle.getMetadata().getAttributes().get("connection")); } diff --git a/consilens-core/src/main/java/com/consilens/core/segment/TableSegmenter.java b/consilens-core/src/main/java/com/consilens/core/segment/TableSegmenter.java index 0879bce..16b5691 100644 --- a/consilens-core/src/main/java/com/consilens/core/segment/TableSegmenter.java +++ b/consilens-core/src/main/java/com/consilens/core/segment/TableSegmenter.java @@ -1,6 +1,7 @@ package com.consilens.core.segment; import com.consilens.core.database.adpter.DatabaseAdapter; +import com.consilens.core.segmentation.CheckpointSelector; import com.consilens.core.segmentation.IntelligentSegmenter; import com.consilens.core.segment.strategy.FallbackSegmentStrategy; import com.consilens.core.segment.strategy.RangeSegmentStrategy; @@ -9,7 +10,6 @@ import com.consilens.core.segment.strategy.SegmentStrategyType; import com.consilens.core.thread.ConcurrencyConfig; import com.consilens.core.thread.ExecutorProvider; -import com.consilens.core.thread.ExecutorProvider; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -36,7 +36,11 @@ public TableSegmenter(DatabaseAdapter database, SegmenterConfig config) { public TableSegmenter(DatabaseAdapter database, SegmenterConfig config, ExecutorProvider executorProvider) { this.config = config; - this.intelligentSegmenter = new IntelligentSegmenter(database); + this.intelligentSegmenter = new IntelligentSegmenter( + database, + buildCheckpointSelector(config), + config.getSampleSize(), + true); this.rangeStrategy = new RangeSegmentStrategy(intelligentSegmenter); this.rowSampleStrategy = new RowSampleSegmentStrategy(); this.fallbackStrategy = new FallbackSegmentStrategy(config); @@ -97,6 +101,13 @@ private Executor getIoExecutor() { return executorProvider.getIoExecutor(); } + private CheckpointSelector buildCheckpointSelector(SegmenterConfig config) { + int maxSegments = Math.max(2, config.getMaxSegmentCount()); + long rawThreshold = config.getBisectionThreshold(); + int threshold = (int) Math.max(1L, Math.min(Integer.MAX_VALUE, rawThreshold)); + return new CheckpointSelector(maxSegments, threshold); + } + private SegmentStrategy selectStrategy(TableSegment table) { SegmentStrategyType strategyType = config.getStrategyType(); diff --git a/consilens-core/src/main/java/com/consilens/core/segment/strategy/FallbackSegmentStrategy.java b/consilens-core/src/main/java/com/consilens/core/segment/strategy/FallbackSegmentStrategy.java index 67309da..82feb27 100644 --- a/consilens-core/src/main/java/com/consilens/core/segment/strategy/FallbackSegmentStrategy.java +++ b/consilens-core/src/main/java/com/consilens/core/segment/strategy/FallbackSegmentStrategy.java @@ -50,7 +50,7 @@ private List fallbackToBasicSegmentation(TableSegment table, int m try { if (table.isBounded()) { - List> checkpoints = table.chooseCheckpoints(maxSegments); + List> checkpoints = table.chooseCheckpoints(Math.max(0, maxSegments - 1)); return table.segmentByCheckpoints(checkpoints); } return List.of(table); diff --git a/consilens-core/src/main/java/com/consilens/core/segmentation/CheckpointSelector.java b/consilens-core/src/main/java/com/consilens/core/segmentation/CheckpointSelector.java index 8ca1874..0df39fd 100644 --- a/consilens-core/src/main/java/com/consilens/core/segmentation/CheckpointSelector.java +++ b/consilens-core/src/main/java/com/consilens/core/segmentation/CheckpointSelector.java @@ -26,6 +26,13 @@ public CheckpointSelector(int bisectionFactor, int bisectionThreshold) { * Choose optimal checkpoints for the given key range. */ public List chooseCheckpoints(KeyVector minKey, KeyVector maxKey, int preferredCount) { + return chooseCheckpoints(minKey, maxKey, preferredCount, false); + } + + public List chooseCheckpoints(KeyVector minKey, + KeyVector maxKey, + int preferredCount, + boolean forceSplit) { if (minKey.isGreaterThan(maxKey)) { throw new IllegalArgumentException("minKey must not be greater than maxKey"); } @@ -37,7 +44,7 @@ public List chooseCheckpoints(KeyVector minKey, KeyVector maxKey, int long spaceSize = space.calculateSpaceSize(minKey, maxKey); // If space is too small, don't split - if (spaceSize <= bisectionThreshold) { + if (!forceSplit && spaceSize <= bisectionThreshold) { return List.of(minKey, maxKey); } @@ -70,10 +77,13 @@ private int calculateEffectiveCheckpointCount(int preferredCount, int dimensions /** * Choose checkpoints with adaptive strategy based on data distribution. */ - public List chooseAdaptiveCheckpoints(KeyVector minKey, KeyVector maxKey, - long estimatedRows, SamplingResult samplingResult) { + public List chooseAdaptiveCheckpoints(KeyVector minKey, + KeyVector maxKey, + int preferredCount, + long estimatedRows, + SamplingResult samplingResult) { // Start with basic uniform distribution - List basicCheckpoints = chooseCheckpoints(minKey, maxKey, bisectionFactor); + List basicCheckpoints = chooseCheckpoints(minKey, maxKey, preferredCount, true); // If we have sampling information, adjust checkpoints if (samplingResult != null && samplingResult.hasDistributionInfo()) { diff --git a/consilens-core/src/main/java/com/consilens/core/segmentation/IntelligentSegmenter.java b/consilens-core/src/main/java/com/consilens/core/segmentation/IntelligentSegmenter.java index 5591b64..37e2178 100644 --- a/consilens-core/src/main/java/com/consilens/core/segmentation/IntelligentSegmenter.java +++ b/consilens-core/src/main/java/com/consilens/core/segmentation/IntelligentSegmenter.java @@ -126,7 +126,11 @@ private List segmentTableInternal(TableSegment table, int maxSegme // Choose checkpoints List checkpoints = checkpointSelector.chooseAdaptiveCheckpoints( - minKey, maxKey, estimateTableSize(table), samplingResult); + minKey, + maxKey, + Math.max(2, maxSegments + 1), + estimateTableSize(table), + samplingResult); // Validate checkpoints if (!checkpointSelector.validateCheckpoints(checkpoints, minKey, maxKey)) { diff --git a/consilens-core/src/test/java/com/consilens/core/segment/TableSegmenterTest.java b/consilens-core/src/test/java/com/consilens/core/segment/TableSegmenterTest.java new file mode 100644 index 0000000..8003a15 --- /dev/null +++ b/consilens-core/src/test/java/com/consilens/core/segment/TableSegmenterTest.java @@ -0,0 +1,54 @@ +package com.consilens.core.segment; + +import com.consilens.connector.api.model.TablePath; +import com.consilens.core.database.adpter.DatabaseAdapter; +import com.consilens.core.thread.ExecutorProvider; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class TableSegmenterTest { + + @Test + void shouldHonorBinarySplitRequestForThresholdSizedBoundedRange() { + DatabaseAdapter adapter = mock(DatabaseAdapter.class); + when(adapter.count(any(TableSegment.class))).thenReturn(5001L); + + TableSegment table = TableSegment.builder() + .database(adapter) + .tablePath(TablePath.of("users")) + .keyColumns(List.of("id")) + .minKey(Optional.of(List.of(1L))) + .maxKey(Optional.of(List.of(5001L))) + .upperBoundInclusive(true) + .build(); + + ExecutorService executor = Executors.newFixedThreadPool(2, r -> { + Thread thread = new Thread(r, "table-segmenter-test"); + thread.setDaemon(true); + return thread; + }); + ExecutorProvider executorProvider = new ExecutorProvider(executor, executor, false); + + TableSegmenter segmenter = new TableSegmenter( + adapter, + new TableSegmenter.SegmenterConfig(5000, 100, 16), + executorProvider); + + List segments = segmenter.createOptimalSegments(table, 2, 5000).join(); + + assertEquals(2, segments.size()); + assertEquals(Optional.of(List.of(1L)), segments.get(0).getMinKey()); + assertEquals(Optional.of(List.of(5001L)), segments.get(1).getMaxKey()); + assertNotEquals(Optional.of(List.of(5001L)), segments.get(0).getMaxKey()); + } +} diff --git a/examples/minimal-mysql-to-pg.yaml b/examples/minimal-mysql-to-pg.yaml index c7e86a2..a0937ec 100644 --- a/examples/minimal-mysql-to-pg.yaml +++ b/examples/minimal-mysql-to-pg.yaml @@ -16,7 +16,7 @@ target: type: postgresql name: target-postgresql connection: - url: jdbc:postgresql://localhost:5432/mydb + url: jdbc:postgresql://localhost:5432/postgres?currentSchema=public username: ${env.PG_USER} password: ${env.PG_PASSWORD} resource: diff --git a/examples/same-db-mysql-comparison.yaml b/examples/same-db-mysql-comparison.yaml index 6a832f6..aa2846f 100644 --- a/examples/same-db-mysql-comparison.yaml +++ b/examples/same-db-mysql-comparison.yaml @@ -1,6 +1,6 @@ # 同库 join 比对配置 - MySQL 内部两张表比对 # 适用于同一 JDBC URL 下的迁移验证,也覆盖 readOptions / extraColumns / -# normalization / result.failOnSinkError / sink.enabled 等显式配置项 +# normalization(所有规则参数)/ result.failOnSinkError / sink.enabled 等显式配置项 source: type: mysql @@ -72,16 +72,35 @@ normalization: decimal: precision: 2 rounding: true + float: + precision: 4 + rounding: false + date: + format: yyyy-MM-dd + comparisonMode: DATE_ONLY + time: + format: HH:mm:ss datetime: format: yyyy-MM-dd HH:mm:ss timezone: Asia/Shanghai - comparisonMode: instant + comparisonMode: "TRUNCATE_TO_SECOND" + timestamp: + format: yyyy-MM-dd HH:mm:ss + timezone: Asia/Shanghai + comparisonMode: "TRUNCATE_TO_SECOND" + binary: + encoding: hex + uppercase: true + boolean: + trueValue: "Y" + falseValue: "N" + nullValue: "" source: varchar: - nullValue: "" + nullValue: "" target: varchar: - nullValue: "" + nullValue: "" result: failOnSinkError: false @@ -90,7 +109,7 @@ result: type: result - format: csv type: diff-record - enabled: false + enabled: true properties: path: ./output/orders-diff.csv delimiter: "," @@ -99,3 +118,6 @@ result: columns: - name: env value: staging + - format: console + type: result + enabled: false