Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples_article/local_incremental_pipeline/.env.dev_duckdb
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FF_DUCKDB_PATH=.local/incremental_demo.duckdb
FF_DUCKDB_SCHEMA=inc_demo_schema
112 changes: 112 additions & 0 deletions examples_article/local_incremental_pipeline/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Makefile — DuckDB-only incremental demo (article version)
#
# Assumes:
# - you are in the project root (where profiles.yml / project.yml live)
# - seeds/ contains seed_events_v1.csv and seed_events_v2.csv
# - fft CLI is installed and on PATH
#
# Usage examples:
# make init
# make run-v1
# make noop
# make run-v2
# make dag
# make test

SHELL := /usr/bin/env bash

# ---- Config (tweak as needed) ---------------------------------------------

ENV ?= dev_duckdb
DB_PATH ?= .local/incremental_demo.duckdb
SEEDS_DIR ?= .local/seeds
ACTIVE_SEED ?= $(SEEDS_DIR)/seed_events.csv

# Exported env for fft / profiles.yml templating
export FFT_ACTIVE_ENV := $(ENV)
export FF_ENGINE := duckdb
export FF_DUCKDB_PATH := $(DB_PATH)

# ---- Convenience ----------------------------------------------------------

.PHONY: help
help:
@echo ""
@echo "Targets:"
@echo " make init - create local folders"
@echo " make clean - remove local DB + generated files"
@echo " make seed-v1 - copy v1 seed into place"
@echo " make seed-v2 - copy v2 seed into place"
@echo " make seed - fft seed using FFT_SEEDS_DIR"
@echo " make run - fft run (cached, rw)"
@echo " make run-v1 - seed v1 + seed + run"
@echo " make noop - run again unchanged (should mostly skip)"
@echo " make run-v2 - seed v2 + seed + run (incremental update+insert)"
@echo " make dag - generate HTML DAG"
@echo " make test - run tests (adjust selector as you like)"
@echo ""
@echo "Config (override like: make run DB_PATH=.local/foo.duckdb):"
@echo " ENV=$(ENV)"
@echo " DB_PATH=$(DB_PATH)"
@echo " SEEDS_DIR=$(SEEDS_DIR)"
@echo ""

.PHONY: init
init:
@mkdir -p .local
@mkdir -p $(SEEDS_DIR)
@echo "Initialized .local/ and $(SEEDS_DIR)/"

.PHONY: clean
clean:
@rm -f $(DB_PATH)
@rm -rf site
@rm -rf .cache .ff_cache 2>/dev/null || true
@rm -f $(ACTIVE_SEED)
@echo "Cleaned local artifacts"

# ---- Seed switching -------------------------------------------------------

.PHONY: seed-v1
seed-v1: init
@cp -f seeds/seed_events_v1.csv $(ACTIVE_SEED)
@echo "Activated seed v1 -> $(ACTIVE_SEED)"

.PHONY: seed-v2
seed-v2: init
@cp -f seeds/seed_events_v2.csv $(ACTIVE_SEED)
@echo "Activated seed v2 -> $(ACTIVE_SEED)"

# ---- fft commands ---------------------------------------------------------

.PHONY: seed
seed: init
@echo "Seeding from FFT_SEEDS_DIR=$(SEEDS_DIR) into DuckDB=$(DB_PATH)"
@FFT_SEEDS_DIR=$(SEEDS_DIR) fft seed . --env $(ENV)

.PHONY: run
run: init
@echo "Running DAG (env=$(ENV), cache=rw, db=$(DB_PATH))"
@fft run . --env $(ENV) --cache=rw

.PHONY: run-v1
run-v1: seed-v1 seed run

.PHONY: noop
noop:
@echo "No-op run (should mostly skip if nothing changed)"
@fft run . --env $(ENV) --cache=rw

.PHONY: run-v2
run-v2: seed-v2 seed run

.PHONY: dag
dag: init
@echo "Generating HTML DAG..."
@fft dag . --env $(ENV) --html
@echo "Open: site/dag/index.html"

.PHONY: test
test:
@echo "Running tests (adjust selector to your project)..."
@fft test . --env $(ENV)
22 changes: 22 additions & 0 deletions examples_article/local_incremental_pipeline/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# FastFlowTransform project scaffold

This project was created with `fft init`.

What lives here:
- models/: SQL (`*.ff.sql`) and Python (`*.ff.py`) models.
- models/macros/: Jinja SQL macros loaded automatically.
- models/macros_py/: Python helpers exposed as Jinja globals/filters.
- seeds/: CSV/Parquet inputs for reproducible seeds (see docs/Quickstart.md).
- sources.yml: External tables for source('group','table').
- profiles.yml: Engine connections; defaults come from docs/Profiles.md.
- packages.yml: Optional shared models/macros (docs/Packages.md).
- tests/unit/: YAML specs for `fft utest` (docs/Unit_Tests.md).
- tests/dq/: Custom data-quality tests for `fft test` (docs/Data_Quality_Tests.md).
- hooks/: SQL or Python hooks referenced from project.yml (docs/Hooks.md).
- docs/: Notes plus generated DAG site when using `fft dag --html`.

Next steps:
1. Update `profiles.yml` with real connection details (docs/Profiles.md).
2. Add sources in `sources.yml` and author models under `models/` (docs/Config_and_Macros.md).
3. Wire packages (optional) in `packages.yml` if you reuse shared models/macros (docs/Packages.md).
4. Seed sample data with `fft seed` and execute models with `fft run` (docs/Quickstart.md).
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{{ config(materialized='table') }}

select
event_id,
cast(updated_at as timestamp) as updated_at,
value
from {{ source('raw', 'events') }};
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{ config(
materialized='incremental',
unique_key='event_id',
incremental={ 'updated_at_column': 'updated_at' },
) }}

with base as (
select *
from {{ ref('events_base.ff') }}
)
select
event_id,
updated_at,
now() as written_at,
value
from base
{% if is_incremental() %}
where updated_at > (
select coalesce(max(updated_at), timestamp '1970-01-01 00:00:00')
from {{ this }}
)
{% endif %};
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{{ config(materialized='incremental') }}

with base as (
select *
from {{ ref('events_base.ff') }}
)
select
event_id,
updated_at,
value
from base;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from fastflowtransform import engine_model
import pandas as pd


@engine_model(
only="duckdb",
name="fct_events_py_incremental",
deps=["events_base.ff"],
)
def build(events_df: pd.DataFrame) -> pd.DataFrame:
df = events_df.copy()
df["value_x10"] = df["value"] * 10
return df[["event_id", "updated_at", "value", "value_x10"]]
5 changes: 5 additions & 0 deletions examples_article/local_incremental_pipeline/packages.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Packages bring in external models and macros. See docs/Packages.md.
packages:
# - name: shared_macros
# path: "../shared_macros"
# models_dir: "models"
4 changes: 4 additions & 0 deletions examples_article/local_incremental_pipeline/profiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dev_duckdb:
engine: duckdb
duckdb:
path: "{{ env('FF_DUCKDB_PATH', '.local/incremental_demo.duckdb') }}"
42 changes: 42 additions & 0 deletions examples_article/local_incremental_pipeline/project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Project configuration generated by `fft init`.
# Read docs/Project_Config.md for the complete reference.
name: local_incremental_pipeline
version: "0.1"
models_dir: models

docs:
# Adjust `dag_dir` to change where `fft dag --html` writes documentation (docs/Technical_Overview.md#auto-docs-and-lineage).
dag_dir: site/dag

# Project-level variables accessible via {{ var('key') }} inside models.
# Example:
# vars:
# run_date: "2024-01-01"
vars: {}

# Optional project-wide hooks that run before/after the pipeline or per model.
# See docs/Hooks.md for examples (SQL + Python) and selector usage.
hooks:
on_run_start: []
on_run_end: []
before_model: []
after_model: []

# Optional storage & incremental defaults applied per model name.
# See docs/Project_Config.md#models for field meanings.
models:
storage: {}
incremental:
fct_events_sql_yaml.ff:
unique_key: "event_id"
incremental:
enabled: true
updated_at_column: "updated_at"

# Optional seed storage overrides (e.g., external locations per seed).
# See docs/Project_Config.md#seeds for supported keys.
seeds:
storage: {}

# Declare project-wide data quality checks under `tests`. See docs/Data_Quality_Tests.md.
tests: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
event_id,updated_at,value
1,2024-01-01 00:00:00,10
2,2024-01-02 00:00:00,20
3,2024-01-03 00:00:00,30
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
event_id,updated_at,value
1,2024-01-01 00:00:00,10
2,2024-01-05 00:00:00,999
3,2024-01-03 00:00:00,30
4,2024-01-06 00:00:00,40
8 changes: 8 additions & 0 deletions examples_article/local_incremental_pipeline/sources.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Source declarations describe external tables.
version: 1

sources:
- name: raw
tables:
- name: events
identifier: seed_events
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "fastflowtransform"
version = "0.6.14"
version = "0.6.18"
description = "Python framework for SQL & Python data transformation, ETL pipelines, and dbt-style data modeling"
readme = "README.md"
license = { text = "Apache-2.0" }
Expand Down
34 changes: 31 additions & 3 deletions src/fastflowtransform/executors/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,19 @@ def __init__(
self.con = duckdb.connect(db_path)
self.schema = schema.strip() if isinstance(schema, str) and schema.strip() else None
catalog_override = catalog.strip() if isinstance(catalog, str) and catalog.strip() else None
self.catalog = self._detect_catalog()
if catalog_override:
if self._apply_catalog_override(catalog_override):

# If a catalog override is provided, connect in-memory and attach the file once
# under the requested alias to avoid DuckDB's auto-attached filename catalog.
self.catalog: str | None = None
if catalog_override and db_path != ":memory:" and "://" not in db_path:
connected = self._connect_with_catalog_override(catalog_override)
if connected:
self.catalog = catalog_override
else:
self.catalog = self._detect_catalog()
else:
self.catalog = self._detect_catalog()

self.runtime_query_stats = DuckQueryStatsRuntime(self)
self.runtime_budget = DuckBudgetRuntime(self)
self.runtime_contracts = DuckRuntimeContracts(self)
Expand Down Expand Up @@ -128,6 +135,27 @@ def _rows(result: Any) -> int | None:
rowcount_extractor=_rows,
)

def _connect_with_catalog_override(self, alias: str) -> bool:
"""
Recreate the connection in-memory and attach the target DB under the
requested catalog alias, so DuckDB does not auto-name it after the file.
"""
try:
resolved = str(Path(self.db_path).resolve())
# New in-memory connection, then attach the file once under the alias.
self.con = duckdb.connect()
self._execute_basic(
f"attach database '{resolved}' as {_q_ident(alias)} (READ_ONLY FALSE)"
)
self._execute_basic(f"set catalog '{alias}'")
return True
except Exception:
# Leave the existing connection in place; caller will fall back.
with suppress(Exception):
self.con.close()
self.con = duckdb.connect(self.db_path)
return False

def _detect_catalog(self) -> str | None:
rows = self._execute_basic("PRAGMA database_list").fetchall()
if rows:
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.