Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions spark_data_test/jobs/comparison_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ def __write_results(comparison_result, job_name, output_config):
def compare_dataframes(spark, source_df, target_df, params):
non_key_cols = list(set(source_df.columns) - set(params.primary_keys))
target_df_schema = target_df.select(*params.select_cols).drop(*params.drop_cols).schema

# Apply transformations (select, drop columns), add checksum & append _src/_target for all columns
source_df = __apply_source_target_transformations(
source_df,
SRC_COL_SUFFIX,
Expand All @@ -199,6 +201,7 @@ def compare_dataframes(spark, source_df, target_df, params):
params
).cache()

# Join Source and Target DF and match _chechsum of Source and Target
joined_df = source_df.join(
target_df, params.primary_keys, FULL_OUTER_JOIN
).cache()
Expand All @@ -210,26 +213,29 @@ def compare_dataframes(spark, source_df, target_df, params):
MATCHED_COL_SUFFIX.format(CHK_SUM_COL), ALL_ROWS_MATCHED_COL
).cache()

# For records with unmatched _checksum values, perform a column-by-column comparison. For numeric fields, apply a difference_tolerance to determine if values are acceptably close.
unmatched_rows_aftr_tolerance = __process_unmatched_records(
joined_df.filter(
(f.col(ALL_ROWS_MATCHED_COL) == False)
(~f.col(ALL_ROWS_MATCHED_COL))
& (f.col(SRC_COL_SUFFIX.format(CHK_SUM_COL)).isNotNull())
& (f.col(TGT_COL_SUFFIX.format(CHK_SUM_COL)).isNotNull())
),
target_df_schema,
params.primary_keys,
params.test_params
)
unmatched_records = unmatched_rows_aftr_tolerance.filter(f.col(ALL_ROWS_MATCHED_AFTR_TOL_COL) == False)


# Update the all_rows_matched column in joined_df for records that are considered matched after applying the difference_tolerance to numeric fields.
joined_df = joined_df.join(
unmatched_rows_aftr_tolerance.filter(f.col(ALL_ROWS_MATCHED_AFTR_TOL_COL)).select(*(params.primary_keys+[ALL_ROWS_MATCHED_AFTR_TOL_COL])),
params.primary_keys,
LEFT_JOIN
).withColumn(ALL_ROWS_MATCHED_COL, f.col(ALL_ROWS_MATCHED_COL) | f.coalesce(f.col(ALL_ROWS_MATCHED_AFTR_TOL_COL), f.lit(False).cast(BooleanType()))).cache()

unmatched_records = unmatched_rows_aftr_tolerance.filter(~f.col(ALL_ROWS_MATCHED_AFTR_TOL_COL))
matched_records = joined_df.filter(f.col(ALL_ROWS_MATCHED_COL) == True).dropDuplicates(params.primary_keys)

# Generate reports
col_lvl_report = __get_column_level_test_report(
params.dataset_name, unmatched_records, non_key_cols
)
Expand Down