From f6374a85ad87aeb9b28ad706f02cb300ea24f539 Mon Sep 17 00:00:00 2001 From: simtsc-db Date: Wed, 8 Apr 2026 17:15:06 +0800 Subject: [PATCH] Add databricks-dqx skill for data quality framework New skill covering DQX (Databricks Labs) data quality framework: profiling, rule generation (AI-assisted and profiler-based), quality checks, quarantining, metrics, dashboards, and best practices. Co-authored-by: Isaac --- databricks-skills/README.md | 1 + databricks-skills/databricks-dqx/SKILL.md | 167 +++++++++ databricks-skills/databricks-dqx/profiling.md | 205 +++++++++++ .../quality-checks-reference.md | 190 ++++++++++ .../databricks-dqx/storage-and-config.md | 325 ++++++++++++++++++ 5 files changed, 888 insertions(+) create mode 100644 databricks-skills/databricks-dqx/SKILL.md create mode 100644 databricks-skills/databricks-dqx/profiling.md create mode 100644 databricks-skills/databricks-dqx/quality-checks-reference.md create mode 100644 databricks-skills/databricks-dqx/storage-and-config.md diff --git a/databricks-skills/README.md b/databricks-skills/README.md index 46d52dc2..6bf5a705 100644 --- a/databricks-skills/README.md +++ b/databricks-skills/README.md @@ -94,6 +94,7 @@ cp -r ai-dev-kit/databricks-skills/databricks-agent-bricks .claude/skills/ - **databricks-unity-catalog** - System tables for lineage, audit, billing ### 🔧 Data Engineering +- **databricks-dqx** - Data quality framework (DQX): profiling, rule generation (AI-assisted), quality checks, quarantining, dashboards - **databricks-iceberg** - Apache Iceberg tables (Managed/Foreign), UniForm, Iceberg REST Catalog, Iceberg Clients Interoperability - **databricks-spark-declarative-pipelines** - SDP (formerly DLT) in SQL/Python - **databricks-jobs** - Multi-task workflows, triggers, schedules diff --git a/databricks-skills/databricks-dqx/SKILL.md b/databricks-skills/databricks-dqx/SKILL.md new file mode 100644 index 00000000..999b9ac2 --- /dev/null +++ b/databricks-skills/databricks-dqx/SKILL.md @@ -0,0 +1,167 @@ +--- +name: databricks-dqx +description: "Data quality framework for PySpark pipelines using DQX (Databricks Labs). Use when building data quality checks, profiling data, generating quality rules (including AI-assisted), applying validation checks, quarantining bad data, creating quality dashboards, or integrating quality checks into Lakeflow Pipelines (DLT), streaming, or batch workflows. Triggers: 'data quality', 'DQX', 'profiling', 'quality checks', 'data validation', 'quarantine', 'quality rules', 'data contract'." +--- + +# Databricks DQX — Data Quality Framework + +DQX is a Databricks Labs data quality framework for Apache Spark. Define, monitor, and address data quality issues in Python-based data pipelines — batch, streaming, and Lakeflow Pipelines (DLT). + +| Capability | Description | +|-----------|-------------| +| **Profiling** | Auto-profile DataFrames/tables and generate rule candidates | +| **AI-Assisted Rules** | Generate checks from natural language using LLMs | +| **Row & Dataset Rules** | Row-level (per-row) and dataset-level (aggregates, uniqueness) checks | +| **Batch & Streaming** | Spark batch, Structured Streaming, and Lakeflow Pipelines | +| **Quarantine** | Split valid/invalid data or annotate rows with error/warning columns | +| **Quality Dashboard** | Track metrics over time with a built-in Lakeview dashboard | +| **Data Contracts** | Generate rules from Open Data Contract Standard (ODCS) YAML | +| **Storage Backends** | YAML, JSON, Delta tables, Workspace files, Volumes, Lakebase | + +## Installation + +```bash +pip install databricks-labs-dqx # Core +pip install 'databricks-labs-dqx[llm]' # AI-assisted rule generation +pip install 'databricks-labs-dqx[pii]' # PII detection +pip install 'databricks-labs-dqx[datacontract]' # Data contract (ODCS) support +databricks labs install dqx # Workspace tool (CLI workflows) +``` + +## Quick Start — Profile → Generate → Apply + +```python +from databricks.sdk import WorkspaceClient +from databricks.labs.dqx.profiler.profiler import DQProfiler +from databricks.labs.dqx.profiler.generator import DQGenerator +from databricks.labs.dqx.engine import DQEngine + +ws = WorkspaceClient() +input_df = spark.read.table("catalog.schema.my_table") + +# Step 1: Profile +profiler = DQProfiler(ws) +summary_stats, profiles = profiler.profile(input_df) + +# Step 2: Generate rules from profiles +generator = DQGenerator(ws) +checks = generator.generate_dq_rules(profiles) + +# Step 3: Apply checks — split valid and invalid rows +dq_engine = DQEngine(ws) +valid_df, invalid_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks) + +# Write results +valid_df.write.mode("overwrite").saveAsTable("catalog.schema.silver") +invalid_df.write.mode("overwrite").saveAsTable("catalog.schema.quarantine") +``` + +## Common Patterns + +### AI-Assisted Rule Generation + +```python +generator = DQGenerator(workspace_client=ws, spark=spark) +checks = generator.generate_dq_rules_ai_assisted( + user_input="Username must not start with 's' if age < 18. All users need valid email. Age 0-120.", + input_config=InputConfig(location="catalog.schema.customers") +) +``` + +Combine profiler + AI rules: `all_checks = profiler_checks + ai_checks`. See [profiling.md](profiling.md) for full options including custom models, summary stats context, and custom check functions. + +### Lakeflow Pipelines (DLT) Integration + +```python +import dlt +from databricks.labs.dqx.engine import DQEngine + +dq_engine = DQEngine(WorkspaceClient()) +checks = dq_engine.load_checks(config=...) + +@dlt.view +def bronze_dq_check(): + return dq_engine.apply_checks_by_metadata(dlt.read_stream("bronze"), checks) + +@dlt.table +def silver(): + return dq_engine.get_valid(dlt.read_stream("bronze_dq_check")) + +@dlt.table +def quarantine(): + return dq_engine.get_invalid(dlt.read_stream("bronze_dq_check")) +``` + +### Structured Streaming (foreachBatch) + +```python +# checks defined as DQX classes (DQRowRule, DQDatasetRule) +def validate_and_write(batch_df, batch_id): + valid_df, invalid_df = dq_engine.apply_checks_and_split(batch_df, checks) + valid_df.write.format("delta").mode("append").saveAsTable("catalog.schema.output") + invalid_df.write.format("delta").mode("append").saveAsTable("catalog.schema.quarantine") + +(spark.readStream.format("delta").table("catalog.schema.bronze") + .writeStream.foreachBatch(validate_and_write).start()) +``` + +### End-to-End with Config + +```python +dq_engine.apply_checks_by_metadata_and_save_in_table( + checks=checks, + input_config=InputConfig(location="catalog.schema.bronze", format="delta"), + output_config=OutputConfig(location="catalog.schema.silver", format="delta", mode="append"), + quarantine_config=OutputConfig(location="catalog.schema.quarantine", format="delta", mode="append"), + metrics_config=OutputConfig(location="catalog.schema.dq_metrics", format="delta", mode="append") +) +``` + +### Multi-Table Checks + +```python +# By explicit table list +dq_engine.apply_checks_and_save_in_tables(run_configs=[RunConfig(...), RunConfig(...)]) + +# By pattern matching +dq_engine.apply_checks_and_save_in_tables_for_patterns( + patterns=["catalog.schema.*"], checks_location="catalog.schema.checks_table" +) +``` + +## Reference Files + +| Topic | File | Description | +|-------|------|-------------| +| Quality Checks | [quality-checks-reference.md](quality-checks-reference.md) | Built-in row/dataset check functions, YAML format, custom checks | +| Profiling | [profiling.md](profiling.md) | Profiler options, AI-assisted generation, data contracts, primary key detection | +| Storage & Config | [storage-and-config.md](storage-and-config.md) | Storage backends, config.yml, metrics, dashboard, CLI reference, best practices | + +## Common Issues + +| Issue | Solution | +|-------|----------| +| `ModuleNotFoundError: databricks.labs.dqx` | `pip install databricks-labs-dqx` or add to job libraries | +| AI generation fails | `pip install 'databricks-labs-dqx[llm]'` and enable serverless clusters | +| PII detection not available | `pip install 'databricks-labs-dqx[pii]'` | +| Data contract rules missing | `pip install 'databricks-labs-dqx[datacontract]'` | +| Checks validation errors | `dq_engine.validate_checks(checks)` or `databricks labs dqx validate-checks` | +| Custom column name conflicts | `ExtraParams(result_column_names={"errors": "custom_errors", "warnings": "custom_warnings"})` | +| Streaming metrics missing | Use `get_streaming_metrics_listener` or end-to-end methods | +| Dashboard shows no tables | Tables must have `_errors`/`_warnings` columns (or custom names) | +| Profiler too slow | Adjust `sample_fraction` (default 0.3), `limit` (default 1000), or use `filter` | + +## Related Skills + +- **[databricks-spark-declarative-pipelines](../databricks-spark-declarative-pipelines/SKILL.md)** — Lakeflow Pipelines with DQX quality checks +- **[databricks-spark-structured-streaming](../databricks-spark-structured-streaming/SKILL.md)** — Streaming with foreachBatch quality validation +- **[databricks-aibi-dashboards](../databricks-aibi-dashboards/SKILL.md)** — Custom dashboards for quality metrics +- **[databricks-jobs](../databricks-jobs/SKILL.md)** — Schedule DQX workflows as Databricks Jobs +- **[databricks-asset-bundles](../databricks-asset-bundles/SKILL.md)** — Deploy DQX pipelines via Asset Bundles +- **[databricks-unity-catalog](../databricks-unity-catalog/SKILL.md)** — Manage catalogs, schemas, and tables for quality checks + +## Resources + +- [DQX Documentation](https://databrickslabs.github.io/dqx/) +- [DQX GitHub Repository](https://github.com/databrickslabs/dqx) +- [DQX Best Practices](https://databrickslabs.github.io/dqx/docs/guide/best_practices/) diff --git a/databricks-skills/databricks-dqx/profiling.md b/databricks-skills/databricks-dqx/profiling.md new file mode 100644 index 00000000..822d272c --- /dev/null +++ b/databricks-skills/databricks-dqx/profiling.md @@ -0,0 +1,205 @@ +# DQX Profiling & Rule Generation + +## Profile a DataFrame + +```python +from databricks.labs.dqx.profiler.profiler import DQProfiler +from databricks.labs.dqx.profiler.generator import DQGenerator + +ws = WorkspaceClient() +profiler = DQProfiler(ws) +summary_stats, profiles = profiler.profile(spark.read.table("catalog.schema.my_table")) +``` + +## Profile a Table Directly + +```python +from databricks.labs.dqx.config import InputConfig + +summary_stats, profiles = profiler.profile_table( + input_config=InputConfig(location="catalog.schema.my_table"), + columns=["col1", "col2", "col3"] # optional +) +``` + +## Profile Multiple Tables + +```python +results = profiler.profile_tables_for_patterns(patterns=["main.data.*"]) +results = profiler.profile_tables_for_patterns( + patterns=["main.*"], exclude_patterns=["*_dq_output", "*_quarantine"] +) +``` + +## Profiling Options + +| Option | Default | Description | +|--------|---------|-------------| +| `sample_fraction` | 0.3 | Fraction of data to sample | +| `sample_seed` | None | Seed for reproducible sampling | +| `limit` | 1000 | Max records to analyze | +| `remove_outliers` | True | Remove outliers before min/max | +| `num_sigmas` | 3 | Std devs for outlier detection | +| `max_null_ratio` | 0.05 | Null ratio threshold for `is_not_null` | +| `trim_strings` | True | Trim whitespace before analysis | +| `max_empty_ratio` | 0.02 | Empty ratio threshold for `is_not_null_or_empty` | +| `distinct_ratio` | 0.01 | Distinct value ratio for `is_in` rule | +| `max_in_count` | 20 | Max items in `is_in_list` rules | +| `round` | True | Round min/max values | +| `filter` | None | SQL filter before profiling | +| `llm_primary_key_detection` | True | Use LLM to detect primary keys | + +## Summary Statistics Fields + +| Field | Meaning | +|-------|---------| +| `count` | Rows profiled (after sampling/limit) | +| `mean` / `stddev` | Average and standard deviation | +| `min` / `max` | Smallest/largest non-null value | +| `25` / `50` / `75` | Approximate percentiles | +| `count_non_null` / `count_null` | Non-null and null counts | + +## Profiler → Check Function Mapping + +| Profile Type | Check Function | Column Types | Trigger | +|-------------|---------------|-------------|---------| +| `is_not_null` | `is_not_null` | All | Null ratio <= `max_null_ratio` | +| `is_not_null_or_empty` | `is_not_null_and_not_empty` | String | Null+empty <= thresholds | +| `is_in` | `is_in_list` | String, Int, Long | Distinct ratio <= threshold, count <= max | +| `min_max` | `is_in_range` | Numeric, Date, Timestamp | With outlier removal and rounding | +| `is_unique` | `is_unique` | All | Requires LLM primary key detection | + +## Generate Rules from Profiles + +```python +generator = DQGenerator(ws) +checks = generator.generate_dq_rules(profiles) +``` + +## AI-Assisted Rule Generation + +Generate quality rules from natural language. Requires `pip install 'databricks-labs-dqx[llm]'`. + +```python +generator = DQGenerator(workspace_client=ws, spark=spark) + +# Basic +checks = generator.generate_dq_rules_ai_assisted( + user_input="Username must not start with 's' if age < 18. Valid email required. Age 0-120." +) + +# With table schema context +checks = generator.generate_dq_rules_ai_assisted( + user_input=user_input, input_config=InputConfig(location="catalog.schema.customers") +) + +# With profiler summary stats +checks = generator.generate_dq_rules_ai_assisted( + user_input="Validate sales data for anomalies", summary_stats=summary_stats +) +``` + +### With Custom Check Functions + +```python +@register_rule("row") +def not_ends_with_suffix(column: str, suffix: str): + return make_condition(F.col(column).endswith(suffix), ...) + +generator = DQGenerator(ws, spark=spark, custom_check_functions={"ends_with_suffix": not_ends_with_suffix}) +checks = generator.generate_dq_rules_ai_assisted(user_input=user_input) +``` + +### Custom Model + +```python +from databricks.labs.dqx.config import LLMModelConfig + +generator = DQGenerator(ws, spark=spark, + llm_model_config=LLMModelConfig(model_name="databricks/databricks-claude-sonnet-4-5")) +``` + +### Combine Profiler + AI Rules + +```python +all_checks = generator.generate_dq_rules(profiles) + generator.generate_dq_rules_ai_assisted( + user_input="GDPR compliance: no PII in public fields", + input_config=InputConfig(location="catalog.schema.customers") +) +``` + +## Primary Key Detection + +```python +result = profiler.detect_primary_keys_with_llm( + input_config=InputConfig(location="catalog.schema.users") +) +# result: {"primary_key_columns": [...], "confidence": ..., "reasoning": ...} +``` + +## Data Contract Rules (ODCS) + +Requires `pip install 'databricks-labs-dqx[datacontract]'`. + +```python +rules = generator.generate_rules_from_contract( + contract_file="/Workspace/Shared/contracts/customers.yaml" +) +``` + +### Constraint Mapping + +| ODCS Constraint | DQX Rule | Example | +|----------------|----------|---------| +| `required: true` | `is_not_null` | Mandatory fields | +| `unique: true` | `is_unique` | Primary keys | +| `pattern` | `regex_match` | Email validation | +| `minimum` / `maximum` | `is_in_range` or `sql_expression` | Amount limits | +| `minLength` / `maxLength` | `sql_expression` | Length constraints | + +### Explicit DQX Rules in Contract + +```yaml +quality: + - type: custom + engine: dqx + implementation: + name: rule_name + criticality: error + check: + function: check_function_name + arguments: + column: field_name +``` + +### Text-Based Rules (LLM-Processed) + +```yaml +quality: + - type: text + description: | + Email addresses must be valid and from approved corporate domains only. +``` + +Requires `pip install 'databricks-labs-dqx[datacontract,llm]'`. + +## Lakeflow DLT Expectations Generation + +Generate DLT expectations from profiled data: + +```python +from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator + +dlt_generator = DQDltGenerator(ws) + +sql_rules = dlt_generator.generate_dlt_rules(profiles, language="SQL") +# CONSTRAINT user_id_is_null EXPECT (user_id is not null) + +sql_drop = dlt_generator.generate_dlt_rules(profiles, language="SQL", action="drop") +# CONSTRAINT user_id_is_null EXPECT (user_id is not null) ON VIOLATION DROP ROW + +python_rules = dlt_generator.generate_dlt_rules(profiles, language="Python") +# @dlt.expect_all({"user_id_is_null": "user_id is not null"}) + +dict_rules = dlt_generator.generate_dlt_rules(profiles, language="Python_Dict") +``` diff --git a/databricks-skills/databricks-dqx/quality-checks-reference.md b/databricks-skills/databricks-dqx/quality-checks-reference.md new file mode 100644 index 00000000..c26f2ae3 --- /dev/null +++ b/databricks-skills/databricks-dqx/quality-checks-reference.md @@ -0,0 +1,190 @@ +# DQX Quality Checks Reference + +## Row-Level Checks + +Row-level checks validate individual rows, adding error/warning annotations per row. + +### Null and Empty + +| Function | Description | Arguments | +|----------|-------------|-----------| +| `is_not_null` | Not null | `column` | +| `is_null` | Is null | `column` | +| `is_not_empty` | String not empty | `column`, `trim_strings` (opt) | +| `is_empty` | String is empty | `column`, `trim_strings` (opt) | +| `is_not_null_and_not_empty` | Not null and not empty | `column`, `trim_strings` (opt) | +| `is_null_or_empty` | Null or empty | `column`, `trim_strings` (opt) | +| `is_not_null_and_not_empty_array` | Array not null/empty | `column` | + +### Value + +| Function | Description | Arguments | +|----------|-------------|-----------| +| `is_in_list` | In allowed list | `column`, `allowed`, `case_sensitive` (opt) | +| `is_not_in_list` | Not in forbidden list | `column`, `forbidden`, `case_sensitive` (opt) | +| `is_not_null_and_is_in_list` | Not null and in list | `column`, `allowed`, `case_sensitive` (opt) | +| `is_in_range` | Within range (inclusive) | `column`, `min_limit`, `max_limit` | +| `is_not_in_range` | Outside range | `column`, `min_limit`, `max_limit` | +| `is_equal_to` | Equals expected | `column`, `value`, `abs_tolerance` (opt), `rel_tolerance` (opt) | +| `is_not_equal_to` | Not equal to expected | `column`, `value`, `abs_tolerance` (opt), `rel_tolerance` (opt) | +| `is_not_less_than` | >= threshold | `column`, `limit` | +| `is_not_greater_than` | <= threshold | `column`, `limit` | + +### Format + +| Function | Description | Arguments | +|----------|-------------|-----------| +| `is_valid_date` | Valid date string | `column`, `date_format` (opt) | +| `is_valid_timestamp` | Valid timestamp | `column`, `timestamp_format` (opt) | +| `is_valid_json` | Valid JSON | `column` | +| `has_json_keys` | JSON has required keys | `column`, `keys`, `require_all` (opt) | +| `has_valid_json_schema` | JSON matches schema | `column`, `schema` (DDL string or StructType) | +| `is_valid_ipv4_address` | Valid IPv4 | `column` | +| `is_ipv4_address_in_cidr` | IPv4 in CIDR | `column`, `cidr_block` | +| `is_valid_ipv6_address` | Valid IPv6 | `column` | +| `is_ipv6_address_in_cidr` | IPv6 in CIDR | `column`, `cidr_block` | +| `regex_match` | Matches regex | `column`, `regex`, `negate` (opt) | + +### Temporal + +| Function | Description | Arguments | +|----------|-------------|-----------| +| `is_not_in_future` | Not in future | `column`, `offset` (opt), `curr_timestamp` (opt) | +| `is_not_in_near_future` | Not in near future | `column`, `offset` (opt), `curr_timestamp` (opt) | +| `is_older_than_n_days` | Older than N days | `column`, `days`, `curr_date` (opt), `negate` (opt) | +| `is_older_than_col2_for_n_days` | col1 older than col2 by N days | `column1`, `column2`, `days`, `negate` (opt) | +| `is_data_fresh` | Data freshness | `column`, `max_age_minutes`, `base_timestamp` (opt) | + +### Geographic + +| Function | Description | +|----------|-------------| +| `is_latitude` / `is_longitude` | Valid lat (-90..90) / lon (-180..180) | +| `is_geometry` / `is_geography` | Valid geometry/geography type | +| `is_point` / `is_linestring` / `is_polygon` | Specific geometry types | +| `is_multipoint` / `is_multilinestring` / `is_multipolygon` | Multi-geometry types | +| `is_geometrycollection` | Geometry collection | +| `is_ogc_valid` | OGC-valid geometry | +| `is_non_empty_geometry` | Non-empty geometry | +| `is_not_null_island` | Not NULL island (POINT(0 0)) | +| `has_dimension` | Expected dimensionality | +| `has_x_coordinate_between` / `has_y_coordinate_between` | Coordinate in range | +| `is_area_not_less_than` / `is_area_not_greater_than` | Area constraints | +| `is_area_equal_to` / `is_area_not_equal_to` | Area equality | +| `is_num_points_not_less_than` / `is_num_points_not_greater_than` | Point count bounds | +| `is_num_points_equal_to` / `is_num_points_not_equal_to` | Point count equality | + +### Security + +| Function | Description | Arguments | +|----------|-------------|-----------| +| `does_not_contain_pii` | No PII detected (requires `[pii]` extra) | `column`, `threshold` (opt), `language` (opt), `entities` (opt) | + +### Custom SQL + +| Function | Description | Arguments | +|----------|-------------|-----------| +| `sql_expression` | Custom SQL boolean expression | `expression`, `msg` (opt), `name` (opt), `negate` (opt), `columns` (opt) | + +```python +DQRowRule(criticality="error", check_func=check_funcs.sql_expression, + check_func_kwargs={"expression": "amount > 0 AND currency IN ('USD', 'EUR')"}) +``` + +## Dataset-Level Checks + +Validate the entire dataset — aggregates, uniqueness, schema, cross-dataset comparisons. + +| Function | Description | Key Arguments | +|----------|-------------|---------------| +| `is_unique` | Unique column(s) | `columns`, `nulls_distinct` (opt, default True) | +| `is_aggr_not_greater_than` | Aggregate <= limit | `column`, `aggr_type`, `limit`, `group_by` (opt), `aggr_params` (opt) | +| `is_aggr_not_less_than` | Aggregate >= limit | `column`, `aggr_type`, `limit`, `group_by` (opt), `aggr_params` (opt) | +| `is_aggr_equal` | Aggregate equals value | `column`, `aggr_type`, `limit`, `group_by` (opt), `abs_tolerance` (opt) | +| `is_aggr_not_equal` | Aggregate != value | `column`, `aggr_type`, `limit`, `group_by` (opt), `abs_tolerance` (opt) | +| `foreign_key` | Values exist in reference | `columns`, `ref_columns`, `ref_df_name` or `ref_table`, `negate` (opt) | +| `sql_query` | Custom SQL returns no violations | `query`, `merge_columns` (opt), `condition_column` (opt) | +| `compare_datasets` | Compare with reference | `columns`, `ref_columns`, `ref_df_name` or `ref_table` | +| `is_data_fresh_per_time_window` | Freshness in time window | `column`, `window_minutes`, `min_records_per_window`, `lookback_windows` (opt) | +| `has_valid_schema` | Schema matches expected | `expected_schema` or `ref_df_name`/`ref_table` | +| `has_no_outliers` | No statistical outliers (MAD) | `column` | + +Supported `aggr_type` values: `count`, `count_distinct`, `approx_count_distinct`, `sum`, `avg`, `min`, `max`, `stddev`, `stddev_pop`, `variance`, `var_pop`, `median`, `mode`, `skewness`, `kurtosis`, `percentile`, `approx_percentile` + +Use `aggr_params` for percentile/accuracy settings (e.g., `{"accuracy": 100}` for `percentile`). + +```python +from databricks.labs.dqx.rule import DQDatasetRule +from databricks.labs.dqx import check_funcs + +# Composite uniqueness +DQDatasetRule(criticality="error", check_func=check_funcs.is_unique, + columns=["order_id", "line_item_id"]) + +# Aggregate with group by +DQDatasetRule(criticality="error", check_func=check_funcs.is_aggr_not_greater_than, + column="amount", check_func_kwargs={"aggr_type": "count", "group_by": ["customer_id"], "limit": 100}) + +# Foreign key +DQDatasetRule(criticality="error", check_func=check_funcs.foreign_key, + columns=["product_id"], check_func_kwargs={"ref_df_name": "products_ref", "ref_columns": ["id"]}) +``` + +## Multi-Column Shorthand + +```python +from databricks.labs.dqx.rule import DQForEachColRule +from databricks.labs.dqx import check_funcs + +checks = DQForEachColRule( + columns=["col1", "col2", "col3"], criticality="error", check_func=check_funcs.is_not_null +).get_rules() +``` + +## YAML/JSON Check Format + +```yaml +- criticality: error # "error" or "warn" (default: "error") + check: + function: is_not_null # Built-in or custom function name + arguments: + column: customer_id + name: custom_name # Optional: override auto-generated name + filter: "status = 'active'" # Optional: SQL filter for conditional checks + user_metadata: # Optional: custom metadata + check_category: completeness + +# Multi-column shorthand +- criticality: error + check: + function: is_not_null + for_each_column: [col1, col2, col3] +``` + +Note: In metadata format (YAML/JSON/dict), use `__decimal__: "0.01"` for decimal values. + +## Complex Types + +```python +# Struct field (dot notation) +DQRowRule(check_func=check_funcs.is_not_null, column="address.zip_code") +# Map key +DQRowRule(check_func=check_funcs.is_not_null, column=F.try_element_at("metadata", F.lit("key1"))) +# Array element +DQRowRule(check_func=check_funcs.is_not_null, column=F.try_element_at("tags", F.lit(1))) +``` + +## Custom Check Functions + +```python +from databricks.labs.dqx.rule import register_rule, make_condition +import pyspark.sql.functions as F + +@register_rule("row") +def not_ends_with_suffix(column: str, suffix: str): + return make_condition( + F.col(column).endswith(suffix), + f"{column} should not end with '{suffix}'", + f"{column}_ends_with_{suffix}" + ) +``` diff --git a/databricks-skills/databricks-dqx/storage-and-config.md b/databricks-skills/databricks-dqx/storage-and-config.md new file mode 100644 index 00000000..171243e5 --- /dev/null +++ b/databricks-skills/databricks-dqx/storage-and-config.md @@ -0,0 +1,325 @@ +# DQX Storage, Configuration, Metrics & Dashboard + +## Checks Storage Backends + +| Backend | Config Class | Location Format | Use Case | +|---------|-------------|----------------|----------| +| **Local File** | `FileChecksStorageConfig` | `checks.yml` or `checks.json` | Development, testing | +| **Workspace File** | `WorkspaceFileChecksStorageConfig` | `/Shared/App1/checks.yml` | Shared workspace checks | +| **Unity Catalog Table** | `TableChecksStorageConfig` | `catalog.schema.checks_table` | Production, governed | +| **UC Volume** | `VolumeFileChecksStorageConfig` | `/Volumes/cat/schema/vol/checks.yml` | File-based with UC governance | +| **Lakebase** | `LakebaseChecksStorageConfig` | `dqx.config.checks` | Lakebase-hosted | +| **Installation** | `InstallationChecksStorageConfig` | Auto-managed | CLI workflow default | + +### Save and Load + +```python +from databricks.labs.dqx.engine import DQEngine +from databricks.labs.dqx.config import ( + FileChecksStorageConfig, WorkspaceFileChecksStorageConfig, + TableChecksStorageConfig, VolumeFileChecksStorageConfig, +) + +dq_engine = DQEngine(WorkspaceClient()) + +# Local YAML +dq_engine.save_checks(checks, config=FileChecksStorageConfig(location="checks.yml")) + +# Workspace file +dq_engine.save_checks(checks, config=WorkspaceFileChecksStorageConfig(location="/Shared/App1/checks.yml")) + +# Unity Catalog table (supports append mode and run_config_name) +dq_engine.save_checks(checks, config=TableChecksStorageConfig( + location="catalog.schema.checks_table", mode="append" +)) + +# UC Volume +dq_engine.save_checks(checks, config=VolumeFileChecksStorageConfig( + location="/Volumes/dq/config/checks_volume/App1/checks.yml" +)) + +# Load from table with run_config_name filter +checks = dq_engine.load_checks(config=TableChecksStorageConfig( + location="catalog.schema.checks_table", run_config_name="main.default.input_table" +)) + +# Validate +status = dq_engine.validate_checks(checks) +assert not status.has_errors +``` + +## Configuration File (config.yml) + +Created during `databricks labs install dqx`. Used by CLI workflows. + +```yaml +log_level: INFO +version: 1 +serverless_clusters: true + +run_configs: + - name: default + input_config: + location: catalog.schema.bronze + format: delta + is_streaming: false + output_config: + location: catalog.schema.silver + format: delta + mode: append + quarantine_config: + location: catalog.schema.quarantine + format: delta + mode: append + metrics_config: + format: delta + location: catalog.schema.dq_metrics + mode: append + checks_location: checks.yml + profiler_config: + sample_fraction: 0.3 + limit: 1000 + llm_primary_key_detection: false + warehouse_id: your-warehouse-id +``` + +### Streaming Configuration + +```yaml +run_configs: + - name: streaming_config + input_config: + location: catalog.schema.bronze + format: delta + is_streaming: true + output_config: + location: catalog.schema.silver + format: delta + mode: append + checkpointLocation: /Volumes/catalog/schema/checkpoint/output + trigger: + availableNow: true + quarantine_config: + location: catalog.schema.quarantine + format: delta + mode: append + checkpointLocation: /Volumes/catalog/schema/checkpoint/quarantine + trigger: + availableNow: true +``` + +### Custom Check Functions in Config + +```yaml +run_configs: + - name: default + custom_check_functions: + my_func: custom_checks/my_funcs.py + my_other: /Workspace/Shared/MyApp/my_funcs.py + email_mask: /Volumes/main/dqx_utils/custom/email.py +``` + +### Reference Tables + +```yaml +run_configs: + - name: default + reference_tables: + reference_table_1: + input_config: + format: delta + location: catalog.schema.reference_data +``` + +## Summary Metrics + +DQX auto-computes: `input_row_count`, `error_row_count`, `warning_row_count`, `valid_row_count`. + +### Batch Collection + +```python +from databricks.labs.dqx.metrics_observer import DQMetricsObserver + +observer = DQMetricsObserver(name="dq_metrics") +engine = DQEngine(WorkspaceClient(), observer=observer) +checked_df, observation = engine.apply_checks_by_metadata(df, checks) +checked_df.count() # trigger Spark action +metrics = observation.get +``` + +### Write Metrics to Table + +```python +engine.apply_checks_and_save_in_table( + checks=checks, + input_config=InputConfig(location="catalog.schema.bronze"), + output_config=OutputConfig(location="catalog.schema.silver"), + quarantine_config=OutputConfig(location="catalog.schema.quarantine"), + metrics_config=OutputConfig(location="catalog.schema.dq_metrics", format="delta", mode="append") +) +``` + +### Streaming Metrics + +Use `get_streaming_metrics_listener` or end-to-end methods for automatic streaming metric collection. + +### Custom Metrics + +```yaml +# In config.yml +custom_metrics: + - "avg(amount) as average_transaction_amount" + - "sum(array_size(_errors)) as total_errors" +``` + +### Metrics Table Schema + +| Column | Description | +|--------|-------------| +| `run_id` | Unique run identifier | +| `run_name` | Run configuration name | +| `input_location` / `output_location` / `quarantine_location` | Source, target, quarantine paths | +| `checks_location` | Checks definition location | +| `metric_name` / `metric_value` | Metric name and value (stored as STRING) | +| `run_time` | Timestamp of the run | +| `error_column_name` / `warning_column_name` | Names of result columns | +| `user_metadata` | Custom metadata map | + +## Additional Configuration + +### Custom Result Column Names + +```python +dq_engine = DQEngine(ws, extra_params=ExtraParams( + result_column_names={"errors": "dq_errors", "warnings": "dq_warnings"} +)) +``` + +### User Metadata + +```python +# Engine-level (applies to all checks) +dq_engine = DQEngine(ws, extra_params=ExtraParams( + user_metadata={"pipeline": "orders", "team": "data-eng"} +)) + +# Per-check (overrides engine-level for same keys) +DQRowRule(criticality="error", check_func=check_funcs.is_not_null, column="customer_id", + user_metadata={"check_category": "completeness", "owner": "data-team"}) +``` + +## Quality Dashboard + +Built-in Lakeview dashboard with three tabs: **Data Quality Summary**, **Quality by Table (Time Series)**, **Quality by Table (Full Snapshot)**. + +```bash +databricks labs dqx open-dashboards +``` + +Tables must have `_errors`/`_warnings` columns (or custom names). Dashboard is not auto-scheduled; set up a schedule via a Databricks Job. Manual import: `DQX_Dashboard.lvdash.json` from the [DQX GitHub repo](https://github.com/databrickslabs/dqx). + +## CLI Reference + +```bash +# Installation +databricks labs install dqx # Install +databricks labs install dqx@v0.9.3 # Specific version +databricks labs upgrade dqx # Upgrade +databricks labs uninstall dqx # Uninstall + +# Configuration +databricks labs dqx open-remote-config # Open config.yml +databricks labs dqx workflows # List workflows + +# Profiling +databricks labs dqx profile --run-config "default" +databricks labs dqx profile --patterns "main.data.*" +databricks labs dqx profile --exclude-patterns "*_tmp" + +# Quality Checking +databricks labs dqx apply-checks --run-config "default" +databricks labs dqx apply-checks --patterns "main.*" +databricks labs dqx apply-checks --output-table-suffix "_clean" +databricks labs dqx apply-checks --quarantine-table-suffix "_quarantine" + +# End-to-End +databricks labs dqx e2e --run-config "default" + +# Validation +databricks labs dqx validate-checks --run-config "default" + +# Logs & Dashboard +databricks labs dqx logs --workflow profiler +databricks labs dqx logs --workflow quality-checker +databricks labs dqx open-dashboards +``` + +## Best Practices + +### Rule Management + +- **Store checks in Delta tables** for production — centralizes management, enables governance and versioning +- **Govern checks storage strictly** — rules directly impact pipeline behavior: + - Read-only access for business users/data engineers; write restricted to service principals + - Modify only through CI/CD workflows, not ad-hoc SQL or direct DML + - Separate tables by domain/team/environment to prevent cross-boundary changes +- **Use `run_config_name`** to share common checks across table groups, layered with table-specific checks +- **Prioritize high-impact fields** — business keys, timestamps, join/aggregation columns, compliance columns, columns with known quality issues, clustering/partition columns +- **Match rule type to purpose:** + +| Type | When to Use | +|------|-------------| +| Rule-based | Deterministic validation (default choice) | +| AI-assisted | Translate natural language requirements to rules | +| Anomaly detection | Catch outliers not covered by explicit rules | +| Profiler-generated | Bootstrap initial rule sets from data | + +- **Prefer row-level checks** for performance and granularity; use dataset-level to supplement +- **Encapsulate reusable logic** in custom check functions instead of duplicating `sql_expression` +- **Define consumer-specific rules** when downstream applications (reporting, ML, data science) have different quality expectations for the same table + +### Profile and Tune Over Time + +- Bootstrap rules with the **profiler** for new datasets; re-profile periodically to detect distribution shifts +- Use **AI-assisted generation** to translate business/compliance requirements into rules +- Monitor **pass/fail rates and trends** to reduce false positives while maintaining coverage +- **Add/tune checks immediately** after production incidents to prevent recurrence +- **Retire obsolete rules** as business logic evolves; version rules for traceability + +### Pipeline Integration + +- **Workflows** (no-code) for post-factum monitoring on persisted data; **embedded** (programmatic) for in-transit validation +- **Apply all checks in one pass** — group row-level + dataset-level + anomaly checks to minimize scans +- **Quarantine** critical bad records (`apply_checks_and_split`); **flag** non-critical records (`apply_checks`) +- **Scale across tables** with `apply_checks_and_save_in_tables` or `apply_checks_and_save_in_tables_for_patterns` +- **Set up SLAs and alerting** for critical quality metrics; use the dashboard and metrics table for trend tracking + +### Deployment + +- **Version rules in Git**, deploy as Delta tables aligned with pipeline releases (e.g., via Databricks Asset Bundles) +- Use **environment-specific configs** when promoting dev → qa → prod +- Tag rules with version info via `user_metadata` or `run_config_name`: + +```yaml +# Per-rule versioning via user_metadata +- criticality: error + check: + function: is_not_null + arguments: + column: col1 + user_metadata: + version: v1 + location: catalog.schema.checks_table +``` + +```python +# Rule-set versioning via run_config_name +dq_engine.save_checks(checks, config=TableChecksStorageConfig( + location="catalog.schema.checks_table", + run_config_name="main.default.input_table_v1", mode="overwrite" +)) +``` + +- **Pin DQX version** (`pip install databricks-labs-dqx==0.9.3` / `databricks labs install dqx@v0.9.3`); review breaking changes before upgrading +- **Test in lower environments** with sample/synthetic data before production; automate regression tests for critical checks +- **Use custom installation folders** to isolate DQX dependencies per team