diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 115af6a..755d6b8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,8 +60,8 @@ jobs: key: ${{ runner.os }}-maven - name: Build and Package on ${{ matrix.java }} run: | - chmod 777 ./mvnw - ./mvnw -B clean verify -Dspotless.skip=true + ./mvnw -B clean package -DskipTests -DskipITs \ + -Dspotless.skip=true \ result: name: Build diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index cfed833..4f45ba5 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -19,7 +19,7 @@ jobs: cache: maven - name: Build Release Package - run: ./mvnw clean verify -Prelease -B + run: mvn clean package -DskipTests -Prelease -B - name: Create GitHub Release uses: softprops/action-gh-release@v2 diff --git a/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java b/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java index 7f57043..79e966e 100644 --- a/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java +++ b/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java @@ -3,6 +3,7 @@ import com.consilens.cli.model.CliConfiguration; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.file.Files; @@ -12,9 +13,12 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; +import java.util.LinkedHashMap; +import java.util.function.Predicate; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; class ExampleConfigurationCompatibilityTest { @@ -34,6 +38,41 @@ void shouldLoadExampleConfigurations(Path configPath) throws Exception { assertNotNull(config.getComparison().getKeys()); } + @Test + void shouldCoverStructuredConfigurationOptionsAcrossExamples() throws Exception { + List exampleTexts = exampleConfigurationPaths() + .map(path -> { + try { + return Files.readString(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + + Map> requiredPatterns = new LinkedHashMap<>(); + requiredPatterns.put("comparison.mappings", text -> text.contains("mappings:") || text.contains("\"mappings\"")); + requiredPatterns.put("comparison.extraColumns", text -> text.contains("extraColumns:") || text.contains("\"extraColumns\"")); + requiredPatterns.put("source/target.readOptions", text -> text.contains("readOptions:") || text.contains("\"readOptions\"")); + requiredPatterns.put("strategy.mode=join", text -> text.contains("mode: join") || text.contains("\"mode\": \"join\"")); + requiredPatterns.put("strategy.algorithm=concat", text -> text.contains("algorithm: concat") + || text.contains("\"algorithm\": \"concat\"")); + requiredPatterns.put("strategy.localCompare.mode=row-hash", text -> text.contains("mode: row-hash") + || text.contains("\"mode\": \"row-hash\"")); + requiredPatterns.put("strategy.maxDifferences", text -> text.contains("maxDifferences:") + || text.contains("\"maxDifferences\"")); + requiredPatterns.put("normalization", text -> text.contains("normalization:") || text.contains("\"normalization\"")); + requiredPatterns.put("result.failOnSinkError", text -> text.contains("failOnSinkError:") + || text.contains("\"failOnSinkError\"")); + requiredPatterns.put("result.sinks[].enabled", text -> text.contains("enabled: false") + || text.contains("\"enabled\": false")); + + for (Map.Entry> entry : requiredPatterns.entrySet()) { + assertTrue(exampleTexts.stream().anyMatch(entry.getValue()), + "examples 目录缺少 " + entry.getKey() + " 的覆盖示例"); + } + } + private static Stream exampleConfigurationPaths() throws IOException { Path examplesDirectory = Paths.get("..", "examples").toAbsolutePath().normalize(); List paths; diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-console/pom.xml b/consilens-sink/consilens-sink-plugins/consilens-sink-console/pom.xml index 4820953..70edbb8 100644 --- a/consilens-sink/consilens-sink-plugins/consilens-sink-console/pom.xml +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-console/pom.xml @@ -37,5 +37,12 @@ org.apache.logging.log4j log4j-api + + + org.junit.jupiter + junit-jupiter + ${junit.version} + test + diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleDiffRecordSink.java b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleDiffRecordSink.java index 4db39c8..dfae5cf 100644 --- a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleDiffRecordSink.java +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleDiffRecordSink.java @@ -5,10 +5,8 @@ import com.consilens.core.lifecycle.SegmentResult; import com.consilens.sink.api.Sink; import com.consilens.sink.api.model.SinkConfig; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; -import java.io.IOException; import java.util.List; /** @@ -19,11 +17,11 @@ public class ConsoleDiffRecordSink implements Sink { private ConsoleSinkConfig sinkConfig; private long rowCount = 0; + private boolean truncationNoticePrinted = false; @Override - public void open(SinkConfig config, DiffContext context) throws IOException { - sinkConfig = parseConfig(config.getProperties()); - System.out.println("[ConsoleDiffRecordSink] taskId=" + context.getTaskId() + " opened"); + public void open(SinkConfig config, DiffContext context) throws Exception { + sinkConfig = ConsoleOutputSupport.parseConfig(config.getProperties()); } @Override @@ -31,24 +29,19 @@ public void onDiffRecords(List rows, DiffContext context) { int limit = sinkConfig.getMaxRows(); for (DiffRow row : rows) { if (limit >= 0 && rowCount >= limit) { - System.out.println("[ConsoleDiffRecordSink] maxRows=" + limit + " reached, further rows suppressed"); + if (!truncationNoticePrinted) { + ConsoleOutputSupport.printStdout(ConsoleOutputSupport.truncationPayload(context, limit), sinkConfig); + truncationNoticePrinted = true; + } return; } - System.out.println("[DIFF] " + row.getDescription()); + ConsoleOutputSupport.printStdout(ConsoleOutputSupport.diffRecordPayload(row, context), sinkConfig); rowCount++; } } @Override public void onSegmentComplete(SegmentResult segmentResult) { - System.out.println("[ConsoleDiffRecordSink] segment=" + segmentResult.getSegmentIndex() - + " differencesFound=" + segmentResult.getDifferencesFound()); - } - - private ConsoleSinkConfig parseConfig(String properties) throws IOException { - if (properties == null || properties.isBlank()) { - return new ConsoleSinkConfig(); - } - return new ObjectMapper().readValue(properties, ConsoleSinkConfig.class); + ConsoleOutputSupport.printStdout(ConsoleOutputSupport.segmentPayload(segmentResult), sinkConfig); } } diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleOutputSupport.java b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleOutputSupport.java new file mode 100644 index 0000000..980d788 --- /dev/null +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleOutputSupport.java @@ -0,0 +1,166 @@ +package com.consilens.sink.console; + +import com.consilens.core.diff.DiffResult; +import com.consilens.core.diff.DiffRow; +import com.consilens.core.lifecycle.DiffContext; +import com.consilens.core.lifecycle.SegmentResult; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +final class ConsoleOutputSupport { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().findAndRegisterModules(); + + private ConsoleOutputSupport() { + } + + static ConsoleSinkConfig parseConfig(String properties) throws IOException { + if (properties == null || properties.isBlank()) { + return new ConsoleSinkConfig(); + } + return OBJECT_MAPPER.readValue(properties, ConsoleSinkConfig.class); + } + + static void printStdout(Map payload, ConsoleSinkConfig config) { + System.out.println(toJson(payload, config)); + } + + static void printStderr(Map payload, ConsoleSinkConfig config) { + System.err.println(toJson(payload, config)); + } + + static Map diffRecordPayload(DiffRow row, DiffContext context) { + LinkedHashMap payload = basePayload("diff-record", context); + payload.put("operation", row.getOperation().getCode()); + payload.put("primaryKey", row.getPrimaryKey()); + payload.put("primaryKeyString", row.getPrimaryKeyString()); + payload.put("sourceValues", row.getAllSourceValues()); + payload.put("targetValues", row.getAllTargetValues()); + payload.put("columnNames1", row.getColumnNames1()); + payload.put("columnNames2", row.getColumnNames2()); + payload.put("changedColumns1", row.getChangedColumns1()); + payload.put("changedColumns2", row.getChangedColumns2()); + if (row.getMetadata() != null && !row.getMetadata().isEmpty()) { + payload.put("metadata", row.getMetadata()); + } + return payload; + } + + static Map resultPayload(DiffResult result, DiffContext context, boolean includeSummary) { + LinkedHashMap payload = basePayload("result", context); + payload.put("status", result != null && result.hasDifferences() ? "DIFF" : "EQUAL"); + payload.put("statistics", result != null ? result.getStatisticsMap() : Map.of()); + if (includeSummary) { + payload.put("summary", safeSummary(result)); + } + if (result != null && result.getMetadata() != null && !result.getMetadata().isEmpty()) { + payload.put("metadata", result.getMetadata()); + } + return payload; + } + + static Map segmentPayload(SegmentResult segmentResult) { + LinkedHashMap payload = basePayload("segment-complete", segmentResult.getContext()); + payload.put("segmentIndex", segmentResult.getSegmentIndex()); + payload.put("sourceRowsScanned", segmentResult.getSourceRowsScanned()); + payload.put("targetRowsScanned", segmentResult.getTargetRowsScanned()); + payload.put("differencesFound", segmentResult.getDifferencesFound()); + payload.put("totalDifferencesAccumulated", segmentResult.getTotalDifferencesAccumulated()); + payload.put("durationMs", segmentResult.getDuration() != null ? segmentResult.getDuration().toMillis() : null); + return payload; + } + + static Map truncationPayload(DiffContext context, long maxRows) { + LinkedHashMap payload = basePayload("diff-record-truncated", context); + payload.put("maxRows", maxRows); + payload.put("message", "maxRows reached, further rows suppressed"); + return payload; + } + + static Map errorPayload(DiffContext context, Throwable error) { + LinkedHashMap payload = basePayload("error", context); + payload.put("error", errorDetails(error)); + return payload; + } + + private static LinkedHashMap basePayload(String event, DiffContext context) { + LinkedHashMap payload = new LinkedHashMap<>(); + payload.put("event", event); + payload.put("timestamp", Instant.now().toString()); + if (context == null) { + return payload; + } + payload.put("taskId", context.getTaskId()); + payload.put("strategy", context.getStrategy()); + payload.put("algorithm", context.getAlgorithm()); + if (context.getSourceTablePath() != null) { + payload.put("sourceTable", context.getSourceTablePath().getFullPath()); + } + if (context.getTargetTablePath() != null) { + payload.put("targetTable", context.getTargetTablePath().getFullPath()); + } + return payload; + } + + private static Map errorDetails(Throwable error) { + LinkedHashMap detail = new LinkedHashMap<>(); + if (error == null) { + detail.put("type", "UnknownError"); + detail.put("message", null); + return detail; + } + detail.put("type", error.getClass().getName()); + detail.put("message", error.getMessage()); + Throwable cause = error.getCause(); + if (cause != null) { + List> causes = new ArrayList<>(); + while (cause != null) { + LinkedHashMap item = new LinkedHashMap<>(); + item.put("type", cause.getClass().getName()); + item.put("message", cause.getMessage()); + causes.add(item); + cause = cause.getCause(); + } + detail.put("causes", causes); + } + if (error.getSuppressed() != null && error.getSuppressed().length > 0) { + List> suppressed = new ArrayList<>(); + for (Throwable item : error.getSuppressed()) { + LinkedHashMap suppressedItem = new LinkedHashMap<>(); + suppressedItem.put("type", item.getClass().getName()); + suppressedItem.put("message", item.getMessage()); + suppressed.add(suppressedItem); + } + detail.put("suppressed", suppressed); + } + return detail; + } + + private static String safeSummary(DiffResult result) { + if (result == null) { + return "No result available"; + } + if (result.getSourceTablePath() == null || result.getTargetTablePath() == null) { + return "Source/target table metadata unavailable"; + } + return result.getSummary(); + } + + private static String toJson(Map payload, ConsoleSinkConfig config) { + try { + if (config != null && config.isPretty()) { + return OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(payload); + } + return OBJECT_MAPPER.writeValueAsString(payload); + } catch (IOException e) { + throw new UncheckedIOException("Failed to serialize console payload", e); + } + } +} diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleResultSink.java b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleResultSink.java index 8f2be16..83f4d63 100644 --- a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleResultSink.java +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleResultSink.java @@ -4,9 +4,6 @@ import com.consilens.core.lifecycle.DiffContext; import com.consilens.sink.api.Sink; import com.consilens.sink.api.model.SinkConfig; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; /** * Outputs the final aggregated diff result to the console. @@ -16,27 +13,19 @@ public class ConsoleResultSink implements Sink { private ConsoleSinkConfig sinkConfig; @Override - public void open(SinkConfig config, DiffContext context) throws IOException { - sinkConfig = parseConfig(config.getProperties()); + public void open(SinkConfig config, DiffContext context) throws Exception { + sinkConfig = ConsoleOutputSupport.parseConfig(config.getProperties()); } @Override public void onResult(DiffResult result, DiffContext context) { if (sinkConfig.isShowStatistics()) { - String summary = result.getSummary(); - System.out.println("[DIFF RESULT] " + summary); + ConsoleOutputSupport.printStdout(ConsoleOutputSupport.resultPayload(result, context, true), sinkConfig); } } @Override public void onError(DiffContext context, Throwable error) { - System.err.println("[DIFF ERROR] taskId=" + context.getTaskId() + " error=" + error.getMessage()); - } - - private ConsoleSinkConfig parseConfig(String properties) throws IOException { - if (properties == null || properties.isBlank()) { - return new ConsoleSinkConfig(); - } - return new ObjectMapper().readValue(properties, ConsoleSinkConfig.class); + ConsoleOutputSupport.printStderr(ConsoleOutputSupport.errorPayload(context, error), sinkConfig); } } diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleSinkConfig.java b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleSinkConfig.java index 10bb424..e3281dd 100644 --- a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleSinkConfig.java +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleSinkConfig.java @@ -25,4 +25,7 @@ public class ConsoleSinkConfig { /** Whether to print summary statistics; default true. */ private boolean showStatistics = true; + + /** Whether to pretty-print JSON payloads; default true. */ + private boolean pretty = true; } diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/test/java/com/consilens/sink/console/ConsoleSinkStructuredOutputTest.java b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/test/java/com/consilens/sink/console/ConsoleSinkStructuredOutputTest.java new file mode 100644 index 0000000..5a137ad --- /dev/null +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/test/java/com/consilens/sink/console/ConsoleSinkStructuredOutputTest.java @@ -0,0 +1,182 @@ +package com.consilens.sink.console; + +import com.consilens.connector.api.model.TablePath; +import com.consilens.core.diff.DiffResult; +import com.consilens.core.diff.DiffRow; +import com.consilens.core.lifecycle.DiffContext; +import com.consilens.core.lifecycle.SegmentResult; +import com.consilens.sink.api.model.SinkConfig; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.MappingIterator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ConsoleSinkStructuredOutputTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().findAndRegisterModules(); + + private final ByteArrayOutputStream out = new ByteArrayOutputStream(); + private final ByteArrayOutputStream err = new ByteArrayOutputStream(); + private PrintStream originalOut; + private PrintStream originalErr; + + @BeforeEach + void setUpStreams() throws Exception { + originalOut = System.out; + originalErr = System.err; + System.setOut(new PrintStream(out, true, StandardCharsets.UTF_8.name())); + System.setErr(new PrintStream(err, true, StandardCharsets.UTF_8.name())); + } + + @AfterEach + void restoreStreams() { + System.setOut(originalOut); + System.setErr(originalErr); + } + + @Test + void shouldPrintStructuredResultAndErrorPayloads() throws Exception { + ConsoleResultSink sink = new ConsoleResultSink(); + DiffContext context = diffContext(); + + sink.open(resultSinkConfig(), context); + sink.onResult(diffResult(), context); + sink.onError(context, new IllegalStateException("console sink failed", new RuntimeException("root cause"))); + + JsonNode resultNode = readFirstJsonLine(out); + assertEquals("result", resultNode.get("event").asText()); + assertEquals("task-1", resultNode.get("taskId").asText()); + assertEquals("DIFF", resultNode.get("status").asText()); + assertEquals(1, resultNode.get("statistics").get("totalDifferences").asInt()); + assertTrue(resultNode.has("summary")); + assertFalse(resultNode.has("infoTree")); + assertTrue(out.toString(StandardCharsets.UTF_8).contains("\n \"event\"")); + + JsonNode errorNode = readFirstJsonLine(err); + assertEquals("error", errorNode.get("event").asText()); + assertEquals("java.lang.IllegalStateException", errorNode.get("error").get("type").asText()); + assertEquals("console sink failed", errorNode.get("error").get("message").asText()); + assertEquals("java.lang.RuntimeException", errorNode.get("error").get("causes").get(0).get("type").asText()); + } + + @Test + void shouldPrintStructuredDiffRecordsAndRespectMaxRows() throws Exception { + ConsoleDiffRecordSink sink = new ConsoleDiffRecordSink(); + DiffContext context = diffContext(); + SinkConfig config = new SinkConfig(); + config.setProperties("{\"maxRows\":1}"); + + sink.open(config, context); + sink.onDiffRecords(List.of(diffRow(), diffRow()), context); + sink.onSegmentComplete(SegmentResult.builder() + .segmentIndex(3) + .sourceRowsScanned(10) + .targetRowsScanned(10) + .differencesFound(1) + .totalDifferencesAccumulated(1) + .duration(Duration.ofMillis(25)) + .context(context) + .build()); + + List lines = readJsonLines(out); + assertEquals(3, lines.size()); + assertEquals("diff-record", lines.get(0).get("event").asText()); + assertEquals("mismatch", lines.get(0).get("operation").asText()); + assertEquals("42", lines.get(0).get("primaryKeyString").asText()); + assertTrue(lines.get(0).get("metadata").has("changedColumns1")); + + assertEquals("diff-record-truncated", lines.get(1).get("event").asText()); + assertEquals(1, lines.get(1).get("maxRows").asInt()); + + assertEquals("segment-complete", lines.get(2).get("event").asText()); + assertEquals(3, lines.get(2).get("segmentIndex").asInt()); + + String plainOutput = out.toString(StandardCharsets.UTF_8); + assertFalse(plainOutput.contains("[DIFF RESULT]")); + assertFalse(plainOutput.contains("[DIFF]")); + assertFalse(plainOutput.contains("[ConsoleDiffRecordSink]")); + } + + private DiffContext diffContext() { + return DiffContext.builder() + .taskId("task-1") + .startTime(Instant.parse("2026-05-12T08:00:00Z")) + .sourceTablePath(TablePath.fromString("mydb.users")) + .targetTablePath(TablePath.fromString("public.users")) + .strategy("checksum") + .algorithm("xor") + .build(); + } + + private SinkConfig resultSinkConfig() { + SinkConfig config = new SinkConfig(); + config.setProperties("{\"showStatistics\":true}"); + return config; + } + + private DiffRow diffRow() { + return DiffRow.modified( + List.of(42), + List.of("Alice", "active"), + List.of("Alice", "inactive"), + List.of("name", "status"), + List.of("name", "status"), + List.of("status"), + List.of("status")); + } + + private DiffResult diffResult() { + return DiffResult.builder() + .differences(List.of(diffRow())) + .statistics(DiffResult.DiffStatistics.builder() + .sourceRowCount(100) + .targetRowCount(100) + .sourceMissingCount(0) + .targetMissingCount(0) + .mismatchCount(1) + .unchangedCount(99) + .totalDifferences(1) + .differencePercentage(0.01d) + .processingTimeMs(12L) + .build()) + .infoTree(Optional.empty()) + .completedAt(Instant.parse("2026-05-12T08:00:12Z")) + .metadata(Map.of("jobId", "job-1")) + .sourceTablePath(TablePath.fromString("mydb.users")) + .targetTablePath(TablePath.fromString("public.users")) + .build(); + } + + private JsonNode readFirstJsonLine(ByteArrayOutputStream stream) throws Exception { + return readJsonLines(stream).get(0); + } + + private List readJsonLines(ByteArrayOutputStream stream) throws Exception { + byte[] bytes = stream.toByteArray(); + assertTrue(bytes.length > 0); + java.util.ArrayList result = new java.util.ArrayList<>(); + try (MappingIterator iterator = OBJECT_MAPPER.readerFor(JsonNode.class) + .readValues(new ByteArrayInputStream(bytes))) { + while (iterator.hasNext()) { + result.add(iterator.next()); + } + } + return result; + } +} diff --git a/examples/detail-to-aggregate-custom-sql.yaml b/examples/detail-to-aggregate-custom-sql.yaml index 448242d..f209f9d 100644 --- a/examples/detail-to-aggregate-custom-sql.yaml +++ b/examples/detail-to-aggregate-custom-sql.yaml @@ -66,3 +66,5 @@ result: sinks: - format: console type: result + - format: console + type: diff-record diff --git a/examples/mapped-mysql-to-postgres-checksum.yaml b/examples/mapped-mysql-to-postgres-checksum.yaml new file mode 100644 index 0000000..8941050 --- /dev/null +++ b/examples/mapped-mysql-to-postgres-checksum.yaml @@ -0,0 +1,88 @@ +# 字段映射比对配置 - MySQL users 到 PostgreSQL users +# 目标:覆盖 comparison.mappings,并演示 column / expression / literal 三种映射方式 + +source: + type: mysql + name: source-mysql-mapped + connection: + url: jdbc:mysql://localhost:3306/mydb + username: ${env.MYSQL_USER} + password: ${env.MYSQL_PASSWORD} + resource: + type: table + name: users + +target: + type: postgresql + name: target-postgresql-mapped + connection: + url: jdbc:postgresql://localhost:5432/postgres?currentSchema=public + username: ${env.PG_USER} + password: ${env.PG_PASSWORD} + resource: + type: table + name: users + +comparison: + keys: + source: + - id + target: + - id + + mappings: + - name: user_id + source: + column: id + target: + column: id + key: true + + - name: normalized_name + source: + expression: "UPPER(name)" + target: + expression: "UPPER(name)" + + - name: email + source: + column: email + target: + column: email + + - name: normalized_phone + source: + expression: "REPLACE(phone, '+86', '')" + target: + expression: "REPLACE(phone, '+86', '')" + + - name: source_system + source: + literal: "seed-users" + target: + literal: "seed-users" + + - name: debug_tag + source: + literal: "users-v1" + target: + literal: "users-v1" + compare: false + + filters: + source: "created_at >= '2026-01-01 00:00:00'" + target: "created_at >= '2026-01-01 00:00:00'" + +strategy: + mode: checksum + algorithm: xor + bisectionFactor: 4 + bisectionThreshold: 5000 + batchSize: 1000 + localCompare: + mode: row-hash + +result: + sinks: + - format: console + type: result diff --git a/examples/same-db-mysql-comparison.yaml b/examples/same-db-mysql-comparison.yaml index 5ba2827..6a832f6 100644 --- a/examples/same-db-mysql-comparison.yaml +++ b/examples/same-db-mysql-comparison.yaml @@ -1,5 +1,6 @@ -# 同库比对配置 - MySQL 内部两张表比对 -# 适用于数据迁移验证、分表一致性校验等场景 +# 同库 join 比对配置 - MySQL 内部两张表比对 +# 适用于同一 JDBC URL 下的迁移验证,也覆盖 readOptions / extraColumns / +# normalization / result.failOnSinkError / sink.enabled 等显式配置项 source: type: mysql @@ -11,6 +12,11 @@ source: resource: type: table name: orders + readOptions: + consistency: snapshot + batchSize: 500 + fetchSize: 1000 + useCursorFetch: true target: type: mysql @@ -22,6 +28,11 @@ target: resource: type: table name: orders_backup + readOptions: + consistency: snapshot + batchSize: 500 + fetchSize: 1000 + useCursorFetch: true comparison: keys: @@ -35,24 +46,56 @@ comparison: - customer_id - amount - status - - created_at target: - customer_id - amount - status - - created_at + + extraColumns: + - created_at filters: source: "created_at >= '2025-01-01'" target: "created_at >= '2025-01-01'" strategy: - mode: checksum - algorithm: xor + # join 要求 source / target 指向同一个 JDBC URL + mode: join + algorithm: concat bisectionFactor: 4 bisectionThreshold: 10000 + batchSize: 500 + maxDifferences: 5000 + +normalization: + global: + decimal: + precision: 2 + rounding: true + datetime: + format: yyyy-MM-dd HH:mm:ss + timezone: Asia/Shanghai + comparisonMode: instant + source: + varchar: + nullValue: "" + target: + varchar: + nullValue: "" result: + failOnSinkError: false sinks: - format: console type: result + - format: csv + type: diff-record + enabled: false + properties: + path: ./output/orders-diff.csv + delimiter: "," + includeHeader: true + mergeDefaults: true + columns: + - name: env + value: staging