From 17db6cefe2bfeffb45968cc0c4b9e5acc8598ce7 Mon Sep 17 00:00:00 2001 From: xiaoyao0120 Date: Tue, 12 May 2026 15:17:21 +0800 Subject: [PATCH 1/2] improve: improve example --- .github/workflows/ci.yml | 4 +- .github/workflows/release.yml | 2 +- ...ExampleConfigurationCompatibilityTest.java | 39 ++++ .../consilens-sink-console/pom.xml | 7 + .../sink/console/ConsoleDiffRecordSink.java | 25 +-- .../sink/console/ConsoleOutputSupport.java | 166 ++++++++++++++++ .../sink/console/ConsoleResultSink.java | 19 +- .../sink/console/ConsoleSinkConfig.java | 3 + .../ConsoleSinkStructuredOutputTest.java | 182 ++++++++++++++++++ examples/detail-to-aggregate-custom-sql.yaml | 2 + .../mapped-mysql-to-postgres-checksum.yaml | 88 +++++++++ examples/same-db-mysql-comparison.yaml | 55 +++++- 12 files changed, 552 insertions(+), 40 deletions(-) create mode 100644 consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleOutputSupport.java create mode 100644 consilens-sink/consilens-sink-plugins/consilens-sink-console/src/test/java/com/consilens/sink/console/ConsoleSinkStructuredOutputTest.java create mode 100644 examples/mapped-mysql-to-postgres-checksum.yaml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 115af6a..755d6b8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,8 +60,8 @@ jobs: key: ${{ runner.os }}-maven - name: Build and Package on ${{ matrix.java }} run: | - chmod 777 ./mvnw - ./mvnw -B clean verify -Dspotless.skip=true + ./mvnw -B clean package -DskipTests -DskipITs \ + -Dspotless.skip=true \ result: name: Build diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index cfed833..4f45ba5 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -19,7 +19,7 @@ jobs: cache: maven - name: Build Release Package - run: ./mvnw clean verify -Prelease -B + run: mvn clean package -DskipTests -Prelease -B - name: Create GitHub Release uses: softprops/action-gh-release@v2 diff --git a/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java b/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java index 7f57043..79e966e 100644 --- a/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java +++ b/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java @@ -3,6 +3,7 @@ import com.consilens.cli.model.CliConfiguration; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.file.Files; @@ -12,9 +13,12 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; +import java.util.LinkedHashMap; +import java.util.function.Predicate; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; class ExampleConfigurationCompatibilityTest { @@ -34,6 +38,41 @@ void shouldLoadExampleConfigurations(Path configPath) throws Exception { assertNotNull(config.getComparison().getKeys()); } + @Test + void shouldCoverStructuredConfigurationOptionsAcrossExamples() throws Exception { + List exampleTexts = exampleConfigurationPaths() + .map(path -> { + try { + return Files.readString(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + + Map> requiredPatterns = new LinkedHashMap<>(); + requiredPatterns.put("comparison.mappings", text -> text.contains("mappings:") || text.contains("\"mappings\"")); + requiredPatterns.put("comparison.extraColumns", text -> text.contains("extraColumns:") || text.contains("\"extraColumns\"")); + requiredPatterns.put("source/target.readOptions", text -> text.contains("readOptions:") || text.contains("\"readOptions\"")); + requiredPatterns.put("strategy.mode=join", text -> text.contains("mode: join") || text.contains("\"mode\": \"join\"")); + requiredPatterns.put("strategy.algorithm=concat", text -> text.contains("algorithm: concat") + || text.contains("\"algorithm\": \"concat\"")); + requiredPatterns.put("strategy.localCompare.mode=row-hash", text -> text.contains("mode: row-hash") + || text.contains("\"mode\": \"row-hash\"")); + requiredPatterns.put("strategy.maxDifferences", text -> text.contains("maxDifferences:") + || text.contains("\"maxDifferences\"")); + requiredPatterns.put("normalization", text -> text.contains("normalization:") || text.contains("\"normalization\"")); + requiredPatterns.put("result.failOnSinkError", text -> text.contains("failOnSinkError:") + || text.contains("\"failOnSinkError\"")); + requiredPatterns.put("result.sinks[].enabled", text -> text.contains("enabled: false") + || text.contains("\"enabled\": false")); + + for (Map.Entry> entry : requiredPatterns.entrySet()) { + assertTrue(exampleTexts.stream().anyMatch(entry.getValue()), + "examples 目录缺少 " + entry.getKey() + " 的覆盖示例"); + } + } + private static Stream exampleConfigurationPaths() throws IOException { Path examplesDirectory = Paths.get("..", "examples").toAbsolutePath().normalize(); List paths; diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-console/pom.xml b/consilens-sink/consilens-sink-plugins/consilens-sink-console/pom.xml index 4820953..70edbb8 100644 --- a/consilens-sink/consilens-sink-plugins/consilens-sink-console/pom.xml +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-console/pom.xml @@ -37,5 +37,12 @@ org.apache.logging.log4j log4j-api + + + org.junit.jupiter + junit-jupiter + ${junit.version} + test + diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleDiffRecordSink.java b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleDiffRecordSink.java index 4db39c8..dfae5cf 100644 --- a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleDiffRecordSink.java +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleDiffRecordSink.java @@ -5,10 +5,8 @@ import com.consilens.core.lifecycle.SegmentResult; import com.consilens.sink.api.Sink; import com.consilens.sink.api.model.SinkConfig; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; -import java.io.IOException; import java.util.List; /** @@ -19,11 +17,11 @@ public class ConsoleDiffRecordSink implements Sink { private ConsoleSinkConfig sinkConfig; private long rowCount = 0; + private boolean truncationNoticePrinted = false; @Override - public void open(SinkConfig config, DiffContext context) throws IOException { - sinkConfig = parseConfig(config.getProperties()); - System.out.println("[ConsoleDiffRecordSink] taskId=" + context.getTaskId() + " opened"); + public void open(SinkConfig config, DiffContext context) throws Exception { + sinkConfig = ConsoleOutputSupport.parseConfig(config.getProperties()); } @Override @@ -31,24 +29,19 @@ public void onDiffRecords(List rows, DiffContext context) { int limit = sinkConfig.getMaxRows(); for (DiffRow row : rows) { if (limit >= 0 && rowCount >= limit) { - System.out.println("[ConsoleDiffRecordSink] maxRows=" + limit + " reached, further rows suppressed"); + if (!truncationNoticePrinted) { + ConsoleOutputSupport.printStdout(ConsoleOutputSupport.truncationPayload(context, limit), sinkConfig); + truncationNoticePrinted = true; + } return; } - System.out.println("[DIFF] " + row.getDescription()); + ConsoleOutputSupport.printStdout(ConsoleOutputSupport.diffRecordPayload(row, context), sinkConfig); rowCount++; } } @Override public void onSegmentComplete(SegmentResult segmentResult) { - System.out.println("[ConsoleDiffRecordSink] segment=" + segmentResult.getSegmentIndex() - + " differencesFound=" + segmentResult.getDifferencesFound()); - } - - private ConsoleSinkConfig parseConfig(String properties) throws IOException { - if (properties == null || properties.isBlank()) { - return new ConsoleSinkConfig(); - } - return new ObjectMapper().readValue(properties, ConsoleSinkConfig.class); + ConsoleOutputSupport.printStdout(ConsoleOutputSupport.segmentPayload(segmentResult), sinkConfig); } } diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleOutputSupport.java b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleOutputSupport.java new file mode 100644 index 0000000..980d788 --- /dev/null +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleOutputSupport.java @@ -0,0 +1,166 @@ +package com.consilens.sink.console; + +import com.consilens.core.diff.DiffResult; +import com.consilens.core.diff.DiffRow; +import com.consilens.core.lifecycle.DiffContext; +import com.consilens.core.lifecycle.SegmentResult; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +final class ConsoleOutputSupport { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().findAndRegisterModules(); + + private ConsoleOutputSupport() { + } + + static ConsoleSinkConfig parseConfig(String properties) throws IOException { + if (properties == null || properties.isBlank()) { + return new ConsoleSinkConfig(); + } + return OBJECT_MAPPER.readValue(properties, ConsoleSinkConfig.class); + } + + static void printStdout(Map payload, ConsoleSinkConfig config) { + System.out.println(toJson(payload, config)); + } + + static void printStderr(Map payload, ConsoleSinkConfig config) { + System.err.println(toJson(payload, config)); + } + + static Map diffRecordPayload(DiffRow row, DiffContext context) { + LinkedHashMap payload = basePayload("diff-record", context); + payload.put("operation", row.getOperation().getCode()); + payload.put("primaryKey", row.getPrimaryKey()); + payload.put("primaryKeyString", row.getPrimaryKeyString()); + payload.put("sourceValues", row.getAllSourceValues()); + payload.put("targetValues", row.getAllTargetValues()); + payload.put("columnNames1", row.getColumnNames1()); + payload.put("columnNames2", row.getColumnNames2()); + payload.put("changedColumns1", row.getChangedColumns1()); + payload.put("changedColumns2", row.getChangedColumns2()); + if (row.getMetadata() != null && !row.getMetadata().isEmpty()) { + payload.put("metadata", row.getMetadata()); + } + return payload; + } + + static Map resultPayload(DiffResult result, DiffContext context, boolean includeSummary) { + LinkedHashMap payload = basePayload("result", context); + payload.put("status", result != null && result.hasDifferences() ? "DIFF" : "EQUAL"); + payload.put("statistics", result != null ? result.getStatisticsMap() : Map.of()); + if (includeSummary) { + payload.put("summary", safeSummary(result)); + } + if (result != null && result.getMetadata() != null && !result.getMetadata().isEmpty()) { + payload.put("metadata", result.getMetadata()); + } + return payload; + } + + static Map segmentPayload(SegmentResult segmentResult) { + LinkedHashMap payload = basePayload("segment-complete", segmentResult.getContext()); + payload.put("segmentIndex", segmentResult.getSegmentIndex()); + payload.put("sourceRowsScanned", segmentResult.getSourceRowsScanned()); + payload.put("targetRowsScanned", segmentResult.getTargetRowsScanned()); + payload.put("differencesFound", segmentResult.getDifferencesFound()); + payload.put("totalDifferencesAccumulated", segmentResult.getTotalDifferencesAccumulated()); + payload.put("durationMs", segmentResult.getDuration() != null ? segmentResult.getDuration().toMillis() : null); + return payload; + } + + static Map truncationPayload(DiffContext context, long maxRows) { + LinkedHashMap payload = basePayload("diff-record-truncated", context); + payload.put("maxRows", maxRows); + payload.put("message", "maxRows reached, further rows suppressed"); + return payload; + } + + static Map errorPayload(DiffContext context, Throwable error) { + LinkedHashMap payload = basePayload("error", context); + payload.put("error", errorDetails(error)); + return payload; + } + + private static LinkedHashMap basePayload(String event, DiffContext context) { + LinkedHashMap payload = new LinkedHashMap<>(); + payload.put("event", event); + payload.put("timestamp", Instant.now().toString()); + if (context == null) { + return payload; + } + payload.put("taskId", context.getTaskId()); + payload.put("strategy", context.getStrategy()); + payload.put("algorithm", context.getAlgorithm()); + if (context.getSourceTablePath() != null) { + payload.put("sourceTable", context.getSourceTablePath().getFullPath()); + } + if (context.getTargetTablePath() != null) { + payload.put("targetTable", context.getTargetTablePath().getFullPath()); + } + return payload; + } + + private static Map errorDetails(Throwable error) { + LinkedHashMap detail = new LinkedHashMap<>(); + if (error == null) { + detail.put("type", "UnknownError"); + detail.put("message", null); + return detail; + } + detail.put("type", error.getClass().getName()); + detail.put("message", error.getMessage()); + Throwable cause = error.getCause(); + if (cause != null) { + List> causes = new ArrayList<>(); + while (cause != null) { + LinkedHashMap item = new LinkedHashMap<>(); + item.put("type", cause.getClass().getName()); + item.put("message", cause.getMessage()); + causes.add(item); + cause = cause.getCause(); + } + detail.put("causes", causes); + } + if (error.getSuppressed() != null && error.getSuppressed().length > 0) { + List> suppressed = new ArrayList<>(); + for (Throwable item : error.getSuppressed()) { + LinkedHashMap suppressedItem = new LinkedHashMap<>(); + suppressedItem.put("type", item.getClass().getName()); + suppressedItem.put("message", item.getMessage()); + suppressed.add(suppressedItem); + } + detail.put("suppressed", suppressed); + } + return detail; + } + + private static String safeSummary(DiffResult result) { + if (result == null) { + return "No result available"; + } + if (result.getSourceTablePath() == null || result.getTargetTablePath() == null) { + return "Source/target table metadata unavailable"; + } + return result.getSummary(); + } + + private static String toJson(Map payload, ConsoleSinkConfig config) { + try { + if (config != null && config.isPretty()) { + return OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(payload); + } + return OBJECT_MAPPER.writeValueAsString(payload); + } catch (IOException e) { + throw new UncheckedIOException("Failed to serialize console payload", e); + } + } +} diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleResultSink.java b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleResultSink.java index 8f2be16..83f4d63 100644 --- a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleResultSink.java +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleResultSink.java @@ -4,9 +4,6 @@ import com.consilens.core.lifecycle.DiffContext; import com.consilens.sink.api.Sink; import com.consilens.sink.api.model.SinkConfig; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; /** * Outputs the final aggregated diff result to the console. @@ -16,27 +13,19 @@ public class ConsoleResultSink implements Sink { private ConsoleSinkConfig sinkConfig; @Override - public void open(SinkConfig config, DiffContext context) throws IOException { - sinkConfig = parseConfig(config.getProperties()); + public void open(SinkConfig config, DiffContext context) throws Exception { + sinkConfig = ConsoleOutputSupport.parseConfig(config.getProperties()); } @Override public void onResult(DiffResult result, DiffContext context) { if (sinkConfig.isShowStatistics()) { - String summary = result.getSummary(); - System.out.println("[DIFF RESULT] " + summary); + ConsoleOutputSupport.printStdout(ConsoleOutputSupport.resultPayload(result, context, true), sinkConfig); } } @Override public void onError(DiffContext context, Throwable error) { - System.err.println("[DIFF ERROR] taskId=" + context.getTaskId() + " error=" + error.getMessage()); - } - - private ConsoleSinkConfig parseConfig(String properties) throws IOException { - if (properties == null || properties.isBlank()) { - return new ConsoleSinkConfig(); - } - return new ObjectMapper().readValue(properties, ConsoleSinkConfig.class); + ConsoleOutputSupport.printStderr(ConsoleOutputSupport.errorPayload(context, error), sinkConfig); } } diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleSinkConfig.java b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleSinkConfig.java index 10bb424..e3281dd 100644 --- a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleSinkConfig.java +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/main/java/com/consilens/sink/console/ConsoleSinkConfig.java @@ -25,4 +25,7 @@ public class ConsoleSinkConfig { /** Whether to print summary statistics; default true. */ private boolean showStatistics = true; + + /** Whether to pretty-print JSON payloads; default true. */ + private boolean pretty = true; } diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/test/java/com/consilens/sink/console/ConsoleSinkStructuredOutputTest.java b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/test/java/com/consilens/sink/console/ConsoleSinkStructuredOutputTest.java new file mode 100644 index 0000000..5a137ad --- /dev/null +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-console/src/test/java/com/consilens/sink/console/ConsoleSinkStructuredOutputTest.java @@ -0,0 +1,182 @@ +package com.consilens.sink.console; + +import com.consilens.connector.api.model.TablePath; +import com.consilens.core.diff.DiffResult; +import com.consilens.core.diff.DiffRow; +import com.consilens.core.lifecycle.DiffContext; +import com.consilens.core.lifecycle.SegmentResult; +import com.consilens.sink.api.model.SinkConfig; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.MappingIterator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ConsoleSinkStructuredOutputTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().findAndRegisterModules(); + + private final ByteArrayOutputStream out = new ByteArrayOutputStream(); + private final ByteArrayOutputStream err = new ByteArrayOutputStream(); + private PrintStream originalOut; + private PrintStream originalErr; + + @BeforeEach + void setUpStreams() throws Exception { + originalOut = System.out; + originalErr = System.err; + System.setOut(new PrintStream(out, true, StandardCharsets.UTF_8.name())); + System.setErr(new PrintStream(err, true, StandardCharsets.UTF_8.name())); + } + + @AfterEach + void restoreStreams() { + System.setOut(originalOut); + System.setErr(originalErr); + } + + @Test + void shouldPrintStructuredResultAndErrorPayloads() throws Exception { + ConsoleResultSink sink = new ConsoleResultSink(); + DiffContext context = diffContext(); + + sink.open(resultSinkConfig(), context); + sink.onResult(diffResult(), context); + sink.onError(context, new IllegalStateException("console sink failed", new RuntimeException("root cause"))); + + JsonNode resultNode = readFirstJsonLine(out); + assertEquals("result", resultNode.get("event").asText()); + assertEquals("task-1", resultNode.get("taskId").asText()); + assertEquals("DIFF", resultNode.get("status").asText()); + assertEquals(1, resultNode.get("statistics").get("totalDifferences").asInt()); + assertTrue(resultNode.has("summary")); + assertFalse(resultNode.has("infoTree")); + assertTrue(out.toString(StandardCharsets.UTF_8).contains("\n \"event\"")); + + JsonNode errorNode = readFirstJsonLine(err); + assertEquals("error", errorNode.get("event").asText()); + assertEquals("java.lang.IllegalStateException", errorNode.get("error").get("type").asText()); + assertEquals("console sink failed", errorNode.get("error").get("message").asText()); + assertEquals("java.lang.RuntimeException", errorNode.get("error").get("causes").get(0).get("type").asText()); + } + + @Test + void shouldPrintStructuredDiffRecordsAndRespectMaxRows() throws Exception { + ConsoleDiffRecordSink sink = new ConsoleDiffRecordSink(); + DiffContext context = diffContext(); + SinkConfig config = new SinkConfig(); + config.setProperties("{\"maxRows\":1}"); + + sink.open(config, context); + sink.onDiffRecords(List.of(diffRow(), diffRow()), context); + sink.onSegmentComplete(SegmentResult.builder() + .segmentIndex(3) + .sourceRowsScanned(10) + .targetRowsScanned(10) + .differencesFound(1) + .totalDifferencesAccumulated(1) + .duration(Duration.ofMillis(25)) + .context(context) + .build()); + + List lines = readJsonLines(out); + assertEquals(3, lines.size()); + assertEquals("diff-record", lines.get(0).get("event").asText()); + assertEquals("mismatch", lines.get(0).get("operation").asText()); + assertEquals("42", lines.get(0).get("primaryKeyString").asText()); + assertTrue(lines.get(0).get("metadata").has("changedColumns1")); + + assertEquals("diff-record-truncated", lines.get(1).get("event").asText()); + assertEquals(1, lines.get(1).get("maxRows").asInt()); + + assertEquals("segment-complete", lines.get(2).get("event").asText()); + assertEquals(3, lines.get(2).get("segmentIndex").asInt()); + + String plainOutput = out.toString(StandardCharsets.UTF_8); + assertFalse(plainOutput.contains("[DIFF RESULT]")); + assertFalse(plainOutput.contains("[DIFF]")); + assertFalse(plainOutput.contains("[ConsoleDiffRecordSink]")); + } + + private DiffContext diffContext() { + return DiffContext.builder() + .taskId("task-1") + .startTime(Instant.parse("2026-05-12T08:00:00Z")) + .sourceTablePath(TablePath.fromString("mydb.users")) + .targetTablePath(TablePath.fromString("public.users")) + .strategy("checksum") + .algorithm("xor") + .build(); + } + + private SinkConfig resultSinkConfig() { + SinkConfig config = new SinkConfig(); + config.setProperties("{\"showStatistics\":true}"); + return config; + } + + private DiffRow diffRow() { + return DiffRow.modified( + List.of(42), + List.of("Alice", "active"), + List.of("Alice", "inactive"), + List.of("name", "status"), + List.of("name", "status"), + List.of("status"), + List.of("status")); + } + + private DiffResult diffResult() { + return DiffResult.builder() + .differences(List.of(diffRow())) + .statistics(DiffResult.DiffStatistics.builder() + .sourceRowCount(100) + .targetRowCount(100) + .sourceMissingCount(0) + .targetMissingCount(0) + .mismatchCount(1) + .unchangedCount(99) + .totalDifferences(1) + .differencePercentage(0.01d) + .processingTimeMs(12L) + .build()) + .infoTree(Optional.empty()) + .completedAt(Instant.parse("2026-05-12T08:00:12Z")) + .metadata(Map.of("jobId", "job-1")) + .sourceTablePath(TablePath.fromString("mydb.users")) + .targetTablePath(TablePath.fromString("public.users")) + .build(); + } + + private JsonNode readFirstJsonLine(ByteArrayOutputStream stream) throws Exception { + return readJsonLines(stream).get(0); + } + + private List readJsonLines(ByteArrayOutputStream stream) throws Exception { + byte[] bytes = stream.toByteArray(); + assertTrue(bytes.length > 0); + java.util.ArrayList result = new java.util.ArrayList<>(); + try (MappingIterator iterator = OBJECT_MAPPER.readerFor(JsonNode.class) + .readValues(new ByteArrayInputStream(bytes))) { + while (iterator.hasNext()) { + result.add(iterator.next()); + } + } + return result; + } +} diff --git a/examples/detail-to-aggregate-custom-sql.yaml b/examples/detail-to-aggregate-custom-sql.yaml index 448242d..f209f9d 100644 --- a/examples/detail-to-aggregate-custom-sql.yaml +++ b/examples/detail-to-aggregate-custom-sql.yaml @@ -66,3 +66,5 @@ result: sinks: - format: console type: result + - format: console + type: diff-record diff --git a/examples/mapped-mysql-to-postgres-checksum.yaml b/examples/mapped-mysql-to-postgres-checksum.yaml new file mode 100644 index 0000000..8941050 --- /dev/null +++ b/examples/mapped-mysql-to-postgres-checksum.yaml @@ -0,0 +1,88 @@ +# 字段映射比对配置 - MySQL users 到 PostgreSQL users +# 目标:覆盖 comparison.mappings,并演示 column / expression / literal 三种映射方式 + +source: + type: mysql + name: source-mysql-mapped + connection: + url: jdbc:mysql://localhost:3306/mydb + username: ${env.MYSQL_USER} + password: ${env.MYSQL_PASSWORD} + resource: + type: table + name: users + +target: + type: postgresql + name: target-postgresql-mapped + connection: + url: jdbc:postgresql://localhost:5432/postgres?currentSchema=public + username: ${env.PG_USER} + password: ${env.PG_PASSWORD} + resource: + type: table + name: users + +comparison: + keys: + source: + - id + target: + - id + + mappings: + - name: user_id + source: + column: id + target: + column: id + key: true + + - name: normalized_name + source: + expression: "UPPER(name)" + target: + expression: "UPPER(name)" + + - name: email + source: + column: email + target: + column: email + + - name: normalized_phone + source: + expression: "REPLACE(phone, '+86', '')" + target: + expression: "REPLACE(phone, '+86', '')" + + - name: source_system + source: + literal: "seed-users" + target: + literal: "seed-users" + + - name: debug_tag + source: + literal: "users-v1" + target: + literal: "users-v1" + compare: false + + filters: + source: "created_at >= '2026-01-01 00:00:00'" + target: "created_at >= '2026-01-01 00:00:00'" + +strategy: + mode: checksum + algorithm: xor + bisectionFactor: 4 + bisectionThreshold: 5000 + batchSize: 1000 + localCompare: + mode: row-hash + +result: + sinks: + - format: console + type: result diff --git a/examples/same-db-mysql-comparison.yaml b/examples/same-db-mysql-comparison.yaml index 5ba2827..6a832f6 100644 --- a/examples/same-db-mysql-comparison.yaml +++ b/examples/same-db-mysql-comparison.yaml @@ -1,5 +1,6 @@ -# 同库比对配置 - MySQL 内部两张表比对 -# 适用于数据迁移验证、分表一致性校验等场景 +# 同库 join 比对配置 - MySQL 内部两张表比对 +# 适用于同一 JDBC URL 下的迁移验证,也覆盖 readOptions / extraColumns / +# normalization / result.failOnSinkError / sink.enabled 等显式配置项 source: type: mysql @@ -11,6 +12,11 @@ source: resource: type: table name: orders + readOptions: + consistency: snapshot + batchSize: 500 + fetchSize: 1000 + useCursorFetch: true target: type: mysql @@ -22,6 +28,11 @@ target: resource: type: table name: orders_backup + readOptions: + consistency: snapshot + batchSize: 500 + fetchSize: 1000 + useCursorFetch: true comparison: keys: @@ -35,24 +46,56 @@ comparison: - customer_id - amount - status - - created_at target: - customer_id - amount - status - - created_at + + extraColumns: + - created_at filters: source: "created_at >= '2025-01-01'" target: "created_at >= '2025-01-01'" strategy: - mode: checksum - algorithm: xor + # join 要求 source / target 指向同一个 JDBC URL + mode: join + algorithm: concat bisectionFactor: 4 bisectionThreshold: 10000 + batchSize: 500 + maxDifferences: 5000 + +normalization: + global: + decimal: + precision: 2 + rounding: true + datetime: + format: yyyy-MM-dd HH:mm:ss + timezone: Asia/Shanghai + comparisonMode: instant + source: + varchar: + nullValue: "" + target: + varchar: + nullValue: "" result: + failOnSinkError: false sinks: - format: console type: result + - format: csv + type: diff-record + enabled: false + properties: + path: ./output/orders-diff.csv + delimiter: "," + includeHeader: true + mergeDefaults: true + columns: + - name: env + value: staging From 3a94051b7a5f33af5fc3143948664d10b7245e5d Mon Sep 17 00:00:00 2001 From: xiaoyao0120 Date: Wed, 13 May 2026 07:27:07 +0800 Subject: [PATCH 2/2] fix: resolve row-hash error --- .../core/algorithm/ChecksumDiffer.java | 134 ++++++++++--- .../core/algorithm/ChecksumDifferTest.java | 42 ++++ .../sink/table/TableDiffRecordSink.java | 71 ++++++- .../sink/table/TableDiffRecordSinkTest.java | 57 ++++++ .../mapped-mysql-to-postgres-checksum.yaml | 8 +- ...62\350\247\243\346\200\273\350\247\210.md" | 188 ++++++++++++++---- ...normalization \344\270\216 concurrency.md" | 2 +- ...15\347\275\256\351\200\237\346\237\245.md" | 16 +- 8 files changed, 436 insertions(+), 82 deletions(-) diff --git a/consilens-core/src/main/java/com/consilens/core/algorithm/ChecksumDiffer.java b/consilens-core/src/main/java/com/consilens/core/algorithm/ChecksumDiffer.java index a5e967f..f09c2a0 100644 --- a/consilens-core/src/main/java/com/consilens/core/algorithm/ChecksumDiffer.java +++ b/consilens-core/src/main/java/com/consilens/core/algorithm/ChecksumDiffer.java @@ -559,23 +559,25 @@ private CompletableFuture performRowHashBasedComparison( return CompletableFuture.supplyAsync(() -> { log.debug("Using row-hash based comparison for segment: {}", segmentId); - Map, String> hashes1 = table1.getRowHashes(); - Map, String> hashes2 = table2.getRowHashes(); + Map columnTypes1 = extractColumnTypes(table1); + Map columnTypes2 = extractColumnTypes(table2); + RowHashSide hashes1 = indexRowHashes(table1, table1.getRowHashes(), columnTypes1); + RowHashSide hashes2 = indexRowHashes(table2, table2.getRowHashes(), columnTypes2); return new RowHashSnapshot(hashes1, hashes2); }, executorProvider.getIoExecutor()) .thenApplyAsync(snapshot -> { - Set> keysOnlyInTable1 = new HashSet<>(snapshot.hashes1.keySet()); - keysOnlyInTable1.removeAll(snapshot.hashes2.keySet()); + Set> keysOnlyInTable1 = new HashSet<>(snapshot.table1.hashesByKey.keySet()); + keysOnlyInTable1.removeAll(snapshot.table2.hashesByKey.keySet()); - Set> keysOnlyInTable2 = new HashSet<>(snapshot.hashes2.keySet()); - keysOnlyInTable2.removeAll(snapshot.hashes1.keySet()); + Set> keysOnlyInTable2 = new HashSet<>(snapshot.table2.hashesByKey.keySet()); + keysOnlyInTable2.removeAll(snapshot.table1.hashesByKey.keySet()); Set> mismatchedKeys = new HashSet<>(); - for (Map.Entry, String> entry : snapshot.hashes1.entrySet()) { + for (Map.Entry, String> entry : snapshot.table1.hashesByKey.entrySet()) { List key = entry.getKey(); - if (snapshot.hashes2.containsKey(key)) { + if (snapshot.table2.hashesByKey.containsKey(key)) { String hash1 = entry.getValue(); - String hash2 = snapshot.hashes2.get(key); + String hash2 = snapshot.table2.hashesByKey.get(key); if (!hash1.equals(hash2)) { mismatchedKeys.add(key); } @@ -591,8 +593,8 @@ private CompletableFuture performRowHashBasedComparison( int debugCount = 0; for (List key : mismatchedKeys) { if (debugCount++ < 5) { - String hash1 = snapshot.hashes1.get(key); - String hash2 = snapshot.hashes2.get(key); + String hash1 = snapshot.table1.hashesByKey.get(key); + String hash2 = snapshot.table2.hashesByKey.get(key); log.debug("Hash mismatch for key {}: source_hash={}, target_hash={}", key, hash1, hash2); } else { @@ -604,14 +606,23 @@ private CompletableFuture performRowHashBasedComparison( return new RowChecksumDiffPlan(snapshot, keysOnlyInTable1, keysOnlyInTable2, mismatchedKeys); }, executorProvider.getCpuExecutor()) .thenApplyAsync(plan -> { + Map columnTypes1 = extractColumnTypes(table1); + Map columnTypes2 = extractColumnTypes(table2); if (plan.totalDifferences() == 0) { - return new RowChecksumDiffData(plan, Collections.emptyMap(), Collections.emptyMap()); + return new RowChecksumDiffData( + plan, + Collections.emptyMap(), + Collections.emptyMap(), + columnTypes1, + columnTypes2); } Map, Object[]> data1 = queryRowsByKeys(table1, - combineKeys(plan.keysOnlyInTable1, plan.mismatchedKeys)); + rawKeysFor(plan.snapshot.table1, plan.keysOnlyInTable1, plan.mismatchedKeys), + columnTypes1); Map, Object[]> data2 = queryRowsByKeys(table2, - combineKeys(plan.keysOnlyInTable2, plan.mismatchedKeys)); - return new RowChecksumDiffData(plan, data1, data2); + rawKeysFor(plan.snapshot.table2, plan.keysOnlyInTable2, plan.mismatchedKeys), + columnTypes2); + return new RowChecksumDiffData(plan, data1, data2, columnTypes1, columnTypes2); }, executorProvider.getIoExecutor()) .thenAcceptAsync(diffData -> { try { @@ -619,8 +630,8 @@ private CompletableFuture performRowHashBasedComparison( List differences = new ArrayList<>(); if (plan.totalDifferences() > 0) { - Map columnTypes1 = extractColumnTypes(table1); - Map columnTypes2 = extractColumnTypes(table2); + Map columnTypes1 = diffData.columnTypes1; + Map columnTypes2 = diffData.columnTypes2; for (List key : plan.keysOnlyInTable1) { Object[] row = diffData.data1.get(key); @@ -652,14 +663,17 @@ private CompletableFuture performRowHashBasedComparison( } performanceMonitor.recordLocalComparison(segmentId, - plan.snapshot.hashes1.size(), plan.snapshot.hashes2.size(), differences.size()); + plan.snapshot.table1.hashesByKey.size(), + plan.snapshot.table2.hashesByKey.size(), + differences.size()); storeDifferences(infoTreeRecorder, differences, segmentId); infoTreeRecorder.addRowsFetched(segmentId, - plan.snapshot.hashes1.size() + plan.snapshot.hashes2.size()); + plan.snapshot.table1.hashesByKey.size() + plan.snapshot.table2.hashesByKey.size()); progressReporter.segmentCompleted(segmentId, - plan.snapshot.hashes1.size() + plan.snapshot.hashes2.size(), differences.size()); + plan.snapshot.table1.hashesByKey.size() + plan.snapshot.table2.hashesByKey.size(), + differences.size()); log.info("Row-hash based comparison completed: {} differences for segment: {}", differences.size(), segmentId); @@ -716,7 +730,9 @@ private CompletableFuture performFullDataComparison( /** * Query rows by primary keys and return as a map. */ - private Map, Object[]> queryRowsByKeys(TableSegment segment, Set> keys) { + private Map, Object[]> queryRowsByKeys(TableSegment segment, + Set> keys, + Map columnTypes) { if (keys.isEmpty()) { return Collections.emptyMap(); } @@ -727,22 +743,74 @@ private Map, Object[]> queryRowsByKeys(TableSegment segment, Set
  • primaryKey = new ArrayList<>(keyColumnCount); - primaryKey.addAll(Arrays.asList(row).subList(0, keyColumnCount)); + List primaryKey = normalizeKeyValues( + Arrays.asList(row).subList(0, Math.min(keyColumnCount, row.length)), + segment.getKeyColumns(), + columnTypes); rowMap.put(primaryKey, row); } return rowMap; } + private RowHashSide indexRowHashes(TableSegment segment, + Map, String> rowHashes, + Map columnTypes) { + Map, String> hashesByKey = new LinkedHashMap<>(); + Map, List> rawKeysByKey = new LinkedHashMap<>(); + for (Map.Entry, String> entry : rowHashes.entrySet()) { + List normalizedKey = normalizeKeyValues(entry.getKey(), segment.getKeyColumns(), columnTypes); + hashesByKey.put(normalizedKey, entry.getValue()); + rawKeysByKey.put(normalizedKey, entry.getKey()); + } + return new RowHashSide(hashesByKey, rawKeysByKey); + } + + @SafeVarargs + private final Set> rawKeysFor(RowHashSide side, Set>... normalizedKeySets) { + Set> rawKeys = new LinkedHashSet<>(); + for (Set> normalizedKeySet : normalizedKeySets) { + for (List normalizedKey : normalizedKeySet) { + List rawKey = side.rawKeysByKey.get(normalizedKey); + if (rawKey != null) { + rawKeys.add(rawKey); + } + } + } + return rawKeys; + } + + private List normalizeKeyValues(List keyValues, + List keyColumns, + Map columnTypes) { + List normalized = new ArrayList<>(keyValues.size()); + for (int i = 0; i < keyValues.size(); i++) { + String columnName = i < keyColumns.size() ? keyColumns.get(i) : null; + DataType dataType = columnName != null + ? columnTypes.getOrDefault(columnName, DataType.UNKNOWN) + : DataType.UNKNOWN; + normalized.add(ValueNormalizer.normalizeValue(keyValues.get(i), dataType)); + } + return List.copyOf(normalized); + } + private static class RowHashSnapshot { - private final Map, String> hashes1; - private final Map, String> hashes2; + private final RowHashSide table1; + private final RowHashSide table2; + + private RowHashSnapshot(RowHashSide table1, RowHashSide table2) { + this.table1 = table1; + this.table2 = table2; + } + } + + private static class RowHashSide { + private final Map, String> hashesByKey; + private final Map, List> rawKeysByKey; - private RowHashSnapshot(Map, String> hashes1, Map, String> hashes2) { - this.hashes1 = hashes1; - this.hashes2 = hashes2; + private RowHashSide(Map, String> hashesByKey, Map, List> rawKeysByKey) { + this.hashesByKey = hashesByKey; + this.rawKeysByKey = rawKeysByKey; } } @@ -769,12 +837,18 @@ private static class RowChecksumDiffData { private final RowChecksumDiffPlan plan; private final Map, Object[]> data1; private final Map, Object[]> data2; + private final Map columnTypes1; + private final Map columnTypes2; private RowChecksumDiffData(RowChecksumDiffPlan plan, Map, Object[]> data1, - Map, Object[]> data2) { + Map, Object[]> data2, + Map columnTypes1, + Map columnTypes2) { this.plan = plan; this.data1 = data1; this.data2 = data2; + this.columnTypes1 = columnTypes1; + this.columnTypes2 = columnTypes2; } } diff --git a/consilens-core/src/test/java/com/consilens/core/algorithm/ChecksumDifferTest.java b/consilens-core/src/test/java/com/consilens/core/algorithm/ChecksumDifferTest.java index ba8944f..bbe2787 100644 --- a/consilens-core/src/test/java/com/consilens/core/algorithm/ChecksumDifferTest.java +++ b/consilens-core/src/test/java/com/consilens/core/algorithm/ChecksumDifferTest.java @@ -1,11 +1,13 @@ package com.consilens.core.algorithm; import com.consilens.common.enums.ChecksumAlgorithm; +import com.consilens.common.enums.LocalCompareMode; import com.consilens.core.database.adpter.DatabaseAdapter; import com.consilens.core.database.adpter.DatabaseAdapter.RowMapper; import com.consilens.core.database.connection.ConnectionPool; import com.consilens.core.diff.DiffResult; import com.consilens.core.diff.DiffResult.InfoTreeNode; +import com.consilens.core.thread.ConcurrencyConfig; import com.consilens.core.thread.ExecutorProvider; import com.consilens.connector.api.model.TablePath; import com.consilens.connector.api.model.PoolConfiguration; @@ -180,6 +182,46 @@ void testLargeTableChunkedProcessing() throws Exception { verify(mockAdapter1, atLeast(2)).countAndChecksum(any(TableSegment.class)); verify(mockAdapter2, atLeast(2)).countAndChecksum(any(TableSegment.class)); } + + @Test + @DisplayName("测试 row-hash 模式能够在键被标准化后仍然取回差异行") + void testRowHashComparisonShouldResolveRowsUsingNormalizedKeys() throws Exception { + differ.close(); + differ = new ChecksumDiffer(new TableDiffer.DifferConfig( + 4, + 1000, + false, + ChecksumAlgorithm.CONCAT, + LocalCompareMode.ROW_HASH, + ConcurrencyConfig.defaultConfig())); + + lenient().when(mockAdapter1.countAndBounds(any(TableSegment.class))) + .thenReturn(new ChecksumResult(1, null, Arrays.asList(0L), Arrays.asList(1L))); + lenient().when(mockAdapter2.countAndBounds(any(TableSegment.class))) + .thenReturn(new ChecksumResult(1, null, Arrays.asList(0L), Arrays.asList(1L))); + lenient().when(mockAdapter1.countAndChecksum(any(TableSegment.class))) + .thenReturn(new ChecksumResult(1, "source-checksum", Arrays.asList(0L), Arrays.asList(1L))); + lenient().when(mockAdapter2.countAndChecksum(any(TableSegment.class))) + .thenReturn(new ChecksumResult(1, "target-checksum", Arrays.asList(0L), Arrays.asList(1L))); + + when(mockAdapter1.querySegmentRowHashes(any(TableSegment.class))) + .thenReturn(Map.of(List.of(1), "source-row-hash")); + when(mockAdapter2.querySegmentRowHashes(any(TableSegment.class))) + .thenReturn(Map.of(List.of(1), "target-row-hash")); + + when(mockAdapter1.querySegmentByKeys(any(TableSegment.class), anySet())) + .thenReturn(Collections.singletonList( + new Object[] {"1", "user_00001", "1380000002"})); + when(mockAdapter2.querySegmentByKeys(any(TableSegment.class), anySet())) + .thenReturn(Collections.singletonList( + new Object[] {"1", "user_00001", "1380000001"})); + + DiffResult result = differ.diffTables(segment1, segment2).get(); + + assertNotNull(result); + assertEquals(1, result.getDifferences().size()); + assertEquals(List.of("value"), result.getDifferences().get(0).getChangedColumns1()); + } } @Nested diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/main/java/com/consilens/sink/table/TableDiffRecordSink.java b/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/main/java/com/consilens/sink/table/TableDiffRecordSink.java index 566a723..7291ca7 100644 --- a/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/main/java/com/consilens/sink/table/TableDiffRecordSink.java +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/main/java/com/consilens/sink/table/TableDiffRecordSink.java @@ -26,6 +26,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -69,17 +70,8 @@ public void open(SinkConfig config, DiffContext context) throws Exception { DatabaseDialect dialect = resolveDialect(sinkConfig); writeCompiler = dialect.getTableWriteCompiler(); - outputColumns = buildOutputColumns(dialect); - TableColumnNames.validateUniqueOutputColumns(outputColumns, "TableDiffRecordSink"); - writePlan = writeCompiler.compile(new TableWriteCompileRequest( - tableName, - sinkConfig.isCreateTable(), - sinkConfig.isDropIfExists(), - outputColumns - )); - - if (sinkConfig.isCreateTable() && !outputColumns.isEmpty()) { - createOutputTable(); + if (sinkConfig.hasCustomColumns() && !sinkConfig.isMergeMode()) { + initializeWritePlan(dialect); } log.info("TableDiffRecordSink opened, table={}, mode={}", tableName, sinkConfig.isMergeMode() ? "merge" : sinkConfig.hasCustomColumns() ? "custom" : "default"); @@ -90,6 +82,7 @@ public void onDiffRecords(List rows, DiffContext context) throws SQLExc if (dataSource == null || rows == null || rows.isEmpty()) { return; } + ensureWritePlanInitialized(rows); try (Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(writePlan.getInsertSql())) { connection.setAutoCommit(false); @@ -126,6 +119,48 @@ public void close() { } } + private void ensureWritePlanInitialized(List rows) throws SQLException { + if (writePlan != null) { + return; + } + mergeColumnsFromRows(rows); + initializeWritePlan(resolveDialect(sinkConfig)); + } + + private void mergeColumnsFromRows(List rows) { + if (rows == null || rows.isEmpty()) { + return; + } + sourceColumns = mergeColumns(sourceColumns, rows, true); + targetColumns = mergeColumns(targetColumns, rows, false); + rebuildOutputColumnMaps(); + } + + private List mergeColumns(List existing, List rows, boolean sourceSide) { + LinkedHashSet merged = new LinkedHashSet<>(); + if (existing != null) { + merged.addAll(existing); + } + for (DiffRow row : rows) { + List columnNames = sourceSide ? row.getColumnNames1() : row.getColumnNames2(); + if (columnNames != null) { + merged.addAll(columnNames); + } + } + return new ArrayList<>(merged); + } + + private void rebuildOutputColumnMaps() { + sourceOutputColumns = new LinkedHashMap<>(); + targetOutputColumns = new LinkedHashMap<>(); + for (String column : sourceColumns) { + sourceOutputColumns.put(TableColumnNames.sanitize(column) + "_1", column); + } + for (String column : targetColumns) { + targetOutputColumns.put(TableColumnNames.sanitize(column) + "_2", column); + } + } + private void createOutputTable() throws SQLException { try (Connection connection = dataSource.getConnection()) { if (sinkConfig.isDropIfExists()) { @@ -139,6 +174,20 @@ private void createOutputTable() throws SQLException { } } + private void initializeWritePlan(DatabaseDialect dialect) throws SQLException { + outputColumns = buildOutputColumns(dialect); + TableColumnNames.validateUniqueOutputColumns(outputColumns, "TableDiffRecordSink"); + writePlan = writeCompiler.compile(new TableWriteCompileRequest( + tableName, + sinkConfig.isCreateTable(), + sinkConfig.isDropIfExists(), + outputColumns + )); + if (sinkConfig.isCreateTable() && !outputColumns.isEmpty()) { + createOutputTable(); + } + } + private TypedOutputRow buildTypedOutputRow(DiffRow row, DiffContext context) { List values = new ArrayList<>(); Map overrideMap = sinkConfig.isMergeMode() ? buildOverrideMap() : Map.of(); diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/test/java/com/consilens/sink/table/TableDiffRecordSinkTest.java b/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/test/java/com/consilens/sink/table/TableDiffRecordSinkTest.java index 52d2946..e5daa25 100644 --- a/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/test/java/com/consilens/sink/table/TableDiffRecordSinkTest.java +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/test/java/com/consilens/sink/table/TableDiffRecordSinkTest.java @@ -220,4 +220,61 @@ void shouldWriteDefaultJsonColumnsInPostgresMode() throws Exception { assertEquals("43", resultSet.getString("device_id_2")); } } + + @Test + void shouldExpandDefaultWideTableColumnsFromDiffRowsWhenContextOnlyHasKeys() throws Exception { + String url = "jdbc:h2:mem:diff_record_dynamic_columns;MODE=MySQL;DB_CLOSE_DELAY=-1"; + SinkConfig sinkConfig = new SinkConfig(); + sinkConfig.setFormat("table"); + sinkConfig.setType("diff-record"); + sinkConfig.setProperties("{" + + "\"type\":\"mysql\"," + + "\"url\":\"" + url + "\"," + + "\"username\":\"sa\"," + + "\"password\":\"\"," + + "\"driver\":\"org.h2.Driver\"," + + "\"tableName\":\"diff_record_dynamic_columns_test\"," + + "\"createTable\":true," + + "\"dropIfExists\":true," + + "\"batchSize\":100" + + "}"); + + DiffContext context = DiffContext.builder() + .taskId("task-dynamic-columns") + .sourceColumnNames(List.of("record_id")) + .targetColumnNames(List.of("record_id")) + .build(); + + DiffRow row = DiffRow.modified( + List.of("REC0001"), + List.of("REC0001", 42L, "active"), + List.of("REC0001", 43L, "inactive"), + List.of("record_id", "col_int", "status"), + List.of("record_id", "col_int", "status"), + List.of("col_int", "status"), + List.of("col_int", "status") + ); + + TableDiffRecordSink sink = new TableDiffRecordSink(); + sink.open(sinkConfig, context); + try { + sink.onDiffRecords(List.of(row), context); + } finally { + sink.close(); + } + + try (Connection connection = DriverManager.getConnection(url, "sa", ""); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery( + "SELECT record_id_1, col_int_1, status_1, record_id_2, col_int_2, status_2 " + + "FROM diff_record_dynamic_columns_test")) { + assertTrue(resultSet.next()); + assertEquals("REC0001", resultSet.getString("record_id_1")); + assertEquals("42", resultSet.getString("col_int_1")); + assertEquals("active", resultSet.getString("status_1")); + assertEquals("REC0001", resultSet.getString("record_id_2")); + assertEquals("43", resultSet.getString("col_int_2")); + assertEquals("inactive", resultSet.getString("status_2")); + } + } } diff --git a/examples/mapped-mysql-to-postgres-checksum.yaml b/examples/mapped-mysql-to-postgres-checksum.yaml index 8941050..b8152c6 100644 --- a/examples/mapped-mysql-to-postgres-checksum.yaml +++ b/examples/mapped-mysql-to-postgres-checksum.yaml @@ -69,13 +69,13 @@ comparison: literal: "users-v1" compare: false - filters: - source: "created_at >= '2026-01-01 00:00:00'" - target: "created_at >= '2026-01-01 00:00:00'" +# filters: +# source: "created_at >= '2026-01-01 00:00:00'" +# target: "created_at >= '2026-01-01 00:00:00'" strategy: mode: checksum - algorithm: xor + algorithm: concat bisectionFactor: 4 bisectionThreshold: 5000 batchSize: 1000 diff --git "a/website/\351\205\215\347\275\256\350\256\262\350\247\243/00-\351\205\215\347\275\256\350\256\262\350\247\243\346\200\273\350\247\210.md" "b/website/\351\205\215\347\275\256\350\256\262\350\247\243/00-\351\205\215\347\275\256\350\256\262\350\247\243\346\200\273\350\247\210.md" index bdf1215..c0a9718 100644 --- "a/website/\351\205\215\347\275\256\350\256\262\350\247\243/00-\351\205\215\347\275\256\350\256\262\350\247\243\346\200\273\350\247\210.md" +++ "b/website/\351\205\215\347\275\256\350\256\262\350\247\243/00-\351\205\215\347\275\256\350\256\262\350\247\243\346\200\273\350\247\210.md" @@ -1,55 +1,175 @@ -# Consilens 配置实战系列 +# 00|Consilens 配置讲解总览:从第一次跑通到生产治理闭环 -很多工具的文档会把参数一项一项列出来。这样的文档当然完整,但对真正要落地的人来说,往往还差一步:**我手上这个数据校验需求,到底应该怎么配?为什么这么配?后面出了问题又该从哪里排查?** +> 导读: +> 本文是一篇面向实战的总览稿,用来回答一个最常见的问题:Consilens 的配置文档看起来并不复杂,但真正落到跨库、跨机房、大表一致性校验、结果落库和生产调优时,到底应该先看什么、后看什么、出了问题又该回到哪一层排查。全文会把整套“配置讲解”文章的阅读路径、核心问题和适用场景串起来,帮助你先建立全局地图,再进入具体配置细节。 +> +> Github: +> https://github.com/datavane/consilens +> 欢迎关注、Star、Fork,参与贡献 -这套文章就是为了解决这个问题。 +很多工具的文档,参数写得很全,但真正落地时,使用者脑子里最常见的问题其实不是“这个字段叫什么”,而是: -我会把 Consilens 的配置能力放回真实的数据一致性场景里来讲:从第一次跑通两张表的核对,到跨库字段对齐、类型标准化、结果落库审计,再到生产环境里的性能调优和场景化模板。你不需要先把所有字段背下来,只要顺着场景往下走,就能逐步形成自己的配置判断力。 +- 我手上这个校验需求,到底该从哪一层开始配? +- 为什么明明跑通了,结果却不准? +- 为什么规则看起来没问题,一上生产就慢、就抖、就不好排障? +- 一次 diff 跑完以后,结果到底应该怎么沉淀,才能真正进入治理闭环? -## 建议阅读顺序 +这篇总览,就是给这几个问题准备的。 + +它不是参数手册,也不是 API 索引,而是一张**配置地图**:帮你先看清 Consilens 的配置体系到底在解决什么问题,再决定应该进入哪一篇细看。 + +## 先记住:一份 Consilens 配置,本质上在回答五个问题 + +如果只用一句话理解 Consilens 配置,可以记住下面这条主线: + +1. **我要比较哪两份数据?** +2. **哪些记录在业务上算同一条?** +3. **哪些字段真正决定一致性?** +4. **用什么方式更稳、更快地完成比较?** +5. **差异结果给谁看、落到哪里、怎么追踪?** + +你会发现,后面所有配置项几乎都能被放回这五个问题里: + +- `source / target` 解决“从哪里取数”; +- `comparison` 解决“什么叫一致”; +- `strategy / normalization / concurrency` 解决“怎样跑得准、跑得稳、跑得动”; +- `result` 解决“结果怎样进入排障和治理链路”。 + +所以,与其把配置当成一堆字段,不如把它看成一次完整的数据一致性设计。 + +## 这套文章的阅读主线 ```mermaid flowchart TD - A["01 先跑通:建立整体心智模型"] --> B["02 comparison:定义业务一致性"] - B --> C["03 strategy / normalization / concurrency:把任务跑稳"] - C --> D["04 result:让结果进入审计闭环"] - D --> E["05 场景模板:直接按场景落配置"] - E --> F["cheatsheet:速查字段与边界"] + A["00 总览:先建立全局地图"] --> B["01 先跑通:理解一份最小可用配置"] + B --> C["02 comparison:定义业务一致性"] + C --> D["03 strategy / normalization / concurrency:把任务跑稳"] + D --> E["04 result:让结果进入审计闭环"] + E --> F["05 场景模板:直接按场景落配置"] + F --> G["06 配置速查:回查字段和边界"] ``` -1. **01-先跑通,再理解:从一份配置看懂 Consilens.md** - 先建立完整心智模型:两端数据从哪里来,比什么,怎么比,结果去哪里。 +这一套顺序不是按“参数字母表”排的,而是按真实项目最自然的落地路径排的: + +- 先能跑起来; +- 再把口径定义准确; +- 然后把任务跑稳; +- 再把结果沉淀下来; +- 最后把常见场景模板化、把字段边界速查化。 + +## 如果你时间不多,可以这样选读 + +| 你的当前问题 | 建议先看 | +| --- | --- | +| 第一次接触 Consilens,不知道一份配置怎么拼起来 | **01** | +| 任务跑出来很多误报,不知道是 keys、fields 还是 filters 的问题 | **02** | +| 大表任务太慢、跨库误报多、并发不好调 | **03** | +| 想把结果落库、进告警、进工单、进治理系统 | **04** | +| 已经理解原理,只想拿一份可改的模板快速开工 | **05** | +| 已经在落地中,只想随手查字段和边界 | **06** | + +如果你是第一次完整读这套文章,还是建议按 01 → 06 走一遍。这样后面遇到问题时,你能知道自己是在“数据集定义层”“业务口径层”“执行策略层”还是“结果输出层”出了偏差。 + +## 每一篇文章分别解决什么问题 + +### 01|先跑通,再理解:从一份配置看懂 Consilens + +这一篇解决的是**“我怎么先写出第一份能跑的配置”**。 + +它会从一份最小但完整的例子出发,把 `source / target`、`comparison`、`strategy`、`result` 四块拆开讲,让你先建立基本心智模型。对于第一次上手的人来说,这一篇最重要的价值不是记住参数,而是明白: + +> 一份配置不是在堆字段,而是在翻译一个真实的数据校验需求。 + +### 02|真正决定准不准的是 comparison + +这一篇解决的是**“为什么任务能跑,但结果不准”**。 + +大多数误报,根因不在算法,而在 `comparison` 没有把业务一致性说清楚。主键选错、字段口径不一致、过滤边界不一致、两边字段语义不对齐,都会把后面的执行链路带偏。 + +所以这一篇会重点讲清楚: -2. **02-真正决定准不准的是 comparison.md** - 主键、比较字段、过滤条件、字段映射、排障上下文都在这一层。这里讲清楚了,误报会少很多。 +- `keys` 怎么选; +- `fields` 和 `exclude` 怎么分工; +- `filters` 为什么不是随手加条件; +- `mappings` 什么时候该用; +- `extraColumns` 更适合承担什么角色。 -3. **03-让任务跑得稳:strategy、normalization 与 concurrency.md** - 讲 checksum 和 join 怎么选,跨库类型差异怎么消噪,大表任务如何逐步调优。 +### 03|让任务跑得稳:strategy、normalization 与 concurrency -4. **04-结果不是终点:result 与审计闭环.md** - 控制台、JSON、CSV、结果表、差异明细表各自适合什么场景,如何让对账结果进入治理流程。 +这一篇解决的是**“为什么任务一上生产就不稳”**。 -5. **05-七个常见场景,直接拿去改.md** - 把最常见的使用方式整理成模板。读完前五篇后,这一篇会变成你的日常配置起点。 +很多任务不是不会跑,而是: -6. **06-配置速查.md** - 一页速查。适合已经理解整体思路后,快速确认字段和边界。 +- 跨库精度和格式不一致,误报很多; +- 大表 checksum 虽然快,但分段、批量和并发调不好; +- 网络、数据库、CPU 的瓶颈混在一起,看起来哪都像问题。 + +这一篇会把 `strategy`、`normalization`、`concurrency` 放回真实运行场景里解释,告诉你什么情况下该选什么路径、什么情况下先调标准化、什么情况下应该先收敛范围而不是先加并发。 + +### 04|结果不是终点:result 与审计闭环 + +这一篇解决的是**“任务跑完之后,结果怎么真正有用”**。 + +如果对账结果只停留在控制台里,那它最多只是一次性检查;只有当结果能进入 JSON、结果表、差异明细表,进一步进入告警、工单、治理看板和回放链路时,它才真正变成生产体系里的一个节点。 + +所以这一篇会重点回答: + +- console / json / table sink 分别适合什么场景; +- 结果摘要和差异明细该怎么配合; +- 默认结构和自定义结构的边界在哪里; +- 怎样从试跑阶段逐步演进到生产化结果沉淀。 + +### 05|七个常见场景,直接拿去改 + +这一篇解决的是**“我已经理解了,现在给我一份能改的模板”**。 + +它把最常见的配置场景直接整理成模板,比如: + +- 同构表全量核对; +- 忽略审计列; +- 字段映射对齐; +- SQL 资源比对; +- 按业务切片过滤; +- 按时间窗滚动检查; +- 结果落库与治理接入。 + +但它不是让你机械复制,而是让你快速找到最接近业务现状的起点,再去替换连接、主键、字段和结果链路。 + +### 06|配置速查 + +这一篇解决的是**“我知道整体思路了,但现场需要快速确认边界”**。 + +它适合在你已经跑过任务、也理解配置分层之后使用。相比前面几篇,它更像一本口袋手册:帮你快速回忆某一类字段该放在哪一层、某个参数更适合承担什么职责、出现某类问题应该优先回查哪里。 ## 这套文章适合谁 -- 第一次接触 Consilens,希望尽快跑通一个对账任务的人; -- 正在做数据迁移、数仓同步、湖仓建设,需要校验两端数据一致性的人; -- 已经能写配置,但经常被字段差异、时间精度、布尔值、结果落库、性能问题困扰的人; -- 想把 Consilens 接入生产治理、审计、告警链路的人。 +如果你属于下面任意一种情况,这套内容都会比较对路: + +- 第一次接触 Consilens,希望尽快跑通一个对账任务; +- 正在做数据迁移、数仓同步、湖仓建设,需要校验两端数据一致性; +- 已经能写基础配置,但经常被误报、类型差异、结果落库、性能调优卡住; +- 想把 Consilens 接进生产环境里的治理、审计、告警链路。 + +## 读完这套文章,你应该得到什么 + +理想状态下,读完这套系列,你不只是“知道字段怎么写”,而是会形成下面这套判断顺序: + +1. 先定义数据集,而不是先纠结数据库类型; +2. 先定义业务一致性,再谈执行优化; +3. 先判断问题在哪一层,再决定调什么参数; +4. 先让结果可排障,再让结果可治理; +5. 先从稳定模板起步,再逐步抽象成团队规范。 + +这套判断顺序,比记住某一个字段默认值更重要。 + +## 建议你从哪里开始 + +如果你现在还没写出第一份配置,就从 **01** 开始。 -## 一句话理解 Consilens 配置 +如果你已经跑过任务,但总觉得结果“不太对”,直接去看 **02**。 -一份好的 Consilens 配置,本质上是在回答五个问题: +如果你已经在生产里跑 checksum 或 join 任务,最值得反复回看的通常是 **03** 和 **04**。 -1. 我要比较哪两份数据? -2. 哪些记录在业务上算同一条? -3. 哪些字段真正决定一致性? -4. 用什么方式更稳、更快地完成比较? -5. 差异结果要给谁看、落到哪里、如何追踪? +如果你只想尽快把今天手头的需求落地,先去 **05** 找最接近的模板,再回头补读前面的原理篇。 -后面的所有参数,都是围绕这五个问题展开的。 +这也是这篇总览真正想做的事:不是替代后面的文章,而是帮你在进入细节之前,先知道**该往哪里走**。 diff --git "a/website/\351\205\215\347\275\256\350\256\262\350\247\243/03-\350\256\251\344\273\273\345\212\241\350\267\221\345\276\227\347\250\263\357\274\232strategy\343\200\201normalization \344\270\216 concurrency.md" "b/website/\351\205\215\347\275\256\350\256\262\350\247\243/03-\350\256\251\344\273\273\345\212\241\350\267\221\345\276\227\347\250\263\357\274\232strategy\343\200\201normalization \344\270\216 concurrency.md" index d12fea7..489b4fd 100644 --- "a/website/\351\205\215\347\275\256\350\256\262\350\247\243/03-\350\256\251\344\273\273\345\212\241\350\267\221\345\276\227\347\250\263\357\274\232strategy\343\200\201normalization \344\270\216 concurrency.md" +++ "b/website/\351\205\215\347\275\256\350\256\262\350\247\243/03-\350\256\251\344\273\273\345\212\241\350\267\221\345\276\227\347\250\263\357\274\232strategy\343\200\201normalization \344\270\216 concurrency.md" @@ -292,7 +292,7 @@ source: 当一个任务“跑不稳”时,可以按这个顺序看: ```mermaid -flowchart TD +flowchart LR A["任务跑不稳"] --> B{"先判断问题类型"} B -->|误报多| C["先查 comparison / normalization"] B -->|慢且差异段多| D["查 strategy 分段和批量参数"] diff --git "a/website/\351\205\215\347\275\256\350\256\262\350\247\243/06-\351\205\215\347\275\256\351\200\237\346\237\245.md" "b/website/\351\205\215\347\275\256\350\256\262\350\247\243/06-\351\205\215\347\275\256\351\200\237\346\237\245.md" index 2fac2b7..fa6d2c1 100644 --- "a/website/\351\205\215\347\275\256\350\256\262\350\247\243/06-\351\205\215\347\275\256\351\200\237\346\237\245.md" +++ "b/website/\351\205\215\347\275\256\350\256\262\350\247\243/06-\351\205\215\347\275\256\351\200\237\346\237\245.md" @@ -1,6 +1,18 @@ -# Consilens 配置速查表 +# 06|Consilens 配置速查:理解之后,用来回查 -这份速查表适合在已经理解整体配置思路后使用。第一次学习建议先读系列文章,不要直接从字段表开始。 +> 导读: +> 本文不是一篇从零入门的教程,而是一份适合放在手边随时回查的配置速查稿。它会把 Consilens 常用配置项按 `source / target`、`comparison`、`strategy`、`normalization`、`concurrency`、`result` 这些层次重新整理,帮助你在已经理解整体思路之后,快速确认字段职责、配置边界和排查入口。 +> +> Github: +> https://github.com/datavane/consilens +> 欢迎关注、Star、Fork,参与贡献 + +这份速查更适合下面两类场景: + +- 你已经跑过任务,现在需要快速回忆某个字段该放在哪一层; +- 你已经读过前面的文章,现在想在现场排障或改配置时少来回翻全文。 + +如果你是第一次接触 Consilens,建议先从 `00` 到 `05` 建立整体心智模型,再回到这篇做查阅。 ## 一份配置的主线