Conversation
fd2c9d2 to
5e85cf9
Compare
fc4df7b to
c3e4c41
Compare
Add a canary test that freezes the default task_id for check_log configs so internal enum churn does not silently change external prefixes and orchestration contracts.
Inline the merge sink dispatch into MergeParallelizer::sink_dml and drop the extra helper layer to keep the hot path simpler.
…me deprecated codes
ed7d1cf to
16425fc
Compare
| } | ||
|
|
||
| #[async_trait] | ||
| pub trait CheckedSinkTarget: Sinker { |
There was a problem hiding this comment.
为什么叫checked_sinker 😶,已检查的sinker?
| impl<S: CheckedSinkTarget + Send> Sinker for CheckedSinker<S> { | ||
| async fn sink_dml(&mut self, mut data: Vec<RowData>, batch: bool) -> anyhow::Result<()> { | ||
| self.inner.sink_dml_borrowed(&mut data, batch).await?; | ||
| self.checker.enqueue_check(data).await?; |
There was a problem hiding this comment.
这里看起来有问题,enqueue_check里send时如果check queue满了异步task可能被挂起。sinker_dml要wait enqueue_check异步执行完所以也会被阻塞,整个cdc就被阻塞了。目标应该是check的逻辑不影响主干流程,send check queue时可以做一些queue满和其他异常时的降级逻辑,让enqueue_check方法可以无条件快速结束
| use anyhow::Context; | ||
| use dt_common::meta::pg::pg_value_type::PgValueType; | ||
| use dt_common::monitor::{counter_type::CounterType, task_metrics::TaskMetricsType}; | ||
| use std::collections::BTreeSet; |
There was a problem hiding this comment.
小问题:import尽量规范点,外部的在上,内部的在下,内部的尽量merge import一下
| } | ||
| // cdc+check also needs refreshed table metadata after sink ddl changes the target schema | ||
| if let Some(checker) = &self.checker { | ||
| checker.refresh_meta(data.clone()).await?; |
There was a problem hiding this comment.
同sink_dml那里的问题一样,原则是校验不影响主干流程,校验可以适当牺牲准确性。这里check的refresh应该是可以快速结束,现在的实现里面又有pub mpsc的逻辑可能被阻塞。而且result通过?解包,check refresh的错误会上抛给主干逻辑了
| let mut position_persisted_by_checker = false; | ||
| if !matches!(record_position, Position::None) { | ||
| if let Some(checker) = &self.checker { | ||
| checker.record_checkpoint(record_position).await?; |
There was a problem hiding this comment.
这里也是,要设计成check不影响主干。可以按这个原则再梳理下流程
| .iter() | ||
| .map(|(key, value)| (key.clone(), value.clone())) | ||
| .collect::<BTreeMap<_, _>>(); | ||
| serde_json::to_string(&serde_json::json!({ |
There was a problem hiding this comment.
这里尽量不要用json序列化,可能会比较耗cpu。这里其他的serde_json相关逻辑也看一下
| ColValue::Enum(v) => Self::Enum(*v), | ||
| ColValue::Set2(v) => Self::Set2(v.clone()), | ||
| ColValue::Enum2(v) => Self::Enum2(v.clone()), | ||
| ColValue::Json(v) => Self::Json(v.clone()), |
There was a problem hiding this comment.
看起来PersistedColValue只是为了记录可读,json、mongodoc,甚至blob,bit这些都可以不记录。内容可能比较大,clone的成本太高,而且blob这种二进制的也不可读。外面diff row的上限最好也控制下
There was a problem hiding this comment.
这个主要是为了在sinker的unconsistent_rows记录最原始的数据类型,用于下一次和target的比对,全量记录可能方便点,不过确实会很冗肿。那我把它改为记录下不一致的pk,重启之后再读取一次source和target的rows吧
| - keep `[sinker] sink_type=write` | ||
| - add `[checker]` | ||
| - add `[resumer] resume_type=from_target` or `from_db` | ||
| - use `[parallelizer] parallel_type=rdb_check` |
There was a problem hiding this comment.
看起来cdc要启用check,需要改parallelizer.parallel_type=rdb_check,但rdb_check实际上是rdb_merge的一种wrap。这里比较倾向的配置入口是类似:
[parallelizer]
parallel_type=rdb_merge/serial/其他
xxx
[checker]
enable=true
xxx
然后比如现在是支持的rdb_merge形态下的cdc check,serial暂时不支持,在task_config时校验不支持就以配置不合法panic
用parallel_type=rdb_check不好扩展支持其他parallel
Summary
parallel_type=rdb_check,pipeline_type=basic, and[resumer] resume_type=from_target|from_db[checker].db_type/url/username/password; target now always comes from[sinker]parallel_type=rdb_checknow requires[checker]inlinefail-open vsstandalonefail-close)apedts_unconsistent_rowsCheck flow