Skip to content

feat: cdc checker#460

Open
loomts wants to merge 38 commits intomainfrom
feat/cdc-checker
Open

feat: cdc checker#460
loomts wants to merge 38 commits intomainfrom
feat/cdc-checker

Conversation

@loomts
Copy link
Copy Markdown
Contributor

@loomts loomts commented Jan 22, 2026

Summary

  • Supported matrix / constraints:
    • standalone snapshot check: mysql / pg / mongo
    • standalone struct check: mysql / pg only
    • inline snapshot check: write sink + mysql / pg / mongo
    • inline cdc check: write sink + mysql / pg only, requires parallel_type=rdb_check, pipeline_type=basic, and [resumer] resume_type=from_target|from_db
  • Breaking config changes:
    • inline check no longer accepts [checker].db_type/url/username/password; target now always comes from [sinker]
    • parallel_type=rdb_check now requires [checker]
    • legacy resumer keys are not just deprecated; they are rejected
  • Runtime semantics:
    • full checker queue applies backpressure instead of dropping check work
    • mismatches are logged only and do not stop the main write path
    • runtime checker errors are mode-dependent (inline fail-open vs standalone fail-close)
  • Operational impact:
    • inline CDC check now persists durable checker state in the resumer backend, including unresolved rows in apedts_unconsistent_rows
    • resume behavior is effectively “recheck unresolved rows first, then continue from the durable checkpoint”
    • this may require schema/table create privileges on the resumer store

Check flow


  source data / change stream
            |
            v
       [Pipeline]
            |
            +-----------------------------+-----------------------------+
            |                                                           |
            | standalone check                                          | inline check
            |                                                           |
            v                                                           v
      [DummySinker]                                      [RealSinker: WRITE first]
            |                                                           |
            +-----------------------------+-----------------------------+
                                          |
                                          v
                                   [CheckedSinker]
                                          |
                                          | enqueue checked rows
                                          | (bounded by queue_size)
                                          v
                              [DataCheckerHandle: queue/worker]
                                          |
                       +------------------+------------------+
                       |                                     |
                       | queue full                          | checker runtime error
                       v                                     v
                 BACKPRESSURE                       mode-dependent handling
            (main path waits here)                 - inline     => FAIL-OPEN
                                                   - standalone => FAIL-CLOSE
                                          |
                                          v
                                   [checker_engine]
                                          |
                      +-------------------+-------------------+-------------------+
                      |                                       |                   |
                      v                                       v                   v
               [MysqlChecker]                          [PgChecker]        [MongoChecker]
                      \                                       |                   /
                       \                                      |                  /
                        +-------------------------------------+-----------------+
                                                              |
                                                              v
                                              compare source vs final target state
                                                              |
                                                              +--> diff / miss
                                                                   |
                                                                   v
            [diff.log / miss.log / sql.log / summary.log / metrics / monitor]
            (mismatch is LOGGED; it does NOT stop the main path)


  CDC + CHECK PERSISTENCE / RECOVERY
  ==================================

                                [checker_engine]
                                       |
                                       v
                                    [cdc_state]
                                       |
                                       | checkpoint persist
                                       | - clean: checkpoint only
                                       | - dirty: checkpoint + unresolved rows (atomic)
                                       v
                             [CheckerStateStore]
                                       |
            +--------------------------+---------------------------+
            |                                                      |
            v                                                      v
  [last durable CDC checkpoint]                     [unresolved checker rows]
  [resumer].table_full_name                         <same schema>.apedts_unconsistent_rows
  e.g. apecloud_metadata.                           stores rows that still need recheck
       apedts_task_position

  stores the latest recoverable CDC checkpoint 
  aligned with durable checker state
            |                                                      |
            +--------------------------+---------------------------+
                                       |
                                       v
                                     RESUME
                                       |
               +-----------------------+------------------------+
               |                                                |
               v                                                v
  load CDC checkpoint                               load unresolved checker rows
  from apedts_task_position                         from apedts_unconsistent_rows
               \                                                /
                \                                              /
                 +--------------------------------------------+
                                      |
                                      v
                       if unresolved rows exist: RECHECK FIRST
                                      |
                                      v
                      then continue CDC from the durable checkpoint

