From 61741a414705638afc80e58c3c9bb402b02c1ce6 Mon Sep 17 00:00:00 2001 From: Jafeer Ali Date: Mon, 21 Jul 2025 20:34:42 +0530 Subject: [PATCH] Code refactoring, Comments added. --- spark_data_test/jobs/comparison_job.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/spark_data_test/jobs/comparison_job.py b/spark_data_test/jobs/comparison_job.py index d4dd6bf..fd5f15a 100644 --- a/spark_data_test/jobs/comparison_job.py +++ b/spark_data_test/jobs/comparison_job.py @@ -186,6 +186,8 @@ def __write_results(comparison_result, job_name, output_config): def compare_dataframes(spark, source_df, target_df, params): non_key_cols = list(set(source_df.columns) - set(params.primary_keys)) target_df_schema = target_df.select(*params.select_cols).drop(*params.drop_cols).schema + + # Apply transformations (select, drop columns), add checksum & append _src/_target for all columns source_df = __apply_source_target_transformations( source_df, SRC_COL_SUFFIX, @@ -199,6 +201,7 @@ def compare_dataframes(spark, source_df, target_df, params): params ).cache() + # Join Source and Target DF and match _chechsum of Source and Target joined_df = source_df.join( target_df, params.primary_keys, FULL_OUTER_JOIN ).cache() @@ -210,9 +213,10 @@ def compare_dataframes(spark, source_df, target_df, params): MATCHED_COL_SUFFIX.format(CHK_SUM_COL), ALL_ROWS_MATCHED_COL ).cache() + # For records with unmatched _checksum values, perform a column-by-column comparison. For numeric fields, apply a difference_tolerance to determine if values are acceptably close. unmatched_rows_aftr_tolerance = __process_unmatched_records( joined_df.filter( - (f.col(ALL_ROWS_MATCHED_COL) == False) + (~f.col(ALL_ROWS_MATCHED_COL)) & (f.col(SRC_COL_SUFFIX.format(CHK_SUM_COL)).isNotNull()) & (f.col(TGT_COL_SUFFIX.format(CHK_SUM_COL)).isNotNull()) ), @@ -220,16 +224,18 @@ def compare_dataframes(spark, source_df, target_df, params): params.primary_keys, params.test_params ) - unmatched_records = unmatched_rows_aftr_tolerance.filter(f.col(ALL_ROWS_MATCHED_AFTR_TOL_COL) == False) - + + # Update the all_rows_matched column in joined_df for records that are considered matched after applying the difference_tolerance to numeric fields. joined_df = joined_df.join( unmatched_rows_aftr_tolerance.filter(f.col(ALL_ROWS_MATCHED_AFTR_TOL_COL)).select(*(params.primary_keys+[ALL_ROWS_MATCHED_AFTR_TOL_COL])), params.primary_keys, LEFT_JOIN ).withColumn(ALL_ROWS_MATCHED_COL, f.col(ALL_ROWS_MATCHED_COL) | f.coalesce(f.col(ALL_ROWS_MATCHED_AFTR_TOL_COL), f.lit(False).cast(BooleanType()))).cache() + unmatched_records = unmatched_rows_aftr_tolerance.filter(~f.col(ALL_ROWS_MATCHED_AFTR_TOL_COL)) matched_records = joined_df.filter(f.col(ALL_ROWS_MATCHED_COL) == True).dropDuplicates(params.primary_keys) + # Generate reports col_lvl_report = __get_column_level_test_report( params.dataset_name, unmatched_records, non_key_cols )