Skip to content

park-peter/dpo

Repository files navigation

Data Profiling Orchestrator (DPO)

A "Declare and Deploy" solution for automating Databricks Data Profiling at scale across Unity Catalog.

Overview

DPO eliminates manual UI-based configuration by providing:

  • CLI-First Workflow: dpo validate, dpo dry-run, dpo run, dpo coverage via a click-based CLI
  • Config-Driven Tables: Define tables explicitly in YAML with per-table settings
  • Optional Tag Discovery: Discover additional tables via Unity Catalog tags
  • Per-Table Configuration: Baseline tables, label columns, granularity, enrichment metadata per table
  • All Profile Types: Support for Snapshot, TimeSeries, and Inference profiles
  • Two Operation Modes: Full mode for unified observability, Bulk mode for quick onboarding
  • Monitor Groups: Per-team/department aggregation with group-scoped drift, profile, performance views, alerts, and dashboards
  • Schema Validation: Pre-flight column existence checks before creating monitors
  • Policy Governance: Configurable naming rules, required tags, and structural constraints
  • Coverage Governance: Separate governance workflow for unmonitored tables, stale monitors, and orphaned monitors
  • Enriched Alerting: Alerts carry owner, runbook URL, and lineage URL for actionable triage
  • Unified Metrics Views: Group-scoped management views for drift, profile, and model performance metrics

Installation

pip install databricks-dpo

For local development:

pip install -e .

Or from a git clone:

make install

For Databricks notebooks (Python API execution path):

%pip install databricks-dpo

Execution Contexts

  • Local shell / CI/CD runners: use the dpo CLI (validate, dry-run, run, coverage).
  • Databricks notebooks / notebook jobs: use the Python API (load_config, run_orchestration).

The notebook path is API-driven; running the CLI inside notebook cells is not the recommended operating model.

Quick Start

Option 1: CLI (Local + CI/CD Recommended)

The dpo CLI is safe by default - validate first, then preview, then execute:

# 1. Validate config syntax and policy rules
dpo validate configs/my_config.yaml

# 2. Preview what would change (no mutations)
dpo dry-run configs/my_config.yaml

# 3. Execute for real (requires explicit --confirm)
dpo run configs/my_config.yaml --confirm

# 4. Check governance coverage
dpo coverage configs/my_config.yaml

Option 2: Config-Driven (Python API for Notebooks/Jobs)

Define tables explicitly in YAML - version-controlled and reproducible:

# configs/my_config.yaml
catalog_name: "prod"
warehouse_id: "your-warehouse-id"
mode: "full"

profile_defaults:
  profile_type: "INFERENCE"
  output_schema_name: "monitoring_results"
  prediction_column: "prediction"
  label_column: "label"
  timestamp_column: "timestamp"

monitored_tables:
  prod.ml.churn_predictions:
    baseline_table_name: "prod.ml.churn_baseline"
    label_column: "churned"
    prediction_column: "churn_probability"
    
  prod.ml.fraud_detection:
    label_column: "is_fraud"
    granularity: "1 hour"
from dpo import run_orchestration, load_config

config = load_config("configs/my_config.yaml")
report = run_orchestration(config, dry_run=False)
print(f"Created {report.monitors_created} monitors")

Option 3: Tag-Based Discovery

Discover tables via Unity Catalog tags:

ALTER TABLE prod.ml.churn_predictions SET TAGS ('monitor_enabled' = 'true');
catalog_name: "prod"
warehouse_id: "your-warehouse-id"
include_tagged_tables: true

discovery:
  include_tags:
    monitor_enabled: "true"
  exclude_schemas:
    - "tmp_*"

profile_defaults:
  profile_type: "INFERENCE"
  output_schema_name: "monitoring_results"

monitored_tables: {}  # Can be empty when using discovery

Option 4: Hybrid Mode

Use both - YAML config takes precedence for tables that appear in both:

catalog_name: "prod"
include_tagged_tables: true  # Discover tagged tables

discovery:
  include_tags: {monitor_enabled: "true"}

monitored_tables:
  # These settings override any tags for this specific table
  prod.ml.churn_predictions:
    baseline_table_name: "prod.ml.churn_baseline"
    label_column: "churned"

Per-Table Configuration

Each table in monitored_tables can override global defaults:

