diff --git a/consilens-core/src/main/java/com/consilens/core/algorithm/ChecksumDiffer.java b/consilens-core/src/main/java/com/consilens/core/algorithm/ChecksumDiffer.java index a5e967f..f09c2a0 100644 --- a/consilens-core/src/main/java/com/consilens/core/algorithm/ChecksumDiffer.java +++ b/consilens-core/src/main/java/com/consilens/core/algorithm/ChecksumDiffer.java @@ -559,23 +559,25 @@ private CompletableFuture performRowHashBasedComparison( return CompletableFuture.supplyAsync(() -> { log.debug("Using row-hash based comparison for segment: {}", segmentId); - Map, String> hashes1 = table1.getRowHashes(); - Map, String> hashes2 = table2.getRowHashes(); + Map columnTypes1 = extractColumnTypes(table1); + Map columnTypes2 = extractColumnTypes(table2); + RowHashSide hashes1 = indexRowHashes(table1, table1.getRowHashes(), columnTypes1); + RowHashSide hashes2 = indexRowHashes(table2, table2.getRowHashes(), columnTypes2); return new RowHashSnapshot(hashes1, hashes2); }, executorProvider.getIoExecutor()) .thenApplyAsync(snapshot -> { - Set> keysOnlyInTable1 = new HashSet<>(snapshot.hashes1.keySet()); - keysOnlyInTable1.removeAll(snapshot.hashes2.keySet()); + Set> keysOnlyInTable1 = new HashSet<>(snapshot.table1.hashesByKey.keySet()); + keysOnlyInTable1.removeAll(snapshot.table2.hashesByKey.keySet()); - Set> keysOnlyInTable2 = new HashSet<>(snapshot.hashes2.keySet()); - keysOnlyInTable2.removeAll(snapshot.hashes1.keySet()); + Set> keysOnlyInTable2 = new HashSet<>(snapshot.table2.hashesByKey.keySet()); + keysOnlyInTable2.removeAll(snapshot.table1.hashesByKey.keySet()); Set> mismatchedKeys = new HashSet<>(); - for (Map.Entry, String> entry : snapshot.hashes1.entrySet()) { + for (Map.Entry, String> entry : snapshot.table1.hashesByKey.entrySet()) { List key = entry.getKey(); - if (snapshot.hashes2.containsKey(key)) { + if (snapshot.table2.hashesByKey.containsKey(key)) { String hash1 = entry.getValue(); - String hash2 = snapshot.hashes2.get(key); + String hash2 = snapshot.table2.hashesByKey.get(key); if (!hash1.equals(hash2)) { mismatchedKeys.add(key); } @@ -591,8 +593,8 @@ private CompletableFuture performRowHashBasedComparison( int debugCount = 0; for (List key : mismatchedKeys) { if (debugCount++ < 5) { - String hash1 = snapshot.hashes1.get(key); - String hash2 = snapshot.hashes2.get(key); + String hash1 = snapshot.table1.hashesByKey.get(key); + String hash2 = snapshot.table2.hashesByKey.get(key); log.debug("Hash mismatch for key {}: source_hash={}, target_hash={}", key, hash1, hash2); } else { @@ -604,14 +606,23 @@ private CompletableFuture performRowHashBasedComparison( return new RowChecksumDiffPlan(snapshot, keysOnlyInTable1, keysOnlyInTable2, mismatchedKeys); }, executorProvider.getCpuExecutor()) .thenApplyAsync(plan -> { + Map columnTypes1 = extractColumnTypes(table1); + Map columnTypes2 = extractColumnTypes(table2); if (plan.totalDifferences() == 0) { - return new RowChecksumDiffData(plan, Collections.emptyMap(), Collections.emptyMap()); + return new RowChecksumDiffData( + plan, + Collections.emptyMap(), + Collections.emptyMap(), + columnTypes1, + columnTypes2); } Map, Object[]> data1 = queryRowsByKeys(table1, - combineKeys(plan.keysOnlyInTable1, plan.mismatchedKeys)); + rawKeysFor(plan.snapshot.table1, plan.keysOnlyInTable1, plan.mismatchedKeys), + columnTypes1); Map, Object[]> data2 = queryRowsByKeys(table2, - combineKeys(plan.keysOnlyInTable2, plan.mismatchedKeys)); - return new RowChecksumDiffData(plan, data1, data2); + rawKeysFor(plan.snapshot.table2, plan.keysOnlyInTable2, plan.mismatchedKeys), + columnTypes2); + return new RowChecksumDiffData(plan, data1, data2, columnTypes1, columnTypes2); }, executorProvider.getIoExecutor()) .thenAcceptAsync(diffData -> { try { @@ -619,8 +630,8 @@ private CompletableFuture performRowHashBasedComparison( List differences = new ArrayList<>(); if (plan.totalDifferences() > 0) { - Map columnTypes1 = extractColumnTypes(table1); - Map columnTypes2 = extractColumnTypes(table2); + Map columnTypes1 = diffData.columnTypes1; + Map columnTypes2 = diffData.columnTypes2; for (List key : plan.keysOnlyInTable1) { Object[] row = diffData.data1.get(key); @@ -652,14 +663,17 @@ private CompletableFuture performRowHashBasedComparison( } performanceMonitor.recordLocalComparison(segmentId, - plan.snapshot.hashes1.size(), plan.snapshot.hashes2.size(), differences.size()); + plan.snapshot.table1.hashesByKey.size(), + plan.snapshot.table2.hashesByKey.size(), + differences.size()); storeDifferences(infoTreeRecorder, differences, segmentId); infoTreeRecorder.addRowsFetched(segmentId, - plan.snapshot.hashes1.size() + plan.snapshot.hashes2.size()); + plan.snapshot.table1.hashesByKey.size() + plan.snapshot.table2.hashesByKey.size()); progressReporter.segmentCompleted(segmentId, - plan.snapshot.hashes1.size() + plan.snapshot.hashes2.size(), differences.size()); + plan.snapshot.table1.hashesByKey.size() + plan.snapshot.table2.hashesByKey.size(), + differences.size()); log.info("Row-hash based comparison completed: {} differences for segment: {}", differences.size(), segmentId); @@ -716,7 +730,9 @@ private CompletableFuture performFullDataComparison( /** * Query rows by primary keys and return as a map. */ - private Map, Object[]> queryRowsByKeys(TableSegment segment, Set> keys) { + private Map, Object[]> queryRowsByKeys(TableSegment segment, + Set> keys, + Map columnTypes) { if (keys.isEmpty()) { return Collections.emptyMap(); } @@ -727,22 +743,74 @@ private Map, Object[]> queryRowsByKeys(TableSegment segment, Set
  • primaryKey = new ArrayList<>(keyColumnCount); - primaryKey.addAll(Arrays.asList(row).subList(0, keyColumnCount)); + List primaryKey = normalizeKeyValues( + Arrays.asList(row).subList(0, Math.min(keyColumnCount, row.length)), + segment.getKeyColumns(), + columnTypes); rowMap.put(primaryKey, row); } return rowMap; } + private RowHashSide indexRowHashes(TableSegment segment, + Map, String> rowHashes, + Map columnTypes) { + Map, String> hashesByKey = new LinkedHashMap<>(); + Map, List> rawKeysByKey = new LinkedHashMap<>(); + for (Map.Entry, String> entry : rowHashes.entrySet()) { + List normalizedKey = normalizeKeyValues(entry.getKey(), segment.getKeyColumns(), columnTypes); + hashesByKey.put(normalizedKey, entry.getValue()); + rawKeysByKey.put(normalizedKey, entry.getKey()); + } + return new RowHashSide(hashesByKey, rawKeysByKey); + } + + @SafeVarargs + private final Set> rawKeysFor(RowHashSide side, Set>... normalizedKeySets) { + Set> rawKeys = new LinkedHashSet<>(); + for (Set> normalizedKeySet : normalizedKeySets) { + for (List normalizedKey : normalizedKeySet) { + List rawKey = side.rawKeysByKey.get(normalizedKey); + if (rawKey != null) { + rawKeys.add(rawKey); + } + } + } + return rawKeys; + } + + private List normalizeKeyValues(List keyValues, + List keyColumns, + Map columnTypes) { + List normalized = new ArrayList<>(keyValues.size()); + for (int i = 0; i < keyValues.size(); i++) { + String columnName = i < keyColumns.size() ? keyColumns.get(i) : null; + DataType dataType = columnName != null + ? columnTypes.getOrDefault(columnName, DataType.UNKNOWN) + : DataType.UNKNOWN; + normalized.add(ValueNormalizer.normalizeValue(keyValues.get(i), dataType)); + } + return List.copyOf(normalized); + } + private static class RowHashSnapshot { - private final Map, String> hashes1; - private final Map, String> hashes2; + private final RowHashSide table1; + private final RowHashSide table2; + + private RowHashSnapshot(RowHashSide table1, RowHashSide table2) { + this.table1 = table1; + this.table2 = table2; + } + } + + private static class RowHashSide { + private final Map, String> hashesByKey; + private final Map, List> rawKeysByKey; - private RowHashSnapshot(Map, String> hashes1, Map, String> hashes2) { - this.hashes1 = hashes1; - this.hashes2 = hashes2; + private RowHashSide(Map, String> hashesByKey, Map, List> rawKeysByKey) { + this.hashesByKey = hashesByKey; + this.rawKeysByKey = rawKeysByKey; } } @@ -769,12 +837,18 @@ private static class RowChecksumDiffData { private final RowChecksumDiffPlan plan; private final Map, Object[]> data1; private final Map, Object[]> data2; + private final Map columnTypes1; + private final Map columnTypes2; private RowChecksumDiffData(RowChecksumDiffPlan plan, Map, Object[]> data1, - Map, Object[]> data2) { + Map, Object[]> data2, + Map columnTypes1, + Map columnTypes2) { this.plan = plan; this.data1 = data1; this.data2 = data2; + this.columnTypes1 = columnTypes1; + this.columnTypes2 = columnTypes2; } } diff --git a/consilens-core/src/test/java/com/consilens/core/algorithm/ChecksumDifferTest.java b/consilens-core/src/test/java/com/consilens/core/algorithm/ChecksumDifferTest.java index ba8944f..bbe2787 100644 --- a/consilens-core/src/test/java/com/consilens/core/algorithm/ChecksumDifferTest.java +++ b/consilens-core/src/test/java/com/consilens/core/algorithm/ChecksumDifferTest.java @@ -1,11 +1,13 @@ package com.consilens.core.algorithm; import com.consilens.common.enums.ChecksumAlgorithm; +import com.consilens.common.enums.LocalCompareMode; import com.consilens.core.database.adpter.DatabaseAdapter; import com.consilens.core.database.adpter.DatabaseAdapter.RowMapper; import com.consilens.core.database.connection.ConnectionPool; import com.consilens.core.diff.DiffResult; import com.consilens.core.diff.DiffResult.InfoTreeNode; +import com.consilens.core.thread.ConcurrencyConfig; import com.consilens.core.thread.ExecutorProvider; import com.consilens.connector.api.model.TablePath; import com.consilens.connector.api.model.PoolConfiguration; @@ -180,6 +182,46 @@ void testLargeTableChunkedProcessing() throws Exception { verify(mockAdapter1, atLeast(2)).countAndChecksum(any(TableSegment.class)); verify(mockAdapter2, atLeast(2)).countAndChecksum(any(TableSegment.class)); } + + @Test + @DisplayName("测试 row-hash 模式能够在键被标准化后仍然取回差异行") + void testRowHashComparisonShouldResolveRowsUsingNormalizedKeys() throws Exception { + differ.close(); + differ = new ChecksumDiffer(new TableDiffer.DifferConfig( + 4, + 1000, + false, + ChecksumAlgorithm.CONCAT, + LocalCompareMode.ROW_HASH, + ConcurrencyConfig.defaultConfig())); + + lenient().when(mockAdapter1.countAndBounds(any(TableSegment.class))) + .thenReturn(new ChecksumResult(1, null, Arrays.asList(0L), Arrays.asList(1L))); + lenient().when(mockAdapter2.countAndBounds(any(TableSegment.class))) + .thenReturn(new ChecksumResult(1, null, Arrays.asList(0L), Arrays.asList(1L))); + lenient().when(mockAdapter1.countAndChecksum(any(TableSegment.class))) + .thenReturn(new ChecksumResult(1, "source-checksum", Arrays.asList(0L), Arrays.asList(1L))); + lenient().when(mockAdapter2.countAndChecksum(any(TableSegment.class))) + .thenReturn(new ChecksumResult(1, "target-checksum", Arrays.asList(0L), Arrays.asList(1L))); + + when(mockAdapter1.querySegmentRowHashes(any(TableSegment.class))) + .thenReturn(Map.of(List.of(1), "source-row-hash")); + when(mockAdapter2.querySegmentRowHashes(any(TableSegment.class))) + .thenReturn(Map.of(List.of(1), "target-row-hash")); + + when(mockAdapter1.querySegmentByKeys(any(TableSegment.class), anySet())) + .thenReturn(Collections.singletonList( + new Object[] {"1", "user_00001", "1380000002"})); + when(mockAdapter2.querySegmentByKeys(any(TableSegment.class), anySet())) + .thenReturn(Collections.singletonList( + new Object[] {"1", "user_00001", "1380000001"})); + + DiffResult result = differ.diffTables(segment1, segment2).get(); + + assertNotNull(result); + assertEquals(1, result.getDifferences().size()); + assertEquals(List.of("value"), result.getDifferences().get(0).getChangedColumns1()); + } } @Nested diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/main/java/com/consilens/sink/table/TableDiffRecordSink.java b/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/main/java/com/consilens/sink/table/TableDiffRecordSink.java index 566a723..7291ca7 100644 --- a/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/main/java/com/consilens/sink/table/TableDiffRecordSink.java +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/main/java/com/consilens/sink/table/TableDiffRecordSink.java @@ -26,6 +26,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -69,17 +70,8 @@ public void open(SinkConfig config, DiffContext context) throws Exception { DatabaseDialect dialect = resolveDialect(sinkConfig); writeCompiler = dialect.getTableWriteCompiler(); - outputColumns = buildOutputColumns(dialect); - TableColumnNames.validateUniqueOutputColumns(outputColumns, "TableDiffRecordSink"); - writePlan = writeCompiler.compile(new TableWriteCompileRequest( - tableName, - sinkConfig.isCreateTable(), - sinkConfig.isDropIfExists(), - outputColumns - )); - - if (sinkConfig.isCreateTable() && !outputColumns.isEmpty()) { - createOutputTable(); + if (sinkConfig.hasCustomColumns() && !sinkConfig.isMergeMode()) { + initializeWritePlan(dialect); } log.info("TableDiffRecordSink opened, table={}, mode={}", tableName, sinkConfig.isMergeMode() ? "merge" : sinkConfig.hasCustomColumns() ? "custom" : "default"); @@ -90,6 +82,7 @@ public void onDiffRecords(List rows, DiffContext context) throws SQLExc if (dataSource == null || rows == null || rows.isEmpty()) { return; } + ensureWritePlanInitialized(rows); try (Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(writePlan.getInsertSql())) { connection.setAutoCommit(false); @@ -126,6 +119,48 @@ public void close() { } } + private void ensureWritePlanInitialized(List rows) throws SQLException { + if (writePlan != null) { + return; + } + mergeColumnsFromRows(rows); + initializeWritePlan(resolveDialect(sinkConfig)); + } + + private void mergeColumnsFromRows(List rows) { + if (rows == null || rows.isEmpty()) { + return; + } + sourceColumns = mergeColumns(sourceColumns, rows, true); + targetColumns = mergeColumns(targetColumns, rows, false); + rebuildOutputColumnMaps(); + } + + private List mergeColumns(List existing, List rows, boolean sourceSide) { + LinkedHashSet merged = new LinkedHashSet<>(); + if (existing != null) { + merged.addAll(existing); + } + for (DiffRow row : rows) { + List columnNames = sourceSide ? row.getColumnNames1() : row.getColumnNames2(); + if (columnNames != null) { + merged.addAll(columnNames); + } + } + return new ArrayList<>(merged); + } + + private void rebuildOutputColumnMaps() { + sourceOutputColumns = new LinkedHashMap<>(); + targetOutputColumns = new LinkedHashMap<>(); + for (String column : sourceColumns) { + sourceOutputColumns.put(TableColumnNames.sanitize(column) + "_1", column); + } + for (String column : targetColumns) { + targetOutputColumns.put(TableColumnNames.sanitize(column) + "_2", column); + } + } + private void createOutputTable() throws SQLException { try (Connection connection = dataSource.getConnection()) { if (sinkConfig.isDropIfExists()) { @@ -139,6 +174,20 @@ private void createOutputTable() throws SQLException { } } + private void initializeWritePlan(DatabaseDialect dialect) throws SQLException { + outputColumns = buildOutputColumns(dialect); + TableColumnNames.validateUniqueOutputColumns(outputColumns, "TableDiffRecordSink"); + writePlan = writeCompiler.compile(new TableWriteCompileRequest( + tableName, + sinkConfig.isCreateTable(), + sinkConfig.isDropIfExists(), + outputColumns + )); + if (sinkConfig.isCreateTable() && !outputColumns.isEmpty()) { + createOutputTable(); + } + } + private TypedOutputRow buildTypedOutputRow(DiffRow row, DiffContext context) { List values = new ArrayList<>(); Map overrideMap = sinkConfig.isMergeMode() ? buildOverrideMap() : Map.of(); diff --git a/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/test/java/com/consilens/sink/table/TableDiffRecordSinkTest.java b/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/test/java/com/consilens/sink/table/TableDiffRecordSinkTest.java index 52d2946..e5daa25 100644 --- a/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/test/java/com/consilens/sink/table/TableDiffRecordSinkTest.java +++ b/consilens-sink/consilens-sink-plugins/consilens-sink-table/src/test/java/com/consilens/sink/table/TableDiffRecordSinkTest.java @@ -220,4 +220,61 @@ void shouldWriteDefaultJsonColumnsInPostgresMode() throws Exception { assertEquals("43", resultSet.getString("device_id_2")); } } + + @Test + void shouldExpandDefaultWideTableColumnsFromDiffRowsWhenContextOnlyHasKeys() throws Exception { + String url = "jdbc:h2:mem:diff_record_dynamic_columns;MODE=MySQL;DB_CLOSE_DELAY=-1"; + SinkConfig sinkConfig = new SinkConfig(); + sinkConfig.setFormat("table"); + sinkConfig.setType("diff-record"); + sinkConfig.setProperties("{" + + "\"type\":\"mysql\"," + + "\"url\":\"" + url + "\"," + + "\"username\":\"sa\"," + + "\"password\":\"\"," + + "\"driver\":\"org.h2.Driver\"," + + "\"tableName\":\"diff_record_dynamic_columns_test\"," + + "\"createTable\":true," + + "\"dropIfExists\":true," + + "\"batchSize\":100" + + "}"); + + DiffContext context = DiffContext.builder() + .taskId("task-dynamic-columns") + .sourceColumnNames(List.of("record_id")) + .targetColumnNames(List.of("record_id")) + .build(); + + DiffRow row = DiffRow.modified( + List.of("REC0001"), + List.of("REC0001", 42L, "active"), + List.of("REC0001", 43L, "inactive"), + List.of("record_id", "col_int", "status"), + List.of("record_id", "col_int", "status"), + List.of("col_int", "status"), + List.of("col_int", "status") + ); + + TableDiffRecordSink sink = new TableDiffRecordSink(); + sink.open(sinkConfig, context); + try { + sink.onDiffRecords(List.of(row), context); + } finally { + sink.close(); + } + + try (Connection connection = DriverManager.getConnection(url, "sa", ""); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery( + "SELECT record_id_1, col_int_1, status_1, record_id_2, col_int_2, status_2 " + + "FROM diff_record_dynamic_columns_test")) { + assertTrue(resultSet.next()); + assertEquals("REC0001", resultSet.getString("record_id_1")); + assertEquals("42", resultSet.getString("col_int_1")); + assertEquals("active", resultSet.getString("status_1")); + assertEquals("REC0001", resultSet.getString("record_id_2")); + assertEquals("43", resultSet.getString("col_int_2")); + assertEquals("inactive", resultSet.getString("status_2")); + } + } } diff --git "a/website/\351\205\215\347\275\256\350\256\262\350\247\243/00-\351\205\215\347\275\256\350\256\262\350\247\243\346\200\273\350\247\210.md" "b/website/\351\205\215\347\275\256\350\256\262\350\247\243/00-\351\205\215\347\275\256\350\256\262\350\247\243\346\200\273\350\247\210.md" index bdf1215..c0a9718 100644 --- "a/website/\351\205\215\347\275\256\350\256\262\350\247\243/00-\351\205\215\347\275\256\350\256\262\350\247\243\346\200\273\350\247\210.md" +++ "b/website/\351\205\215\347\275\256\350\256\262\350\247\243/00-\351\205\215\347\275\256\350\256\262\350\247\243\346\200\273\350\247\210.md" @@ -1,55 +1,175 @@ -# Consilens 配置实战系列 +# 00|Consilens 配置讲解总览:从第一次跑通到生产治理闭环 -很多工具的文档会把参数一项一项列出来。这样的文档当然完整,但对真正要落地的人来说,往往还差一步:**我手上这个数据校验需求,到底应该怎么配?为什么这么配?后面出了问题又该从哪里排查?** +> 导读: +> 本文是一篇面向实战的总览稿,用来回答一个最常见的问题:Consilens 的配置文档看起来并不复杂,但真正落到跨库、跨机房、大表一致性校验、结果落库和生产调优时,到底应该先看什么、后看什么、出了问题又该回到哪一层排查。全文会把整套“配置讲解”文章的阅读路径、核心问题和适用场景串起来,帮助你先建立全局地图,再进入具体配置细节。 +> +> Github: +> https://github.com/datavane/consilens +> 欢迎关注、Star、Fork,参与贡献 -这套文章就是为了解决这个问题。 +很多工具的文档,参数写得很全,但真正落地时,使用者脑子里最常见的问题其实不是“这个字段叫什么”,而是: -我会把 Consilens 的配置能力放回真实的数据一致性场景里来讲:从第一次跑通两张表的核对,到跨库字段对齐、类型标准化、结果落库审计,再到生产环境里的性能调优和场景化模板。你不需要先把所有字段背下来,只要顺着场景往下走,就能逐步形成自己的配置判断力。 +- 我手上这个校验需求,到底该从哪一层开始配? +- 为什么明明跑通了,结果却不准? +- 为什么规则看起来没问题,一上生产就慢、就抖、就不好排障? +- 一次 diff 跑完以后,结果到底应该怎么沉淀,才能真正进入治理闭环? -## 建议阅读顺序 +这篇总览,就是给这几个问题准备的。 + +它不是参数手册,也不是 API 索引,而是一张**配置地图**:帮你先看清 Consilens 的配置体系到底在解决什么问题,再决定应该进入哪一篇细看。 + +## 先记住:一份 Consilens 配置,本质上在回答五个问题 + +如果只用一句话理解 Consilens 配置,可以记住下面这条主线: + +1. **我要比较哪两份数据?** +2. **哪些记录在业务上算同一条?** +3. **哪些字段真正决定一致性?** +4. **用什么方式更稳、更快地完成比较?** +5. **差异结果给谁看、落到哪里、怎么追踪?** + +你会发现,后面所有配置项几乎都能被放回这五个问题里: + +- `source / target` 解决“从哪里取数”; +- `comparison` 解决“什么叫一致”; +- `strategy / normalization / concurrency` 解决“怎样跑得准、跑得稳、跑得动”; +- `result` 解决“结果怎样进入排障和治理链路”。 + +所以,与其把配置当成一堆字段,不如把它看成一次完整的数据一致性设计。 + +## 这套文章的阅读主线 ```mermaid flowchart TD - A["01 先跑通:建立整体心智模型"] --> B["02 comparison:定义业务一致性"] - B --> C["03 strategy / normalization / concurrency:把任务跑稳"] - C --> D["04 result:让结果进入审计闭环"] - D --> E["05 场景模板:直接按场景落配置"] - E --> F["cheatsheet:速查字段与边界"] + A["00 总览:先建立全局地图"] --> B["01 先跑通:理解一份最小可用配置"] + B --> C["02 comparison:定义业务一致性"] + C --> D["03 strategy / normalization / concurrency:把任务跑稳"] + D --> E["04 result:让结果进入审计闭环"] + E --> F["05 场景模板:直接按场景落配置"] + F --> G["06 配置速查:回查字段和边界"] ``` -1. **01-先跑通,再理解:从一份配置看懂 Consilens.md** - 先建立完整心智模型:两端数据从哪里来,比什么,怎么比,结果去哪里。 +这一套顺序不是按“参数字母表”排的,而是按真实项目最自然的落地路径排的: + +- 先能跑起来; +- 再把口径定义准确; +- 然后把任务跑稳; +- 再把结果沉淀下来; +- 最后把常见场景模板化、把字段边界速查化。 + +## 如果你时间不多,可以这样选读 + +| 你的当前问题 | 建议先看 | +| --- | --- | +| 第一次接触 Consilens,不知道一份配置怎么拼起来 | **01** | +| 任务跑出来很多误报,不知道是 keys、fields 还是 filters 的问题 | **02** | +| 大表任务太慢、跨库误报多、并发不好调 | **03** | +| 想把结果落库、进告警、进工单、进治理系统 | **04** | +| 已经理解原理,只想拿一份可改的模板快速开工 | **05** | +| 已经在落地中,只想随手查字段和边界 | **06** | + +如果你是第一次完整读这套文章,还是建议按 01 → 06 走一遍。这样后面遇到问题时,你能知道自己是在“数据集定义层”“业务口径层”“执行策略层”还是“结果输出层”出了偏差。 + +## 每一篇文章分别解决什么问题 + +### 01|先跑通,再理解:从一份配置看懂 Consilens + +这一篇解决的是**“我怎么先写出第一份能跑的配置”**。 + +它会从一份最小但完整的例子出发,把 `source / target`、`comparison`、`strategy`、`result` 四块拆开讲,让你先建立基本心智模型。对于第一次上手的人来说,这一篇最重要的价值不是记住参数,而是明白: + +> 一份配置不是在堆字段,而是在翻译一个真实的数据校验需求。 + +### 02|真正决定准不准的是 comparison + +这一篇解决的是**“为什么任务能跑,但结果不准”**。 + +大多数误报,根因不在算法,而在 `comparison` 没有把业务一致性说清楚。主键选错、字段口径不一致、过滤边界不一致、两边字段语义不对齐,都会把后面的执行链路带偏。 + +所以这一篇会重点讲清楚: -2. **02-真正决定准不准的是 comparison.md** - 主键、比较字段、过滤条件、字段映射、排障上下文都在这一层。这里讲清楚了,误报会少很多。 +- `keys` 怎么选; +- `fields` 和 `exclude` 怎么分工; +- `filters` 为什么不是随手加条件; +- `mappings` 什么时候该用; +- `extraColumns` 更适合承担什么角色。 -3. **03-让任务跑得稳:strategy、normalization 与 concurrency.md** - 讲 checksum 和 join 怎么选,跨库类型差异怎么消噪,大表任务如何逐步调优。 +### 03|让任务跑得稳:strategy、normalization 与 concurrency -4. **04-结果不是终点:result 与审计闭环.md** - 控制台、JSON、CSV、结果表、差异明细表各自适合什么场景,如何让对账结果进入治理流程。 +这一篇解决的是**“为什么任务一上生产就不稳”**。 -5. **05-七个常见场景,直接拿去改.md** - 把最常见的使用方式整理成模板。读完前五篇后,这一篇会变成你的日常配置起点。 +很多任务不是不会跑,而是: -6. **06-配置速查.md** - 一页速查。适合已经理解整体思路后,快速确认字段和边界。 +- 跨库精度和格式不一致,误报很多; +- 大表 checksum 虽然快,但分段、批量和并发调不好; +- 网络、数据库、CPU 的瓶颈混在一起,看起来哪都像问题。 + +这一篇会把 `strategy`、`normalization`、`concurrency` 放回真实运行场景里解释,告诉你什么情况下该选什么路径、什么情况下先调标准化、什么情况下应该先收敛范围而不是先加并发。 + +### 04|结果不是终点:result 与审计闭环 + +这一篇解决的是**“任务跑完之后,结果怎么真正有用”**。 + +如果对账结果只停留在控制台里,那它最多只是一次性检查;只有当结果能进入 JSON、结果表、差异明细表,进一步进入告警、工单、治理看板和回放链路时,它才真正变成生产体系里的一个节点。 + +所以这一篇会重点回答: + +- console / json / table sink 分别适合什么场景; +- 结果摘要和差异明细该怎么配合; +- 默认结构和自定义结构的边界在哪里; +- 怎样从试跑阶段逐步演进到生产化结果沉淀。 + +### 05|七个常见场景,直接拿去改 + +这一篇解决的是**“我已经理解了,现在给我一份能改的模板”**。 + +它把最常见的配置场景直接整理成模板,比如: + +- 同构表全量核对; +- 忽略审计列; +- 字段映射对齐; +- SQL 资源比对; +- 按业务切片过滤; +- 按时间窗滚动检查; +- 结果落库与治理接入。 + +但它不是让你机械复制,而是让你快速找到最接近业务现状的起点,再去替换连接、主键、字段和结果链路。 + +### 06|配置速查 + +这一篇解决的是**“我知道整体思路了,但现场需要快速确认边界”**。 + +它适合在你已经跑过任务、也理解配置分层之后使用。相比前面几篇,它更像一本口袋手册:帮你快速回忆某一类字段该放在哪一层、某个参数更适合承担什么职责、出现某类问题应该优先回查哪里。 ## 这套文章适合谁 -- 第一次接触 Consilens,希望尽快跑通一个对账任务的人; -- 正在做数据迁移、数仓同步、湖仓建设,需要校验两端数据一致性的人; -- 已经能写配置,但经常被字段差异、时间精度、布尔值、结果落库、性能问题困扰的人; -- 想把 Consilens 接入生产治理、审计、告警链路的人。 +如果你属于下面任意一种情况,这套内容都会比较对路: + +- 第一次接触 Consilens,希望尽快跑通一个对账任务; +- 正在做数据迁移、数仓同步、湖仓建设,需要校验两端数据一致性; +- 已经能写基础配置,但经常被误报、类型差异、结果落库、性能调优卡住; +- 想把 Consilens 接进生产环境里的治理、审计、告警链路。 + +## 读完这套文章,你应该得到什么 + +理想状态下,读完这套系列,你不只是“知道字段怎么写”,而是会形成下面这套判断顺序: + +1. 先定义数据集,而不是先纠结数据库类型; +2. 先定义业务一致性,再谈执行优化; +3. 先判断问题在哪一层,再决定调什么参数; +4. 先让结果可排障,再让结果可治理; +5. 先从稳定模板起步,再逐步抽象成团队规范。 + +这套判断顺序,比记住某一个字段默认值更重要。 + +## 建议你从哪里开始 + +如果你现在还没写出第一份配置,就从 **01** 开始。 -## 一句话理解 Consilens 配置 +如果你已经跑过任务,但总觉得结果“不太对”,直接去看 **02**。 -一份好的 Consilens 配置,本质上是在回答五个问题: +如果你已经在生产里跑 checksum 或 join 任务,最值得反复回看的通常是 **03** 和 **04**。 -1. 我要比较哪两份数据? -2. 哪些记录在业务上算同一条? -3. 哪些字段真正决定一致性? -4. 用什么方式更稳、更快地完成比较? -5. 差异结果要给谁看、落到哪里、如何追踪? +如果你只想尽快把今天手头的需求落地,先去 **05** 找最接近的模板,再回头补读前面的原理篇。 -后面的所有参数,都是围绕这五个问题展开的。 +这也是这篇总览真正想做的事:不是替代后面的文章,而是帮你在进入细节之前,先知道**该往哪里走**。 diff --git "a/website/\351\205\215\347\275\256\350\256\262\350\247\243/03-\350\256\251\344\273\273\345\212\241\350\267\221\345\276\227\347\250\263\357\274\232strategy\343\200\201normalization \344\270\216 concurrency.md" "b/website/\351\205\215\347\275\256\350\256\262\350\247\243/03-\350\256\251\344\273\273\345\212\241\350\267\221\345\276\227\347\250\263\357\274\232strategy\343\200\201normalization \344\270\216 concurrency.md" index d12fea7..489b4fd 100644 --- "a/website/\351\205\215\347\275\256\350\256\262\350\247\243/03-\350\256\251\344\273\273\345\212\241\350\267\221\345\276\227\347\250\263\357\274\232strategy\343\200\201normalization \344\270\216 concurrency.md" +++ "b/website/\351\205\215\347\275\256\350\256\262\350\247\243/03-\350\256\251\344\273\273\345\212\241\350\267\221\345\276\227\347\250\263\357\274\232strategy\343\200\201normalization \344\270\216 concurrency.md" @@ -292,7 +292,7 @@ source: 当一个任务“跑不稳”时,可以按这个顺序看: ```mermaid -flowchart TD +flowchart LR A["任务跑不稳"] --> B{"先判断问题类型"} B -->|误报多| C["先查 comparison / normalization"] B -->|慢且差异段多| D["查 strategy 分段和批量参数"] diff --git "a/website/\351\205\215\347\275\256\350\256\262\350\247\243/06-\351\205\215\347\275\256\351\200\237\346\237\245.md" "b/website/\351\205\215\347\275\256\350\256\262\350\247\243/06-\351\205\215\347\275\256\351\200\237\346\237\245.md" index 2fac2b7..fa6d2c1 100644 --- "a/website/\351\205\215\347\275\256\350\256\262\350\247\243/06-\351\205\215\347\275\256\351\200\237\346\237\245.md" +++ "b/website/\351\205\215\347\275\256\350\256\262\350\247\243/06-\351\205\215\347\275\256\351\200\237\346\237\245.md" @@ -1,6 +1,18 @@ -# Consilens 配置速查表 +# 06|Consilens 配置速查:理解之后,用来回查 -这份速查表适合在已经理解整体配置思路后使用。第一次学习建议先读系列文章,不要直接从字段表开始。 +> 导读: +> 本文不是一篇从零入门的教程,而是一份适合放在手边随时回查的配置速查稿。它会把 Consilens 常用配置项按 `source / target`、`comparison`、`strategy`、`normalization`、`concurrency`、`result` 这些层次重新整理,帮助你在已经理解整体思路之后,快速确认字段职责、配置边界和排查入口。 +> +> Github: +> https://github.com/datavane/consilens +> 欢迎关注、Star、Fork,参与贡献 + +这份速查更适合下面两类场景: + +- 你已经跑过任务,现在需要快速回忆某个字段该放在哪一层; +- 你已经读过前面的文章,现在想在现场排障或改配置时少来回翻全文。 + +如果你是第一次接触 Consilens,建议先从 `00` 到 `05` 建立整体心智模型,再回到这篇做查阅。 ## 一份配置的主线