diff --git a/docs/Config_and_Macros.md b/docs/Config_and_Macros.md index 42608a4..88e7757 100644 --- a/docs/Config_and_Macros.md +++ b/docs/Config_and_Macros.md @@ -56,9 +56,8 @@ FastFlowTransform discovers models under `/models/` with two primary fl create or replace table users as select id, email from {{ source('crm', 'users') }}; -```` +``` - ### 1.2 Python models (`*.ff.py`) Use the `@model` decorator from `fastflowtransform.core` to register a callable. The decorator accepts: @@ -144,7 +143,7 @@ Allowed values are case-insensitive strings or tuples. If the engine does not ma ```yaml # sources.yml -version: 2 +version: 1 sources: - name: crm diff --git a/docs/Contracts.md b/docs/Contracts.md new file mode 100644 index 0000000..d2df549 --- /dev/null +++ b/docs/Contracts.md @@ -0,0 +1,621 @@ +# Contracts + +FastFlowTransform supports **data contracts**: declarative expectations about your +tables and columns. Contracts are stored in YAML files and are compiled into +normal `fft test` checks. + +You get: + +- A place to describe the **intended schema** (types, nullability, enums, etc.) +- Automatic **data-quality tests** derived from those contracts +- Optional checks for the **physical DB data type** (per engine) + +Contracts live in two places: + +- Per-table: `models/**/.contracts.yml` +- Project-level defaults: `contracts.yml` at the project root + + +--- + +## Per-table contracts (`*.contracts.yml`) + +For each logical table you can create a `*.contracts.yml` file under `models/`. + +**Convention** + +- File name: ends with `.contracts.yml` +- Location: anywhere under `models/` +- Each file describes **exactly one table** + +Example: + +```yaml +# models/staging/customers.contracts.yml +version: 1 +table: customers + +columns: + customer_id: + type: integer + physical: + duckdb: BIGINT + postgres: integer + bigquery: INT64 + snowflake_snowpark: NUMBER + databricks_spark: BIGINT + nullable: false + unique: true + + name: + type: string + nullable: false + + status: + type: string + nullable: false + enum: + - active + - inactive + + created_at: + type: timestamp + nullable: false +```` + +The `table` name should match the logical relation name you use in your models +(e.g. `relation_for("customers")`). + +--- + +## Column attributes + +Each entry under `columns:` is a **column contract**. + +Supported attributes: + +```yaml +columns: + some_column: + type: string # optional semantic type + physical: # optional physical DB type(s) + duckdb: VARCHAR + postgres: text + nullable: false # nullability contract + unique: true # uniqueness contract + enum: [a, b, c] # allowed values + regex: "^[A-Z]{2}[0-9]{4}$" # regex pattern + min: 0 # numeric min (inclusive) + max: 100 # numeric max (inclusive) + description: "Human note" # free-form description +``` + +### `type` (semantic type) + +Free-form semantic type hint, things like: + +* `integer` +* `string` +* `timestamp` +* `boolean` +* … + +Right now this is **documentation / intent only**; it does not generate tests by itself. +Use it to communicate intent and align with your physical types. + +--- + +### `physical` (engine-specific physical DB type) + +`physical` describes the **actual DB type** of the column, per engine. + +There are two forms: + +**1) Shorthand string** + +```yaml +physical: BIGINT +``` + +This is interpreted as: + +```yaml +physical: + default: BIGINT +``` + +**2) Per-engine mapping** + +```yaml +physical: + default: BIGINT # fallback if no engine-specific key is set + duckdb: BIGINT + postgres: integer + bigquery: INT64 + snowflake_snowpark: NUMBER + databricks_spark: BIGINT +``` + +Supported keys: + +| Key | Engine / executor | +| -------------------- | --------------------------- | +| `default` | Fallback for all engines | +| `duckdb` | DuckDB executor | +| `postgres` | Postgres executor | +| `bigquery` | BigQuery executors | +| `snowflake_snowpark` | Snowflake Snowpark executor | +| `databricks_spark` | Databricks / Spark executor | + +> **Important** +> +> The value here must match what your warehouse reports in its catalog / +> information schema for that column (e.g. `INT64` in BigQuery, `NUMBER` in +> Snowflake, etc.). + +Each `physical` contract is turned into a `column_physical_type` test. +If the engine does not yet support physical type introspection, the test will +fail with a clear “engine not yet supported” message instead of silently +passing. + +### Engine-canonical type names + +Physical type comparisons use the **canonical type strings reported by the engine**. + +That means: + +* Some engines expose aliases as canonical names in their catalogs. + + * Example (Postgres): + + * `timestamp` is an alias for `timestamp without time zone` + * `timestamptz` is an alias for `timestamp with time zone` +* FFT compares types after **engine-specific canonicalization**, so contracts can use common names like `timestamp`/`timestamptz` while still matching what Postgres reports. + +If you see a mismatch like: + +> expected `timestamp`, got `timestamp without time zone` + +it means your Postgres executor/runtime is not canonicalizing types yet (or you’re using raw `information_schema.data_type`). In that case, update Postgres type introspection to use `pg_catalog.format_type(...)` so comparisons are consistent. + +--- + +### `nullable` + +```yaml +nullable: false +``` + +* `nullable: false` → generates a `not_null` test for this column. +* `nullable: true` or omitted → no nullability test. + +--- + +### `unique` + +```yaml +unique: true +``` + +* `unique: true` → generates a `unique` test for this column. +* `unique: false` or omitted → no uniqueness test. + +--- + +### `enum` + +```yaml +enum: + - active + - inactive + - pending +``` + +`enum` defines a finite set of allowed values and generates an +`accepted_values` test. + +You can also use a single scalar: + +```yaml +enum: active +``` + +which is treated as `["active"]`. + +--- + +### `regex` + +```yaml +regex: "^[^@]+@[^@]+$" +``` + +`regex` defines a pattern that all non-null values must match. It generates a +`regex_match` test. + +--- + +### `min` / `max` + +```yaml +min: 0 +max: 100 +``` + +`min` and `max` define an inclusive numeric range and generate a `between` test. + +You can specify just one side: + +```yaml +min: 0 # only lower bound +# or +max: 100 # only upper bound +``` + +--- + +### `description` + +```yaml +description: "Customer signup timestamp in UTC" +``` + +Free-form description field. This does not generate tests; it’s for docs / +tooling. + +--- + +## Project-level contracts (`contracts.yml`) + +You can define **project-wide defaults** in a single `contracts.yml` file at +the project root. + +This file only defines **defaults**, not concrete tables. + +Example: + +```yaml +# contracts.yml +version: 1 + +defaults: + columns: + # All *_id columns are non-null integers with engine-specific types + - match: + name: ".*_id$" + type: integer + nullable: false + physical: + duckdb: BIGINT + postgres: integer + bigquery: INT64 + + # created_at should always be a non-null timestamp + - match: + name: "^created_at$" + type: timestamp + nullable: false +``` + +### `contracts.yml` enforcement configuration + +Example: + +```yaml +version: 1 + +defaults: + columns: + - match: + name: ".*_id$" + type: integer + nullable: false + +enforcement: + # Modes: off | verify | cast + default_mode: off + + # If true, contract enforcement only cares about declared columns. + # Extra columns produced by the model are allowed. + allow_extra_columns: true + + # Optional per-table overrides (by logical relation name) + tables: + mart_users_by_domain: + mode: verify + allow_extra_columns: true + + mart_latest_signup: + mode: cast + allow_extra_columns: true +``` + +Rules: + +* `enforcement.default_mode` applies to all tables unless overridden. +* `enforcement.tables.
.mode` overrides the default for a single table. +* `allow_extra_columns` controls whether the model output may contain columns not listed in the contract: + + * `true`: extra columns are ignored by enforcement (but still exist in the table) + * `false`: extra columns fail enforcement + + +### Column match rules + +Each entry under `defaults.columns` is a **column default rule**: + +```yaml +- match: + name: "regex on column name" # required + table: "regex on table name" # optional + type: ... + physical: ... + nullable: ... + unique: ... + enum: ... + regex: ... + min: ... + max: ... + description: ... +``` + +* `match.name` + Required **regex** applied to the column name. + +* `match.table` + Optional **regex** applied to the table name. + +All the other fields are the same as in `*.contracts.yml`. They act as +**defaults**. + +### How defaults are applied + +For each column contract from a per-table file: + +1. All `defaults.columns` rules are evaluated **in file order**. +2. A rule applies if both: + + * `match.name` matches the column name, and + * `match.table` is empty or matches the table name. +3. For every applicable rule: + + * Fields that are currently `null` / unset on the column are **filled** from + the rule. + * Fields that are already set on the column are **not overridden**. + +**Per-table contracts always win.** +Defaults only fill in missing values. + +Example: + +```yaml +# contracts.yml +defaults: + columns: + - match: + name: ".*_id$" + nullable: false + physical: BIGINT +``` + +```yaml +# models/orders.contracts.yml +version: 1 +table: orders +columns: + customer_id: + # nullable unspecified → inherited as false from defaults + physical: + duckdb: BIGINT + postgres: integer # overrides default +``` + +Effective contract for `orders.customer_id`: + +```yaml +type: null +nullable: false # from defaults +physical: + duckdb: BIGINT # from per-table + postgres: integer # from per-table + default: BIGINT # from defaults.physical (other engines) +unique: null +... +``` + +--- + +## How contracts become tests + +Contracts are turned into regular `TestSpec` entries used by `fft test`. + +For each column: + +| Contract field | Generated test type | Notes | +| ----------------- | ---------------------- | ---------------------------- | +| `physical` | `column_physical_type` | Uses engine-specific mapping | +| `nullable: false` | `not_null` | | +| `unique: true` | `unique` | | +| `enum` | `accepted_values` | | +| `min` / `max` | `between` | inclusive range | +| `regex` | `regex_match` | Python regex | + +All contract-derived tests: + +* Use **severity** `error` by default (today) +* Receive the tag `contract` (so you can filter on them later) + +Example for `customers`: + +```yaml +# models/staging/customers.contracts.yml +version: 1 +table: customers +columns: + customer_id: + nullable: false + unique: true + physical: + duckdb: BIGINT + status: + enum: [active, inactive] +``` + +This yields tests roughly equivalent to: + +```text +customers.customer_id not_null (tags: contract) +customers.customer_id unique (tags: contract) +customers.customer_id column_physical_type (tags: contract) +customers.status accepted_values (tags: contract) +``` + +You don’t need to write those tests yourself; they’re derived automatically +from the contract files. + +### Runtime enforcement (optional) + +In addition to turning contracts into `fft test` checks, FastFlowTransform can **enforce** contracts **at runtime** while building models. + +Runtime enforcement means: + +* FFT can **verify** that the materialized table matches the contract schema, and fail the run if not. +* FFT can **cast** the model output into the declared physical types before creating the table. + +This is configured in **project-level `contracts.yml`** under `enforcement`. + +#### Enforcement modes + +Contracts enforcement supports three modes: + +* `off` + Do not enforce at build time. (Contracts may still generate tests.) + +* `verify` + Build the table normally, then verify the physical schema matches the contract. + +* `cast` + Build the table by selecting from your model and **casting** contract columns into their declared physical types, then verify. + +> `cast` is useful when your warehouse would infer “close but not exact” types (e.g. `COUNT(*)` becoming a sized numeric type) and you want stable physical types across engines. + +### Failure messages + +If enforcement fails, FFT raises an error like: + +* Missing/extra columns +* Type mismatch (expected vs actual physical type) +* Non-null/unique contract failures (if those are enforced at runtime in your setup) + +The error includes the table name and a list of mismatches. + +### Enforcement with incremental models + +When a model is materialized as `incremental`, FFT applies enforcement to the **incremental write path**, not only full refresh. + +Typical behavior: + +* On the first run, the model creates the target relation (full refresh behavior) and enforcement is applied. +* On subsequent runs, FFT computes a delta dataset and writes it using the engine’s incremental strategy (insert/merge/delete+insert, etc.). +* Enforcement is applied so the target table remains compatible with the contract. + +Practical recommendations: + +* If the incremental model relies on `unique_key`, make sure your source change simulation does not introduce duplicated keys in the delta. +* For “update simulation” in demos, prefer a **second full seed file** that represents the entire source after the update (not just appended rows), then rerun incremental. This produces a realistic “source changed” scenario without creating duplicates. + +### Tests vs runtime enforcement + +Contracts can be used in two independent ways: + +1. **Tests** (`fft test`) + Contracts generate test specs like `not_null`, `unique`, `accepted_values`, `regex_match`, and `column_physical_type`. + +2. **Runtime enforcement** (`fft run`) + Enforcement runs during model materialization and can fail the run early. + +You can use either one alone, or both together. + +### Enforcement for SQL models + +When enforcing contracts for a SQL model: + +* `verify` mode: + + 1. FFT creates the table/view normally from the model SQL + 2. FFT introspects the created object and compares the physical schema to the contract + +* `cast` mode: + + 1. FFT wraps the model SQL in a projection that casts the declared columns: + + ```sql + select + cast(col_a as ) as col_a, + cast(col_b as ) as col_b, + ... + -- optionally include extra columns if allow_extra_columns=true + from () as src + ``` + 2. FFT creates the table from that casted SELECT + 3. FFT verifies the resulting physical schema + +Notes: + +* Enforcement is best-effort: if a contract has no physical types for the current engine, `cast` mode cannot enforce and will fail with a clear error. +* `allow_extra_columns=true` means non-contracted columns are carried through unchanged. + +### Enforcement for Python models + +For Python models (pandas / Spark / Snowpark / BigFrames): + +* FFT first materializes the DataFrame result according to the executor. +* If enforcement is enabled, the runtime contracts layer may: + + * Stage the DataFrame into a temporary table (engine-specific) + * Re-create the target table using casts (`cast` mode) + * Or only verify the schema (`verify` mode) + +This allows a consistent enforcement mechanism even when the model result is not expressed as SQL. + +--- + +## Using contracts with `fft test` + +The high-level flow: + +1. You define `*.contracts.yml` under `models/` and, optionally, a root + `contracts.yml` with defaults. +2. `fft test` loads: + + * all per-table contracts + * project-level defaults +3. Contracts are expanded into test specs. +4. Tests are executed like any other `fft test` checks. + +If a contract file is malformed (YAML, duplicate keys, or schema), FFT raises a +friendly `ContractsConfigError` with a hint. The test run will fail until the +file is fixed, rather than silently skipping it. + +--- + +## Current limitations + +A few things contracts **do not** do yet: + +* Contracts **do not change DDL**: tables are still created with the types + inferred by the warehouse from your `SELECT`. +* `type` (semantic type) is not used to alter the schema; it is for intent / + documentation. +* Physical type checks require engine support: + + * Currently, only engines that can introspect their `INFORMATION_SCHEMA` + and expose that to FFT can fully enforce `column_physical_type`. + * Other engines may reject such tests with a clear “engine not supported” + message. + +### Current limitations + +* Enforcement behavior can differ by engine depending on what the executor can introspect and how it stages/casts data. +* `cast` mode requires explicit `physical` types for the current engine. +* Some warehouses expose “decorated” physical types (e.g. `VARCHAR(16777216)`, `NUMBER(18,0)`) rather than a short base type name. Contracts should match the canonical/normalized representation used by the engine implementation. diff --git a/docs/Quickstart.md b/docs/Quickstart.md index a06407f..6c61a43 100644 --- a/docs/Quickstart.md +++ b/docs/Quickstart.md @@ -18,7 +18,7 @@ The command is non-interactive, refuses to overwrite existing directories, and l python3 -m venv .venv . .venv/bin/activate # or source .venv/bin/activate pip install --upgrade pip -pip install -e . # run from the repo root; use `uv pip install --editable .` if you prefer uv +pip install fastflowtransform fft --help ``` @@ -26,20 +26,20 @@ Choose extras if you target other engines (combine as needed): ```bash # Postgres -pip install -e .[postgres] +pip install "fastflowtransform[postgres]" # BigQuery (pandas) or BigFrames -pip install -e .[bigquery] # pandas -pip install -e .[bigquery_bf] # BigFrames +pip install "fastflowtransform[bigquery]" # pandas +pip install "fastflowtransform[bigquery_bf]" # BigFrames # Databricks/Spark + Delta -pip install -e .[spark] +pip install "fastflowtransform[spark]" # Snowflake Snowpark -pip install -e .[snowflake] +pip install "fastflowtransform[snowflake]" # Everything -pip install -e .[full] +pip install "fastflowtransform[full]" ``` ## 2. Create project layout @@ -47,7 +47,7 @@ pip install -e .[full] ```bash mkdir -p demo/{models,seeds} cat <<'YAML' > demo/sources.yml -version: 2 +version: 1 sources: - name: raw @@ -106,7 +106,6 @@ You should see log lines similar to `✓ L01 [DUCK] users.ff`. The resulting tab ## 7. Next steps - Add `project.yml` for reusable `vars:` and metadata -- Explore `fft docs` to generate HTML documentation - Use engine profiles under `profiles.yml` to target Postgres, BigQuery, or Databricks (path-based sources supported via `format` + `location` overrides) - Render the DAG site for this project: `fft dag demo --env dev --html` (find it under `demo/site/dag/index.html`) diff --git a/docs/Source_Freshness.md b/docs/Source_Freshness.md index f441edc..baeb426 100644 --- a/docs/Source_Freshness.md +++ b/docs/Source_Freshness.md @@ -33,7 +33,7 @@ Freshness rules are attached to source tables in your metadata (conceptually alo A minimal example: ```yaml -version: 2 +version: 1 sources: - name: crm schema: raw diff --git a/docs/Sources.md b/docs/Sources.md index c67d827..f2433a2 100644 --- a/docs/Sources.md +++ b/docs/Sources.md @@ -11,14 +11,14 @@ project/ ├── models/ ├── sources.yml └── seeds/ -```` +``` ## YAML Schema (Version 2) FastFlowTransform expects a dbt-style structure: ```yaml -version: 2 +version: 1 sources: - name: raw schema: staging # default schema for this source group @@ -82,7 +82,7 @@ Engine-specific overrides follow this merge order: A typical analytics project mixes **seeded reference data**, **database tables**, and **lakehouse paths**. A single `sources.yml` might look like this: ```yaml -version: 2 +version: 1 sources: # Seeded reference data (CSV → tables) - name: ref diff --git a/docs/Technical_Overview.md b/docs/Technical_Overview.md index 0eba393..6fd9968 100644 --- a/docs/Technical_Overview.md +++ b/docs/Technical_Overview.md @@ -142,7 +142,7 @@ CLI (Typer) ├── Executors (executors/*) │ ├── BaseExecutor (SQL rendering, dependency loading, materialization, requires guard) │ ├── DuckExecutor (DuckDB) -│ ├── PostgresExecutor (SQLAlchemy, shims) +│ ├── PostgresExecutor (SQLAlchemy) │ ├── BigQueryExecutor (pandas) │ ├── BigQueryBFExecutor (BigQuery DataFrames / bigframes) │ ├── DatabricksSparkExecutor (PySpark, without pandas) @@ -256,7 +256,6 @@ class BaseExecutor(ABC): **Postgres (`postgres.py`)** -- `_SAConnShim` (compatible with `testing._exec`). - `run_sql` renders SQL and rewrites `CREATE OR REPLACE TABLE` to `DROP + CREATE AS`. - `_read_relation` uses pandas, handles schemas, and provides clear guidance. - `_materialize_relation` writes via `to_sql(if_exists="replace")`. diff --git a/docs/YAML_Tests.md b/docs/YAML_Tests.md index 7eba47f..36c34c8 100644 --- a/docs/YAML_Tests.md +++ b/docs/YAML_Tests.md @@ -6,7 +6,7 @@ Schema-bound tests live in `models/*.yml` or `models/**/schema.yml` and compleme ```yaml # examples/r1_demo/models/users_enriched.yml -version: 2 +version: 1 models: - name: users_enriched description: "Adds gmail flag" diff --git a/docs/examples/DQ_Demo.md b/docs/examples/DQ_Demo.md index 28fcbe3..23719b0 100644 --- a/docs/examples/DQ_Demo.md +++ b/docs/examples/DQ_Demo.md @@ -1,159 +1,296 @@ # Data Quality Demo Project -The **Data Quality Demo** shows how to use **all built-in FFT data quality tests** plus **custom DQ tests (Python & SQL)** on a small, understandable model: - -* Column checks: - - * `not_null` - * `unique` - * `accepted_values` - * `greater_equal` - * `non_negative_sum` - * `row_count_between` - * `freshness` -* Cross-table reconciliations: - - * `reconcile_equal` - * `reconcile_ratio_within` - * `reconcile_diff_within` - * `reconcile_coverage` - -* Custom tests (demo): - - * `min_positive_share` (Python-based) - * `no_future_orders` (SQL-based) - -It uses a simple **customers / orders / mart** setup so you can see exactly what each test does and how it fails when something goes wrong. +The **Data Quality Demo** shows how to combine: + +- **Built-in FFT data quality tests** +- **Tests generated from data contracts (`*.contracts.yml` + `contracts.yml`)** +- **Custom DQ tests (Python & SQL)** +- **Multiple engines** (DuckDB, Postgres, Databricks Spark, BigQuery, Snowflake Snowpark) + +on a small, understandable model: + +- **Column checks (from contracts + project.yml):** + - `column_physical_type` + - `not_null` + - `unique` + - `accepted_values` + - `between` + - `regex_match` + - `greater_equal` + - `non_negative_sum` + - `row_count_between` + - `freshness` + - `relationships` + +- **Cross-table reconciliations:** + - `reconcile_equal` + - `reconcile_ratio_within` + - `reconcile_diff_within` + - `reconcile_coverage` + +- **Custom tests (demo):** + - `min_positive_share` (Python-based) + - `no_future_orders` (SQL-based) + +It uses a simple **customers / orders / mart** setup so you can see exactly what +each test does and how it fails when something goes wrong. --- ## What this example demonstrates -1. **Basic column checks** on staging tables - Ensure IDs are present and unique, amounts are non-negative, and status values are valid. +1. **Basic column checks** on staging tables + - Enforced via **contracts** (`*.contracts.yml` + `contracts.yml`) and + project tests: + - IDs are present / non-null, status values are constrained, numeric ranges + are respected, physical types match the warehouse. + +2. **Freshness** on a timestamp column + - Table-level `freshness` test on `orders.order_ts`. + - Source-level freshness via `sources.yml` for `crm.customers` / `crm.orders`. -2. **Freshness** on a timestamp column - Check that the most recent order in your mart is not “too old”, using `last_order_ts`. +3. **Row count sanity checks** + - Guard against empty tables and unexpectedly large row counts. -3. **Row count sanity checks** - Guard against empty tables and unexpectedly large row counts. +4. **Cross-table reconciliations** between staging and mart + - Verify that sums and counts match between `orders` and the aggregated + `mart_orders_agg`, and that every order has a matching customer. -4. **Cross-table reconciliations** between staging and mart - Verify that sums and counts match between `orders` and the aggregated `mart_orders_agg`, and that every customer has a corresponding mart row. +5. **Tagged tests and selective execution** + - All tests are tagged (e.g. `example:dq_demo`, `reconcile`, `fk`, + `contract`) so you can run exactly the subset you care about. -5. **Tagged tests and selective execution** - All tests are tagged (e.g. `example:dq_demo`, `reconcile`) so you can run exactly the subset you care about. +6. **Contracts-driven tests** + - Per-table contracts plus project-wide defaults generate DQ tests + automatically (including `column_physical_type`). --- -## Project layout (example) +## Project layout ```text examples/dq_demo/ - .env + .env.dev_bigquery_bigframes + .env.dev_bigquery_pandas + .env.dev_databricks .env.dev_duckdb .env.dev_postgres - .env.dev_databricks - .env.dev_bigquery_pandas - .env.dev_bigquery_bigframes .env.dev_snowflake - Makefile # optional, convenience wrapper around fft commands + Makefile + README.md + contracts.yml profiles.yml project.yml sources.yml - seeds/ - customers.csv - orders.csv - models/ + README.md + marts/ + mart_orders_agg.contracts.yml + mart_orders_agg.ff.sql staging/ + customers.contracts.yml customers.ff.sql + orders.contracts.yml orders.ff.sql - marts/ - mart_orders_agg.ff.sql + + seeds/ + README.md + schema.yml + seed_customers.csv + seed_orders.csv tests/ dq/ min_positive_share.ff.py no_future_orders.ff.sql + unit/ + README.md +```` + +High level: + +* **`.env.dev_*`** — engine-specific environment examples +* **`Makefile`** — convenience wrapper for seeding, running models, DAG HTML and tests +* **`profiles.yml`** — connection profiles for all engines +* **`project.yml`** — central place for **tests** (including reconciliations & custom DQ tests) +* **`contracts.yml`** — project-level **contract defaults** +* **`models/**.contracts.yml`** — per-table contracts +* **`sources.yml`** — source definitions + freshness on raw seeds +* **`seeds/`** — demo CSVs and seed schema +* **`tests/dq/`** — custom DQ tests (Python + SQL) + +--- + +## Seeds + +### `seeds/seed_customers.csv` + +Simple customer dimension with a creation timestamp: + +```csv +customer_id,name,status,created_at +1,Alice,active,2025-01-01T10:00:00 +2,Bob,active,2025-01-02T11:00:00 +3,Carol,inactive,2025-01-03T12:00:00 ``` -### Seeds +Columns: -* `seeds/customers.csv` - Simple customer dimension with a creation timestamp: - `customer_id`, `name`, `status`, `created_at` (ISO-8601, e.g. `2025-01-01T10:00:00`). - The demo ships with three rows (Alice, Bob, Carol) so it’s easy to reason about failures. +* `customer_id` – integer +* `name` – string +* `status` – string (`active` / `inactive`) +* `created_at` – ISO-8601 timestamp -* `seeds/orders.csv` - Order fact data with per-order timestamps: - `order_id`, `customer_id`, `amount`, `order_ts` (ISO-8601, e.g. `2025-01-10T09:00:00`). - One order has `amount = 0.00` so the custom `min_positive_share` test has something to complain about. +### `seeds/seed_orders.csv` -### Models +Order fact data with per-order timestamps: -**1. Staging: `customers.ff.sql`** +```csv +order_id,customer_id,amount,order_ts +100,1,50.00,2025-01-10T09:00:00 +101,1,20.00,2025-01-11T09:00:00 +102,2,30.00,2025-01-11T10:00:00 +103,3,0.00,2025-01-12T10:00:00 +``` -* Materialized as a table. -* Casts IDs and other fields into proper types. -* Used as the “clean” customer dimension for downstream checks. +Columns: - ```sql - {{ config( - materialized='table', - tags=[ - 'example:dq_demo', - 'scope:staging', - 'engine:duckdb', - 'engine:postgres', - 'engine:databricks_spark', - 'engine:bigquery', - 'engine:snowflake_snowpark' - ], - ) }} +* `order_id` – integer +* `customer_id` – integer +* `amount` – double +* `order_ts` – ISO-8601 timestamp - select - cast(customer_id as int) as customer_id, - name, - status, - cast(created_at as timestamp) as created_at - from {{ source('crm', 'customers') }}; - ``` - -**2. Staging: `orders.ff.sql`** - -* Materialized as a table. -* Casts fields to proper types so DQ tests work reliably: - - ```sql - {{ config( - materialized='table', - tags=[ - 'example:dq_demo', - 'scope:staging', - 'engine:duckdb', - 'engine:postgres', - 'engine:databricks_spark', - 'engine:bigquery', - 'engine:snowflake_snowpark' - ], - ) }} +**One order has `amount = 0.00`** so the custom +`min_positive_share` test has something to complain about. - select - cast(order_id as int) as order_id, - cast(customer_id as int) as customer_id, - cast(amount as double) as amount, - cast(order_ts as timestamp) as order_ts - from {{ source('crm', 'orders') }}; - ``` +### Seed schema and sources + +`seeds/schema.yml` defines target placement and types: + +```yaml +targets: + seed_customers: + schema: dq_demo + seed_orders: + schema: dq_demo + +columns: + seed_customers: + customer_id: integer + name: string + status: string + created_at: + type: timestamp + seed_orders: + order_id: integer + customer_id: integer + amount: double + order_ts: + type: timestamp +``` + +`sources.yml` exposes them as `crm.customers` and `crm.orders` with **source +freshness**: + +```yaml +version: 1 + +sources: + - name: crm + schema: dq_demo + tables: + - name: customers + identifier: seed_customers + description: "Seeded customers table" + freshness: + loaded_at_field: _ff_loaded_at + warn_after: + count: 60 + period: minute + error_after: + count: 240 + period: minute + - name: orders + identifier: seed_orders + description: "Seeded orders table" + freshness: + loaded_at_field: _ff_loaded_at + warn_after: + count: 60 + period: minute + error_after: + count: 240 + period: minute +``` + +--- + +## Models + +### 1. Staging: `models/staging/customers.ff.sql` + +Materialized as a table; casts IDs and timestamps into proper types and +prepares the customer dimension: + +```sql +{{ config( + materialized='table', + tags=[ + 'example:dq_demo', + 'scope:staging', + 'engine:duckdb', + 'engine:postgres', + 'engine:databricks_spark', + 'engine:bigquery', + 'engine:snowflake_snowpark' + ], +) }} + +-- Staging table for customers +select + cast(customer_id as int) as customer_id, + name, + status, + cast(created_at as timestamp) as created_at +from {{ source('crm', 'customers') }}; +``` + +### 2. Staging: `models/staging/orders.ff.sql` + +Materialized as a table; ensures types are suitable for numeric and freshness +checks: + +```sql +{{ config( + materialized='table', + tags=[ + 'example:dq_demo', + 'scope:staging', + 'engine:duckdb', + 'engine:postgres', + 'engine:databricks_spark', + 'engine:bigquery', + 'engine:snowflake_snowpark' + ], +) }} - This is important for: +-- Staging table for orders with proper types for DQ checks +select + cast(order_id as int) as order_id, + cast(customer_id as int) as customer_id, + cast(amount as numeric) as amount, + cast(order_ts as timestamp) as order_ts +from {{ source('crm', 'orders') }}; +``` - * numeric checks (`greater_equal`, `non_negative_sum`) - * timestamp-based `freshness` checks +This is important for: -**3. Mart: `mart_orders_agg.ff.sql`** +* Numeric checks (`greater_equal`, `non_negative_sum`) +* Timestamp-based `freshness` checks on `order_ts` +* Relationships on `customer_id` + +### 3. Mart: `models/marts/mart_orders_agg.ff.sql` Aggregates orders per customer and prepares data for reconciliation + freshness: @@ -171,13 +308,13 @@ Aggregates orders per customer and prepares data for reconciliation + freshness: ], ) }} --- Aggregate orders per customer for DQ & reconciliation tests +-- Aggregate orders per customer for reconciliation & freshness tests with base as ( select o.order_id, o.customer_id, -- Ensure numeric and timestamp types for downstream DQ checks - cast(o.amount as double) as amount, + cast(o.amount as numeric) as amount, cast(o.order_ts as timestamp) as order_ts, c.name as customer_name, c.status as customer_status @@ -197,93 +334,184 @@ from base group by customer_id, customer_name, customer_status; ``` -The important columns for DQ tests are: +Key columns: + +* `status` → used by contracts (`enum`) +* `order_count` and `total_amount` → used by reconciliation tests +* `first_order_ts` / `last_order_ts` → available for freshness & diagnostics + +--- + +## Contracts in the demo + +The demo uses contracts for: + +* **Per-table contracts** in `models/**.contracts.yml` +* **Project-wide defaults** in `contracts.yml` + +See `docs/Contracts.md` for the full specification; below is how the demo uses +it. + +### Project-level defaults: `contracts.yml` + +```yaml +version: 1 + +defaults: + columns: + - match: + name: ".*_id$" + type: integer + nullable: false + + - match: + name: "created_at" + type: timestamp + nullable: false + + - match: + name: ".*_ts$" + type: timestamp + nullable: true + description: "Timestamp-like but allowed to be null in some pipelines" +``` + +These rules say: + +* Any column ending with `_id` is an integer and not nullable. +* Any `created_at` is a non-null timestamp. +* Any `*_ts` column is a (possibly nullable) timestamp with a description. + +Defaults are *merged into* per-table contracts, but never override explicit +settings. + +### Example: `models/staging/customers.contracts.yml` + +```yaml +version: 1 +table: customers + +columns: + customer_id: + type: integer + nullable: false + physical: + duckdb: integer + postgres: integer + bigquery: INT64 + snowflake_snowpark: NUMBER + databricks_spark: INT + + name: + type: string + nullable: false + physical: + duckdb: VARCHAR + postgres: text + bigquery: STRING + snowflake_snowpark: TEXT + databricks_spark: STRING + + status: + type: string + nullable: false + enum: + - active + - inactive + physical: + duckdb: VARCHAR + postgres: text + bigquery: STRING + snowflake_snowpark: TEXT + databricks_spark: STRING + + created_at: + type: timestamp + nullable: false + physical: + duckdb: TIMESTAMP + postgres: timestamp without time zone + bigquery: TIMESTAMP + snowflake_snowpark: TIMESTAMP_NTZ + databricks_spark: TIMESTAMP +``` + +At runtime, these contracts get turned into tests: + +* `column_physical_type` for each column with `physical` +* `not_null` for columns with `nullable: false` +* `accepted_values` for `status` via `enum` +* Plus any inherited defaults from `contracts.yml` -* `status` → used for `accepted_values` -* `order_count` and `total_amount` → used for numeric and reconciliation tests -* `last_order_ts` → used for `freshness` +Similarly, the mart has a contract at +`models/marts/mart_orders_agg.contracts.yml` specifying types, nullability, +and enums. --- ## Data quality configuration (`project.yml`) -All tests live under `project.yml → tests:`. -This example uses the tag `example:dq_demo` for easy selection. +All **explicit** tests live under `project.yml → tests:`. +Contracts produce additional tests with the tag `contract`. -### Column-level checks +The demo uses the tag `example:dq_demo` for easy selection. + +### Single-table & relationships tests ```yaml tests: - # 1) IDs must be present and unique - - type: not_null - table: customers - column: customer_id - tags: [example:dq_demo, batch] + # --- Single-table checks ---------------------------------------------------- - - type: unique + - type: row_count_between table: customers - column: customer_id + min_rows: 1 + max_rows: 100 tags: [example:dq_demo, batch] - # 2) Order amounts must be >= 0 - type: greater_equal table: orders column: amount threshold: 0 tags: [example:dq_demo, batch] - # 3) Total sum of amounts must not be negative - type: non_negative_sum table: orders column: amount tags: [example:dq_demo, batch] - # 4) Customer status values must be within a known set - - type: accepted_values - table: mart_orders_agg - column: status - values: ["active", "inactive", "prospect"] - severity: warn # show as warning, not hard failure - tags: [example:dq_demo, batch] - - # 5) Row count sanity check on mart - - type: row_count_between - table: mart_orders_agg - min_rows: 1 - max_rows: 100000 - tags: [example:dq_demo, batch] - - # 6) Freshness: last order in the mart must not be "too old" - - type: freshness - table: mart_orders_agg - column: last_order_ts - max_delay_minutes: 100000000 - tags: [example:dq_demo, batch] - - # 7) Custom Python test: ensure at least a given share of positive amounts - - type: min_positive_share + - type: relationships table: orders - column: amount - params: - min_share: 0.75 - where: "amount <> 0" - tags: [example:dq_demo, batch] + column: customer_id + to: "ref('customers.ff')" + to_field: customer_id + tags: [example:dq_demo, fk] - # 8) Custom SQL test: no future orders allowed - - type: no_future_orders + # Large max_delay_minutes so the example typically passes; + # adjust down in real projects to enforce freshness SLAs. + - type: freshness table: orders column: order_ts - params: - where: "amount <> 0" + max_delay_minutes: 100000000 tags: [example:dq_demo, batch] ``` -### Cross-table reconciliations +What these do: + +* `row_count_between` — ensure `customers` is not empty and not unexpectedly + large. +* `greater_equal` / `non_negative_sum` — protect against negative `amount` and + weird aggregates. +* `relationships` — enforces referential integrity: + every `orders.customer_id` must exist in `customers.customer_id`. +* `freshness` — checks that the latest `order_ts` is recent enough. + +### Reconciliation tests ```yaml - # 7) Reconcile total revenue between orders and mart + # --- Reconciliation checks -------------------------------------------------- + - type: reconcile_equal - name: total_amount_orders_vs_mart + name: orders_total_matches_mart tags: [example:dq_demo, reconcile] left: table: orders @@ -291,103 +519,140 @@ tests: right: table: mart_orders_agg expr: "sum(total_amount)" - abs_tolerance: 0.01 + abs_tolerance: 0.0 - # 8) Ratio of sums should be ~1 (within tight bounds) - type: reconcile_ratio_within - name: total_amount_ratio + name: order_counts_match tags: [example:dq_demo, reconcile] left: - table: orders - expr: "sum(amount)" - right: table: mart_orders_agg - expr: "sum(total_amount)" + expr: "sum(order_count)" + right: + table: orders + expr: "count(*)" min_ratio: 0.999 max_ratio: 1.001 - # 9) Row count diff between orders and mart should be bounded - type: reconcile_diff_within - name: order_count_diff + name: customers_vs_orders_volume tags: [example:dq_demo, reconcile] left: table: orders expr: "count(*)" right: - table: mart_orders_agg - expr: "sum(order_count)" - max_abs_diff: 0 + table: customers + expr: "count(*)" + max_abs_diff: 10 - # 10) Coverage: every customer should appear in the mart - type: reconcile_coverage - name: customers_covered_in_mart + name: all_orders_have_customers tags: [example:dq_demo, reconcile] source: - table: customers + table: orders key: "customer_id" target: - table: mart_orders_agg + table: customers key: "customer_id" ``` -This set of tests touches **all available test types** and ties directly back to the simple data model. +These checks ensure: + +* Sums match between raw `orders.amount` and `mart_orders_agg.total_amount`. +* The number of rows in `orders` matches the sum of `order_count` in the mart. +* Overall orders vs customers volume stays within a reasonable bound. +* All orders reference an existing customer (coverage). + +### Custom tests + +```yaml + # --- Custom tests -------------------------------------------------- + - type: no_future_orders + table: orders + column: order_ts + where: "order_ts is not null" + tags: [example:dq_demo, batch] + + - type: min_positive_share + table: orders + column: amount + params: + min_share: 0.75 + where: "amount <> 0" + tags: [example:dq_demo, batch] +``` + +* `no_future_orders` — SQL-based test that fails if any order has a timestamp + in the future. +* `min_positive_share` — Python-based test that requires a minimum share of + positive values in `amount`. --- ## Custom DQ tests (Python & SQL) -The demo also shows how to define **custom data quality tests** that integrate with: +The demo shows how to define **custom data quality tests** that integrate with: -* the `project.yml → tests:` block, -* the `fft test` CLI, -* and the same summary output as built-in tests. +* `project.yml → tests:` +* `fft test` +* The same summary output as built-in tests. ### Python-based test: `min_positive_share` File: `examples/dq_demo/tests/dq/min_positive_share.ff.py` ```python +# examples/dq_demo/tests/dq/min_positive_share.ff.py from __future__ import annotations from typing import Any +from pydantic import BaseModel, ConfigDict + from fastflowtransform.decorators import dq_test from fastflowtransform.testing import base as testing -@dq_test("min_positive_share") +class MinPositiveShareParams(BaseModel): + """ + Params for the min_positive_share test. + + - min_share: required minimum share of positive values in [0, 1] + - where: optional WHERE predicate to filter rows + """ + + model_config = ConfigDict(extra="forbid") + + min_share: float = 0.5 + where: str | None = None + + +@dq_test("min_positive_share", params_model=MinPositiveShareParams) def min_positive_share( - con: Any, + executor: Any, table: str, column: str | None, params: dict[str, Any], ) -> tuple[bool, str | None, str | None]: """ - Custom DQ test: require that at least `min_share` of rows have column > 0. - - Parameters (from project.yml → tests → params): - - min_share: float in [0,1], e.g. 0.75 - - where: optional filter (string) to restrict the population + Require that at least `min_share` of rows have column > 0. """ if column is None: example = f"select count(*) from {table} where > 0" return False, "min_positive_share requires a 'column' parameter", example - # Params come from project.yml under `params:` - cfg: dict[str, Any] = params.get("params") or params # project.yml wrapper - min_share: float = cfg["min_share"] - where: str | None = cfg.get("where") + min_share: float = params["min_share"] + where: str | None = params.get("where") where_clause = f" where {where}" if where else "" total_sql = f"select count(*) from {table}{where_clause}" if where: - pos_sql = f"select count(*) from {table}{where_clause} and {column} > 0" + pos_sql = f"{total_sql} and {column} > 0" else: pos_sql = f"select count(*) from {table} where {column} > 0" - total = testing._scalar(con, total_sql) - positives = testing._scalar(con, pos_sql) + total = testing._scalar(executor, total_sql) + positives = testing._scalar(executor, pos_sql) example_sql = f"{pos_sql}; -- positives\n{total_sql}; -- total" @@ -399,14 +664,16 @@ def min_positive_share( msg = ( f"min_positive_share failed: positive share {share:.4f} " f"< required {min_share:.4f} " - f"({positives} of {total} rows have {column} > 0)" + f"({positives} of {total} rows have {column} > 0" + + (f" where {where}" if where else "") + + ")" ) return False, msg, example_sql return True, None, example_sql -```` +``` -This test is wired up from `project.yml` like this: +Wiring in `project.yml`: ```yaml - type: min_positive_share @@ -430,10 +697,11 @@ File: `examples/dq_demo/tests/dq/no_future_orders.ff.sql` -- Custom DQ test: fail if any row has a timestamp in the future. -- --- Conventions: --- - {{ table }} : table name (e.g. "orders") --- - {{ column }} : timestamp column (e.g. "order_ts") --- - {{ where }} : optional filter, passed via params["where"] +-- Context variables injected by the runner: +-- {{ table }} : table name (e.g. "orders") +-- {{ column }} : timestamp column (e.g. "order_ts") +-- {{ where }} : optional filter (string), from params["where"] +-- {{ params }} : full params dict (validated), if you ever need it select count(*) as failures from {{ table }} @@ -441,144 +709,111 @@ where {{ column }} > current_timestamp {%- if where %} and ({{ where }}){%- endif %} ``` -And the corresponding `project.yml` test: +And the corresponding entry in `project.yml`: ```yaml - type: no_future_orders table: orders column: order_ts - params: - where: "amount <> 0" + where: "order_ts is not null" tags: [example:dq_demo, batch] ``` At runtime: -* The SQL file is discovered under `tests/**/*.ff.sql`. -* `{{ config(...) }}` tells FFT the logical `type` and allowed `params`. -* `fft test` validates your `params:` from `project.yml` against this schema and - then executes the rendered SQL as a “violation count” query (`0` = pass, `>0` = fail). +* FFT discovers `*.ff.sql` test files under `tests/dq/`. +* `{{ config(...) }}` declares the test `type` and valid `params`. +* `fft test` validates and injects params, then executes the query as a + “violation count” (`0` = pass, `>0` = fail). --- ## Running the demo -Assuming you are in the repo root and using DuckDB as a starting point: +From `examples/dq_demo/`, you can either: + +* Use the **Makefile** (recommended), or +* Run `fft` commands manually. + +### Using the Makefile -### 1. Seed the data +Pick an engine: ```bash -fft seed examples/dq_demo --env dev_duckdb -``` +# DuckDB +make demo ENGINE=duckdb -This reads `seeds/customers.csv` and `seeds/orders.csv` and materializes them as tables referenced by `sources.yml`. +# Postgres +make demo ENGINE=postgres -### 2. Run the models +# Databricks Spark +make demo ENGINE=databricks_spark -```bash -fft run examples/dq_demo --env dev_duckdb +# BigQuery (pandas or BigFrames) +make demo ENGINE=bigquery BQ_FRAME=pandas +make demo ENGINE=bigquery BQ_FRAME=bigframes + +# Snowflake Snowpark +make demo ENGINE=snowflake_snowpark ``` -This builds: +The `demo` target runs: + +1. `fft seed` (load seeds) +2. `fft source freshness` +3. `fft run` (build models) +4. `fft dag` (generate DAG HTML) +5. `fft test` (run DQ tests) +6. Prints locations of artifacts (manifest, run_results, catalog, DAG HTML) -* `customers` (staging) -* `orders` (staging) -* `mart_orders_agg` (mart) +### Running manually (DuckDB example) -### 3. Run all DQ tests +From the repo root: ```bash -fft test examples/dq_demo --env dev_duckdb --select tag:example:dq_demo -``` +# 1) Seed +fft seed examples/dq_demo --env dev_duckdb -You should see a summary like: +# 2) Build models +fft run examples/dq_demo --env dev_duckdb -```text -Data Quality Summary -──────────────────── -✅ not_null customers.customer_id -✅ unique customers.customer_id -✅ greater_equal orders.amount -✅ non_negative_sum orders.amount -❕ accepted_values mart_orders_agg.status -✅ row_count_between mart_orders_agg -✅ freshness mart_orders_agg.last_order_ts -✅ reconcile_equal total_amount_orders_vs_mart -✅ reconcile_ratio_within total_amount_ratio -✅ reconcile_diff_within order_count_diff -✅ reconcile_coverage customers_covered_in_mart - -Totals -────── -✓ passed: 10 -! warnings: 1 +# 3) Run all DQ tests +fft test examples/dq_demo --env dev_duckdb --select tag:example:dq_demo ``` -(Exact output will differ, but you’ll see pass/failed/warned checks listed.) +You’ll see a summary of: -### 4. Run only reconciliation tests +* Tests derived from **contracts** (tag: `contract`) +* Explicit tests from `project.yml` (tags: `batch`, `reconcile`, `fk`, …) + +You can also run just reconciliations, just FK tests, etc.: ```bash +# Only reconciliation tests fft test examples/dq_demo --env dev_duckdb --select tag:reconcile -``` -This executes just the cross-table checks, which is handy when you’re iterating on a mart. +# Only FK-style relationship tests +fft test examples/dq_demo --env dev_duckdb --select tag:fk +``` --- -## BigQuery variant (pandas or BigFrames) - -To run the same demo on BigQuery: - -1. Copy `.env.dev_bigquery_pandas` or `.env.dev_bigquery_bigframes` to `.env` and fill in: - ```bash - FF_BQ_PROJECT= - FF_BQ_DATASET=dq_demo - FF_BQ_LOCATION= # e.g., EU or US - GOOGLE_APPLICATION_CREDENTIALS=../secrets/.json # or rely on gcloud / WIF - ``` -2. Run via the Makefile from `examples/dq_demo`: - ```bash - make demo ENGINE=bigquery BQ_FRAME=pandas # or bigframes - ``` - -Both profiles accept `allow_create_dataset` in `profiles.yml` if you want the example to create the dataset automatically. - -## Snowflake Snowpark variant - -To run on Snowflake: - -1. Copy `.env.dev_snowflake` to `.env` and populate: - ```bash - FF_SF_ACCOUNT= - FF_SF_USER= - FF_SF_PASSWORD= - FF_SF_WAREHOUSE=COMPUTE_WH - FF_SF_DATABASE=DQ_DEMO - FF_SF_SCHEMA=DQ_DEMO - FF_SF_ROLE= - ``` -2. Install the Snowflake extra if needed: - ```bash - pip install "fastflowtransform[snowflake]" - ``` -3. Run via the Makefile: - ```bash - make demo ENGINE=snowflake_snowpark - ``` - -The Snowflake profile enables `allow_create_schema`, so the schema is created automatically on first run when permitted. - ## Things to experiment with -To understand the tests better, intentionally break the data and re-run `fft test`: +To understand the tests better, intentionally break the data and re-run +`fft test`: -* Set one `customers.customer_id` to `NULL` → watch `not_null` fail. -* Duplicate a `customer_id` → watch `unique` fail. -* Put a negative `amount` in `orders.csv` → `greater_equal` and `non_negative_sum` fail. -* Add a new `status` value (e.g. `"paused"`) → `accepted_values` warns. -* Drop a customer from `mart_orders_agg` manually (or filter it out in SQL) → `reconcile_coverage` fails. +* Set one `customers.customer_id` to `NULL` → `not_null` (from contracts) fails. +* Duplicate a `customer_id` → `unique` (from contracts) fails. +* Put a negative `amount` in `seed_orders.csv` → `greater_equal` and + `non_negative_sum` fail. +* Change `status` to a value not in the enum → `accepted_values` fails. +* Drop a customer from `customers` or change an ID → `relationships` and + reconciliation tests complain. * Change an amount in the mart only → reconciliation tests fail. +* Push an order timestamp into the future → `no_future_orders` fails. +* Change a physical column type in the warehouse to disagree with the + contract → `column_physical_type` fails. This makes it very clear what each test guards against. @@ -589,24 +824,38 @@ This makes it very clear what each test guards against. The Data Quality Demo is designed to be: * **Small and readable** – customers, orders, and a single mart. -* **Complete** – exercises every built-in FFT DQ test type. +* **Complete** – exercises: + + * Built-in FFT DQ tests, + * Tests generated from contracts, + * Custom Python & SQL tests. * **Practical** – real-world patterns like: - * typing in staging models, - * testing freshness on a mart timestamp, - * reconciling sums and row counts across tables. + * Typing in staging models, + * Testing freshness on staging tables and sources, + * Reconciling sums and row counts across tables, + * Enforcing physical types per engine. + +Once you’re comfortable with this example, you can copy the patterns into your +real projects: -Once you’re comfortable with this example, you can copy the patterns into your real project: start with staging-level checks, then layer in reconciliations and freshness on your most important marts. +1. Start with **contracts** and simple column tests on staging. +2. Add **freshness** on key timestamps and sources. +3. Layer in **reconciliations** across marts and fact tables. +4. Add **custom tests** when built-ins aren’t enough. > **Tip – Source vs. table freshness** -> -> The demo uses the `freshness` test type on the mart (`mart_orders_agg.last_order_ts`). -> For *source-level freshness* (e.g. “when was `crm.orders` last loaded?”), define -> freshness rules on your sources and run: -> +> +> The demo uses: +> +> * `freshness` tests on tables (`orders.order_ts`), and +> * `freshness` in `sources.yml` (via `_ff_loaded_at`). +> +> Run source freshness with: +> > ```bash > fft source freshness examples/dq_demo --env dev_duckdb > ``` -> -> This complements table-level DQ tests by checking whether your inputs are recent enough -> *before* you even build marts. +> +> This complements table-level DQ tests by checking whether your inputs are +> recent enough *before* you even build marts. diff --git a/docs/examples/Local_Engine_Setup.md b/docs/examples/Local_Engine_Setup.md index 6ef2fdb..04ce832 100644 --- a/docs/examples/Local_Engine_Setup.md +++ b/docs/examples/Local_Engine_Setup.md @@ -172,7 +172,7 @@ The BigQuery client in `fastflowtransform` will pick this up automatically **as make ENGINE=bigquery test ``` - `fft test` uses the BigQuery shim to run checks like `not_null`, `unique`, + `fft test` uses the BigQuery to run checks like `not_null`, `unique`, `row_count_between`, `greater_equal`, etc. against `${FF_BQ_PROJECT}.${FF_BQ_DATASET}.
`. diff --git a/examples/api_demo/sources.yml b/examples/api_demo/sources.yml index 84e04d5..4048b3c 100644 --- a/examples/api_demo/sources.yml +++ b/examples/api_demo/sources.yml @@ -1,4 +1,4 @@ -version: 2 +version: 1 sources: - name: crm diff --git a/examples/basic_demo/contracts.yml b/examples/basic_demo/contracts.yml new file mode 100644 index 0000000..5ed90eb --- /dev/null +++ b/examples/basic_demo/contracts.yml @@ -0,0 +1,37 @@ +# Project-level contracts for the basic_demo. + +defaults: + # Reusable column-level rules applied by regex. + columns: + # All *_id columns are non-null integers + - match: + name: ".*_id$" + type: integer + nullable: false + + # All *_date columns are non-null dates + - match: + name: ".*_date$" + type: date + nullable: false + + # email_domain is non-null everywhere + - match: + name: "^email_domain$" + type: string + nullable: false + description: "Normalized email domain (lowercased)." + +enforcement: + # Modes: off | verify | cast + default_mode: off + allow_extra_columns: true + + tables: + mart_users_by_domain: + mode: verify # only check schema, don't cast + allow_extra_columns: true + + mart_latest_signup: + mode: cast # cast into physical types, then verify + allow_extra_columns: true diff --git a/examples/basic_demo/models/marts/mart_latest_signup.contracts.yml b/examples/basic_demo/models/marts/mart_latest_signup.contracts.yml new file mode 100644 index 0000000..9200579 --- /dev/null +++ b/examples/basic_demo/models/marts/mart_latest_signup.contracts.yml @@ -0,0 +1,52 @@ +table: mart_latest_signup + +columns: + email_domain: + type: string + nullable: false + unique: true + description: "Email domain, one row per domain." + physical: + duckdb: VARCHAR + postgres: text + bigquery: STRING + databricks_spark: STRING + snowflake_snowpark: VARCHAR + default: STRING + + latest_user_id: + type: integer + nullable: false + description: "User ID of the most recent signup on this domain." + physical: + duckdb: INTEGER + postgres: integer + bigquery: INT64 + databricks_spark: INT + snowflake_snowpark: NUMBER + default: INTEGER + + latest_email: + type: string + nullable: false + regex: "^.+@.+$" + description: "Email address of the most recent signup." + physical: + duckdb: VARCHAR + postgres: text + bigquery: STRING + databricks_spark: STRING + snowflake_snowpark: VARCHAR + default: STRING + + latest_signup_date: + type: date + nullable: false + description: "Timestamp / date of the most recent signup." + physical: + duckdb: DATE + postgres: date + bigquery: DATE + databricks_spark: DATE + snowflake_snowpark: DATE + default: DATE diff --git a/examples/basic_demo/models/marts/mart_users_by_domain.contracts.yml b/examples/basic_demo/models/marts/mart_users_by_domain.contracts.yml new file mode 100644 index 0000000..2f40725 --- /dev/null +++ b/examples/basic_demo/models/marts/mart_users_by_domain.contracts.yml @@ -0,0 +1,53 @@ +table: mart_users_by_domain + +columns: + email_domain: + type: string + nullable: false + unique: true + description: "Email domain (example.com, example.net, …)." + physical: + duckdb: VARCHAR + postgres: text + bigquery: STRING + databricks_spark: STRING + snowflake_snowpark: VARCHAR + default: STRING + + user_count: + type: integer + nullable: false + min: 0 + description: "Number of users per domain (COUNT(*))" + physical: + # COUNT(*) result types + duckdb: BIGINT + postgres: bigint + bigquery: INT64 + databricks_spark: BIGINT + snowflake_snowpark: NUMBER + default: BIGINT + + first_signup: + type: date + nullable: false + description: "Earliest signup date for this domain." + physical: + duckdb: DATE + postgres: date + bigquery: DATE + databricks_spark: DATE + snowflake_snowpark: DATE + default: DATE + + last_signup: + type: date + nullable: false + description: "Latest signup date for this domain." + physical: + duckdb: DATE + postgres: date + bigquery: DATE + databricks_spark: DATE + snowflake_snowpark: DATE + default: DATE diff --git a/examples/basic_demo/sources.yml b/examples/basic_demo/sources.yml index d48deca..2b8db8a 100644 --- a/examples/basic_demo/sources.yml +++ b/examples/basic_demo/sources.yml @@ -1,4 +1,4 @@ -version: 2 +version: 1 sources: - name: crm diff --git a/examples/cache_demo/sources.yml b/examples/cache_demo/sources.yml index 0490edc..174bb70 100644 --- a/examples/cache_demo/sources.yml +++ b/examples/cache_demo/sources.yml @@ -1,4 +1,4 @@ -version: 2 +version: 1 sources: - name: crm schema: cache_demo diff --git a/examples/ci_demo/sources.yml b/examples/ci_demo/sources.yml index 83436dc..fa207c5 100644 --- a/examples/ci_demo/sources.yml +++ b/examples/ci_demo/sources.yml @@ -1,5 +1,5 @@ # Source declarations describe external tables. See docs/Sources.md for details. -version: 2 +version: 1 # sources: # Example: # - name: raw diff --git a/examples/dq_demo/contracts.yml b/examples/dq_demo/contracts.yml new file mode 100644 index 0000000..8834b05 --- /dev/null +++ b/examples/dq_demo/contracts.yml @@ -0,0 +1,19 @@ +version: 1 + +defaults: + columns: + - match: + name: ".*_id$" + type: integer + nullable: false + + - match: + name: "created_at" + type: timestamp + nullable: false + + - match: + name: ".*_ts$" + type: timestamp + nullable: true + description: "Timestamp-like but allowed to be null in some pipelines" diff --git a/examples/dq_demo/models/marts/mart_orders_agg.contracts.yml b/examples/dq_demo/models/marts/mart_orders_agg.contracts.yml new file mode 100644 index 0000000..5fa2cba --- /dev/null +++ b/examples/dq_demo/models/marts/mart_orders_agg.contracts.yml @@ -0,0 +1,36 @@ +version: 1 +table: mart_orders_agg + +columns: + customer_id: + type: integer + nullable: false + + customer_name: + type: string + nullable: false + + status: + type: string + nullable: false + enum: + - active + - inactive # keep in sync with customers.status + + order_count: + type: integer + nullable: false + min: 0 + + total_amount: + type: double + nullable: false + min: 0 + + first_order_ts: + type: timestamp + nullable: false + + last_order_ts: + type: timestamp + nullable: false diff --git a/examples/dq_demo/models/staging/customers.contracts.yml b/examples/dq_demo/models/staging/customers.contracts.yml new file mode 100644 index 0000000..63885c6 --- /dev/null +++ b/examples/dq_demo/models/staging/customers.contracts.yml @@ -0,0 +1,46 @@ +version: 1 +table: customers + +columns: + customer_id: + type: integer + nullable: false + physical: + duckdb: integer + postgres: integer + bigquery: INT64 + snowflake_snowpark: NUMBER + databricks_spark: INT + + name: + type: string + nullable: false + physical: + duckdb: VARCHAR + postgres: text + bigquery: STRING + snowflake_snowpark: VARCHAR + databricks_spark: STRING + + status: + type: string + nullable: false + enum: + - active + - inactive + physical: + duckdb: VARCHAR + postgres: text + bigquery: STRING + snowflake_snowpark: VARCHAR + databricks_spark: STRING + + created_at: + type: timestamp + nullable: false + physical: + duckdb: TIMESTAMP + postgres: timestamp without time zone + bigquery: TIMESTAMP + snowflake_snowpark: TIMESTAMP_NTZ + databricks_spark: TIMESTAMP diff --git a/examples/dq_demo/models/staging/orders.contracts.yml b/examples/dq_demo/models/staging/orders.contracts.yml new file mode 100644 index 0000000..45d7a7c --- /dev/null +++ b/examples/dq_demo/models/staging/orders.contracts.yml @@ -0,0 +1,12 @@ +version: 1 +table: orders + +columns: + order_id: + type: integer + + customer_id: + type: integer + + order_ts: + type: timestamp diff --git a/examples/dq_demo/project.yml b/examples/dq_demo/project.yml index 49b6f85..8ea9e70 100644 --- a/examples/dq_demo/project.yml +++ b/examples/dq_demo/project.yml @@ -10,21 +10,10 @@ seeds: {} tests: # --- Single-table checks ---------------------------------------------------- - - type: not_null - table: customers - column: customer_id - tags: [example:dq_demo, batch] - - - type: unique - table: customers - column: customer_id - tags: [example:dq_demo, batch] - - - type: accepted_values + - type: row_count_between table: customers - column: status - values: [active, inactive] - severity: warn # demo of warn vs error + min_rows: 1 + max_rows: 100 tags: [example:dq_demo, batch] - type: greater_equal @@ -38,12 +27,6 @@ tests: column: amount tags: [example:dq_demo, batch] - - type: row_count_between - table: customers - min_rows: 1 - max_rows: 100 - tags: [example:dq_demo, batch] - - type: relationships table: orders column: customer_id diff --git a/examples/dq_demo/sources.yml b/examples/dq_demo/sources.yml index 57bc26a..93d49e4 100644 --- a/examples/dq_demo/sources.yml +++ b/examples/dq_demo/sources.yml @@ -1,4 +1,4 @@ -version: 2 +version: 1 sources: - name: crm diff --git a/examples/dq_demo/tests/dq/min_positive_share.ff.py b/examples/dq_demo/tests/dq/min_positive_share.ff.py index 47c054d..67da3d1 100644 --- a/examples/dq_demo/tests/dq/min_positive_share.ff.py +++ b/examples/dq_demo/tests/dq/min_positive_share.ff.py @@ -25,7 +25,7 @@ class MinPositiveShareParams(BaseModel): @dq_test("min_positive_share", params_model=MinPositiveShareParams) def min_positive_share( - con: Any, + executor: Any, table: str, column: str | None, params: dict[str, Any], @@ -48,8 +48,8 @@ def min_positive_share( else: pos_sql = f"select count(*) from {table} where {column} > 0" - total = testing._scalar(con, total_sql) - positives = testing._scalar(con, pos_sql) + total = testing._scalar(executor, total_sql) + positives = testing._scalar(executor, pos_sql) example_sql = f"{pos_sql}; -- positives\n{total_sql}; -- total" diff --git a/examples/env_matrix/sources.yml b/examples/env_matrix/sources.yml index cac9e94..16f3749 100644 --- a/examples/env_matrix/sources.yml +++ b/examples/env_matrix/sources.yml @@ -1,4 +1,4 @@ -version: 2 +version: 1 sources: - name: raw diff --git a/examples/hooks_demo/sources.yml b/examples/hooks_demo/sources.yml index 4386923..293a2c5 100644 --- a/examples/hooks_demo/sources.yml +++ b/examples/hooks_demo/sources.yml @@ -1,4 +1,4 @@ -version: 2 +version: 1 sources: - name: raw diff --git a/examples/incremental_demo/Makefile b/examples/incremental_demo/Makefile index 09629af..73a8cf0 100644 --- a/examples/incremental_demo/Makefile +++ b/examples/incremental_demo/Makefile @@ -1,10 +1,11 @@ -.PHONY: seed run_full run_incr dag test artifacts clean demo demo-open +.PHONY: seed seed_step2 run_full run_incr dag test artifacts clean demo demo-open # --- Config ------------------------------------------------------------------- DB ?= .local/incremental_demo.duckdb PROJECT ?= . UV ?= uv +LOCAL_SEEDS_DIR = $(PROJECT)/.local/seeds # Engine selector (duckdb|postgres|databricks_spark|bigquery|snowflake_snowpark) ENGINE ?= duckdb @@ -87,16 +88,21 @@ endif # --- Targets ------------------------------------------------------------------ -seed: - env $(BASE_ENV) $(UV) run fft seed "$(PROJECT)" --env $(PROFILE_ENV) +seed_v1: + @mkdir -p "$(LOCAL_SEEDS_DIR)" + @cp "$(PROJECT)/seeds/seed_events_v1.csv" "$(LOCAL_SEEDS_DIR)/seed_events.csv" + env $(BASE_ENV) FFT_SEEDS_DIR="$(LOCAL_SEEDS_DIR)" $(UV) run fft seed "$(PROJECT)" --env $(PROFILE_ENV) + +seed_v2: + @mkdir -p "$(LOCAL_SEEDS_DIR)" + @cp "$(PROJECT)/seeds/seed_events_v2.csv" "$(LOCAL_SEEDS_DIR)/seed_events.csv" + env $(BASE_ENV) FFT_SEEDS_DIR="$(LOCAL_SEEDS_DIR)" $(UV) run fft seed "$(PROJECT)" --env $(PROFILE_ENV) -# Full refresh (first run) run_full: - env $(RUN_ENV) $(UV) run fft run "$(PROJECT)" --env $(PROFILE_ENV) $(SELECT_FLAGS) --cache rw + env $(RUN_ENV) $(UV) run fft run "$(PROJECT)" --env $(PROFILE_ENV) $(SELECT_FLAGS) --cache=rw -# second/subsequent run: shows incremental/delta behaviour run_incr: - env $(RUN_ENV) $(UV) run fft run "$(PROJECT)" --env $(PROFILE_ENV) $(SELECT_FLAGS) --cache rw + env $(RUN_ENV) $(UV) run fft run "$(PROJECT)" --env $(PROFILE_ENV) $(SELECT_FLAGS) --cache=rw dag: env $(RUN_ENV) $(UV) run fft dag "$(PROJECT)" --env $(PROFILE_ENV) $(SELECT_FLAGS) --html @@ -122,13 +128,20 @@ demo-open: demo: clean @echo "== 🚀 Incremental Demo ($(ENGINE)) ==" - @echo "Profile=$(PROFILE_ENV) DB=$(DB) PROJECT=$(PROJECT) DBR_TABLE_FORMAT=$(DBR_TABLE_FORMAT)" - +$(MAKE) seed + @echo "== 1) First run (seed v1 → initial build) ==" + +$(MAKE) seed_v1 +$(MAKE) run_full @echo - @echo "== 🔁 Second run (Incremental/Delta) ==" + @echo "== 2) No-op run (same seed v1; should be mostly skipped) ==" + +$(MAKE) run_incr + @echo + @echo "== 3) Change seed data (seed v2 snapshot: update + new row) ==" + +$(MAKE) seed_v2 +$(MAKE) run_incr + @echo + @echo "== 4) DAG & artifacts ==" +$(MAKE) dag +$(MAKE) test +$(MAKE) artifacts - @echo "✅ Demo done. Open DAG here: $(PROJECT)/site/dag/index.html" + @echo + @echo "✅ Demo done. Open DAG at: $(PROJECT)/site/dag/index.html" diff --git a/examples/incremental_demo/contracts.yml b/examples/incremental_demo/contracts.yml new file mode 100644 index 0000000..a72cca2 --- /dev/null +++ b/examples/incremental_demo/contracts.yml @@ -0,0 +1,33 @@ +# Project-level contracts for incremental_demo. + +defaults: + columns: + - match: + name: "^event_id$" + type: integer + nullable: false + + - match: + name: "^updated_at$" + type: timestamp + nullable: false + +enforcement: + # Modes: off | verify | cast + default_mode: off + allow_extra_columns: true + + tables: + # incremental SQL examples + fct_events_sql_inline: + mode: verify + allow_extra_columns: true + + fct_events_sql_yaml: + mode: verify + allow_extra_columns: true + + # python incremental example + fct_events_py_incremental: + mode: verify + allow_extra_columns: true diff --git a/examples/incremental_demo/models/common/fct_events_py_incremental.contracts.yml b/examples/incremental_demo/models/common/fct_events_py_incremental.contracts.yml new file mode 100644 index 0000000..d047005 --- /dev/null +++ b/examples/incremental_demo/models/common/fct_events_py_incremental.contracts.yml @@ -0,0 +1,47 @@ +table: fct_events_py_incremental + +columns: + event_id: + type: integer + nullable: false + unique: true + physical: + duckdb: BIGINT + postgres: bigint + bigquery: INT64 + databricks_spark: BIGINT + snowflake_snowpark: NUMBER + default: BIGINT + + updated_at: + type: timestamp + nullable: false + physical: + duckdb: TIMESTAMP + postgres: timestamp + bigquery: TIMESTAMP + databricks_spark: TIMESTAMP + snowflake_snowpark: TIMESTAMP_NTZ + default: TIMESTAMP + + value: + type: integer + nullable: false + physical: + duckdb: BIGINT + postgres: bigint + bigquery: INT64 + databricks_spark: BIGINT + snowflake_snowpark: NUMBER + default: BIGINT + + value_x10: + type: integer + nullable: false + physical: + duckdb: BIGINT + postgres: bigint + bigquery: INT64 + databricks_spark: BIGINT + snowflake_snowpark: NUMBER + default: BIGINT diff --git a/examples/incremental_demo/models/common/fct_events_sql_inline.contracts.yml b/examples/incremental_demo/models/common/fct_events_sql_inline.contracts.yml new file mode 100644 index 0000000..51791d4 --- /dev/null +++ b/examples/incremental_demo/models/common/fct_events_sql_inline.contracts.yml @@ -0,0 +1,36 @@ +table: fct_events_sql_inline + +columns: + event_id: + type: integer + nullable: false + unique: true + physical: + duckdb: BIGINT + postgres: bigint + bigquery: INT64 + databricks_spark: BIGINT + snowflake_snowpark: NUMBER + default: BIGINT + + updated_at: + type: timestamp + nullable: false + physical: + duckdb: TIMESTAMP + postgres: timestamp + bigquery: TIMESTAMP + databricks_spark: TIMESTAMP + snowflake_snowpark: TIMESTAMP_NTZ + default: TIMESTAMP + + value: + type: integer + nullable: false + physical: + duckdb: BIGINT + postgres: bigint + bigquery: INT64 + databricks_spark: BIGINT + snowflake_snowpark: NUMBER + default: BIGINT diff --git a/examples/incremental_demo/models/common/fct_events_sql_yaml.contracts.yml b/examples/incremental_demo/models/common/fct_events_sql_yaml.contracts.yml new file mode 100644 index 0000000..8ca6245 --- /dev/null +++ b/examples/incremental_demo/models/common/fct_events_sql_yaml.contracts.yml @@ -0,0 +1,36 @@ +table: fct_events_sql_yaml + +columns: + event_id: + type: integer + nullable: false + unique: true + physical: + duckdb: BIGINT + postgres: bigint + bigquery: INT64 + databricks_spark: BIGINT + snowflake_snowpark: NUMBER + default: BIGINT + + updated_at: + type: timestamp + nullable: false + physical: + duckdb: TIMESTAMP + postgres: timestamp + bigquery: TIMESTAMP + databricks_spark: TIMESTAMP + snowflake_snowpark: TIMESTAMP_NTZ + default: TIMESTAMP + + value: + type: integer + nullable: false + physical: + duckdb: BIGINT + postgres: bigint + bigquery: INT64 + databricks_spark: BIGINT + snowflake_snowpark: NUMBER + default: BIGINT diff --git a/examples/incremental_demo/project.yml b/examples/incremental_demo/project.yml index e29c791..dba5f44 100644 --- a/examples/incremental_demo/project.yml +++ b/examples/incremental_demo/project.yml @@ -8,9 +8,6 @@ models: fct_events_sql_inline.ff: unique_key: "event_id" - fct_events_sql_yaml.ff: - unique_key: "event_id" - fct_events_sql_yaml.ff: unique_key: "event_id" # top-level shortcut incremental: @@ -20,6 +17,12 @@ models: fct_events_sql_inline_delta.ff: unique_key: "event_id" + fct_events_py_incremental: + unique_key: "event_id" + incremental: + enabled: true + updated_at_column: "updated_at" + seeds: {} tests: diff --git a/examples/incremental_demo/seeds/seed_events.csv b/examples/incremental_demo/seeds/seed_events_v1.csv similarity index 100% rename from examples/incremental_demo/seeds/seed_events.csv rename to examples/incremental_demo/seeds/seed_events_v1.csv diff --git a/examples/incremental_demo/seeds/seed_events_v2.csv b/examples/incremental_demo/seeds/seed_events_v2.csv new file mode 100644 index 0000000..d670163 --- /dev/null +++ b/examples/incremental_demo/seeds/seed_events_v2.csv @@ -0,0 +1,5 @@ +event_id,updated_at,value +1,2024-01-01T00:00:00,10 +2,2024-01-05T00:00:00,999 +3,2024-01-03T00:00:00,30 +4,2024-01-06T00:00:00,40 diff --git a/examples/incremental_demo/sources.yml b/examples/incremental_demo/sources.yml index 6715df2..014c7b6 100644 --- a/examples/incremental_demo/sources.yml +++ b/examples/incremental_demo/sources.yml @@ -1,4 +1,4 @@ -version: 2 +version: 1 sources: - name: raw diff --git a/examples/macros_demo/sources.yml b/examples/macros_demo/sources.yml index d451751..1d0f8af 100644 --- a/examples/macros_demo/sources.yml +++ b/examples/macros_demo/sources.yml @@ -1,4 +1,4 @@ -version: 2 +version: 1 sources: - name: crm diff --git a/examples/materializations_demo/sources.yml b/examples/materializations_demo/sources.yml index 1fa3552..44dbb1d 100644 --- a/examples/materializations_demo/sources.yml +++ b/examples/materializations_demo/sources.yml @@ -1,4 +1,4 @@ -version: 2 +version: 1 sources: - name: demo diff --git a/examples/packages_demo/main_project/sources.yml b/examples/packages_demo/main_project/sources.yml index 7866e1e..61d3cca 100644 --- a/examples/packages_demo/main_project/sources.yml +++ b/examples/packages_demo/main_project/sources.yml @@ -1,4 +1,4 @@ -version: 2 +version: 1 sources: - name: crm diff --git a/examples/snapshot_demo/sources.yml b/examples/snapshot_demo/sources.yml index d48deca..2b8db8a 100644 --- a/examples/snapshot_demo/sources.yml +++ b/examples/snapshot_demo/sources.yml @@ -1,4 +1,4 @@ -version: 2 +version: 1 sources: - name: crm diff --git a/examples_article/building_locally_demo/sources.yml b/examples_article/building_locally_demo/sources.yml index 7ef2d9f..1a7cfe3 100644 --- a/examples_article/building_locally_demo/sources.yml +++ b/examples_article/building_locally_demo/sources.yml @@ -1,4 +1,4 @@ -version: 2 +version: 1 sources: - name: raw diff --git a/examples_article/http_cache_demo/sources.yml b/examples_article/http_cache_demo/sources.yml index 83436dc..fa207c5 100644 --- a/examples_article/http_cache_demo/sources.yml +++ b/examples_article/http_cache_demo/sources.yml @@ -1,5 +1,5 @@ # Source declarations describe external tables. See docs/Sources.md for details. -version: 2 +version: 1 # sources: # Example: # - name: raw diff --git a/mkdocs.yml b/mkdocs.yml index 8cb8f0d..030f440 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -24,6 +24,7 @@ nav: - Technical Overview: Technical_Overview.md - API & Models: Api_Models.md - Configuration & Macros: Config_and_Macros.md + - Contracts: Contracts.md - Cache & Parallelism: Cache_and_Parallelism.md - Incremental Processing: Incremental.md - Profiles & Environments: Profiles.md @@ -66,7 +67,7 @@ markdown_extensions: permalink: true - pymdownx.superfences - pymdownx.details - - pymdownx.highlight # <— ersetzt codehilite + - pymdownx.highlight - pymdownx.inlinehilite - pymdownx.snippets: base_path: diff --git a/pyproject.toml b/pyproject.toml index cdf6cbf..74cfd22 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "fastflowtransform" -version = "0.6.11" +version = "0.6.13" description = "Python framework for SQL & Python data transformation, ETL pipelines, and dbt-style data modeling" readme = "README.md" license = { text = "Apache-2.0" } diff --git a/src/fastflowtransform/cli/__init__.py b/src/fastflowtransform/cli/__init__.py index 7f22b7f..88fff42 100644 --- a/src/fastflowtransform/cli/__init__.py +++ b/src/fastflowtransform/cli/__init__.py @@ -8,7 +8,6 @@ from fastflowtransform.cli.bootstrap import ( CLIContext, _die, - _get_test_con, _load_project_and_env, _make_executor, _parse_cli_vars, @@ -170,7 +169,6 @@ def main( "_build_predicates", "_compile_selector", "_die", - "_get_test_con", "_infer_sql_ref_aliases", "_load_project_and_env", "_make_executor", diff --git a/src/fastflowtransform/cli/bootstrap.py b/src/fastflowtransform/cli/bootstrap.py index e7ee61d..d0e276f 100644 --- a/src/fastflowtransform/cli/bootstrap.py +++ b/src/fastflowtransform/cli/bootstrap.py @@ -4,6 +4,7 @@ import importlib import os from collections.abc import Callable +from contextlib import suppress from dataclasses import dataclass from pathlib import Path from typing import Any, NoReturn, cast @@ -14,11 +15,11 @@ from jinja2 import Environment from fastflowtransform.config.budgets import BudgetsConfig, load_budgets_config +from fastflowtransform.contracts.core import _load_project_contracts, load_contracts from fastflowtransform.core import REGISTRY from fastflowtransform.errors import DependencyNotFoundError -from fastflowtransform.executors._shims import BigQueryConnShim, SAConnShim from fastflowtransform.executors.base import BaseExecutor -from fastflowtransform.logging import echo +from fastflowtransform.logging import echo, warn from fastflowtransform.settings import ( EngineType, EnvSettings, @@ -36,7 +37,7 @@ class CLIContext: profile: Profile budgets_cfg: BudgetsConfig | None = None - def make_executor(self) -> tuple[Any, Callable, Callable]: + def make_executor(self) -> tuple[BaseExecutor, Callable, Callable]: executor, run_sql, run_py = _make_executor(self.profile, self.jinja_env) self._configure_budget_limit(executor) return executor, run_sql, run_py @@ -151,6 +152,29 @@ def _merge(p: Path) -> None: os.environ.setdefault(key, value) +def configure_executor_contracts(project_dir: Path, executor: BaseExecutor | None) -> None: + """ + Load contracts from project_dir and attach them to the executor (if supported). + + Mirrors the behaviour in `fft run`: parse per-table contracts and the + project-level contracts.yml; on parse errors, log a warning and continue + without contracts. + """ + if executor is None or not hasattr(executor, "configure_contracts"): + return + + try: + contracts_by_table = load_contracts(project_dir) + project_contracts = _load_project_contracts(project_dir) + except Exception as exc: + warn(f"[contracts] Failed to load contracts from {project_dir}: {exc}") + contracts_by_table = {} + project_contracts = None + + with suppress(Exception): + executor.configure_contracts(contracts_by_table, project_contracts) + + def _resolve_profile( env_name: str, engine: EngineType | None, proj: Path ) -> tuple[EnvSettings, Profile]: @@ -316,30 +340,7 @@ def _parse_cli_vars(pairs: list[str]) -> dict[str, object]: return out -def _get_test_con(executor: Any) -> Any: - """ - Return a connection with .execute(...) that understands sequences and (sql, params). - Reuse shims on the executor or build an appropriate one when needed. - """ - if hasattr(executor, "engine"): - try: - return SAConnShim(executor.engine, schema=getattr(executor, "schema", None)) - except Exception: - pass - if hasattr(executor, "client") and hasattr(executor, "dataset"): - try: - return BigQueryConnShim(executor.client, executor.dataset, executor.location) - except Exception: - try: - return BigQueryConnShim(executor.client, getattr(executor, "location", None)) - except Exception: - pass - if hasattr(executor, "con") and hasattr(executor.con, "execute"): - return executor.con - return executor - - -def _make_executor(prof: Profile, jenv: Environment) -> tuple[Any, Callable, Callable]: +def _make_executor(prof: Profile, jenv: Environment) -> tuple[BaseExecutor, Callable, Callable]: ex: BaseExecutor if prof.engine == "duckdb": DuckExecutor = _import_optional( @@ -366,6 +367,16 @@ def _make_executor(prof: Profile, jenv: Environment) -> tuple[Any, Callable, Cal if prof.bigquery.dataset is None: raise RuntimeError("BigQuery dataset must be set") + # Validate env-provided frame selector early (used by examples/Makefiles) + frame_env = os.getenv("FF_ENGINE_VARIANT") or os.getenv("BQ_FRAME") + if frame_env: + frame_normalized = frame_env.lower() + if frame_normalized not in {"pandas", "bigframes"}: + raise RuntimeError( + f"Unsupported BigQuery frame '{frame_env}'. " + "Set FF_ENGINE_VARIANT/BQ_FRAME to 'pandas' or 'bigframes'." + ) + if prof.bigquery.use_bigframes: BigQueryBFExecutor = _import_optional( "fastflowtransform.executors.bigquery.bigframes", diff --git a/src/fastflowtransform/cli/init_cmd.py b/src/fastflowtransform/cli/init_cmd.py index e8e3c6d..1cd50f2 100644 --- a/src/fastflowtransform/cli/init_cmd.py +++ b/src/fastflowtransform/cli/init_cmd.py @@ -143,7 +143,7 @@ def _build_sources_yaml() -> str: return "\n".join( [ "# Source declarations describe external tables. See docs/Sources.md for details.", - "version: 2", + "version: 1", "# sources:", " # Example:", " # - name: raw", diff --git a/src/fastflowtransform/cli/run.py b/src/fastflowtransform/cli/run.py index d573c3b..120ff07 100644 --- a/src/fastflowtransform/cli/run.py +++ b/src/fastflowtransform/cli/run.py @@ -27,7 +27,11 @@ compute_affected_models, get_changed_models, ) -from fastflowtransform.cli.bootstrap import CLIContext, _prepare_context +from fastflowtransform.cli.bootstrap import ( + CLIContext, + _prepare_context, + configure_executor_contracts, +) from fastflowtransform.cli.options import ( CacheMode, CacheOpt, @@ -1810,6 +1814,17 @@ def run( engine_.invocation_id = uuid4().hex engine_.run_started_at = datetime.now(UTC).isoformat(timespec="seconds") + # ---------- Runtime contracts: load + configure executor ---------- + project_dir = Path(ctx.project) + + # engine_.shared is (executor, run_sql_fn, run_py_fn) + try: + executor, _, _ = engine_.shared + except Exception: + executor = None + + configure_executor_contracts(project_dir, executor) + bind_context( engine=ctx.profile.engine, env=env_name, diff --git a/src/fastflowtransform/cli/source_cmd.py b/src/fastflowtransform/cli/source_cmd.py index c626cff..4ac3080 100644 --- a/src/fastflowtransform/cli/source_cmd.py +++ b/src/fastflowtransform/cli/source_cmd.py @@ -3,7 +3,7 @@ import typer -from fastflowtransform.cli.bootstrap import _get_test_con, _prepare_context +from fastflowtransform.cli.bootstrap import _prepare_context from fastflowtransform.cli.options import EngineOpt, EnvOpt, ProjectArg, VarsOpt from fastflowtransform.logging import bind_context, clear_context, echo from fastflowtransform.source_freshness import SourceFreshnessResult, run_source_freshness @@ -31,12 +31,10 @@ def freshness( # Get a live connection / executor from the context execu, _run_sql, _run_py = ctx.make_executor() - con = _get_test_con(execu) # Run freshness checks over all sources with a configured freshness block results: list[SourceFreshnessResult] = run_source_freshness( execu, - con=con, engine=ctx.profile.engine, ) diff --git a/src/fastflowtransform/cli/test_cmd.py b/src/fastflowtransform/cli/test_cmd.py index 3261d22..de48573 100644 --- a/src/fastflowtransform/cli/test_cmd.py +++ b/src/fastflowtransform/cli/test_cmd.py @@ -1,7 +1,6 @@ # fastflowtransform/cli/test_cmd.py from __future__ import annotations -import os import re import time from collections.abc import Callable, Iterable, Mapping @@ -11,7 +10,7 @@ import typer -from fastflowtransform.cli.bootstrap import _get_test_con, _prepare_context +from fastflowtransform.cli.bootstrap import _prepare_context, configure_executor_contracts from fastflowtransform.cli.options import ( EngineOpt, EnvOpt, @@ -25,9 +24,11 @@ BaseProjectTestConfig, parse_project_yaml_config, ) +from fastflowtransform.contracts.core import load_contract_tests from fastflowtransform.core import REGISTRY from fastflowtransform.dag import topo_sort from fastflowtransform.errors import ModelExecutionError +from fastflowtransform.executors.base import BaseExecutor from fastflowtransform.logging import echo from fastflowtransform.schema_loader import Severity, TestSpec, load_schema_tests from fastflowtransform.testing.discovery import ( @@ -111,11 +112,6 @@ def _execute_models( on_error(name, node, exc) -def _maybe_print_marker(con: Any) -> None: - if os.getenv("FFT_SQL_DEBUG") == "1": - echo(getattr(con, "marker", "NO_SHIM")) - - def _run_models( pred: Callable[[Any], bool], run_sql: Callable[[Any], Any], @@ -342,7 +338,7 @@ def _prepare_test( return _prepare_test_from_mapping(raw_test, executor) -def _run_dq_tests(con: Any, tests: Iterable[Any], executor: Any) -> list[DQResult]: +def _run_dq_tests(executor: BaseExecutor, tests: Iterable[Any]) -> list[DQResult]: results: list[DQResult] = [] for raw_test in tests: @@ -381,7 +377,7 @@ def _run_dq_tests(con: Any, tests: Iterable[Any], executor: Any) -> list[DQResul ) continue - ok, msg, example = runner(con, table_for_exec, col, params) + ok, msg, example = runner(executor, table_for_exec, col, params) ms = int((time.perf_counter() - t0) * 1000) param_str = _format_params_for_summary(kind, params) @@ -468,16 +464,14 @@ def test( engine: EngineOpt = None, vars: VarsOpt = None, select: SelectOpt = None, - skip_build: SkipBuildOpt = False, + skip_build: SkipBuildOpt = True, ) -> None: ctx = _prepare_context(project, env_name, engine, vars) tokens, pred = _compile_selector(select) has_model_matches = any(pred(node) for node in REGISTRY.nodes.values()) legacy_tag_only = _is_legacy_test_token(tokens) and not has_model_matches execu, run_sql, run_py = ctx.make_executor() - - con = _get_test_con(execu) - _maybe_print_marker(con) + configure_executor_contracts(ctx.project, execu) model_pred = (lambda _n: True) if legacy_tag_only else pred # Run models; if a model fails, show friendly error then exit(1). @@ -492,13 +486,15 @@ def test( tests: list[Any] = _load_tests(ctx.project) # 2) schema YAML tests tests.extend(load_schema_tests(ctx.project)) + # 2b) contracts tests (contracts/*.contracts.yml) + tests.extend(load_contract_tests(ctx.project)) # 3) optional legacy tagfilter (e.g., "batch") tests = _apply_legacy_tag_filter(tests, tokens, legacy_token=legacy_tag_only) if not tests: typer.secho("No tests configured.", fg="bright_black") raise typer.Exit(code=0) - results = _run_dq_tests(con, tests, execu) + results = _run_dq_tests(execu, tests) _print_summary(results) # Exit code: count only ERROR fails diff --git a/src/fastflowtransform/config/contracts.py b/src/fastflowtransform/config/contracts.py new file mode 100644 index 0000000..e6084e8 --- /dev/null +++ b/src/fastflowtransform/config/contracts.py @@ -0,0 +1,420 @@ +# fastflowtransform/config/contracts.py +from __future__ import annotations + +from pathlib import Path +from typing import Any, Literal + +import yaml +from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator + +from fastflowtransform.config.loaders import NoDupLoader +from fastflowtransform.errors import ContractsConfigError + +SchemaEnforcementMode = Literal["off", "verify", "cast"] + + +class PhysicalTypeConfig(BaseModel): + """ + Engine-specific physical type configuration for a column. + + All fields are optional; you can set: + - default: applies to all engines if no engine-specific override is set + - duckdb, postgres, bigquery, snowflake_snowpark, databricks_spark: + engine-specific physical types (e.g. "integer", "NUMERIC", "TIMESTAMP") + + Example YAML: + physical: "integer" + + physical: + default: numeric + postgres: numeric + bigquery: NUMERIC + """ + + model_config = ConfigDict(extra="forbid") + + default: str | None = None + duckdb: str | None = None + postgres: str | None = None + bigquery: str | None = None + snowflake_snowpark: str | None = None + databricks_spark: str | None = None + + +class ColumnContractModel(BaseModel): + """ + Column-level contract definition. + + Example YAML fragment: + + columns: + id: + type: integer + nullable: false + status: + type: string + enum: ["active", "inactive"] + amount: + type: double + nullable: false + min: 0 + max: 10000 + email: + type: string + regex: "^[^@]+@[^@]+$" + """ + + model_config = ConfigDict(extra="forbid") + + # Optional semantic / physical type hint ("integer", "string", "timestamp", ...) + type: str | None = None + + # Engine-specific physical DB types; see PhysicalTypeConfig. + physical: PhysicalTypeConfig | None = None + + # Nullability: nullable=False → not_null check + nullable: bool | None = None + + # Uniqueness: unique=True → unique test + unique: bool | None = None + + # Enumerated allowed values (accepted_values test) + enum: list[Any] | None = None + + # Regex constraint; currently used via a generic regex_match test + regex: str | None = None + + # Numeric range (inclusive) for numeric-like columns + min: float | int | None = None + max: float | int | None = None + + # Optional free-form description (handy for docs later) + description: str | None = None + + @field_validator("enum", mode="before") + @classmethod + def _normalize_enum(cls, v: Any) -> list[Any] | None: + """ + Allow: + enum: "A" -> ["A"] + enum: [1, 2, 3] -> [1, 2, 3] + """ + if v is None: + return None + if isinstance(v, (list, tuple)): + return list(v) + return [v] + + @field_validator("physical", mode="before") + @classmethod + def _coerce_physical(cls, v: Any) -> Any: + """ + Accept either: + physical: "integer" + physical: + default: numeric + postgres: numeric + bigquery: NUMERIC + and normalize to a PhysicalTypeConfig-compatible dict. + """ + if v is None: + return None + if isinstance(v, PhysicalTypeConfig): + return v + if isinstance(v, str): + # Shorthand: same type for all engines → default + return {"default": v} + if isinstance(v, dict): + # Let Pydantic validate keys; we just pass through. + return v + raise TypeError( + "physical must be either a string or a mapping of engine keys to types " + "(e.g. {default: numeric, postgres: numeric})" + ) + + +class TableSchemaEnforcementModel(BaseModel): + """ + Per-table runtime schema enforcement configuration. + + Example in *.contracts.yml: + + enforce_schema: + mode: cast # off | verify | cast + allow_extra_columns: false + """ + + model_config = ConfigDict(extra="forbid") + + mode: SchemaEnforcementMode = "off" + allow_extra_columns: bool = True + + @field_validator("mode", mode="before") + @classmethod + def _coerce_mode(cls, v: Any) -> Any: + # Allow bare `off` from YAML → False + if v is False: + return "off" + if isinstance(v, str): + return v.strip().lower() + return v + + +class ContractsFileModel(BaseModel): + """ + One contracts file. + + Convention: + - One file describes contracts for exactly one table/relation. + - The table name is what will be used in DQ tests (SELECT ... FROM
). + + Example `*.contracts.yml`: + + version: 1 + table: users_enriched + columns: + id: + type: integer + nullable: false + status: + type: string + enum: ["active", "inactive"] + email: + type: string + nullable: false + regex: "^[^@]+@[^@]+$" + """ + + model_config = ConfigDict(extra="forbid") + + version: int = 1 + table: str = Field(..., description="Logical/physical table name the contract applies to") + columns: dict[str, ColumnContractModel] = Field(default_factory=dict) + + enforce_schema: TableSchemaEnforcementModel | None = Field( + default=None, + description="Optional runtime schema enforcement config for this table", + ) + + +# --------------------------------------------------------------------------- +# Project-level contracts (contracts.yml at project root) +# --------------------------------------------------------------------------- + + +class ColumnMatchModel(BaseModel): + """ + Column match expression for project-level defaults. + + Currently supports: + - name: regex on column name (required) + - table: optional regex on table name (future-proof; optional) + """ + + model_config = ConfigDict(extra="forbid") + + name: str = Field(..., description="Regex to match column name") + table: str | None = Field( + default=None, description="Optional regex to restrict to specific tables" + ) + + @model_validator(mode="after") + def _strip(self) -> ColumnMatchModel: + object.__setattr__(self, "name", self.name.strip()) + if self.table is not None: + object.__setattr__(self, "table", self.table.strip()) + return self + + +class ColumnDefaultsRuleModel(BaseModel): + """ + One rule under defaults.columns in contracts.yml. + + Example: + + defaults: + columns: + - match: + name: ".*_id$" + type: integer + nullable: false + - match: + name: "created_at" + type: timestamp + nullable: false + """ + + model_config = ConfigDict(extra="forbid") + + match: ColumnMatchModel + # Payload is the same shape as ColumnContractModel but optional: + type: str | None = None + physical: PhysicalTypeConfig | None = None + nullable: bool | None = None + unique: bool | None = None + enum: list[Any] | None = None + regex: str | None = None + min: float | None = None + max: float | None = None + description: str | None = None + + @field_validator("enum", mode="before") + @classmethod + def _normalize_enum(cls, v: Any) -> list[Any] | None: + if v is None: + return None + if isinstance(v, (list, tuple)): + return list(v) + return [v] + + @field_validator("physical", mode="before") + @classmethod + def _coerce_physical(cls, v: Any) -> Any: + if v is None: + return None + if isinstance(v, PhysicalTypeConfig): + return v + if isinstance(v, str): + return {"default": v} + if isinstance(v, dict): + return v + raise TypeError( + "defaults.columns[*].physical must be either a string or a mapping of engine " + "keys to types (e.g. {default: numeric, postgres: numeric})" + ) + + +class ContractsDefaultsModel(BaseModel): + """ + Root defaults block for project-level contracts.yml. + + Example: + + version: 1 + + defaults: + models: + - match: + name: "staging.*" + materialized: table + + columns: + - match: + name: ".*_id$" + type: integer + nullable: false + - match: + name: "created_at" + type: timestamp + nullable: false + """ + + model_config = ConfigDict(extra="forbid") + + # Future global defaults (e.g. a default severity for contract tests) could live here. + columns: list[ColumnDefaultsRuleModel] = Field(default_factory=list) + + +class TableSchemaEnforcementOverrideModel(BaseModel): + """ + Per-table override in project-level contracts.yml + + Example: + + enforcement: + tables: + customers: + mode: cast + allow_extra_columns: false + """ + + model_config = ConfigDict(extra="forbid") + + mode: SchemaEnforcementMode | None = None + allow_extra_columns: bool | None = None + + +class ProjectSchemaEnforcementModel(BaseModel): + """ + Project-level schema enforcement defaults (contracts.yml). + + Example: + + version: 1 + + enforcement: + default_mode: verify # off | verify | cast + allow_extra_columns: true + tables: + customers: + mode: cast + allow_extra_columns: false + """ + + model_config = ConfigDict(extra="forbid") + + default_mode: SchemaEnforcementMode = "off" + allow_extra_columns: bool = True + tables: dict[str, TableSchemaEnforcementOverrideModel] = Field(default_factory=dict) + + @field_validator("default_mode", mode="before") + @classmethod + def _coerce_default_mode(cls, v: Any) -> Any: + if v is False: + return "off" + # Same comment as above if you ever want to accept `true`. + if isinstance(v, str): + return v.strip().lower() + return v + + +class ProjectContractsModel(BaseModel): + """ + Top-level model for project-level contracts.yml. + + Only defines defaults, no table-specific contracts (those live in + per-table *.contracts.yml files). + """ + + model_config = ConfigDict(extra="forbid") + + version: int = 1 + defaults: ContractsDefaultsModel = Field(default_factory=ContractsDefaultsModel) + + enforcement: ProjectSchemaEnforcementModel | None = Field( + default=None, + description="Runtime schema enforcement defaults and per-table overrides", + ) + + +# ---- Parsers ----------------------------------------------------------------- + + +def parse_contracts_file(path: Path) -> ContractsFileModel: + """ + Load and validate a single *.contracts.yml file. + Raises a Pydantic validation error or yaml.YAMLError on malformed input. + """ + try: + raw = yaml.load(path.read_text(encoding="utf-8"), Loader=NoDupLoader) or {} + return ContractsFileModel.model_validate(raw) + except Exception as exc: + hint = "Check the contracts YAML for duplicate keys or invalid structure." + raise ContractsConfigError( + f"Failed to parse contracts file: {exc}", path=str(path), hint=hint + ) from exc + + +def parse_project_contracts_file(path: Path) -> ProjectContractsModel: + """ + Load and validate the project-level contracts.yml file. + Returns ProjectContractsModel, raising on malformed input. + """ + try: + raw = yaml.load(path.read_text(encoding="utf-8"), Loader=NoDupLoader) or {} + return ProjectContractsModel.model_validate(raw) + except Exception as exc: + hint = "Check the project-level contracts.yml for duplicate keys or invalid structure." + raise ContractsConfigError( + f"Failed to parse project contracts file: {exc}", path=str(path), hint=hint + ) from exc diff --git a/src/fastflowtransform/config/loaders.py b/src/fastflowtransform/config/loaders.py new file mode 100644 index 0000000..7032b88 --- /dev/null +++ b/src/fastflowtransform/config/loaders.py @@ -0,0 +1,19 @@ +import yaml +from yaml.loader import SafeLoader + + +class NoDupLoader(SafeLoader): + pass + + +def _construct_mapping(loader, node, deep=False): + mapping = {} + for key_node, value_node in node.value: + key = loader.construct_object(key_node, deep=deep) + if key in mapping: + raise ValueError(f"Duplicate key {key!r} in {node.start_mark}") + mapping[key] = loader.construct_object(value_node, deep=deep) + return mapping + + +NoDupLoader.add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, _construct_mapping) diff --git a/src/fastflowtransform/config/sources.py b/src/fastflowtransform/config/sources.py index dce68da..ee02936 100644 --- a/src/fastflowtransform/config/sources.py +++ b/src/fastflowtransform/config/sources.py @@ -305,7 +305,7 @@ class SourcesFileConfig(BaseModel): model_config = ConfigDict(extra="forbid") - version: Literal[2] + version: Literal[1] sources: list[SourceGroupConfig] = Field(default_factory=list) diff --git a/src/fastflowtransform/contracts/__init__.py b/src/fastflowtransform/contracts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fastflowtransform/contracts/core.py b/src/fastflowtransform/contracts/core.py new file mode 100644 index 0000000..088dad8 --- /dev/null +++ b/src/fastflowtransform/contracts/core.py @@ -0,0 +1,300 @@ +# fastflowtransform/contracts/core.py +from __future__ import annotations + +import re +from pathlib import Path +from typing import Any + +from fastflowtransform.config.contracts import ( + ColumnContractModel, + ContractsDefaultsModel, + ContractsFileModel, + ProjectContractsModel, + parse_contracts_file, + parse_project_contracts_file, +) +from fastflowtransform.logging import get_logger +from fastflowtransform.schema_loader import Severity, TestSpec + +logger = get_logger("contracts") + + +# --------------------------------------------------------------------------- +# Discovery helpers +# --------------------------------------------------------------------------- + + +def _discover_contract_paths(project_dir: Path) -> list[Path]: + """ + Discover *.contracts.yml files under models/. + + Convention: + - You can place contracts anywhere under models/, as long as the file + name ends with ".contracts.yml". + - Each file describes contracts for one logical table (ContractsFileModel.table). + """ + models_dir = project_dir / "models" + if not models_dir.exists(): + return [] + + paths: list[Path] = [] + for p in models_dir.rglob("*.contracts.yml"): + if p.is_file(): + paths.append(p) + return sorted(paths) + + +def load_contracts(project_dir: Path) -> dict[str, ContractsFileModel]: + """ + Load all per-table contracts from *.contracts.yml under models/. + + Returns: + dict[table_name, ContractsFileModel] + If multiple files define the same `table`, the last one wins (with a warning). + """ + contracts: dict[str, ContractsFileModel] = {} + for path in _discover_contract_paths(project_dir): + cfg = parse_contracts_file(path) + + table = cfg.table + if table in contracts: + logger.warning( + "Multiple contracts for table %r: overriding previous definition with %s", + table, + path, + ) + contracts[table] = cfg + + return contracts + + +def _load_project_contracts(project_dir: Path) -> ProjectContractsModel | None: + """ + Load project-level contracts.yml (if present). + + The file is optional; if it does not exist, None is returned. + """ + path = project_dir / "contracts.yml" + if not path.exists(): + return None + + cfg = parse_project_contracts_file(path) + + return cfg + + +# --------------------------------------------------------------------------- +# Column defaults application +# --------------------------------------------------------------------------- + + +def _apply_column_defaults( + col_name: str, + table: str, + col: ColumnContractModel, + defaults: ContractsDefaultsModel | None, +) -> ColumnContractModel: + """ + Merge project-level column defaults into a column contract. + + Rules: + - We only consider defaults.columns rules where the regex on + name matches `col_name` *and* optional table regex matches `table`. + - Rules are applied in file order; later rules override earlier ones. + - Per-table contracts take precedence: we only fill attributes that are + still None on the ColumnContractModel. + """ + if defaults is None or not defaults.columns: + return col + + # Start from the explicit per-table column config (already validated) + data: dict[str, Any] = col.model_dump() + + for rule in defaults.columns: + m = rule.match + # name regex is required + if not re.search(m.name, col_name): + continue + # optional table regex + if m.table and not re.search(m.table, table): + continue + + # For each field, only apply if current value is None and rule defines a value + if data.get("type") is None and rule.type is not None: + data["type"] = rule.type + if data.get("physical") is None and rule.physical is not None: + data["physical"] = rule.physical + if data.get("nullable") is None and rule.nullable is not None: + data["nullable"] = rule.nullable + if data.get("unique") is None and rule.unique is not None: + data["unique"] = rule.unique + if data.get("enum") is None and rule.enum is not None: + data["enum"] = list(rule.enum) + if data.get("regex") is None and rule.regex is not None: + data["regex"] = rule.regex + if data.get("min") is None and rule.min is not None: + data["min"] = rule.min + if data.get("max") is None and rule.max is not None: + data["max"] = rule.max + if data.get("description") is None and rule.description is not None: + data["description"] = rule.description + + # Re-validate into a ColumnContractModel (cheap; keeps invariants) + return ColumnContractModel.model_validate(data) + + +# --------------------------------------------------------------------------- +# DQ test expansion from contracts +# --------------------------------------------------------------------------- + + +def _contract_tests_for_table( + table: str, + contract: ContractsFileModel, + *, + defaults: ContractsDefaultsModel | None, + default_severity: Severity = "error", +) -> list[TestSpec]: + """ + Convert column contracts for a single table into TestSpec instances, taking + project-level column defaults into account. + """ + specs: list[TestSpec] = [] + + # Base tags shared by all contract-derived tests. You can always add more + # tags at the project level later if needed. + base_tags: list[str] = ["contract"] + + for col_name, col in contract.columns.items(): + effective_col = _apply_column_defaults(col_name, table, col, defaults) + + # 0) Physical type assertion → column_physical_type test + if effective_col.physical is not None: + specs.append( + TestSpec( + type="column_physical_type", + table=table, + column=col_name, + params={"physical": effective_col.physical}, + severity=default_severity, + tags=list(base_tags), + ) + ) + + # 1) Nullability: nullable=False → not_null test + if effective_col.nullable is False: + specs.append( + TestSpec( + type="not_null", + table=table, + column=col_name, + params={}, # no extra params + severity=default_severity, + tags=list(base_tags), + ) + ) + + # 1b) Uniqueness → unique test + if effective_col.unique: + specs.append( + TestSpec( + type="unique", + table=table, + column=col_name, + params={}, + severity=default_severity, + tags=list(base_tags), + ) + ) + + # 2) Enumerated values → accepted_values test (if any values declared) + if effective_col.enum: + specs.append( + TestSpec( + type="accepted_values", + table=table, + column=col_name, + params={"values": list(effective_col.enum)}, + severity=default_severity, + tags=list(base_tags), + ) + ) + + # 3) Numeric range (inclusive) → between test + if effective_col.min is not None or effective_col.max is not None: + params: dict[str, Any] = {} + if effective_col.min is not None: + params["min"] = effective_col.min + if effective_col.max is not None: + params["max"] = effective_col.max + + specs.append( + TestSpec( + type="between", + table=table, + column=col_name, + params=params, + severity=default_severity, + tags=list(base_tags), + ) + ) + + # 4) Regex constraint → regex_match test (Python-side evaluation) + if effective_col.regex: + specs.append( + TestSpec( + type="regex_match", + table=table, + column=col_name, + params={"pattern": effective_col.regex}, + severity=default_severity, + tags=list(base_tags), + ) + ) + + return specs + + +def build_contract_tests( + contracts: dict[str, ContractsFileModel], + *, + defaults: ContractsDefaultsModel | None = None, + default_severity: Severity = "error", +) -> list[TestSpec]: + """ + Convert a set of ContractsFileModel objects into a flat list of TestSpec. + + `defaults` is the (optional) project-level defaults section from contracts.yml. + """ + if not contracts: + return [] + + all_specs: list[TestSpec] = [] + for table, cfg in contracts.items(): + all_specs.extend( + _contract_tests_for_table( + table, + cfg, + defaults=defaults, + default_severity=default_severity, + ) + ) + return all_specs + + +def load_contract_tests(project_dir: Path) -> list[TestSpec]: + """ + High-level helper used by the CLI: + + project_dir -> [TestSpec, ...] + + This is what we plug into `fft test` so contracts become "first-class" tests. + """ + contracts = load_contracts(project_dir) + if not contracts: + return [] + + project_cfg = _load_project_contracts(project_dir) + defaults = project_cfg.defaults if project_cfg is not None else None + + return build_contract_tests(contracts, defaults=defaults) diff --git a/src/fastflowtransform/contracts/runtime/__init__.py b/src/fastflowtransform/contracts/runtime/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fastflowtransform/contracts/runtime/base.py b/src/fastflowtransform/contracts/runtime/base.py new file mode 100644 index 0000000..a1094dd --- /dev/null +++ b/src/fastflowtransform/contracts/runtime/base.py @@ -0,0 +1,271 @@ +# fastflowtransform/contracts/runtime/base.by +from __future__ import annotations + +import re +from dataclasses import dataclass +from typing import Any, Protocol, TypeVar + +from fastflowtransform.config.contracts import ( + ContractsFileModel, + PhysicalTypeConfig, + ProjectContractsModel, + SchemaEnforcementMode, +) +from fastflowtransform.core import Node + + +class ContractExecutor(Protocol): + """ + Minimal surface that runtime contracts are allowed to use on an executor. + + Every engine that wants runtime contract support should conform to this. + """ + + ENGINE_NAME: str + + def _execute_sql(self, sql: str, *args: Any, **kwargs: Any) -> Any: ... + def introspect_column_physical_type(self, table: str, column: str) -> str | None: ... + def introspect_table_physical_schema(self, table: str) -> dict[str, str]: ... + + +E = TypeVar("E", bound=ContractExecutor) + + +@dataclass +class RuntimeContractConfig: + mode: SchemaEnforcementMode + allow_extra_columns: bool + + +@dataclass +class RuntimeContractContext: + node: Node + relation: str # logical relation name, e.g. "customers" + physical_table: str # engine-specific identifier used in SQL (e.g. qualified) + contract: ContractsFileModel | None + project_contracts: ProjectContractsModel | None + config: RuntimeContractConfig + is_incremental: bool = False # future: incremental support + + +def _resolve_physical_type_for_engine( + cfg: PhysicalTypeConfig | None, + engine_name: str, +) -> str | None: + if cfg is None: + return None + engine = (engine_name or "").lower() + # exact engine key + if hasattr(cfg, engine): + v = getattr(cfg, engine) + if v: + return v + # engine base prefix before underscore; e.g. "snowflake" from "snowflake_snowpark" + if "_" in engine: + base = engine.split("_", 1)[0] + if hasattr(cfg, base): + v = getattr(cfg, base) + if v: + return v + # fallback to default + if cfg.default: + return cfg.default + return None + + +def _canonicalize_physical_type(engine_name: str, typ: str | None) -> str | None: + """ + Apply minimal, engine-specific normalization so expected vs. actual types + compare predictably. Keep this small and focused on real metadata quirks. + """ + if typ is None: + return None + engine = (engine_name or "").lower() + t = typ.strip() + if not t: + return None + + # Snowflake: information_schema reports all string-family types as TEXT with a + # length column; normalize common aliases to VARCHAR and drop the huge default. + if engine.startswith("snowflake"): + upper = t.upper() + if upper in {"TEXT", "STRING", "CHAR", "CHARACTER"}: + return "VARCHAR" + if re.fullmatch(r"VARCHAR\s*\(\s*16777216\s*\)", upper): + return "VARCHAR" + if upper in {"DECIMAL", "NUMERIC"}: + return "NUMBER" + return upper + + # Default: case-insensitive comparison only. + return t.upper() + + +def expected_physical_schema( + *, + executor: ContractExecutor, + contract: ContractsFileModel | None, +) -> dict[str, str]: + """ + Build {column_name: expected_physical_type} for the given executor, + using the per-table ContractsFileModel. + """ + if contract is None: + return {} + + engine = getattr(executor, "ENGINE_NAME", "") or "" + result: dict[str, str] = {} + + for col_name, col_model in (contract.columns or {}).items(): + phys = col_model.physical + typ = _resolve_physical_type_for_engine(phys, engine) + if typ: + canon = _canonicalize_physical_type(engine, typ) + if canon: + result[col_name] = canon + + return result + + +def resolve_runtime_contract_config( + *, + table_name: str, + contract: ContractsFileModel | None, + project_contracts: ProjectContractsModel | None, +) -> RuntimeContractConfig: + # 1) table-level override + if contract and contract.enforce_schema is not None: + cfg = contract.enforce_schema + return RuntimeContractConfig( + mode=cfg.mode, + allow_extra_columns=cfg.allow_extra_columns, + ) + + # 2) project-level enforcement + proj = project_contracts.enforcement if project_contracts is not None else None + if proj is None: + return RuntimeContractConfig(mode="off", allow_extra_columns=True) + + table_override = (proj.tables or {}).get(table_name) + + mode: SchemaEnforcementMode = proj.default_mode + allow_extra = proj.allow_extra_columns + + if table_override is not None: + if table_override.mode is not None: + mode = table_override.mode + if table_override.allow_extra_columns is not None: + allow_extra = table_override.allow_extra_columns + + return RuntimeContractConfig(mode=mode, allow_extra_columns=allow_extra) + + +class BaseRuntimeContracts[E: ContractExecutor]: + """ + Base class for engine-specific runtime contract implementations. + + Executors use this via composition: `self.runtime_contracts = ...`. + """ + + executor: E + + def __init__(self, executor: E): + self.executor = executor + + # ------------------------------------------------------------------ # + # Context builder used by the run-engine # + # ------------------------------------------------------------------ # + + def build_context( + self, + *, + node: Node, + relation: str, + physical_table: str, + contract: ContractsFileModel | None, + project_contracts: ProjectContractsModel | None, + is_incremental: bool = False, + ) -> RuntimeContractContext: + """ + Build a RuntimeContractContext with the correct RuntimeContractConfig. + + The caller (run-engine) decides which contract applies and passes: + - node: the fft Node being built + - relation: logical name (typically node.name) + - physical_table: fully-qualified identifier used in SQL + - contract: per-table ContractsFileModel, or None + - project_contracts: parsed project-level contracts.yml, or None + """ + # Use the contract's declared table name if present, otherwise fall + # back to the logical relation name for project-level overrides. + table_key = contract.table if contract is not None else relation + + cfg = resolve_runtime_contract_config( + table_name=table_key, + contract=contract, + project_contracts=project_contracts, + ) + + return RuntimeContractContext( + node=node, + relation=relation, + physical_table=physical_table, + contract=contract, + project_contracts=project_contracts, + config=cfg, + is_incremental=is_incremental, + ) + + # --- Hooks used by the run-engine ---------------------------- + + def apply_sql_contracts( + self, + *, + ctx: RuntimeContractContext, + select_body: str, + ) -> None: + """ + Entry point for SQL models. + + Engines override this to implement verify/cast mode. The default + implementation just does a plain CTAS (no enforcement). + """ + # Default = "off" / do nothing special: + self.executor._execute_sql(f"create or replace table {ctx.physical_table} as {select_body}") + + def verify_after_materialization(self, *, ctx: RuntimeContractContext) -> None: + """ + Optional second step (e.g. verify mode). + + Called after the model has been materialized. Default is no-op. + """ + return + + def coerce_frame_schema(self, df: Any, ctx: RuntimeContractContext) -> Any: + """ + Optional hook for Python models: given a DataFrame-like object and the + RuntimeContractContext, return a new frame whose column types have been + coerced to match the expected physical schema (where reasonable). + + Default implementation is a no-op. Engine-specific subclasses may + override this (e.g. DuckDB + pandas). + """ + return df + + def materialize_python( + self, + *, + ctx: RuntimeContractContext, + df: Any, + ) -> bool: + """ + Optional hook for Python models. + + Engines override this to take over materialization for Python + models (e.g. to enforce contracts via explicit CASTs). + + Return True if you fully materialized ctx.physical_table yourself. + Return False to let the executor use its normal path + (_materialize_relation / _materialize_incremental). + """ + return False diff --git a/src/fastflowtransform/contracts/runtime/bigquery.py b/src/fastflowtransform/contracts/runtime/bigquery.py new file mode 100644 index 0000000..d0b213c --- /dev/null +++ b/src/fastflowtransform/contracts/runtime/bigquery.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any + +from fastflowtransform.contracts.runtime.base import ( + BaseRuntimeContracts, + ContractExecutor, + RuntimeContractConfig, + RuntimeContractContext, + expected_physical_schema, +) + + +class BigQueryRuntimeContracts(BaseRuntimeContracts): + """ + Runtime schema contracts for BigQuery. + + Notes: + - executor._execute_sql returns a job-like object; we force execution via .result() + when present. + - CAST mode uses BigQuery's `src.* EXCEPT(col1, col2, ...)` to retain extra columns. + """ + + def __init__(self, executor: ContractExecutor): + super().__init__(executor) + + # --- helpers --------------------------------------------------------- + + def _exec(self, sql: str) -> Any: + res = self.executor._execute_sql(sql) + # BigQuery QueryJob / our _TrackedQueryJob: execute via .result() + result_fn = getattr(res, "result", None) + if callable(result_fn): + return result_fn() + # Spark-like fallbacks (harmless here, but keeps this helper generic) + collect_fn = getattr(res, "collect", None) + if callable(collect_fn): + return collect_fn() + return res + + def _verify( + self, + *, + table: str, + expected: Mapping[str, str], + cfg: RuntimeContractConfig, + ) -> None: + if not expected: + return + + actual = self.executor.introspect_table_physical_schema(table) # {lower_name: TYPE} + exp_lower = {k.lower(): v for k, v in expected.items()} + + problems: list[str] = [] + + for col, expected_type in expected.items(): + key = col.lower() + if key not in actual: + problems.append(f"- missing column {col!r}") + continue + got = actual[key] + if str(got).lower() != str(expected_type).lower(): + problems.append(f"- column {col!r}: expected type {expected_type!r}, got {got!r}") + + if not cfg.allow_extra_columns: + extras = [c for c in actual if c not in exp_lower] + if extras: + problems.append(f"- extra columns present: {sorted(extras)}") + + if problems: + raise RuntimeError( + f"[contracts] BigQuery schema enforcement failed for {table}:\n" + + "\n".join(problems) + ) + + def _ctas_raw(self, target: str, select_body: str) -> None: + # BigQuery supports CREATE OR REPLACE TABLE ... AS