monitored_tables:
  prod.ml.churn_model:
    baseline_table_name: "prod.ml.churn_baseline"
    label_column: "churned"
    prediction_column: "churn_probability"
    timestamp_column: "prediction_time"
    problem_type: "PROBLEM_TYPE_CLASSIFICATION"
    granularity: "1 day"
    slicing_exprs:
      - "region"
      - "customer_segment"

Available per-table settings (all optional, inherit from profile_defaults):

Setting Description
baseline_table_name Baseline table for drift comparison
label_column Ground truth label column
prediction_column Model prediction column
timestamp_column Timestamp column
model_id_column Model version/ID column
problem_type CLASSIFICATION or REGRESSION
granularity Aggregation window ("1 hour", "1 day", etc.)
slicing_exprs Columns to slice metrics by
owner Table owner for alert routing (falls back to UC tag owner)
runbook_url Runbook URL included in alert payloads
lineage_url Lineage URL included in alert payloads
drift_threshold Per-table JS divergence threshold (overrides global alerting.drift_threshold)

Operation Modes

Full Mode (default)

Complete pipeline with unified observability:

mode: "full"

Creates:

  • Individual monitors per table
  • Group-scoped unified views for drift, profile, and performance metrics
  • SQL Alerts (per group)
  • Lakeview dashboards (per group) plus an optional executive rollup when multiple groups are active

Bulk Provisioning Mode

Fastest onboarding path - creates monitors only:

mode: "bulk_provision_only"

Use when:

  • You have 100+ tables and want the fastest setup
  • You prefer using Databricks' native per-table UI for results
  • You don't need cross-table unified observability yet

Monitor Groups

Segment monitoring by team, department, or use case:

ALTER TABLE prod.ml.churn_predictions SET TAGS (
    'monitor_enabled' = 'true',
    'monitor_group' = 'ml_team'
);

Per-Group Alert Routing

alerting:
  enable_aggregated_alerts: true
  default_notifications:
    - "mlops-alerts@company.com"
  group_notifications:
    ml_team:
      - "ml-team@company.com"
    data_eng:
      - "data-eng@company.com"

Validation rules for grouped deployments:

  • Monitored tables must have unique leaf table names across the run.
  • Group names must sanitize to unique SQL-safe identifiers.

Profile Types

Type Use Case Key Config
SNAPSHOT Static data quality checks None (simplest)
TIMESERIES Time-windowed data quality timeseries_timestamp_column
INFERENCE ML model monitoring with drift prediction_column, label_column

Inference Profile

profile_defaults:
  profile_type: "INFERENCE"
  problem_type: "PROBLEM_TYPE_CLASSIFICATION"
  prediction_column: "prediction"
  label_column: "label"
  timestamp_column: "timestamp"

TimeSeries Profile

profile_defaults:
  profile_type: "TIMESERIES"
  timeseries_timestamp_column: "event_time"
  granularity: "1 day"

Snapshot Profile

profile_defaults:
  profile_type: "SNAPSHOT"

Schema Validation

DPO validates that configured columns exist before creating monitors:

ValueError: Column 'label_v1' (from label_column) not found in table 
'prod.ml.churn'. Available columns: ['id', 'prediction', 'actual_label', 'ts']

This prevents cryptic API errors and provides clear guidance.

CLI Reference

DPO ships a click-based CLI installed as the dpo console script. Use this CLI from a local shell or CI/CD runner. For Databricks notebooks/jobs, use the Python API shown above.

Command Description Safe by Default
dpo validate <config> Schema + policy validation (offline by default, --check-workspace for workspace checks) Yes
dpo dry-run <config> Preview planned actions with a structured Impact Report Yes
dpo run <config> --confirm Execute orchestration (requires explicit --confirm flag) Yes
dpo coverage <config> Report unmonitored, stale, and orphaned monitors Yes

All commands support --verbose for DEBUG logging and --format json for machine-readable output.

dpo dry-run always enforces dry-run mode and dpo run --confirm always enforces live mode, regardless of the YAML dry_run value.

Exit codes: 0 success, 1 validation failure, 2 usage error, 3 runtime error.

Policy Governance

Define structural rules that all configurations must satisfy:

