A production-grade data quality gate for clinical ML pipelines
- Project Overview
- Research Foundation
- What's In Scope vs Out of Scope
- Quickstart
- Repository Layout
- Pipeline Integration
- DQV Agent Architecture
- Inputs & Outputs
- Validation Checks Taxonomy
- Artifacts Produced
- Example Run Outputs
- Design Decisions
- Development
- Testing
- Limitations & Next Steps
- License
This repository implements the Data Quality Validation (DQV) Agent, an autonomous validation gate that sits within Nimblemind's agentic clinical ML pipeline. The DQV Agent extends the multi-agent workflow by adding a pre-preprocessing quality gate that validates raw (or anonymized) structured medical datasets before they proceed to downstream feature extraction, model matching, and inference.
The DQV Agent validates tabular medical datasets using a dual-lane validation architecture:
- Lane A: TFDV Runner — Statistical validation using TensorFlow Data Validation for schema inference, statistics generation, and anomaly detection
- Lane B: MDPT Runner — Domain-specific validation using Medical Data Pecking Tests (context-aware, reference-grounded unit tests)
When the DQV Agent completes validation, it produces:
- Gate Decision:
PASS,WARN, orFAIL— determines whether the dataset proceeds downstream - Validation Report: Machine-readable (JSON) and human-readable (Markdown) reports with findings
- Structured Artifacts: Schema files, statistics, anomaly reports, and test results
- Execution Logs: Telemetry events with trace IDs, run IDs, and idempotency keys for observability
This repository extends Nimblemind's agentic workflow by adding a Data Quality Validation gate before downstream processing. The DQV Agent prevents garbage-in-garbage-out scenarios by validating data quality before it reaches feature extraction, model-data matching, preprocessing recommendation, and model inference stages.
This implementation is grounded in two seminal research papers that define best practices for ML data validation:
1. Medical Data Pecking: A Context-Aware Approach for Automated Quality Evaluation of Structured Medical Data
Authors: Irena Girshovitz, Atai Ambus, Moni Shahar, and Ran Gilad-Bachrach
Official Links:
- arXiv (Abstract/Summary): https://arxiv.org/abs/2507.02628
- arXiv (PDF): https://arxiv.org/pdf/2507.02628
- GitHub (Code): MDPT Repository
What We Implemented: The MDPT paper introduces a framework for LLM-driven, reference-grounded unit testing of medical datasets. This repo implements Lane B (MDPT Runner), which executes curated test packs (based on MDPT methodology) to validate context-specific data quality (e.g., diagnosis prevalence, measurement distributions, demographic consistency).
Key Concepts Used:
- Context-aware unit testing (diagnosis + region + vocabulary)
- External reference grounding (Bing API + medical concept search)
- Test coverage at multiple levels (diagnosis, measurements, drugs)
- DFtest execution framework for declarative data tests
Authors: Neoklis Polyzotis, Martin Zinkevich, Sudip Roy, Eric Breck, and Steven Whang
Official Links:
- MLSys (PDF): https://mlsys.org/Conferences/2019/doc/2019/167.pdf
- MLSys (Abstract): https://proceedings.mlsys.org/paper_files/paper/2019/hash/928f1160e52192e3e0017fb63ab65391-Abstract.html
- Google Research: https://research.google/pubs/data-validation-for-machine-learning/
- TensorFlow Data Validation (Related open-source library): https://github.com/tensorflow/data-validation
What We Implemented: The Google paper introduces TFDV, a production-grade system for continuous data validation at scale. This repo implements Lane A (TFDV Runner), which performs schema validation, statistical profiling, and anomaly detection using TensorFlow Data Validation.
Key Concepts Used:
- Schema inference and validation
- Distribution-based anomaly detection
- Feature statistics generation (mean, min, max, missing rate, unique count)
- Drift detection (optional, via baseline statistics)
- Categorical vs numerical type validation
- Dataset-level validation: Validate structure, schema, and distributions across tables
- Schema & type checks: Ensure expected columns exist with correct data types
- Missingness checks: Detect excessive missing values at column and row levels
- Anomaly detection: Identify statistical outliers, schema violations, and drift
- Domain-specific tests: Execute context-aware medical data unit tests (MDPT)
- Report generation: Produce machine-readable and human-readable validation reports
- Gate decision: Return
PASS,WARN, orFAILbased on configurable policy - Integration hooks: Structured inputs/outputs for orchestrator integration
- Artifact persistence: Store all outputs in deterministic, versioned artifact paths
- Idempotency: Cache results using content-based fingerprinting
- Telemetry: Emit OpenTelemetry-compatible events for distributed tracing
- Model training: Performed by downstream ML agents (not DQV's responsibility)
- Model inference: Handled by Model Inference Agent
- Privacy anonymization: Performed upstream by Data Anonymizer Agent (DQV assumes data is already anonymized or safe to process)
- Feature extraction: Performed downstream by Feature Extraction Agent
- Preprocessing recommendation/implementation: Handled by Preprocessing Recommender and Implementor Agents
- Model-data matching: Performed downstream by Model-Data Matcher Agent
- File type detection: Handled upstream by Ingestion Identifier Agent
Design Philosophy: DQV is a quality gate, not a transformation agent. It validates and reports but does not modify data.
- Docker (recommended) or Python 3.11+
- Git LFS (if using large test datasets)
- Azure/Bing API keys (optional, for Lane B generated pack mode)
docker build -t dqv:dev .# Windows PowerShell
docker run --rm `
-v "${PWD}/data:/app/data" `
-v "${PWD}/reports:/app/reports" `
-e DQV_MDPT_MODE=curated_pack `
dqv:dev python scripts/run_e2e_all_datasets.py
# Linux/Mac (bash)
docker run --rm \
-v "$(pwd)/data:/app/data" \
-v "$(pwd)/reports:/app/reports" \
-e DQV_MDPT_MODE=curated_pack \
dqv:dev python scripts/run_e2e_all_datasets.py# View latest run report
cat reports/e2e_runs/$(ls -t reports/e2e_runs/ | head -1)/index.mdExpected Outputs:
dqv_gate.json— Gate decision (PASS,WARN,FAIL)execution_log.jsonl— Structured execution logtfdv_stats.pbtxt,tfdv_schema.pbtxt,tfdv_anomalies.pbtxt— Lane A artifactstest_results_diagnoses.json,test_results_drugs.json,test_results_measurements.json— Lane B artifacts
nimblemind-agents/
├── README.md # This file
├── docs/
│ ├── design.md # System architecture with Mermaid diagrams
│ ├── agents/
│ │ ├── dqv.md # Full DQV agent specification
│ │ └── AGENT_STANDARDS.md # Cross-agent requirements (must-haves)
│ ├── evaluation/
│ │ └── dqv_evaluation.md # Evaluation plan and metrics
│ └── critique.md # Limitations and future improvements
├── src/
│ ├── contracts/ # Pydantic v2 boundary models
│ │ ├── dqv_models.py # DQVRequest, DQVSuccessResponse, etc.
│ │ ├── tfdv_models.py # Lane A models
│ │ ├── mdpt_models.py # Lane B models
│ │ └── common_models.py # Shared enums (GateDecision, LaneStatus)
│ ├── protocols/ # Protocols/ABCs for dependency inversion
│ ├── observability/ # Telemetry, logging, execution log writer
│ ├── dqv_api_interface_layer/ # Public API (DQVServiceImpl.validate())
│ ├── dqv_core_engine/ # Core orchestration (CoreEngine)
│ │ ├── engine.py # Runs both lanes, aggregates results
│ │ ├── policy.py # PolicyEngine (gating rules)
│ │ └── fingerprint.py # Idempotency key computation
│ ├── lane_a_tfdv_runner/ # TFDV wrapper (stats, schema, anomalies)
│ ├── lane_b_mdpt_dftest_runner/ # MDPT wrapper (curated/generated packs)
│ ├── data_access_adapters/ # CSV/Parquet/JSONL readers
│ ├── artifact_store_adapter/ # Local filesystem artifact persistence
│ └── cache_result_registry/ # In-memory cache for idempotency
├── tests/
│ ├── unit/ # Unit tests per component
│ ├── contract/ # Boundary/contract tests
│ ├── integration/ # Multi-component integration tests
│ ├── e2e/ # End-to-end workflow tests
│ └── fixtures/ # Test data generators
├── scripts/
│ ├── run_e2e_all_datasets.py # Full E2E test runner
│ ├── E2E_README.md # E2E test documentation
│ └── generate_synthea_sample.py # Generate synthetic test datasets
├── data/ # Input datasets (mounted in Docker)
├── reports/ # Output artifacts (mounted in Docker)
├── mdpt-main/ # MDPT research code (submodule/vendored)
├── Dockerfile # Production Docker image
├── Makefile # Development shortcuts
├── pyproject.toml # Python project config
└── .env.example # Environment variable template
The DQV Agent sits after data ingestion and anonymization but before downstream processing:
flowchart LR
A[Ingestion Identifier Agent] --> B[Data Anonymizer Agent]
B --> C[**DQV Agent**]
C -->|PASS/WARN| D[Feature Extraction Agent]
C -->|FAIL| STOP[❌ Pipeline Stops]
D --> E[Model-Data Matcher Agent]
E --> F[Preprocessing Recommender Agent]
F --> G[Preprocessing Implementor Agent]
G --> H[Model Inference Agent]
style C fill:#ff6b6b,stroke:#c92a2a,stroke-width:3px,color:#fff
style STOP fill:#f03e3e,stroke:#c92a2a,stroke-width:2px,color:#fff
- Ingestion Identifier Agent: Detects file types (CSV, Parquet, JSONL)
- Data Anonymizer Agent: Removes PII/PHI before DQV processes data
- Feature Extraction Agent: Relies on validated schema and clean data
- Model-Data Matcher Agent: Uses header matching (DQV ensures headers are semantically meaningful)
- PASS: Dataset proceeds to Feature Extraction Agent
- WARN: Dataset proceeds with warnings logged (user decides policy)
- FAIL: Pipeline stops; dataset is rejected (prevents garbage-in-garbage-out)
The DQV Agent follows a layered architecture with clear separation of concerns:
flowchart TD
API[DQV API Interface Layer<br/>DQVServiceImpl] --> Engine[DQV Core Engine<br/>CoreEngine]
Engine --> LaneA[Lane A: TFDV Runner<br/>TFDVRunnerImpl]
Engine --> LaneB[Lane B: MDPT Runner<br/>MDPTRunnerImpl]
LaneA --> Readers[Data Access Adapters<br/>read_dataset]
LaneB --> Readers
LaneA --> TFDVLib[tensorflow-data-validation]
LaneB --> MDPTLib[mdpt + dftest]
Engine --> Policy[Policy Engine<br/>compute_gate]
Engine --> Artifacts[Artifact Store<br/>LocalArtifactStore]
Engine --> Cache[Cache Registry<br/>InMemoryCacheRegistry]
API --> Telemetry[Observability Layer<br/>TelemetryImpl]
style Engine fill:#4c6ef5,stroke:#364fc7,stroke-width:2px,color:#fff
style Policy fill:#fcc419,stroke:#f59f00,stroke-width:2px
| Component | Responsibility |
|---|---|
DQVServiceImpl |
Public API; handles caching, validation, error handling |
CoreEngine |
Orchestrates Lane A + Lane B execution; aggregates results |
PolicyEngine |
Applies gating rules; computes gate decision (PASS/WARN/FAIL) |
TFDVRunnerImpl |
Wraps TFDV library; generates stats, infers schema, detects anomalies |
MDPTRunnerImpl |
Wraps MDPT framework; executes curated/generated test packs |
LocalArtifactStore |
Persists artifacts to filesystem with deterministic paths |
InMemoryCacheRegistry |
Caches results by idempotency key (prevents redundant runs) |
TelemetryImpl |
Emits OpenTelemetry events (fail-safe logging) |
ExecutionLogWriter |
Writes structured JSONL logs for audit trail |
Required Fields:
{
"trace_id": str, # Distributed tracing ID (propagated from orchestrator)
"dataset_ref": {
"tables": [ # List of table references
{
"name": str, # e.g., "patients"
"uri": str, # e.g., "file:///app/data/patients.csv"
"format": str # "csv" | "parquet" | "jsonl"
}
],
"content_hash": str | null # SHA256 hash (for idempotency)
}
}Optional Configuration:
{
"tfdv_config": {
"enabled": bool, # Default: true
"schema_uri": str | null, # Use existing schema (or infer)
"infer_schema": bool, # Default: true
"enable_drift_detection": bool, # Default: false
"baseline_stats_uri": str | null # For drift detection
},
"mdpt_config": {
"enabled": bool, # Default: true
"mode": "curated_pack" | "generated_pack",
"pack_id": str | null, # ID of curated pack (e.g., "default")
"pack_version": str | null
},
"policy_config": {
"warn_threshold": float, # Default: 0.4
"fail_threshold": float, # Default: 0.7
"shadow_mode": bool, # Default: false (run without gating)
"elevate_lane_b_to_fail": bool # Default: false (treat Lane B failures as FAIL)
}
}Example Request:
{
"trace_id": "trace-abc123",
"dataset_ref": {
"tables": [
{
"name": "patients",
"uri": "file:///app/data/synthea_example/patients.csv",
"format": "csv"
},
{
"name": "observations",
"uri": "file:///app/data/synthea_example/observations.csv",
"format": "csv"
}
]
},
"tfdv_config": {
"enabled": true,
"infer_schema": true
},
"mdpt_config": {
"enabled": true,
"mode": "curated_pack",
"pack_id": "default"
},
"policy_config": {
"elevate_lane_b_to_fail": false
}
}Structure:
{
"trace_id": str, # From request
"run_id": str, # Unique run ID (8-char UUID)
"idempotency_key": str, # Computed from request (SHA256)
"timestamp": datetime,
"gate": "PASS" | "WARN" | "FAIL", # Overall gate decision
"quality_score": float | null, # Aggregate score (0.0-1.0)
"lane_a_result": {
"lane_name": "lane_a",
"status": "PASS" | "WARN" | "FAIL" | "SKIPPED" | "ERROR",
"score": float | null,
"artifact_uris": {
"schema": str, # file:///.../tfdv_schema.pbtxt
"statistics": str,
"anomalies": str
},
"message": str,
"latency_ms": int
},
"lane_b_result": {
"lane_name": "lane_b",
"status": "PASS" | "WARN" | "FAIL" | "SKIPPED" | "ERROR",
"score": float | null,
"artifact_uris": {
"diagnoses": str, # file:///.../test_results_diagnoses.json
"measurements": str,
"drugs": str
},
"findings": [ # Top 10 findings (PII-safe)
{
"name": str,
"passed": bool,
"expected": str,
"actual": str,
"message": str
}
],
"message": str,
"latency_ms": int
},
"artifacts": {
"dqv_gate": str, # file:///.../dqv_gate.json
"tfdv_schema": str,
"tfdv_stats": str,
"tfdv_anomalies": str,
"mdpt_results": [str]
},
"total_latency_ms": int,
"cached": bool # True if result from cache
}Example Response (PASS):
{
"trace_id": "trace-abc123",
"run_id": "9f44bbab",
"idempotency_key": "7e843342fbe88f989379b98b6843e014",
"gate": "PASS",
"quality_score": 1.0,
"lane_a_result": {
"status": "PASS",
"message": "No anomalies detected",
"latency_ms": 113
},
"lane_b_result": {
"status": "PASS",
"message": "Ran 5 tests from pack default",
"latency_ms": 326
},
"total_latency_ms": 444,
"cached": false
}Example Response (FAIL):
{
"trace_id": "trace-fail-001",
"run_id": "a1b2c3d4",
"idempotency_key": "...",
"gate": "FAIL",
"quality_score": 0.3,
"lane_a_result": {
"status": "FAIL",
"message": "Found 5 critical anomalies",
"latency_ms": 250
},
"lane_b_result": {
"status": "ERROR",
"message": "MDPT error: Subprocess exited with code 1",
"latency_ms": 100
},
"total_latency_ms": 350,
"cached": false
}Category: Schema & Type Validation
| Check | What It Detects | Why It Matters | Severity | Remediation |
|---|---|---|---|---|
| Column presence | Missing expected columns | Downstream feature extractors expect specific columns | ERROR | Add missing column or update schema |
| Type consistency | Mismatched dtypes (e.g., string where numeric expected) | Type errors crash ML pipelines | ERROR | Cast to correct type or drop rows |
| Unique key constraint | Duplicate IDs in patient/observation tables | Violates data model assumptions | ERROR | Deduplicate or investigate source |
Category: Distribution & Anomaly Detection
| Check | What It Detects | Why It Matters | Severity | Remediation |
|---|---|---|---|---|
| Numerical range | Values outside expected min/max (e.g., age = -5) | Indicates data corruption or entry errors | WARNING | Clamp values or investigate source |
| Constant columns | Columns with zero variance (all same value) | Useless for ML (no signal) | WARNING | Drop column |
| Missing rate | High percentage of NULL values in column | May require imputation or affect model performance | WARNING | Impute, drop column, or accept risk |
| Categorical cardinality | Unexpected number of unique values | May indicate data entry errors or encoding issues | WARNING | Consolidate categories |
Category: Drift Detection (Optional)
| Check | What It Detects | Why It Matters | Severity | Remediation |
|---|---|---|---|---|
| Distribution drift | Statistical shift vs baseline (KL divergence) | Model trained on different distribution may fail | WARNING | Retrain model or investigate shift |
Category: Diagnosis & Demographics
| Check | What It Detects | Why It Matters | Severity | Remediation |
|---|---|---|---|---|
| Demographics not empty | Zero rows in patient table | Dataset is unusable | ERROR | Fix data source |
| Person ID column exists | Missing primary key column | Cannot join tables | ERROR | Add column or update mapper |
| Gender column exists | Missing gender field | Required for demographic stratification | WARNING | Add column or accept limitation |
| Diagnosis prevalence in range | Prevalence far from reference (e.g., diabetes = 50%) | Data may be biased or incorrectly coded | WARNING | Investigate coding or sampling bias |
Category: Measurements & Labs
| Check | What It Detects | Why It Matters | Severity | Remediation |
|---|---|---|---|---|
| Measurement not empty | Zero lab/measurement records | Cannot validate clinical logic | WARNING | Investigate data availability |
| Measurement value distribution | Mean/median outside reference range | May indicate unit conversion errors or data quality issues | WARNING | Verify units and reference ranges |
| Measurement code coverage | Expected LOINC codes missing | Incomplete data for condition | WARNING | Investigate coding gaps |
Category: Medications & Drugs
| Check | What It Detects | Why It Matters | Severity | Remediation |
|---|---|---|---|---|
| Drug not empty | Zero medication records | Cannot validate treatment patterns | WARNING | Investigate data availability |
| Drug code coverage | Expected RxNorm codes missing | Incomplete treatment data | WARNING | Investigate coding gaps |
All artifacts are written to a deterministic path based on the idempotency key:
artifacts/{trace_id}/{idempotency_key}/
├── dqv_gate.json # Gate decision + rationale + quality score
├── execution_log.jsonl # Structured execution log (audit trail)
├── tfdv_schema.pbtxt # Lane A: Inferred/validated schema
├── tfdv_stats.pbtxt # Lane A: Feature statistics
├── tfdv_anomalies.pbtxt # Lane A: Detected anomalies
├── test_results_diagnoses.json # Lane B: Diagnosis tests
├── test_results_measurements.json # Lane B: Measurement tests
├── test_results_drugs.json # Lane B: Drug tests
├── mdpt_stdout.log # Lane B: Subprocess stdout
└── mdpt_stderr.log # Lane B: Subprocess stderr
| File | Format | Description | Consumer |
|---|---|---|---|
dqv_gate.json |
JSON | Final gate decision, rationale, quality score | Orchestrator (decides next step) |
execution_log.jsonl |
JSONL | Event log (dqv.started, dqv.engine.completed, dqv.cache_hit) | Observability/audit systems |
tfdv_schema.pbtxt |
Protocol Buffer | Schema definition (column names, types, domains) | Downstream Feature Extraction Agent |
tfdv_stats.pbtxt |
Protocol Buffer | Feature statistics (min, max, mean, missing%, unique count) | Data profiling dashboards |
tfdv_anomalies.pbtxt |
Protocol Buffer | Detected anomalies (severity, description, affected columns) | Data quality reports |
test_results_diagnoses.json |
JSON | MDPT test results for demographics + diagnoses | Clinical validation reports |
test_results_measurements.json |
JSON | MDPT test results for labs/measurements | Clinical validation reports |
test_results_drugs.json |
JSON | MDPT test results for medications | Clinical validation reports |
Input Dataset: synthea_example (10 patients, 20 observations, 15 medications)
Execution Log (execution_log.jsonl):
{"timestamp": "2026-01-11T07:45:49.049416+00:00", "event_type": "dqv.started", "trace_id": "e2e-synthea_example-20260111_074542", "run_id": "b70e639b", "component": "dqv_api_interface_layer", "status": "started", "latency_ms": 0, "idempotency_key": ""}
{"timestamp": "2026-01-11T07:45:49.049599+00:00", "event_type": "dqv.engine.started", "trace_id": "e2e-synthea_example-20260111_074542", "run_id": "b70e639b", "component": "dqv_core_engine", "status": "ok", "latency_ms": 0, "idempotency_key": "ebbb0c28bb35723937c9f88e9b23adb4"}
{"timestamp": "2026-01-11T07:45:49.493000+00:00", "event_type": "dqv.engine.completed", "trace_id": "e2e-synthea_example-20260111_074542", "run_id": "b70e639b", "component": "dqv_core_engine", "status": "ok", "latency_ms": 443, "idempotency_key": "ebbb0c28bb35723937c9f88e9b23adb4", "metadata": {"gate": "PASS", "rationale": "All validations passed"}}
{"timestamp": "2026-01-11T07:45:49.493140+00:00", "event_type": "dqv.completed", "trace_id": "e2e-synthea_example-20260111_074542", "run_id": "b70e639b", "component": "dqv_api_interface_layer", "status": "ok", "latency_ms": 443, "idempotency_key": "ebbb0c28bb35723937c9f88e9b23adb4"}Gate Decision (dqv_gate.json):
{
"gate": "PASS",
"rationale": "All validations passed",
"quality_score": 1.0
}MDPT Test Results (test_results_diagnoses.json):
{
"tests": [
{
"name": "demographics_not_empty",
"passed": true,
"expected": "> 0 rows",
"actual": "10 rows",
"message": "Demographics data present"
},
{
"name": "demographics_has_person_id",
"passed": true,
"expected": "person_id column",
"actual": "present",
"message": "person_id column exists"
},
{
"name": "demographics_has_gender",
"passed": true,
"expected": "gender_source_value column",
"actual": "present",
"message": "Gender column exists"
}
]
}Outcome: Dataset proceeds to Feature Extraction Agent.
Input Dataset: syntheticmass_real (empty CSV files - Git LFS pointers)
Gate Decision (dqv_gate.json):
{
"gate": "WARN",
"rationale": "Lane B (MDPT) encountered an error",
"quality_score": 1.0
}MDPT Error Log (mdpt_stderr.log):
Traceback (most recent call last):
File "<string>", line 7, in <module>
...
pandas.errors.EmptyDataError: No columns to parse from file
Outcome: Dataset proceeds with warning logged (user can decide to investigate or continue).
Decision: DQV runs after anonymization but before feature extraction.
Rationale:
- Header matching dependency: The Model-Data Matcher Agent (downstream) relies on semantically meaningful column headers. If DQV detects corrupted headers or schema issues, it prevents garbage from corrupting the embedding-based matching.
- Fail-fast principle: Validating schema/types early prevents cascading failures in downstream agents.
- Privacy-safe validation: By running after the Data Anonymizer Agent, DQV never sees raw PII/PHI.
Reference:
"The Model-Data Matcher uses embedding-based similarity to match dataset headers to model expectations. Schema corruption detected by DQV prevents false matches." — [Design Trade-offs in Medical ML Pipelines]
Decision: Use both statistical validation (Lane A) and domain-specific validation (Lane B).
Rationale:
- Complementary strengths:
- TFDV excels at schema/type/distribution validation (domain-agnostic)
- MDPT excels at context-aware medical validation (diagnosis + region + vocabulary)
- Defense in depth: TFDV catches structural issues; MDPT catches semantic/clinical issues.
- Graceful degradation: If Lane B fails (e.g., pack not found), Lane A still provides value.
Reference:
"TFDV validates schema and distributions at scale, while MDPT provides context-grounded unit tests for medical datasets." — [Data Validation for ML + MDPT papers]
Decision: Use TFDV's built-in anomaly detection (schema validation + distribution checks).
Why not Isolation Forest / Robust Z-Score?
- Production-proven: TFDV is used at Google scale (billions of examples/day).
- Interpretability: TFDV anomalies are actionable (e.g., "GENDER has unexpected value 'X'").
- Maintenance burden: Custom anomaly models require ongoing tuning and version management.
Trade-off: TFDV's rule-based approach may miss subtle outliers that ML-based methods (Isolation Forest) would catch. Future improvement: hybrid approach.
Reference:
"TFDV uses schema-based validation and distribution-based anomaly detection, proven at scale in production ML systems." — [Data Validation for ML, Section 3.2]
Decision: DQV logs only aggregates and samples (not raw patient data).
Implementation:
- Telemetry: Only emits counts, latencies, and error codes (no column values).
- Artifacts: TFDV stats contain aggregates (min, max, mean) but not individual rows.
- MDPT results: Test results contain expected vs actual counts/distributions, not patient IDs.
Rationale:
- HIPAA compliance: Even anonymized data should minimize exposure in logs.
- Audit trail: Execution logs are sufficient for debugging without raw data.
Reference:
"Data validation systems must balance observability with privacy, logging aggregates rather than individual records." — [HIPAA Safe Harbor Guidelines]
All builds and tests run in Linux Docker for reproducibility:
make build # Build Docker image
make test # Run tests with coverage (≥90%)
make lint # Run ruff linter
make typecheck # Run mypy --strict
make all # Run all checks (lint + typecheck + test)
make shell # Interactive shellFor quick iteration (Docker is still canonical):
# Install dependencies
pip install -e ".[dev]"
# Run tests
pytest -q --cov=src --cov-fail-under=90
# Type checking
mypy --strict src
# Linting
ruff check .External tests require Azure/Bing API keys:
- Copy
.env.exampleto.env - Fill in your API keys
- Run:
make test-external
| Test Type | Coverage | Description |
|---|---|---|
| Unit tests | 90%+ | Component-level tests (engine, runners, policy, cache) |
| Contract tests | 100% | Pydantic model validation (all boundary contracts) |
| Integration tests | 80%+ | Multi-component tests (service + engine + runners) |
| E2E tests | 100% | Full workflow tests (request → response → artifacts) |
# All tests
make test
# Unit tests only
pytest tests/unit/ -q
# E2E tests only
pytest tests/e2e/ -q
# External tests (requires API keys)
pytest -m external tests/external/ -q- Rule-based thresholds: Policy engine uses static thresholds (not learned from historical failures).
- No feedback loop: DQV doesn't learn which checks predict downstream failures.
- Limited drift detection: Baseline statistics must be manually provided.
- Subprocess isolation overhead: Lane B uses subprocess execution (adds ~100-300ms latency).
- Adaptive thresholds: Learn optimal thresholds from historical gate decisions + downstream outcomes.
- Feedback-aware validation: Track which DQV warnings correlated with model failures (adjust severity).
- Automated baseline generation: Auto-generate baseline statistics from initial production runs.
- Streaming validation: Support incremental validation for large datasets (avoid loading all into memory).
Reference:
"Future work includes learning validation thresholds from historical pipeline outcomes, moving from rule-based to feedback-driven policies." — [Data Validation for ML, Section 6: Future Work]
MIT License
Copyright (c) 2024 Nimblemind
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-
Medical Data Pecking: A Context-Aware Approach for Automated Quality Evaluation of Structured Medical Data Girshovitz et al., 2025 https://arxiv.org/abs/2507.02628
-
Data Validation for Machine Learning Polyzotis et al., MLSys 2019 https://mlsys.org/Conferences/2019/doc/2019/167.pdf
-
TensorFlow Data Validation (TFDV) https://github.com/tensorflow/data-validation
-
DFtest: Declarative Data Testing Framework https://pypi.org/project/dftest/