feat(reconcile): materialize intermediate DataFrames during reconcile#2535
feat(reconcile): materialize intermediate DataFrames during reconcile#2535BesikiML wants to merge 5 commits into
Conversation
Add write_and_read_df_with_volumes checkpoints for threshold comparison, aggregate per-rule outputs, and mismatch capture to improve backpressure. Includes unit tests verifying persistence is invoked in each path. Fixes #2257
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2535 +/- ##
==========================================
+ Coverage 69.10% 69.18% +0.07%
==========================================
Files 105 105
Lines 9482 9503 +21
Branches 1050 1052 +2
==========================================
+ Hits 6553 6575 +22
+ Misses 2735 2731 -4
- Partials 194 197 +3 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Move imports to module scope, mock target.read_data via constructor, and satisfy mypy/pylint for the threshold comparison test.
Remove the private-method threshold test instead of suppressing protected-access; aggregate and capture tests already verify the intermediate persist wiring pattern.
|
✅ 169/169 passed, 9 flaky, 2 skipped, 52m4s total Flaky tests:
Running from acceptance #4946 |
| source: DataFrame, | ||
| target: DataFrame, | ||
| key_columns: list[str], | ||
| persistence: AbstractReconIntermediatePersist | None = None, |
There was a problem hiding this comment.
persistence is never None so no needs to handle that or allow that in the first place
| @@ -0,0 +1,106 @@ | |||
| import tempfile | |||
There was a problem hiding this comment.
we dont really need to unit test this. this is a performance optimization addition that testing it goes beyond simple unit tests. we have e2e tests for that.
can you also attempt a test run and post a screenshot of the result
Make persistence a required parameter in capture_mismatch_data_and_columns since callers always provide it. Remove the dedicated unit test file in favor of existing integration/e2e coverage.
Summary
write_and_read_df_with_volumescheckpoints in reconcile computation paths that were still missing materialization (marked with TODOs in [FEATURE]: Tighten writes to delta during reconcile #2257).count()/ sampling work.Motivation
Large Spark plans in reconcile can retain long lineages and cause backpressure. The reconcile runtime already materializes some intermediate results (e.g. after source/target join); this PR fills the remaining gaps called out in #2257.
Changes
reconciliation._compute_threshold_comparisonmismatched_dfbeforecount()compare.reconcile_agg_data_per_rulepersistenceand materializemismatch,missing_in_src,missing_in_tgtcompare.capture_mismatch_data_and_columnspersistence; wired fromReconciliation._get_mismatch_dataFixed issue: #2257
Note: These are temporary intermediate writes via
ReconIntermediatePersist(UC volume / parquet), not changes to finalReconCaptureMAIN/METRICS/DETAILS output tables. Temp data is still cleaned up intrigger_recon/trigger_recon_aggregates.Test plan
test_compare.py,test_aggregates_reconcile.py,test_execute.py) — covers aggregate per-rule, mismatch capture, and threshold paths via existing e2e coveragemetadata_config.volume