# In your config or a standalone policy file
policy:
  naming_patterns:
    - "^prod\\..*"                # Tables must be in prod catalog
  required_tags:
    - "owner"
    - "team"
  forbidden_patterns:
    - ".*_tmp$"                   # No temporary tables
    - ".*_test$"                  # No test tables
  require_baseline: true          # All inference monitors need a baseline
  require_slicing: true           # At least one slicing expression per table
  max_tables_per_config: 500      # Limit config scope

Policy templates are available in configs/policies/:

  • default_policy.yaml - Permissive defaults
  • strict_policy.yaml - Production-grade constraints

Coverage Governance

The dpo coverage command analyzes monitoring gaps:

Metric Description
Unmonitored tables Tables in the catalog that have no profiling monitor
Stale monitors Monitors that haven't refreshed in stale_monitor_days (default 30)
Orphan monitors Monitors whose source tables no longer exist
dpo coverage configs/production.yaml --format json

Coverage analysis is intentionally separate from dpo dry-run and dpo run. Use dpo coverage when you want a catalog-wide governance report, or call CoverageAnalyzer directly from the Python API.

Configuration Reference

Top-Level Settings

Setting Required Description
catalog_name Yes Target catalog - all tables must be in this catalog
warehouse_id Yes SQL Warehouse for statement execution
mode No full (default) or bulk_provision_only
include_tagged_tables No If true, discover tables via UC tags (default: false)
monitored_tables Conditional Required if include_tagged_tables is false
discovery Conditional Required if include_tagged_tables is true
policy No Policy governance rules (see Policy Governance section)
stale_monitor_days No Days without refresh before a monitor is considered stale (default: 30)

Profile Defaults

Setting Description
profile_type INFERENCE, SNAPSHOT, or TIMESERIES
output_schema_name Where metric tables are created
granularity Default aggregation window
prediction_column Default prediction column (inference)
label_column Default label column (inference)
timestamp_column Default timestamp column
problem_type Default problem type (inference)
slicing_exprs Default slicing expressions
create_builtin_dashboard Create per-monitor dashboards

Alerting

Setting Description
enable_aggregated_alerts Create alerts on unified views
drift_threshold JS divergence threshold (default: 0.2)
null_rate_threshold Alert if null rate exceeds threshold
row_count_min Alert if row count falls below
alert_cron_schedule Quartz cron schedule for alert evaluation (default hourly)
alert_timezone Timezone for alert schedule evaluation (default UTC)
default_notifications Default notification destinations
group_notifications Per-group notification routing

Project Structure

DPO/
├── src/
│   └── dpo/
│       ├── __init__.py       # Main entry point + run_orchestration()
│       ├── __main__.py       # Module entry point (python -m dpo)
│       ├── cli.py            # click-based CLI (dpo validate/dry-run/run/coverage)
│       ├── config.py         # Pydantic config models (incl. PolicyConfig)
│       ├── coverage.py       # Coverage governance (unmonitored/stale/orphan)
│       ├── discovery.py      # UC table discovery + enrichment resolution
│       ├── naming.py         # Artifact naming, slugging, and identity rules
│       ├── planning.py       # Execution planning and upfront validation
│       ├── provisioning.py   # Monitor lifecycle + structured ImpactReport
│       ├── aggregator.py     # Group-scoped unified views with enrichment metadata
│       ├── alerting.py       # SQL Alerts with owner/runbook/lineage
│       ├── dashboard.py      # Lakeview dashboard deployment
│       └── utils.py          # Shared utilities
├── configs/
│   ├── default_profile.yaml
│   └── policies/
│       ├── default_policy.yaml
│       └── strict_policy.yaml
├── notebooks/
│   ├── 01_discovery_demo.py
│   ├── 02_full_orchestration.py
│   ├── 03_alerting_setup.py
│   ├── 04_profile_types_demo.py
│   ├── 05_bulk_provisioning.py
│   └── 06_monitor_groups_demo.py
├── tests/
├── pyproject.toml
└── README.md

API Reference

from dpo import (
    run_orchestration,
    run_bulk_provisioning,
    load_config,
    OrchestrationReport,
    ImpactReport,
    CoverageAnalyzer,
    CoverageReport,
    get_monitor_statuses,
    wait_for_monitors,
    print_monitor_statuses,
)

OrchestrationReport Fields

