Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ jobs:
key: ${{ runner.os }}-maven
- name: Build and Package on ${{ matrix.java }}
run: |
chmod 777 ./mvnw
./mvnw -B clean verify -Dspotless.skip=true
./mvnw -B clean package -DskipTests -DskipITs \
-Dspotless.skip=true \

result:
name: Build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
cache: maven

- name: Build Release Package
run: ./mvnw clean verify -Prelease -B
run: mvn clean package -DskipTests -Prelease -B

- name: Create GitHub Release
uses: softprops/action-gh-release@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.consilens.cli.model.CliConfiguration;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -12,9 +13,12 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

class ExampleConfigurationCompatibilityTest {

Expand All @@ -34,6 +38,41 @@ void shouldLoadExampleConfigurations(Path configPath) throws Exception {
assertNotNull(config.getComparison().getKeys());
}

@Test
void shouldCoverStructuredConfigurationOptionsAcrossExamples() throws Exception {
List<String> exampleTexts = exampleConfigurationPaths()
.map(path -> {
try {
return Files.readString(path);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());

Map<String, Predicate<String>> requiredPatterns = new LinkedHashMap<>();
requiredPatterns.put("comparison.mappings", text -> text.contains("mappings:") || text.contains("\"mappings\""));
requiredPatterns.put("comparison.extraColumns", text -> text.contains("extraColumns:") || text.contains("\"extraColumns\""));
requiredPatterns.put("source/target.readOptions", text -> text.contains("readOptions:") || text.contains("\"readOptions\""));
requiredPatterns.put("strategy.mode=join", text -> text.contains("mode: join") || text.contains("\"mode\": \"join\""));
requiredPatterns.put("strategy.algorithm=concat", text -> text.contains("algorithm: concat")
|| text.contains("\"algorithm\": \"concat\""));
requiredPatterns.put("strategy.localCompare.mode=row-hash", text -> text.contains("mode: row-hash")
|| text.contains("\"mode\": \"row-hash\""));
requiredPatterns.put("strategy.maxDifferences", text -> text.contains("maxDifferences:")
|| text.contains("\"maxDifferences\""));
requiredPatterns.put("normalization", text -> text.contains("normalization:") || text.contains("\"normalization\""));
requiredPatterns.put("result.failOnSinkError", text -> text.contains("failOnSinkError:")
|| text.contains("\"failOnSinkError\""));
requiredPatterns.put("result.sinks[].enabled", text -> text.contains("enabled: false")
|| text.contains("\"enabled\": false"));

for (Map.Entry<String, Predicate<String>> entry : requiredPatterns.entrySet()) {
assertTrue(exampleTexts.stream().anyMatch(entry.getValue()),
"examples 目录缺少 " + entry.getKey() + " 的覆盖示例");
}
}

private static Stream<Path> exampleConfigurationPaths() throws IOException {
Path examplesDirectory = Paths.get("..", "examples").toAbsolutePath().normalize();
List<Path> paths;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,12 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
import com.consilens.core.lifecycle.SegmentResult;
import com.consilens.sink.api.Sink;
import com.consilens.sink.api.model.SinkConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.List;

/**
Expand All @@ -19,36 +17,31 @@ public class ConsoleDiffRecordSink implements Sink {

private ConsoleSinkConfig sinkConfig;
private long rowCount = 0;
private boolean truncationNoticePrinted = false;

@Override
public void open(SinkConfig config, DiffContext context) throws IOException {
sinkConfig = parseConfig(config.getProperties());
System.out.println("[ConsoleDiffRecordSink] taskId=" + context.getTaskId() + " opened");
public void open(SinkConfig config, DiffContext context) throws Exception {
sinkConfig = ConsoleOutputSupport.parseConfig(config.getProperties());
}

@Override
public void onDiffRecords(List<DiffRow> rows, DiffContext context) {
int limit = sinkConfig.getMaxRows();
for (DiffRow row : rows) {
if (limit >= 0 && rowCount >= limit) {
System.out.println("[ConsoleDiffRecordSink] maxRows=" + limit + " reached, further rows suppressed");
if (!truncationNoticePrinted) {
ConsoleOutputSupport.printStdout(ConsoleOutputSupport.truncationPayload(context, limit), sinkConfig);
truncationNoticePrinted = true;
}
return;
}
System.out.println("[DIFF] " + row.getDescription());
ConsoleOutputSupport.printStdout(ConsoleOutputSupport.diffRecordPayload(row, context), sinkConfig);
rowCount++;
}
}

@Override
public void onSegmentComplete(SegmentResult segmentResult) {
System.out.println("[ConsoleDiffRecordSink] segment=" + segmentResult.getSegmentIndex()
+ " differencesFound=" + segmentResult.getDifferencesFound());
}

private ConsoleSinkConfig parseConfig(String properties) throws IOException {
if (properties == null || properties.isBlank()) {
return new ConsoleSinkConfig();
}
return new ObjectMapper().readValue(properties, ConsoleSinkConfig.class);
ConsoleOutputSupport.printStdout(ConsoleOutputSupport.segmentPayload(segmentResult), sinkConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package com.consilens.sink.console;

import com.consilens.core.diff.DiffResult;
import com.consilens.core.diff.DiffRow;
import com.consilens.core.lifecycle.DiffContext;
import com.consilens.core.lifecycle.SegmentResult;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

final class ConsoleOutputSupport {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().findAndRegisterModules();

private ConsoleOutputSupport() {
}

static ConsoleSinkConfig parseConfig(String properties) throws IOException {
if (properties == null || properties.isBlank()) {
return new ConsoleSinkConfig();
}
return OBJECT_MAPPER.readValue(properties, ConsoleSinkConfig.class);
}

static void printStdout(Map<String, Object> payload, ConsoleSinkConfig config) {
System.out.println(toJson(payload, config));
}

static void printStderr(Map<String, Object> payload, ConsoleSinkConfig config) {
System.err.println(toJson(payload, config));
}

static Map<String, Object> diffRecordPayload(DiffRow row, DiffContext context) {
LinkedHashMap<String, Object> payload = basePayload("diff-record", context);
payload.put("operation", row.getOperation().getCode());
payload.put("primaryKey", row.getPrimaryKey());
payload.put("primaryKeyString", row.getPrimaryKeyString());
payload.put("sourceValues", row.getAllSourceValues());
payload.put("targetValues", row.getAllTargetValues());
payload.put("columnNames1", row.getColumnNames1());
payload.put("columnNames2", row.getColumnNames2());
payload.put("changedColumns1", row.getChangedColumns1());
payload.put("changedColumns2", row.getChangedColumns2());
if (row.getMetadata() != null && !row.getMetadata().isEmpty()) {
payload.put("metadata", row.getMetadata());
}
return payload;
}

static Map<String, Object> resultPayload(DiffResult result, DiffContext context, boolean includeSummary) {
LinkedHashMap<String, Object> payload = basePayload("result", context);
payload.put("status", result != null && result.hasDifferences() ? "DIFF" : "EQUAL");
payload.put("statistics", result != null ? result.getStatisticsMap() : Map.of());
if (includeSummary) {
payload.put("summary", safeSummary(result));
}
if (result != null && result.getMetadata() != null && !result.getMetadata().isEmpty()) {
payload.put("metadata", result.getMetadata());
}
return payload;
}

static Map<String, Object> segmentPayload(SegmentResult segmentResult) {
LinkedHashMap<String, Object> payload = basePayload("segment-complete", segmentResult.getContext());
payload.put("segmentIndex", segmentResult.getSegmentIndex());
payload.put("sourceRowsScanned", segmentResult.getSourceRowsScanned());
payload.put("targetRowsScanned", segmentResult.getTargetRowsScanned());
payload.put("differencesFound", segmentResult.getDifferencesFound());
payload.put("totalDifferencesAccumulated", segmentResult.getTotalDifferencesAccumulated());
payload.put("durationMs", segmentResult.getDuration() != null ? segmentResult.getDuration().toMillis() : null);
return payload;
}

static Map<String, Object> truncationPayload(DiffContext context, long maxRows) {
LinkedHashMap<String, Object> payload = basePayload("diff-record-truncated", context);
payload.put("maxRows", maxRows);
payload.put("message", "maxRows reached, further rows suppressed");
return payload;
}

static Map<String, Object> errorPayload(DiffContext context, Throwable error) {
LinkedHashMap<String, Object> payload = basePayload("error", context);
payload.put("error", errorDetails(error));
return payload;
}

private static LinkedHashMap<String, Object> basePayload(String event, DiffContext context) {
LinkedHashMap<String, Object> payload = new LinkedHashMap<>();
payload.put("event", event);
payload.put("timestamp", Instant.now().toString());
if (context == null) {
return payload;
}
payload.put("taskId", context.getTaskId());
payload.put("strategy", context.getStrategy());
payload.put("algorithm", context.getAlgorithm());
if (context.getSourceTablePath() != null) {
payload.put("sourceTable", context.getSourceTablePath().getFullPath());
}
if (context.getTargetTablePath() != null) {
payload.put("targetTable", context.getTargetTablePath().getFullPath());
}
return payload;
}

private static Map<String, Object> errorDetails(Throwable error) {
LinkedHashMap<String, Object> detail = new LinkedHashMap<>();
if (error == null) {
detail.put("type", "UnknownError");
detail.put("message", null);
return detail;
}
detail.put("type", error.getClass().getName());
detail.put("message", error.getMessage());
Throwable cause = error.getCause();
if (cause != null) {
List<Map<String, Object>> causes = new ArrayList<>();
while (cause != null) {
LinkedHashMap<String, Object> item = new LinkedHashMap<>();
item.put("type", cause.getClass().getName());
item.put("message", cause.getMessage());
causes.add(item);
cause = cause.getCause();
}
detail.put("causes", causes);
}
if (error.getSuppressed() != null && error.getSuppressed().length > 0) {
List<Map<String, Object>> suppressed = new ArrayList<>();
for (Throwable item : error.getSuppressed()) {
LinkedHashMap<String, Object> suppressedItem = new LinkedHashMap<>();
suppressedItem.put("type", item.getClass().getName());
suppressedItem.put("message", item.getMessage());
suppressed.add(suppressedItem);
}
detail.put("suppressed", suppressed);
}
return detail;
}

private static String safeSummary(DiffResult result) {
if (result == null) {
return "No result available";
}
if (result.getSourceTablePath() == null || result.getTargetTablePath() == null) {
return "Source/target table metadata unavailable";
}
return result.getSummary();
}

private static String toJson(Map<String, Object> payload, ConsoleSinkConfig config) {
try {
if (config != null && config.isPretty()) {
return OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(payload);
}
return OBJECT_MAPPER.writeValueAsString(payload);
} catch (IOException e) {
throw new UncheckedIOException("Failed to serialize console payload", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
import com.consilens.core.lifecycle.DiffContext;
import com.consilens.sink.api.Sink;
import com.consilens.sink.api.model.SinkConfig;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

/**
* Outputs the final aggregated diff result to the console.
Expand All @@ -16,27 +13,19 @@ public class ConsoleResultSink implements Sink {
private ConsoleSinkConfig sinkConfig;

@Override
public void open(SinkConfig config, DiffContext context) throws IOException {
sinkConfig = parseConfig(config.getProperties());
public void open(SinkConfig config, DiffContext context) throws Exception {
sinkConfig = ConsoleOutputSupport.parseConfig(config.getProperties());
}

@Override
public void onResult(DiffResult result, DiffContext context) {
if (sinkConfig.isShowStatistics()) {
String summary = result.getSummary();
System.out.println("[DIFF RESULT] " + summary);
ConsoleOutputSupport.printStdout(ConsoleOutputSupport.resultPayload(result, context, true), sinkConfig);
}
}

@Override
public void onError(DiffContext context, Throwable error) {
System.err.println("[DIFF ERROR] taskId=" + context.getTaskId() + " error=" + error.getMessage());
}

private ConsoleSinkConfig parseConfig(String properties) throws IOException {
if (properties == null || properties.isBlank()) {
return new ConsoleSinkConfig();
}
return new ObjectMapper().readValue(properties, ConsoleSinkConfig.class);
ConsoleOutputSupport.printStderr(ConsoleOutputSupport.errorPayload(context, error), sinkConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ public class ConsoleSinkConfig {

/** Whether to print summary statistics; default true. */
private boolean showStatistics = true;

/** Whether to pretty-print JSON payloads; default true. */
private boolean pretty = true;
}
Loading
Loading