Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,5 @@ cython_debug/
.pypirc

#vscode
.DS_Store
.DS_Store
.vscode/
102 changes: 83 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ run_comparison_job_from_dfs(
- `source_df`: Source DataFrame.
- `target_df`: Target DataFrame.
- `params`: An instance of `DatasetParams` specifying dataset name, primary keys, columns to select/drop, etc.
- `output_config`: An instance of `OutputConfig` specifying output directory, file format, Spark write options, etc.
- `output_config`: An instance of [`OutputConfig`](#outputconfig) specifying output directory, file format, Spark write options, etc.

#### Example

Expand Down Expand Up @@ -81,7 +81,7 @@ run_comparison_job(
#### Parameters

- `spark`: The active `SparkSession`.
- `config`: A dictionary or `ComparisonJobConfig` instance describing one or more datasets to compare, their source/target configs, and output config.
- `config`: A dictionary or [`ComparisonJobConfig`](#comparisonjobconfig) instance describing one or more datasets to compare, their source/target configs, and output config.

#### Example

Expand Down Expand Up @@ -116,23 +116,87 @@ run_comparison_job(spark, config)

---

## Example Configuration (Python dict)

Below is an example of how to create a configuration dictionary for `run_comparison_job` using the dataclass structure:

```python
config = {
"job_name": "sample_comparison_job",
"dataset_configs": [
{
"params": {
"dataset_name": "table1",
"primary_keys": ["id"],
"test_params": {"difference_tolerance": 0.1},
"select_cols": ["id", "name", "value"],
"drop_cols": []
},
"source_config": {
"path": "/data/source/table1",
"file_format": "parquet",
"spark_options": {}
},
"target_config": {
"path": "/data/target/table1",
"file_format": "parquet",
"spark_options": {}
}
},
{
"params": {
"dataset_name": "table2",
"primary_keys": ["key"],
"test_params": {"difference_tolerance": 0.0},
"select_cols": ["key", "amount"],
"drop_cols": ["extra_col"]
},
"source_config": {
"path": "/data/source/table2",
"file_format": "csv",
"spark_options": {"header": "true"}
},
"target_config": {
"path": "/data/target/table2",
"file_format": "csv",
"spark_options": {"header": "true"}
}
}
],
"output_config": {
"output_dir": "/tmp/comparison_results",
"output_file_format": "parquet",
"spark_options": {},
"no_of_partitions": -1
}
}
```

You can pass this config directly to `run_comparison_job(spark, config)`.

---

## Configuration Dataclasses

Below are the main dataclasses used for configuration in `spark-data-test`. You can use these directly in Python or as a reference for your JSON configs.

### DatasetParams

Defines parameters for a single dataset comparison.

```python
@dataclass
class TestParams:
difference_margin: float = 0.0 # Allowed numeric difference for matching numeric columns.
```
```python
from dataclasses import dataclass, field

@dataclass
class DatasetParams:
dataset_name: str # Name of the dataset/table
primary_keys: list # List of primary key column names
select_cols: list = field(default_factory=lambda: ["*"]) # Columns to select (default: all)
drop_cols: list = field(default_factory=list) # Columns to drop (default: none)
dataset_name: str # Name of the dataset/table
primary_keys: list # List of primary key column names
test_params: TestParams # Testing parameters for dataset (Optional)
select_cols: list # Columns to select (default: all) (Optional)
drop_cols: list # Columns to drop (default: none) (Optional)
```

### DataframeConfig
Expand All @@ -144,9 +208,9 @@ from dataclasses import dataclass, field

@dataclass
class DataframeConfig:
path: str # Path to the data (e.g., file or table)
file_format: str = "parquet" # File format (parquet, csv, etc.)
spark_options: dict = field(default_factory=dict) # Spark read options (e.g., {"header": "true"})
path: str # Path to the data (e.g., file or table)
file_format: str # File format (parquet, csv, etc.) (default:parquet) (Optional)
spark_options: dict # Spark read options (e.g., {"header": "true"}) (Optional)
```

### OutputConfig
Expand All @@ -158,10 +222,10 @@ from dataclasses import dataclass, field

@dataclass
class OutputConfig:
output_dir: str # Directory to write output files
output_file_format: str = "parquet" # Output file format
spark_options: dict = field(default_factory=dict) # Spark write options
no_of_partitions: int = -1 # Number of partitions for output (-1 for default)
output_dir: str # Directory to write output files
output_file_format: str # Output file format (default:parquet) (Optional)
spark_options: dict # Spark write options (Optional)
no_of_partitions: int = -1 # Number of partitions for output (-1 for default partitions) (Optional)
```

### DatasetConfig
Expand Down Expand Up @@ -200,7 +264,7 @@ After running a comparison job, the following files/directories are generated un

### **overall_test_report**

Summary DataFrame with row counts, matched counts, duplicate counts, missing rows, and test status for each dataset.
Summary DataFrame with row counts, matched counts, duplicate counts, missing rows, and test status for each dataset. Output will generate under `<output_dir>/<job_name>/overall_test_report`

| dataset_name | count | matched_count | duplicate_count | missing_rows | test_status |
|--------------|----------------------|---------------|------------------------|------------------------|-------------|
Expand All @@ -210,7 +274,7 @@ Summary DataFrame with row counts, matched counts, duplicate counts, missing row

### **col_lvl_test_report**

Column-level report showing the count of unmatched values for each non-key column.
Column-level report showing the count of unmatched values for each non-key column. Output will generate under `<output_dir>/<job_name>/col_lvl_test_report`

| dataset_name | column_name | unmatched_rows_count |
|--------------|-------------|---------------------|
Expand All @@ -221,7 +285,7 @@ Column-level report showing the count of unmatched values for each non-key colum

### **row_lvl_test_report**

Row-level report with primary keys, duplicate count, missing row status, and match status for each row.
Row-level report with primary keys, duplicate count, missing row status, and match status for each row. Output will generate under `<output_dir>/<job_name>/row_lvl_test_report`

| dataset_name | id | duplicate_count | missing_row_status | all_rows_matched |
|--------------|----|----------------|----------------------|------------------|
Expand All @@ -232,7 +296,7 @@ Row-level report with primary keys, duplicate count, missing row status, and mat

### **unmatched_rows/**

Directory containing one file per column with all rows where that column did not match between source and target.
Directory containing one file per column with all rows where that column did not match between source and target. Output will generate under `<output_dir>/<job_name>/unmatched_rows/<dataset_name>/<column_name>`

Example for `unmatched_rows/colA`:

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
'pyspark==3.5.6'
],
extras_require={
"dev": ["pytest>=5", "pytest-cov"]
"dev": ["numpy==1.26.4","pytest>=5", "pytest-cov", "pandas==2.2.2", "pyarrow==14.0.2"]
},
python_requires='>=3.7',
)
2 changes: 2 additions & 0 deletions spark_data_test/constants/common_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
SRC_COL_SUFFIX = "{0}_src"
TGT_COL_SUFFIX = "{0}_target"
MATCHED_COL_SUFFIX = "{0}_matched"
MATCHED_SUFFIX = "_matched"

#common col names
CHK_SUM_COL = "_chk_sum"
Expand All @@ -38,6 +39,7 @@

#row level report
ALL_ROWS_MATCHED_COL = "all_rows_matched"
ALL_ROWS_MATCHED_AFTR_TOL_COL = "all_rows_matched_after_tolerance"
DUPLICATE_COUNT_COL = "duplicate_count"
MISSING_ROW_STATUS_COL = "missing_row_status"
MISSING_AT_SOURCE_STATUS = "MISSING_AT_SOURCE"
Expand Down
9 changes: 7 additions & 2 deletions spark_data_test/entities/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from dataclasses import dataclass, field
from spark_data_test.constants.common_constants import PARQUET_FMT

@dataclass
class TestParams:
difference_tolerance: float = 0.0

@dataclass
class DatasetParams:
dataset_name: str
primary_keys: list
test_params: TestParams = field(default_factory=TestParams)
select_cols: list = field(default_factory=lambda: ["*"])
drop_cols: list = field(default_factory=list)


@dataclass
class OutputConfig:
Expand All @@ -21,14 +27,13 @@ class DataframeConfig:
file_format: str = PARQUET_FMT
spark_options: dict = field(default_factory=dict)


@dataclass
class DatasetConfig:
params: DatasetParams
source_config: DataframeConfig
target_config: DataframeConfig



@dataclass
class ComparisonJobConfig:
job_name: str
Expand Down
Loading