Field Type Description
tables_discovered int Number of tables processed
monitors_created int New monitors created
monitors_updated int Existing monitors updated
monitors_skipped int Tables skipped (quota, validation)
monitors_failed int Failed provisioning attempts
monitor_statuses List[MonitorStatus] Final status of each monitor
unified_drift_views Dict[str, str] Group → drift view name
unified_profile_views Dict[str, str] Group → profile view name
drift_alert_ids Dict[str, str] Group → drift alert ID
dashboard_ids Dict[str, str] Group → dashboard ID
impact_report Optional[ImpactReport] Structured dry-run impact report

Requirements

  • Databricks workspace with Unity Catalog enabled
  • databricks-sdk>=0.77.0 (uses data_quality API)
  • click>=8.0 (CLI framework)
  • SQL Warehouse for statement execution
  • Appropriate permissions on target catalog/schema

Objective Function Registry

For simple metric expressions, use custom_metrics directly. For complex logic (joins, UDFs, sklearn), create a UC SQL function first, then reference it in the metric definition.

UC Function Pattern

-- Step 1: Create a UC SQL function for complex logic
CREATE FUNCTION catalog.schema.roc_auc_score_func(predictions ARRAY<DOUBLE>, labels ARRAY<INT>)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
from sklearn.metrics import roc_auc_score
return float(roc_auc_score(labels, predictions))
$$;
# Step 2: Reference it in DPO config
objective_functions:
  churn_30d_rocauc:
    version: "1.2"
    owner: ml_team
    description: "ROC AUC for 30-day churn prediction"
    uc_function_name: catalog.schema.roc_auc_score_func
    metric:
      name: rocauc_30d
      metric_type: aggregate
      input_columns: [":table"]
      definition: "catalog.schema.roc_auc_score_func({{prediction_col}}, {{label_col}})"
      output_type: double

Custom Metric Expression Constraints

Custom metric definitions are Jinja-templated SQL expressions, not full SQL statements. They cannot contain joins or subqueries. For complex logic, wrap it in a UC SQL function and reference it in the definition.

Jinja Template Variable Reference

DPO passes metric definitions through to Databricks verbatim. Databricks renders the Jinja templates.

Variable Available When Description
{{input_column}} Single-column aggregate/derived metrics The column being processed
{{prediction_col}} InferenceLog monitors Prediction column value
{{label_col}} InferenceLog monitors Label/ground truth column value
{{current_df}} Drift metrics Current time window data reference
{{base_df}} Drift metrics Baseline data reference

Additionally, any name listed in input_columns is a valid template variable (e.g., {{null_count}}, {{count}}).

Supported Granularities

DPO supports all Databricks Data Profiling aggregation granularities:

Granularity Use Case
5 minutes Real-time streaming inference
30 minutes Near real-time monitoring
1 hour Standard streaming workloads
1 day Default; most batch workloads
1 week Weekly batch processing
2 weeks Bi-weekly reporting cycles
3 weeks Custom sprint cycles
4 weeks Monthly-adjacent reporting
1 month Monthly batch/financial cycles
1 year Annual trend analysis

Multi-Granularity

DPO supports multiple granularities per monitor. Use granularities (plural) for dual leading/lagging indicators:

profile_defaults:
  granularities: ["1 day", "1 month"]

The singular granularity field is still supported as a fallback. When granularities is set, it takes precedence.

Windowing Strategy

30-Day Initial Lookback

When a monitor is first created, Databricks processes the last 30 days of data. This is not a hard constraint on evaluation windows. Subsequent refreshes process all new/changed data, especially when Change Data Feed (CDF) is enabled.

Delayed Ground Truth Workflow

For models with delayed labels (30/60/90 days):

  1. Labels are backfilled into the monitored inference table as they become available
  2. On the next DPO refresh, Databricks reprocesses windows containing new label data
  3. Performance metrics update automatically for those windows

DPO does not orchestrate the label backfill itself -- that is an ETL concern. DPO picks up the updated data on refresh.

Change Data Feed (CDF) Recommendation

Enable Change Data Feed on monitored tables for efficient incremental processing:

ALTER TABLE catalog.schema.inference_table SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

This allows Databricks to process only changed rows on subsequent refreshes rather than re-scanning entire partitions.


License

GPL v3

About

A "Declare and Deploy" solution for automating Databricks Data Profiling at scale across Unity Catalog

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages