Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ public class TypeNormalizationRule {
*/
@JsonProperty("timezone")
private String timezone;

/**
* Temporal comparison mode for date/datetime/timestamp values.
*/
@JsonProperty("comparisonMode")
private String comparisonMode;

/**
* Encoding method (hex/base64).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,26 @@ public class DiffService {
* @return diff result
*/
public CliDiffResult performDiff(CliConfiguration config) throws Exception {
log.info("Starting diff operation with strategy: {}, algorithm: {}",
log.info("Starting diff operation with strategy: {}, algorithm: {}",
config.getStrategyMode(), config.getAlgorithm());

long startTime = System.currentTimeMillis();

DiffLifecycle lifecycle = buildLifecycle(config);
DiffContext diffContext = buildDiffContext(config);
CliDiffResult result = null;
Exception failure = null;

try {
lifecycle.onDiffStart(diffContext);
CompareRuntime runtime = new DefaultCompareRuntime();
CompareRuntime runtime = createCompareRuntime();
DiffResult coreResult = runtime.execute(toCompareRequest(config));

publishDifferences(coreResult, lifecycle, diffContext);
lifecycle.onDiffComplete(coreResult, diffContext);

// Convert core result to CLI result
CliDiffResult result = convertToCLIResult(coreResult, config.getStrategyMode(), config);
result = convertToCLIResult(coreResult, config.getStrategyMode(), config);
if (result.getInfoTree() != null) {
log.info(formatInfoTree(result.getInfoTree()));
}
Expand All @@ -80,34 +83,45 @@ public CliDiffResult performDiff(CliConfiguration config) throws Exception {

log.info("Diff operation completed in {} ms with {} differences", duration, result.getTotalDifferences());

return result;

} catch (Exception e) {
log.error("Diff operation failed", e);
failure = new Exception("Diff operation failed: " + e.getMessage(), e);
try {
lifecycle.onDiffError(diffContext, e);
} catch (Exception lifecycleEx) {
log.warn("Lifecycle onDiffError failed", lifecycleEx);
failure.addSuppressed(lifecycleEx);
}
throw new Exception("Diff operation failed: " + e.getMessage(), e);
} finally {
try {
lifecycle.close();
} catch (Exception e) {
log.warn("Lifecycle close failed", e);
} catch (Exception closeEx) {
if (failure != null) {
failure.addSuppressed(closeEx);
} else {
failure = new Exception("Lifecycle close failed: " + closeEx.getMessage(), closeEx);
}
}
}

if (failure != null) {
throw failure;
}
return result;
}

protected CompareRuntime createCompareRuntime() {
return new DefaultCompareRuntime();
}

private DiffLifecycle buildLifecycle(CliConfiguration config) {
protected DiffLifecycle buildLifecycle(CliConfiguration config) {
ResultConfig resultConfig = config.getResult();
if (resultConfig == null || resultConfig.getSinks() == null || resultConfig.getSinks().isEmpty()) {
return new NoopDiffLifecycle();
}
return new DefaultDiffLifecycle(resultConfig);
}

private DiffContext buildDiffContext(CliConfiguration config) {
protected DiffContext buildDiffContext(CliConfiguration config) {
TablePath sourcePath = null;
TablePath targetPath = null;
List<String> sourceColumnNames = new ArrayList<>();
Expand Down Expand Up @@ -152,6 +166,13 @@ private DiffContext buildDiffContext(CliConfiguration config) {
.build();
}

private void publishDifferences(DiffResult coreResult, DiffLifecycle lifecycle, DiffContext diffContext) throws Exception {
if (coreResult == null || coreResult.getDifferences() == null || coreResult.getDifferences().isEmpty()) {
return;
}
lifecycle.onDifferencesFound(coreResult.getDifferences(), diffContext);
}

/**
* Convert core DiffResult to CLI DiffResult.
*/
Expand All @@ -178,7 +199,9 @@ private CliDiffResult convertToCLIResult(DiffResult coreResult, String strategy,
.targetRowCount((int) stats.getTargetRowCount())
.differences(convertDiffRows(coreResult.getDifferences()))
.tableMetadata(tableMetadata)
.infoTree(coreResult.getInfoTree().isPresent() ? coreResult.getInfoTree().orElse(null) : null)
.infoTree(coreResult.getInfoTree() != null && coreResult.getInfoTree().isPresent()
? coreResult.getInfoTree().orElse(null)
: null)
.build()
.withSortKeyColumns(sortKeyColumns);
}
Expand Down Expand Up @@ -433,14 +456,17 @@ private List<NormalizationRule> toNormalizationRules(String type, TypeNormalizat
result.add(normalizationRule(type, "format_number", params));
}

if (rule.getFormat() != null || rule.getTimezone() != null) {
if (rule.getFormat() != null || rule.getTimezone() != null || rule.getComparisonMode() != null) {
Map<String, Object> params = new LinkedHashMap<>();
if (rule.getFormat() != null) {
params.put("format", rule.getFormat());
}
if (rule.getTimezone() != null) {
params.put("timezone", rule.getTimezone());
}
if (rule.getComparisonMode() != null) {
params.put("comparisonMode", rule.getComparisonMode());
}
result.add(normalizationRule(type, "format_datetime", params));
}

Expand Down Expand Up @@ -564,7 +590,7 @@ private Integer integerValue(Object value) {
* Perform a dry run to validate configuration without executing diff.
*/
public CliDiffResult performDryRun(CliConfiguration config) throws Exception {
log.info("Performing dry run for diff operation with strategy: {}, algorithm: {}",
log.info("Performing dry run for diff operation with strategy: {}, algorithm: {}",
config.getStrategyMode(), config.getAlgorithm());
ConnectorProbeService probeService = new ConnectorProbeService();
ComparisonConfig comparison = config.getComparison();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package com.consilens.cli.service;

import com.consilens.cli.model.CliConfiguration;
import com.consilens.cli.model.ComparisonConfig;
import com.consilens.cli.model.ConnectionConfig;
import com.consilens.cli.model.ListPairConfig;
import com.consilens.cli.model.LocalCompareConfig;
import com.consilens.cli.model.StrategyConfig;
import com.consilens.cli.model.StringPairConfig;
import com.consilens.cli.model.normalization.TypeNormalizationRule;
import com.consilens.connector.api.normalization.NormalizationRule;
import com.consilens.core.compare.CompareRuntime;
import com.consilens.core.diff.DiffResult;
import com.consilens.core.diff.DiffRow;
import com.consilens.core.lifecycle.DiffContext;
import com.consilens.core.lifecycle.DiffLifecycle;
import com.consilens.core.lifecycle.SegmentResult;
import org.junit.jupiter.api.Test;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

class DiffServiceTest {

@Test
void shouldPublishDifferencesBeforeCompletingLifecycle() throws Exception {
RecordingLifecycle lifecycle = new RecordingLifecycle(false);
DiffResult diffResult = DiffResult.of(
List.of(DiffRow.removed(List.of(1), List.of("Alice"), List.of("name"))),
com.consilens.connector.api.model.TablePath.of("source_table"),
com.consilens.connector.api.model.TablePath.of("target_table"));

TestableDiffService service = new TestableDiffService(lifecycle, request -> diffResult);

service.performDiff(createConfig());

assertEquals(List.of("start", "differences", "complete", "close"), lifecycle.events);
}

@Test
void shouldFailWhenLifecycleCloseFailsAfterSuccessfulDiff() {
RecordingLifecycle lifecycle = new RecordingLifecycle(true);
DiffResult diffResult = DiffResult.of(
List.of(DiffRow.removed(List.of(1), List.of("Alice"), List.of("name"))),
com.consilens.connector.api.model.TablePath.of("source_table"),
com.consilens.connector.api.model.TablePath.of("target_table"));

TestableDiffService service = new TestableDiffService(lifecycle, request -> diffResult);

Exception exception = assertThrows(Exception.class, () -> service.performDiff(createConfig()));
assertTrue(exception.getMessage().contains("Lifecycle close failed"));
assertEquals(List.of("start", "differences", "complete", "close"), lifecycle.events);
}

@Test
@SuppressWarnings("unchecked")
void shouldConvertTemporalComparisonModeToNormalizationRule() throws Exception {
DiffService service = new DiffService();
TypeNormalizationRule rule = new TypeNormalizationRule();
rule.setTimezone("UTC");
rule.setComparisonMode("DATE_ONLY");

Method method = DiffService.class.getDeclaredMethod("toNormalizationRules", String.class, TypeNormalizationRule.class);
method.setAccessible(true);

List<NormalizationRule> rules = (List<NormalizationRule>) method.invoke(service, "timestamp", rule);

assertEquals(1, rules.size());
assertEquals("format_datetime", rules.get(0).getOperation());
assertEquals(Map.of("timezone", "UTC", "comparisonMode", "DATE_ONLY"), rules.get(0).getParams());
}

private CliConfiguration createConfig() {
return CliConfiguration.builder()
.source(ConnectionConfig.builder()
.type("mysql")
.url("jdbc:mysql://localhost:3306/source_db")
.username("user")
.password("pwd")
.resource(ConnectionConfig.ResourceConfig.builder().type("table").name("source_table").build())
.build())
.target(ConnectionConfig.builder()
.type("mysql")
.url("jdbc:mysql://localhost:3306/target_db")
.username("user")
.password("pwd")
.resource(ConnectionConfig.ResourceConfig.builder().type("table").name("target_table").build())
.build())
.comparison(ComparisonConfig.builder()
.tables(StringPairConfig.builder().source("source_table").target("target_table").build())
.keys(ListPairConfig.builder().source(List.of("id")).target(List.of("id")).build())
.fields(ListPairConfig.builder().source(List.of("name")).target(List.of("name")).build())
.build())
.strategy(StrategyConfig.builder()
.mode("checksum")
.algorithm("concat")
.bisectionFactor(4)
.bisectionThreshold(1000L)
.batchSize(100)
.enableProfiling(false)
.localCompare(LocalCompareConfig.builder().mode("full").build())
.build())
.build();
}

private static class TestableDiffService extends DiffService {
private final DiffLifecycle lifecycle;
private final CompareRuntime runtime;

private TestableDiffService(DiffLifecycle lifecycle, CompareRuntime runtime) {
this.lifecycle = lifecycle;
this.runtime = runtime;
}

@Override
protected CompareRuntime createCompareRuntime() {
return runtime;
}

@Override
protected DiffLifecycle buildLifecycle(CliConfiguration config) {
return lifecycle;
}
}

private static class RecordingLifecycle implements DiffLifecycle {
private final boolean failOnClose;
private final List<String> events = new ArrayList<>();

private RecordingLifecycle(boolean failOnClose) {
this.failOnClose = failOnClose;
}

@Override
public void onDiffStart(DiffContext context) {
events.add("start");
}

@Override
public void onSegmentComplete(SegmentResult result) {
}

@Override
public void onDifferencesFound(List<com.consilens.core.diff.DiffRow> diffs, DiffContext context) {
events.add("differences");
}

@Override
public void onDiffComplete(DiffResult result, DiffContext context) {
events.add("complete");
}

@Override
public void onDiffError(DiffContext context, Throwable error) {
events.add("error");
}

@Override
public void close() throws Exception {
events.add("close");
if (failOnClose) {
throw new Exception("close failed");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private Map<String, NormalizationOperationDefinition> createDefinitions() {
setOf("precision", "rounding")));
result.put("format_datetime", definition("format_datetime",
setOf("date", "time", "datetime", "timestamp"),
setOf("format", "timezone")));
setOf("format", "timezone", "comparisonMode")));
result.put("encode", definition("encode",
setOf("binary"),
setOf("format", "encoding", "uppercase")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public class DefaultNormalizationSpecValidator implements NormalizationSpecValid
"datetime", "timestamp", "boolean", "binary", "json"));

private static final Set<String> SUPPORTED_BINARY_FORMATS = new HashSet<>(Arrays.asList("hex", "base64"));
private static final Set<String> SUPPORTED_TEMPORAL_COMPARISON_MODES = new HashSet<>(
Arrays.asList("EXACT", "DATE_ONLY", "TRUNCATE_TO_SECOND", "TRUNCATE_TO_DAY"));

private final NormalizationOperationRegistry operationRegistry;

Expand Down Expand Up @@ -102,6 +104,7 @@ private void validateParams(Map<String, Object> params,
} else if ("format_datetime".equals(operation)) {
validateString(params.get("format"), "format", scope, operation, false);
validateTimezone(params.get("timezone"), scope, operation);
validateTemporalComparisonMode(params.get("comparisonMode"), scope, operation);
} else if ("encode".equals(operation)) {
Object encoding = params.get("encoding") != null ? params.get("encoding") : params.get("format");
validateBinaryFormat(encoding, scope, operation);
Expand Down Expand Up @@ -161,6 +164,18 @@ private void validateTimezone(Object value, String scope, String operation) {
}
}

private void validateTemporalComparisonMode(Object value, String scope, String operation) {
if (value == null) {
return;
}
validateString(value, "comparisonMode", scope, operation, false);
String normalized = ((String) value).trim().toUpperCase();
if (!SUPPORTED_TEMPORAL_COMPARISON_MODES.contains(normalized)) {
throw new ConnectorException("Parameter 'comparisonMode' for operation '" + operation + "' in " + scope
+ " must be one of " + SUPPORTED_TEMPORAL_COMPARISON_MODES);
}
}

private void validateBinaryFormat(Object value, String scope, String operation) {
if (value == null) {
return;
Expand Down
Loading
Loading