@loomts loomts marked this pull request as draft January 23, 2026 12:20
@loomts loomts force-pushed the feat/cdc-checker branch 2 times, most recently from fd2c9d2 to 5e85cf9 Compare February 9, 2026 13:51
@loomts loomts force-pushed the feat/cdc-checker branch from 3a1c850 to 2f1f040 Compare March 5, 2026 10:47
@loomts loomts force-pushed the feat/cdc-checker branch 2 times, most recently from fc4df7b to c3e4c41 Compare March 19, 2026 03:07
@loomts loomts force-pushed the feat/cdc-checker branch from ed7d1cf to 16425fc Compare March 24, 2026 07:29
@loomts loomts marked this pull request as ready for review March 26, 2026 03:46
}

#[async_trait]
pub trait CheckedSinkTarget: Sinker {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么叫checked_sinker 😶,已检查的sinker?

impl<S: CheckedSinkTarget + Send> Sinker for CheckedSinker<S> {
async fn sink_dml(&mut self, mut data: Vec<RowData>, batch: bool) -> anyhow::Result<()> {
self.inner.sink_dml_borrowed(&mut data, batch).await?;
self.checker.enqueue_check(data).await?;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里看起来有问题,enqueue_check里send时如果check queue满了异步task可能被挂起。sinker_dml要wait enqueue_check异步执行完所以也会被阻塞,整个cdc就被阻塞了。目标应该是check的逻辑不影响主干流程,send check queue时可以做一些queue满和其他异常时的降级逻辑,让enqueue_check方法可以无条件快速结束

use anyhow::Context;
use dt_common::meta::pg::pg_value_type::PgValueType;
use dt_common::monitor::{counter_type::CounterType, task_metrics::TaskMetricsType};
use std::collections::BTreeSet;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

小问题:import尽量规范点,外部的在上,内部的在下,内部的尽量merge import一下

}
// cdc+check also needs refreshed table metadata after sink ddl changes the target schema
if let Some(checker) = &self.checker {
checker.refresh_meta(data.clone()).await?;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同sink_dml那里的问题一样,原则是校验不影响主干流程,校验可以适当牺牲准确性。这里check的refresh应该是可以快速结束,现在的实现里面又有pub mpsc的逻辑可能被阻塞。而且result通过?解包,check refresh的错误会上抛给主干逻辑了

let mut position_persisted_by_checker = false;
if !matches!(record_position, Position::None) {
if let Some(checker) = &self.checker {
checker.record_checkpoint(record_position).await?;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里也是,要设计成check不影响主干。可以按这个原则再梳理下流程

.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect::<BTreeMap<_, _>>();
serde_json::to_string(&serde_json::json!({
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里尽量不要用json序列化,可能会比较耗cpu。这里其他的serde_json相关逻辑也看一下

ColValue::Enum(v) => Self::Enum(*v),
ColValue::Set2(v) => Self::Set2(v.clone()),
ColValue::Enum2(v) => Self::Enum2(v.clone()),
ColValue::Json(v) => Self::Json(v.clone()),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看起来PersistedColValue只是为了记录可读,json、mongodoc,甚至blob,bit这些都可以不记录。内容可能比较大,clone的成本太高,而且blob这种二进制的也不可读。外面diff row的上限最好也控制下

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个主要是为了在sinker的unconsistent_rows记录最原始的数据类型,用于下一次和target的比对,全量记录可能方便点,不过确实会很冗肿。那我把它改为记录下不一致的pk,重启之后再读取一次source和target的rows吧

- keep `[sinker] sink_type=write`
- add `[checker]`
- add `[resumer] resume_type=from_target` or `from_db`
- use `[parallelizer] parallel_type=rdb_check`
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看起来cdc要启用check,需要改parallelizer.parallel_type=rdb_check,但rdb_check实际上是rdb_merge的一种wrap。这里比较倾向的配置入口是类似:

[parallelizer]
parallel_type=rdb_merge/serial/其他
xxx

[checker]
enable=true
xxx

然后比如现在是支持的rdb_merge形态下的cdc check,serial暂时不支持,在task_config时校验不支持就以配置不合法panic

用parallel_type=rdb_check不好扩展支持其他parallel

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants