diff --git a/examples_article/local_incremental_pipeline/.env.dev_duckdb b/examples_article/local_incremental_pipeline/.env.dev_duckdb new file mode 100644 index 0000000..ed6f363 --- /dev/null +++ b/examples_article/local_incremental_pipeline/.env.dev_duckdb @@ -0,0 +1,2 @@ +FF_DUCKDB_PATH=.local/incremental_demo.duckdb +FF_DUCKDB_SCHEMA=inc_demo_schema \ No newline at end of file diff --git a/examples_article/local_incremental_pipeline/Makefile b/examples_article/local_incremental_pipeline/Makefile new file mode 100644 index 0000000..256d69d --- /dev/null +++ b/examples_article/local_incremental_pipeline/Makefile @@ -0,0 +1,112 @@ +# Makefile — DuckDB-only incremental demo (article version) +# +# Assumes: +# - you are in the project root (where profiles.yml / project.yml live) +# - seeds/ contains seed_events_v1.csv and seed_events_v2.csv +# - fft CLI is installed and on PATH +# +# Usage examples: +# make init +# make run-v1 +# make noop +# make run-v2 +# make dag +# make test + +SHELL := /usr/bin/env bash + +# ---- Config (tweak as needed) --------------------------------------------- + +ENV ?= dev_duckdb +DB_PATH ?= .local/incremental_demo.duckdb +SEEDS_DIR ?= .local/seeds +ACTIVE_SEED ?= $(SEEDS_DIR)/seed_events.csv + +# Exported env for fft / profiles.yml templating +export FFT_ACTIVE_ENV := $(ENV) +export FF_ENGINE := duckdb +export FF_DUCKDB_PATH := $(DB_PATH) + +# ---- Convenience ---------------------------------------------------------- + +.PHONY: help +help: + @echo "" + @echo "Targets:" + @echo " make init - create local folders" + @echo " make clean - remove local DB + generated files" + @echo " make seed-v1 - copy v1 seed into place" + @echo " make seed-v2 - copy v2 seed into place" + @echo " make seed - fft seed using FFT_SEEDS_DIR" + @echo " make run - fft run (cached, rw)" + @echo " make run-v1 - seed v1 + seed + run" + @echo " make noop - run again unchanged (should mostly skip)" + @echo " make run-v2 - seed v2 + seed + run (incremental update+insert)" + @echo " make dag - generate HTML DAG" + @echo " make test - run tests (adjust selector as you like)" + @echo "" + @echo "Config (override like: make run DB_PATH=.local/foo.duckdb):" + @echo " ENV=$(ENV)" + @echo " DB_PATH=$(DB_PATH)" + @echo " SEEDS_DIR=$(SEEDS_DIR)" + @echo "" + +.PHONY: init +init: + @mkdir -p .local + @mkdir -p $(SEEDS_DIR) + @echo "Initialized .local/ and $(SEEDS_DIR)/" + +.PHONY: clean +clean: + @rm -f $(DB_PATH) + @rm -rf site + @rm -rf .cache .ff_cache 2>/dev/null || true + @rm -f $(ACTIVE_SEED) + @echo "Cleaned local artifacts" + +# ---- Seed switching ------------------------------------------------------- + +.PHONY: seed-v1 +seed-v1: init + @cp -f seeds/seed_events_v1.csv $(ACTIVE_SEED) + @echo "Activated seed v1 -> $(ACTIVE_SEED)" + +.PHONY: seed-v2 +seed-v2: init + @cp -f seeds/seed_events_v2.csv $(ACTIVE_SEED) + @echo "Activated seed v2 -> $(ACTIVE_SEED)" + +# ---- fft commands --------------------------------------------------------- + +.PHONY: seed +seed: init + @echo "Seeding from FFT_SEEDS_DIR=$(SEEDS_DIR) into DuckDB=$(DB_PATH)" + @FFT_SEEDS_DIR=$(SEEDS_DIR) fft seed . --env $(ENV) + +.PHONY: run +run: init + @echo "Running DAG (env=$(ENV), cache=rw, db=$(DB_PATH))" + @fft run . --env $(ENV) --cache=rw + +.PHONY: run-v1 +run-v1: seed-v1 seed run + +.PHONY: noop +noop: + @echo "No-op run (should mostly skip if nothing changed)" + @fft run . --env $(ENV) --cache=rw + +.PHONY: run-v2 +run-v2: seed-v2 seed run + +.PHONY: dag +dag: init + @echo "Generating HTML DAG..." + @fft dag . --env $(ENV) --html + @echo "Open: site/dag/index.html" + +.PHONY: test +test: + @echo "Running tests (adjust selector to your project)..." + @fft test . --env $(ENV) diff --git a/examples_article/local_incremental_pipeline/README.md b/examples_article/local_incremental_pipeline/README.md new file mode 100644 index 0000000..87959ef --- /dev/null +++ b/examples_article/local_incremental_pipeline/README.md @@ -0,0 +1,22 @@ +# FastFlowTransform project scaffold + +This project was created with `fft init`. + +What lives here: +- models/: SQL (`*.ff.sql`) and Python (`*.ff.py`) models. + - models/macros/: Jinja SQL macros loaded automatically. + - models/macros_py/: Python helpers exposed as Jinja globals/filters. +- seeds/: CSV/Parquet inputs for reproducible seeds (see docs/Quickstart.md). +- sources.yml: External tables for source('group','table'). +- profiles.yml: Engine connections; defaults come from docs/Profiles.md. +- packages.yml: Optional shared models/macros (docs/Packages.md). +- tests/unit/: YAML specs for `fft utest` (docs/Unit_Tests.md). +- tests/dq/: Custom data-quality tests for `fft test` (docs/Data_Quality_Tests.md). +- hooks/: SQL or Python hooks referenced from project.yml (docs/Hooks.md). +- docs/: Notes plus generated DAG site when using `fft dag --html`. + +Next steps: +1. Update `profiles.yml` with real connection details (docs/Profiles.md). +2. Add sources in `sources.yml` and author models under `models/` (docs/Config_and_Macros.md). +3. Wire packages (optional) in `packages.yml` if you reuse shared models/macros (docs/Packages.md). +4. Seed sample data with `fft seed` and execute models with `fft run` (docs/Quickstart.md). diff --git a/examples_article/local_incremental_pipeline/models/events_base.ff.sql b/examples_article/local_incremental_pipeline/models/events_base.ff.sql new file mode 100644 index 0000000..a12effe --- /dev/null +++ b/examples_article/local_incremental_pipeline/models/events_base.ff.sql @@ -0,0 +1,7 @@ +{{ config(materialized='table') }} + +select + event_id, + cast(updated_at as timestamp) as updated_at, + value +from {{ source('raw', 'events') }}; \ No newline at end of file diff --git a/examples_article/local_incremental_pipeline/models/fct_events_sql_inline.ff.sql b/examples_article/local_incremental_pipeline/models/fct_events_sql_inline.ff.sql new file mode 100644 index 0000000..07e0a0a --- /dev/null +++ b/examples_article/local_incremental_pipeline/models/fct_events_sql_inline.ff.sql @@ -0,0 +1,22 @@ +{{ config( + materialized='incremental', + unique_key='event_id', + incremental={ 'updated_at_column': 'updated_at' }, +) }} + +with base as ( + select * + from {{ ref('events_base.ff') }} +) +select + event_id, + updated_at, + now() as written_at, + value +from base +{% if is_incremental() %} +where updated_at > ( + select coalesce(max(updated_at), timestamp '1970-01-01 00:00:00') + from {{ this }} +) +{% endif %}; \ No newline at end of file diff --git a/examples_article/local_incremental_pipeline/models/fct_events_sql_yaml.ff.sql b/examples_article/local_incremental_pipeline/models/fct_events_sql_yaml.ff.sql new file mode 100644 index 0000000..8f9c156 --- /dev/null +++ b/examples_article/local_incremental_pipeline/models/fct_events_sql_yaml.ff.sql @@ -0,0 +1,11 @@ +{{ config(materialized='incremental') }} + +with base as ( + select * + from {{ ref('events_base.ff') }} +) +select + event_id, + updated_at, + value +from base; \ No newline at end of file diff --git a/examples_article/local_incremental_pipeline/models/fct_eventy_py_incremental.ff.py b/examples_article/local_incremental_pipeline/models/fct_eventy_py_incremental.ff.py new file mode 100644 index 0000000..ce04291 --- /dev/null +++ b/examples_article/local_incremental_pipeline/models/fct_eventy_py_incremental.ff.py @@ -0,0 +1,13 @@ +from fastflowtransform import engine_model +import pandas as pd + + +@engine_model( + only="duckdb", + name="fct_events_py_incremental", + deps=["events_base.ff"], +) +def build(events_df: pd.DataFrame) -> pd.DataFrame: + df = events_df.copy() + df["value_x10"] = df["value"] * 10 + return df[["event_id", "updated_at", "value", "value_x10"]] diff --git a/examples_article/local_incremental_pipeline/packages.yml b/examples_article/local_incremental_pipeline/packages.yml new file mode 100644 index 0000000..3046424 --- /dev/null +++ b/examples_article/local_incremental_pipeline/packages.yml @@ -0,0 +1,5 @@ +# Packages bring in external models and macros. See docs/Packages.md. +packages: + # - name: shared_macros + # path: "../shared_macros" + # models_dir: "models" diff --git a/examples_article/local_incremental_pipeline/profiles.yml b/examples_article/local_incremental_pipeline/profiles.yml new file mode 100644 index 0000000..b2bf65c --- /dev/null +++ b/examples_article/local_incremental_pipeline/profiles.yml @@ -0,0 +1,4 @@ +dev_duckdb: + engine: duckdb + duckdb: + path: "{{ env('FF_DUCKDB_PATH', '.local/incremental_demo.duckdb') }}" \ No newline at end of file diff --git a/examples_article/local_incremental_pipeline/project.yml b/examples_article/local_incremental_pipeline/project.yml new file mode 100644 index 0000000..b52c58b --- /dev/null +++ b/examples_article/local_incremental_pipeline/project.yml @@ -0,0 +1,42 @@ +# Project configuration generated by `fft init`. +# Read docs/Project_Config.md for the complete reference. +name: local_incremental_pipeline +version: "0.1" +models_dir: models + +docs: + # Adjust `dag_dir` to change where `fft dag --html` writes documentation (docs/Technical_Overview.md#auto-docs-and-lineage). + dag_dir: site/dag + +# Project-level variables accessible via {{ var('key') }} inside models. +# Example: +# vars: +# run_date: "2024-01-01" +vars: {} + +# Optional project-wide hooks that run before/after the pipeline or per model. +# See docs/Hooks.md for examples (SQL + Python) and selector usage. +hooks: + on_run_start: [] + on_run_end: [] + before_model: [] + after_model: [] + +# Optional storage & incremental defaults applied per model name. +# See docs/Project_Config.md#models for field meanings. +models: + storage: {} + incremental: + fct_events_sql_yaml.ff: + unique_key: "event_id" + incremental: + enabled: true + updated_at_column: "updated_at" + +# Optional seed storage overrides (e.g., external locations per seed). +# See docs/Project_Config.md#seeds for supported keys. +seeds: + storage: {} + +# Declare project-wide data quality checks under `tests`. See docs/Data_Quality_Tests.md. +tests: [] diff --git a/examples_article/local_incremental_pipeline/seeds/seed_events_v1.csv b/examples_article/local_incremental_pipeline/seeds/seed_events_v1.csv new file mode 100644 index 0000000..3d7ac58 --- /dev/null +++ b/examples_article/local_incremental_pipeline/seeds/seed_events_v1.csv @@ -0,0 +1,4 @@ +event_id,updated_at,value +1,2024-01-01 00:00:00,10 +2,2024-01-02 00:00:00,20 +3,2024-01-03 00:00:00,30 diff --git a/examples_article/local_incremental_pipeline/seeds/seed_events_v2.csv b/examples_article/local_incremental_pipeline/seeds/seed_events_v2.csv new file mode 100644 index 0000000..3bc3b4f --- /dev/null +++ b/examples_article/local_incremental_pipeline/seeds/seed_events_v2.csv @@ -0,0 +1,5 @@ +event_id,updated_at,value +1,2024-01-01 00:00:00,10 +2,2024-01-05 00:00:00,999 +3,2024-01-03 00:00:00,30 +4,2024-01-06 00:00:00,40 diff --git a/examples_article/local_incremental_pipeline/sources.yml b/examples_article/local_incremental_pipeline/sources.yml new file mode 100644 index 0000000..fc75b3e --- /dev/null +++ b/examples_article/local_incremental_pipeline/sources.yml @@ -0,0 +1,8 @@ +# Source declarations describe external tables. +version: 1 + +sources: + - name: raw + tables: + - name: events + identifier: seed_events diff --git a/pyproject.toml b/pyproject.toml index 276da9c..ab5d2e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "fastflowtransform" -version = "0.6.14" +version = "0.6.18" description = "Python framework for SQL & Python data transformation, ETL pipelines, and dbt-style data modeling" readme = "README.md" license = { text = "Apache-2.0" } diff --git a/src/fastflowtransform/executors/duckdb.py b/src/fastflowtransform/executors/duckdb.py index 9b35b1c..7588621 100644 --- a/src/fastflowtransform/executors/duckdb.py +++ b/src/fastflowtransform/executors/duckdb.py @@ -41,12 +41,19 @@ def __init__( self.con = duckdb.connect(db_path) self.schema = schema.strip() if isinstance(schema, str) and schema.strip() else None catalog_override = catalog.strip() if isinstance(catalog, str) and catalog.strip() else None - self.catalog = self._detect_catalog() - if catalog_override: - if self._apply_catalog_override(catalog_override): + + # If a catalog override is provided, connect in-memory and attach the file once + # under the requested alias to avoid DuckDB's auto-attached filename catalog. + self.catalog: str | None = None + if catalog_override and db_path != ":memory:" and "://" not in db_path: + connected = self._connect_with_catalog_override(catalog_override) + if connected: self.catalog = catalog_override else: self.catalog = self._detect_catalog() + else: + self.catalog = self._detect_catalog() + self.runtime_query_stats = DuckQueryStatsRuntime(self) self.runtime_budget = DuckBudgetRuntime(self) self.runtime_contracts = DuckRuntimeContracts(self) @@ -128,6 +135,27 @@ def _rows(result: Any) -> int | None: rowcount_extractor=_rows, ) + def _connect_with_catalog_override(self, alias: str) -> bool: + """ + Recreate the connection in-memory and attach the target DB under the + requested catalog alias, so DuckDB does not auto-name it after the file. + """ + try: + resolved = str(Path(self.db_path).resolve()) + # New in-memory connection, then attach the file once under the alias. + self.con = duckdb.connect() + self._execute_basic( + f"attach database '{resolved}' as {_q_ident(alias)} (READ_ONLY FALSE)" + ) + self._execute_basic(f"set catalog '{alias}'") + return True + except Exception: + # Leave the existing connection in place; caller will fall back. + with suppress(Exception): + self.con.close() + self.con = duckdb.connect(self.db_path) + return False + def _detect_catalog(self) -> str | None: rows = self._execute_basic("PRAGMA database_list").fetchall() if rows: diff --git a/uv.lock b/uv.lock index b2c7c77..43cfecf 100644 --- a/uv.lock +++ b/uv.lock @@ -733,7 +733,7 @@ wheels = [ [[package]] name = "fastflowtransform" -version = "0.6.14" +version = "0.6.18" source = { editable = "." } dependencies = [ { name = "duckdb" },