Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.consilens.cli.config;

import com.consilens.cli.model.CliConfiguration;
import com.consilens.cli.service.CompareRequestFactory;
import com.consilens.connector.api.normalization.DefaultNormalizationSpecValidator;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -73,6 +75,54 @@ void shouldCoverStructuredConfigurationOptionsAcrossExamples() throws Exception
}
}

@Test
void shouldCoverAllNormalizationRuleParametersInSameDbExample() throws Exception {
String exampleText = Files.readString(sameDbExamplePath());
List<String> requiredTokens = List.of(
"precision:",
"rounding:",
"format:",
"timezone:",
"comparisonMode:",
"encoding:",
"uppercase:",
"trueValue:",
"falseValue:",
"nullValue:");

for (String token : requiredTokens) {
assertTrue(exampleText.contains(token),
"same-db-mysql-comparison.yaml 缺少 normalization 参数覆盖: " + token);
}
}

@Test
void shouldBuildValidNormalizationSpecForSameDbExample() throws Exception {
ConfigurationManager configurationManager = new ConfigurationManager(testEnvironment());
CliConfiguration config = configurationManager.loadConfiguration(sameDbExamplePath().toString(), false);

CompareRequestFactory factory = new CompareRequestFactory();
new DefaultNormalizationSpecValidator().validate(factory.create(config).getNormalizationSpec());
}

@Test
void shouldEnableCsvDiffRecordSinkForSameDbExample() throws Exception {
ConfigurationManager configurationManager = new ConfigurationManager(testEnvironment());
CliConfiguration config = configurationManager.loadConfiguration(sameDbExamplePath().toString(), false);

assertNotNull(config.getResult());
assertNotNull(config.getResult().getSinks());

var csvSink = config.getResult().getSinks().stream()
.filter(sink -> "csv".equalsIgnoreCase(sink.getFormat()))
.filter(sink -> "diff-record".equalsIgnoreCase(sink.getType()))
.findFirst()
.orElseThrow();

assertTrue(csvSink.isEnabled(), "same-db-mysql-comparison.yaml 的 csv diff-record sink 应默认启用");
assertTrue(csvSink.getProperties().contains("\"path\":\"./output/orders-diff.csv\""));
}

private static Stream<Path> exampleConfigurationPaths() throws IOException {
Path examplesDirectory = Paths.get("..", "examples").toAbsolutePath().normalize();
List<Path> paths;
Expand All @@ -91,6 +141,10 @@ private static Stream<Path> exampleConfigurationPaths() throws IOException {
return paths.stream();
}

private static Path sameDbExamplePath() {
return Paths.get("..", "examples", "same-db-mysql-comparison.yaml").toAbsolutePath().normalize();
}

private Map<String, String> testEnvironment() {
Map<String, String> env = new HashMap<>();
env.put("MYSQL_USER", "test_user");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,22 @@ private DatasetMetadata createMetadata(String connectorName,
attributes.putAll(discoverDorisPartitionAttributes(resource));
}

EnumSet<ConnectorCapability> capabilities = EnumSet.of(
ConnectorCapability.SCHEMA_DISCOVERY,
ConnectorCapability.FILTER_PUSHDOWN,
ConnectorCapability.PROJECTION_PUSHDOWN,
ConnectorCapability.SERVER_SIDE_HASH,
ConnectorCapability.ORDERED_SCAN,
ConnectorCapability.STREAM_SCAN
);
if (!isSqlResource(resource)) {
capabilities.add(ConnectorCapability.SERVER_SIDE_JOIN);
}

return DatasetMetadata.builder()
.logicalName(resource != null ? (resource.getName() != null ? resource.getName() : resource.getPath()) : null)
.executionDomainId(buildExecutionDomainId(connectorType, connection))
.capabilities(new CapabilitySet(EnumSet.of(
ConnectorCapability.SCHEMA_DISCOVERY,
ConnectorCapability.FILTER_PUSHDOWN,
ConnectorCapability.PROJECTION_PUSHDOWN,
ConnectorCapability.SERVER_SIDE_HASH,
ConnectorCapability.ORDERED_SCAN,
ConnectorCapability.STREAM_SCAN
)))
.capabilities(new CapabilitySet(capabilities))
.attributes(attributes)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void shouldExposeRelationalSupportWithoutLeakingExecutionInputsToMetadata() {

assertTrue(handle.getSupport(RelationalDatasetSupport.class).isPresent());
assertTrue(handle.getMetadata().getCapabilities().supports(ConnectorCapability.SERVER_SIDE_HASH));
assertFalse(handle.getMetadata().getCapabilities().supports(ConnectorCapability.SERVER_SIDE_JOIN));
assertTrue(handle.getMetadata().getCapabilities().supports(ConnectorCapability.SERVER_SIDE_JOIN));
assertNull(handle.getMetadata().getAttributes().get("readOptions"));
assertNull(handle.getMetadata().getAttributes().get("connection"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.consilens.core.segment;

import com.consilens.core.database.adpter.DatabaseAdapter;
import com.consilens.core.segmentation.CheckpointSelector;
import com.consilens.core.segmentation.IntelligentSegmenter;
import com.consilens.core.segment.strategy.FallbackSegmentStrategy;
import com.consilens.core.segment.strategy.RangeSegmentStrategy;
Expand All @@ -9,7 +10,6 @@
import com.consilens.core.segment.strategy.SegmentStrategyType;
import com.consilens.core.thread.ConcurrencyConfig;
import com.consilens.core.thread.ExecutorProvider;
import com.consilens.core.thread.ExecutorProvider;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -36,7 +36,11 @@ public TableSegmenter(DatabaseAdapter database, SegmenterConfig config) {

public TableSegmenter(DatabaseAdapter database, SegmenterConfig config, ExecutorProvider executorProvider) {
this.config = config;
this.intelligentSegmenter = new IntelligentSegmenter(database);
this.intelligentSegmenter = new IntelligentSegmenter(
database,
buildCheckpointSelector(config),
config.getSampleSize(),
true);
this.rangeStrategy = new RangeSegmentStrategy(intelligentSegmenter);
this.rowSampleStrategy = new RowSampleSegmentStrategy();
this.fallbackStrategy = new FallbackSegmentStrategy(config);
Expand Down Expand Up @@ -97,6 +101,13 @@ private Executor getIoExecutor() {
return executorProvider.getIoExecutor();
}

private CheckpointSelector buildCheckpointSelector(SegmenterConfig config) {
int maxSegments = Math.max(2, config.getMaxSegmentCount());
long rawThreshold = config.getBisectionThreshold();
int threshold = (int) Math.max(1L, Math.min(Integer.MAX_VALUE, rawThreshold));
return new CheckpointSelector(maxSegments, threshold);
}

private SegmentStrategy selectStrategy(TableSegment table) {
SegmentStrategyType strategyType = config.getStrategyType();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private List<TableSegment> fallbackToBasicSegmentation(TableSegment table, int m

try {
if (table.isBounded()) {
List<List<Object>> checkpoints = table.chooseCheckpoints(maxSegments);
List<List<Object>> checkpoints = table.chooseCheckpoints(Math.max(0, maxSegments - 1));
return table.segmentByCheckpoints(checkpoints);
}
return List.of(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ public CheckpointSelector(int bisectionFactor, int bisectionThreshold) {
* Choose optimal checkpoints for the given key range.
*/
public List<KeyVector> chooseCheckpoints(KeyVector minKey, KeyVector maxKey, int preferredCount) {
return chooseCheckpoints(minKey, maxKey, preferredCount, false);
}

public List<KeyVector> chooseCheckpoints(KeyVector minKey,
KeyVector maxKey,
int preferredCount,
boolean forceSplit) {
if (minKey.isGreaterThan(maxKey)) {
throw new IllegalArgumentException("minKey must not be greater than maxKey");
}
Expand All @@ -37,7 +44,7 @@ public List<KeyVector> chooseCheckpoints(KeyVector minKey, KeyVector maxKey, int
long spaceSize = space.calculateSpaceSize(minKey, maxKey);

// If space is too small, don't split
if (spaceSize <= bisectionThreshold) {
if (!forceSplit && spaceSize <= bisectionThreshold) {
return List.of(minKey, maxKey);
}

Expand Down Expand Up @@ -70,10 +77,13 @@ private int calculateEffectiveCheckpointCount(int preferredCount, int dimensions
/**
* Choose checkpoints with adaptive strategy based on data distribution.
*/
public List<KeyVector> chooseAdaptiveCheckpoints(KeyVector minKey, KeyVector maxKey,
long estimatedRows, SamplingResult samplingResult) {
public List<KeyVector> chooseAdaptiveCheckpoints(KeyVector minKey,
KeyVector maxKey,
int preferredCount,
long estimatedRows,
SamplingResult samplingResult) {
// Start with basic uniform distribution
List<KeyVector> basicCheckpoints = chooseCheckpoints(minKey, maxKey, bisectionFactor);
List<KeyVector> basicCheckpoints = chooseCheckpoints(minKey, maxKey, preferredCount, true);

// If we have sampling information, adjust checkpoints
if (samplingResult != null && samplingResult.hasDistributionInfo()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,11 @@ private List<TableSegment> segmentTableInternal(TableSegment table, int maxSegme

// Choose checkpoints
List<KeyVector> checkpoints = checkpointSelector.chooseAdaptiveCheckpoints(
minKey, maxKey, estimateTableSize(table), samplingResult);
minKey,
maxKey,
Math.max(2, maxSegments + 1),
estimateTableSize(table),
samplingResult);

// Validate checkpoints
if (!checkpointSelector.validateCheckpoints(checkpoints, minKey, maxKey)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.consilens.core.segment;

import com.consilens.connector.api.model.TablePath;
import com.consilens.core.database.adpter.DatabaseAdapter;
import com.consilens.core.thread.ExecutorProvider;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class TableSegmenterTest {

@Test
void shouldHonorBinarySplitRequestForThresholdSizedBoundedRange() {
DatabaseAdapter adapter = mock(DatabaseAdapter.class);
when(adapter.count(any(TableSegment.class))).thenReturn(5001L);

TableSegment table = TableSegment.builder()
.database(adapter)
.tablePath(TablePath.of("users"))
.keyColumns(List.of("id"))
.minKey(Optional.of(List.of(1L)))
.maxKey(Optional.of(List.of(5001L)))
.upperBoundInclusive(true)
.build();

ExecutorService executor = Executors.newFixedThreadPool(2, r -> {
Thread thread = new Thread(r, "table-segmenter-test");
thread.setDaemon(true);
return thread;
});
ExecutorProvider executorProvider = new ExecutorProvider(executor, executor, false);

TableSegmenter segmenter = new TableSegmenter(
adapter,
new TableSegmenter.SegmenterConfig(5000, 100, 16),
executorProvider);

List<TableSegment> segments = segmenter.createOptimalSegments(table, 2, 5000).join();

assertEquals(2, segments.size());
assertEquals(Optional.of(List.of(1L)), segments.get(0).getMinKey());
assertEquals(Optional.of(List.of(5001L)), segments.get(1).getMaxKey());
assertNotEquals(Optional.of(List.of(5001L)), segments.get(0).getMaxKey());
}
}
2 changes: 1 addition & 1 deletion examples/minimal-mysql-to-pg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ target:
type: postgresql
name: target-postgresql
connection:
url: jdbc:postgresql://localhost:5432/mydb
url: jdbc:postgresql://localhost:5432/postgres?currentSchema=public
username: ${env.PG_USER}
password: ${env.PG_PASSWORD}
resource:
Expand Down
32 changes: 27 additions & 5 deletions examples/same-db-mysql-comparison.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# 同库 join 比对配置 - MySQL 内部两张表比对
# 适用于同一 JDBC URL 下的迁移验证,也覆盖 readOptions / extraColumns /
# normalization / result.failOnSinkError / sink.enabled 等显式配置项
# normalization(所有规则参数)/ result.failOnSinkError / sink.enabled 等显式配置项

source:
type: mysql
Expand Down Expand Up @@ -72,16 +72,35 @@ normalization:
decimal:
precision: 2
rounding: true
float:
precision: 4
rounding: false
date:
format: yyyy-MM-dd
comparisonMode: DATE_ONLY
time:
format: HH:mm:ss
datetime:
format: yyyy-MM-dd HH:mm:ss
timezone: Asia/Shanghai
comparisonMode: instant
comparisonMode: "TRUNCATE_TO_SECOND"
timestamp:
format: yyyy-MM-dd HH:mm:ss
timezone: Asia/Shanghai
comparisonMode: "TRUNCATE_TO_SECOND"
binary:
encoding: hex
uppercase: true
boolean:
trueValue: "Y"
falseValue: "N"
nullValue: "<BOOL_NULL>"
source:
varchar:
nullValue: ""
nullValue: "<SRC_NULL>"
target:
varchar:
nullValue: ""
nullValue: "<TGT_NULL>"

result:
failOnSinkError: false
Expand All @@ -90,7 +109,7 @@ result:
type: result
- format: csv
type: diff-record
enabled: false
enabled: true
properties:
path: ./output/orders-diff.csv
delimiter: ","
Expand All @@ -99,3 +118,6 @@ result:
columns:
- name: env
value: staging
- format: console
type: result
enabled: false
Loading