From 1c8c87260abe0314debc8407935aa8222406dfc6 Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Mon, 9 Feb 2026 18:25:26 +0100 Subject: [PATCH 1/4] Small fixes --- pyproject.toml | 2 +- src/fastflowtransform/cli/bootstrap.py | 2 +- src/fastflowtransform/contracts/runtime/duckdb.py | 1 - uv.lock | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ab5d2e1..d153cd0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "fastflowtransform" -version = "0.6.18" +version = "0.6.19" description = "Python framework for SQL & Python data transformation, ETL pipelines, and dbt-style data modeling" readme = "README.md" license = { text = "Apache-2.0" } diff --git a/src/fastflowtransform/cli/bootstrap.py b/src/fastflowtransform/cli/bootstrap.py index d0e276f..02ca729 100644 --- a/src/fastflowtransform/cli/bootstrap.py +++ b/src/fastflowtransform/cli/bootstrap.py @@ -74,7 +74,7 @@ def _resolve_project_path(project_arg: str) -> Path: if not models.exists() or not models.is_dir(): raise typer.BadParameter( f"Invalid project at {p}\n" - "Expected eian subfolder 'models/'.\n" + "Expected a subfolder 'models/'.\n" "Tip: change directory to the root and use '.'." ) return p diff --git a/src/fastflowtransform/contracts/runtime/duckdb.py b/src/fastflowtransform/contracts/runtime/duckdb.py index 6715e31..de608e0 100644 --- a/src/fastflowtransform/contracts/runtime/duckdb.py +++ b/src/fastflowtransform/contracts/runtime/duckdb.py @@ -120,7 +120,6 @@ def materialize_python( - Only for pandas.DataFrame - Uses expected_physical_schema to build CAST expressions """ - print(ctx) mode = ctx.config.mode if mode == "off": return False diff --git a/uv.lock b/uv.lock index 43cfecf..f41b83e 100644 --- a/uv.lock +++ b/uv.lock @@ -733,7 +733,7 @@ wheels = [ [[package]] name = "fastflowtransform" -version = "0.6.18" +version = "0.6.19" source = { editable = "." } dependencies = [ { name = "duckdb" }, From 6a601a84d0bb0863b30812a93b50a871b17a8563 Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Wed, 11 Feb 2026 16:03:52 +0100 Subject: [PATCH 2/4] Backend implementation for artifacts postgres db upsert --- Makefile.dev | 7 +- docs/Auto_Docs.md | 29 ++ examples/basic_demo/.env.dev_postgres | 4 + examples/basic_demo/profiles.yml | 7 + src/fastflowtransform/artifacts/__init__.py | 27 ++ src/fastflowtransform/artifacts/config.py | 80 +++++ src/fastflowtransform/artifacts/emitter.py | 242 ++++++++++++++ .../{artifacts.py => artifacts/files.py} | 153 ++++++--- .../artifacts/postgres_store.py | 307 ++++++++++++++++++ src/fastflowtransform/cli/bootstrap.py | 33 ++ src/fastflowtransform/cli/dag_cmd.py | 4 + src/fastflowtransform/cli/docgen_cmd.py | 4 + src/fastflowtransform/cli/docs_cmd.py | 4 + src/fastflowtransform/cli/run.py | 39 ++- src/fastflowtransform/cli/selectors.py | 37 ++- src/fastflowtransform/cli/test_cmd.py | 50 +-- src/fastflowtransform/cli/utest_cmd.py | 56 ++-- src/fastflowtransform/config/loaders.py | 1 + src/fastflowtransform/docs.py | 3 + src/fastflowtransform/settings.py | 59 +++- 20 files changed, 1037 insertions(+), 109 deletions(-) create mode 100644 src/fastflowtransform/artifacts/__init__.py create mode 100644 src/fastflowtransform/artifacts/config.py create mode 100644 src/fastflowtransform/artifacts/emitter.py rename src/fastflowtransform/{artifacts.py => artifacts/files.py} (82%) create mode 100644 src/fastflowtransform/artifacts/postgres_store.py diff --git a/Makefile.dev b/Makefile.dev index 89f9cda..19a277c 100644 --- a/Makefile.dev +++ b/Makefile.dev @@ -58,7 +58,10 @@ act-commit: --env VIRTUAL_ENV= concat-docs: - $(UV) run python _scripts/concat_docs.py -o Combined.md + $(UV) run python _scripts/concat_docs.py -o _exports/Combined_Docs.md --exclude "examples/**" export-demo: - $(UV) python _scripts/export_subdir_md.py examples/incremental_demo -o _exports/incremental_demo_export.md --exclude-ext html css \ No newline at end of file + $(UV) python _scripts/export_subdir_md.py examples/incremental_demo -o _exports/incremental_demo_export.md --exclude-ext html css + +show-structure: + tree -a -I '.git|.venv|__pycache__|.pytest_cache|.mypy_cache|.ruff_cache|node_modules|dist|build|htmlcov|site|.fastflowtransform|.DS_Store|.idea|.vscode|.local|metastore_db|.uv-cache|_exports|_scripts|articles|examples_article|tickets' diff --git a/docs/Auto_Docs.md b/docs/Auto_Docs.md index e425e2a..03f8365 100644 --- a/docs/Auto_Docs.md +++ b/docs/Auto_Docs.md @@ -30,6 +30,34 @@ fft docgen . --env dev --out site/docs --emit-json site/docs/docs_manifest.json This generates the SPA and also writes a manifest you can use for CI checks or custom tooling. +Note: If `artifacts.mode` is set to `db`, docs generation is disabled because it would create local asset files (SPA JS/CSS and JSON). Switch to `files` or `both` to generate docs. + +Artifacts configuration lives in your `profiles.yml` under the selected environment. Example: + +```yaml +dev_postgres: + engine: postgres + postgres: + dsn: "{{ env('FF_PG_DSN') }}" + db_schema: "{{ env('FF_PG_SCHEMA', 'public') }}" + + artifacts: + mode: both # files | db | both + engine: postgres # currently only postgres supported + postgres: + dsn: "{{ env('FF_ARTIFACTS_PG_DSN') }}" + db_schema: "{{ env('FF_ARTIFACTS_PG_SCHEMA', 'public') }}" # or `schema: ...` +``` + +Env overrides: +- `FF_ARTIFACTS_MODE` overrides `artifacts.mode`. +- `FF_ARTIFACTS_PG_DSN` overrides `artifacts.postgres.dsn`. +- `FF_ARTIFACTS_PG_SCHEMA` overrides `artifacts.postgres.db_schema` (or `schema`). + +Validation rules: +- `mode: files` does not require any Postgres settings. +- `mode: db` or `both` requires `artifacts.engine=postgres` and a valid `dsn` + `db_schema`. + ### Classic (DAG-only) ```bash @@ -353,6 +381,7 @@ If your build outputs runtime artifacts (run results, tests), the portal can dis - optional unit test results This is intended to be lightweight and opt-in: docs remain usable without it. +In `artifacts.mode=db`, runtime artifacts are stored in Postgres and the docs site is not generated, so the health strip is unavailable. --- diff --git a/examples/basic_demo/.env.dev_postgres b/examples/basic_demo/.env.dev_postgres index 2000d7c..1a2b28b 100644 --- a/examples/basic_demo/.env.dev_postgres +++ b/examples/basic_demo/.env.dev_postgres @@ -1,3 +1,7 @@ # Postgres profile for the basic demo (replace with your own connection string) FF_PG_DSN=postgresql+psycopg://postgres:postgres@localhost:5432 FF_PG_SCHEMA=basic_demo + +FF_ARTIFACTS_MODE=both +FF_ARTIFACTS_PG_DSN=postgresql+psycopg://postgres:postgres@localhost:5432 +FF_ARTIFACTS_PG_SCHEMA=basic_demo diff --git a/examples/basic_demo/profiles.yml b/examples/basic_demo/profiles.yml index a24fa9b..d532e10 100644 --- a/examples/basic_demo/profiles.yml +++ b/examples/basic_demo/profiles.yml @@ -18,6 +18,13 @@ dev_postgres: dsn: "{{ env('FF_PG_DSN') }}" db_schema: "{{ env('FF_PG_SCHEMA', 'public') }}" + artifacts: + mode: both # files | db | both (default: files) + engine: postgres # currently only postgres supported + postgres: + dsn: "{{ env('FF_PG_DSN') }}" + db_schema: "{{ env('FF_PG_SCHEMA', 'public') }}" + dev_postgres_utest: engine: postgres # same engine postgres: diff --git a/src/fastflowtransform/artifacts/__init__.py b/src/fastflowtransform/artifacts/__init__.py new file mode 100644 index 0000000..2d3d5b7 --- /dev/null +++ b/src/fastflowtransform/artifacts/__init__.py @@ -0,0 +1,27 @@ +"""Artifacts helpers (file-based + builders).""" + +from __future__ import annotations + +from .files import ( + RunNodeResult, + TestResult, + UTestResult, + load_last_run_durations, + write_catalog, + write_manifest, + write_run_results, + write_test_results, + write_utest_results, +) + +__all__ = [ + "RunNodeResult", + "TestResult", + "UTestResult", + "load_last_run_durations", + "write_catalog", + "write_manifest", + "write_run_results", + "write_test_results", + "write_utest_results", +] diff --git a/src/fastflowtransform/artifacts/config.py b/src/fastflowtransform/artifacts/config.py new file mode 100644 index 0000000..826ced9 --- /dev/null +++ b/src/fastflowtransform/artifacts/config.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import yaml + +from fastflowtransform.logging import warn +from fastflowtransform.settings import _render_profiles_template + + +@dataclass(frozen=True) +class ResolvedArtifactsDb: + mode: str # files|db|both + dsn: str + db_schema: str + + +def _read_profiles_yaml(project_dir: Path) -> dict[str, Any]: + for name in ("profiles.yml", "profiles.yaml"): + p = project_dir / name + if p.exists(): + raw_text = p.read_text(encoding="utf-8") + rendered = _render_profiles_template(raw_text, project_dir) + raw = yaml.safe_load(rendered) or {} + return raw if isinstance(raw, dict) else {} + return {} + + +def resolve_artifacts_db(project_dir: Path, env_name: str) -> ResolvedArtifactsDb | None: + """ + Reads project_dir/profiles.yml and looks for: + + .artifacts.mode + .artifacts.postgres.dsn + .artifacts.postgres.db_schema + + Also supports an optional top-level 'profiles:' wrapper: + + profiles: + dev: {...} + """ + raw = _read_profiles_yaml(project_dir) + if not raw: + return None + + profiles = raw.get("profiles") + root = profiles if isinstance(profiles, dict) else raw + + env = root.get(env_name) + if not isinstance(env, dict): + return None + + artifacts = env.get("artifacts") + if not isinstance(artifacts, dict): + return None + + mode = str(artifacts.get("mode") or "files").lower().strip() + if mode not in ("files", "db", "both"): + warn(f"[artifacts] invalid artifacts.mode={mode!r}; falling back to 'files'") + mode = "files" + + engine = str(artifacts.get("engine") or "postgres").lower().strip() + if engine != "postgres": + warn(f"[artifacts] artifacts.engine={engine!r} not supported yet; ignoring") + return None + + pg = artifacts.get("postgres") + if not isinstance(pg, dict): + return None + + dsn = pg.get("dsn") + schema = pg.get("db_schema") or pg.get("schema") + if not isinstance(dsn, str) or not dsn.strip(): + return None + if not isinstance(schema, str) or not schema.strip(): + return None + + return ResolvedArtifactsDb(mode=mode, dsn=dsn.strip(), db_schema=schema.strip()) diff --git a/src/fastflowtransform/artifacts/emitter.py b/src/fastflowtransform/artifacts/emitter.py new file mode 100644 index 0000000..2068a61 --- /dev/null +++ b/src/fastflowtransform/artifacts/emitter.py @@ -0,0 +1,242 @@ +from __future__ import annotations + +import uuid +from pathlib import Path +from typing import Any + +from fastflowtransform.artifacts.postgres_store import PostgresArtifactsStore + +from .files import ( + _json_dump, + _target_dir, + build_catalog, + build_manifest, + build_run_results, + build_test_results, + build_utest_results, +) + + +def _normalize_mode(mode: str | None) -> str: + raw = (mode or "files").strip().lower() + return raw if raw in {"files", "db", "both"} else "files" + + +def _ensure_run_id(store: PostgresArtifactsStore | None) -> str: + if store is not None: + return store.new_run_id() + return uuid.uuid4().hex + + +def _inject_run_meta( + payload: dict[str, Any], + *, + run_id: str, + env_name: str | None, + model_engine: str | None, +) -> dict[str, Any]: + meta = dict(payload.get("metadata") or {}) + meta["run_id"] = run_id + if env_name: + meta["env_name"] = env_name + if model_engine: + meta["model_engine"] = model_engine + out = dict(payload) + out["metadata"] = meta + return out + + +def _write_payload_file(project_dir: Path, filename: str, payload: dict[str, Any]) -> None: + out_dir = _target_dir(project_dir) + path = out_dir / filename + _json_dump(path, payload) + + +def emit_run_artifacts( + project_dir: Path, + *, + artifacts_mode: str | None, + artifacts_store: PostgresArtifactsStore | None, + env_name: str | None, + model_engine: str | None, + started_at: str, + finished_at: str, + node_results: list[Any], + budgets: dict[str, Any] | None = None, + executor: Any | None = None, + include_catalog: bool = True, +) -> str: + """ + Emit manifest/run_results (and optionally catalog) to files, DB, or both. + Returns the run_id used for all emitted artifacts. + """ + mode = _normalize_mode(artifacts_mode) + run_id = _ensure_run_id(artifacts_store) + + manifest = _inject_run_meta( + build_manifest(project_dir), + run_id=run_id, + env_name=env_name, + model_engine=model_engine, + ) + run_results = _inject_run_meta( + build_run_results( + project_dir, + started_at=started_at, + finished_at=finished_at, + node_results=node_results, + budgets=budgets, + ), + run_id=run_id, + env_name=env_name, + model_engine=model_engine, + ) + + catalog: dict[str, Any] | None = None + if include_catalog and executor is not None: + catalog = _inject_run_meta( + build_catalog(project_dir, executor), + run_id=run_id, + env_name=env_name, + model_engine=model_engine, + ) + + if mode in {"files", "both"}: + _write_payload_file(project_dir, "manifest.json", manifest) + _write_payload_file(project_dir, "run_results.json", run_results) + if catalog is not None: + _write_payload_file(project_dir, "catalog.json", catalog) + + if mode in {"db", "both"}: + if artifacts_store is None: + raise RuntimeError("Artifacts DB mode requires a configured PostgresArtifactsStore.") + artifacts_store.upsert_run( + run_id=run_id, + env_name=env_name, + model_engine=model_engine, + meta={"started_at": started_at, "finished_at": finished_at}, + ) + artifacts_store.write_artifact( + run_id=run_id, + artifact_type="manifest", + payload=manifest, + ) + artifacts_store.write_artifact( + run_id=run_id, + artifact_type="run_results", + payload=run_results, + ) + if catalog is not None: + artifacts_store.write_artifact( + run_id=run_id, + artifact_type="catalog", + payload=catalog, + ) + + return run_id + + +def emit_test_results( + project_dir: Path, + *, + artifacts_mode: str | None, + artifacts_store: PostgresArtifactsStore | None, + env_name: str | None, + model_engine: str | None, + started_at: str, + finished_at: str, + results: list[Any], +) -> str: + """ + Emit test_results to files, DB, or both. + Returns the run_id used for the emitted artifact. + """ + mode = _normalize_mode(artifacts_mode) + run_id = _ensure_run_id(artifacts_store) + + payload = _inject_run_meta( + build_test_results( + project_dir, + started_at=started_at, + finished_at=finished_at, + results=results, + ), + run_id=run_id, + env_name=env_name, + model_engine=model_engine, + ) + + if mode in {"files", "both"}: + _write_payload_file(project_dir, "test_results.json", payload) + + if mode in {"db", "both"}: + if artifacts_store is None: + raise RuntimeError("Artifacts DB mode requires a configured PostgresArtifactsStore.") + artifacts_store.upsert_run( + run_id=run_id, + env_name=env_name, + model_engine=model_engine, + meta={"started_at": started_at, "finished_at": finished_at, "kind": "test"}, + ) + artifacts_store.write_artifact( + run_id=run_id, + artifact_type="test_results", + payload=payload, + ) + + return run_id + + +def emit_utest_results( + project_dir: Path, + *, + artifacts_mode: str | None, + artifacts_store: PostgresArtifactsStore | None, + env_name: str | None, + model_engine: str | None, + started_at: str, + finished_at: str, + failures: int, + results: list[Any], + engine: str | None = None, +) -> str: + """ + Emit utest_results to files, DB, or both. + Returns the run_id used for the emitted artifact. + """ + mode = _normalize_mode(artifacts_mode) + run_id = _ensure_run_id(artifacts_store) + + payload = _inject_run_meta( + build_utest_results( + project_dir, + started_at=started_at, + finished_at=finished_at, + failures=failures, + results=results, + engine=engine, + ), + run_id=run_id, + env_name=env_name, + model_engine=model_engine, + ) + + if mode in {"files", "both"}: + _write_payload_file(project_dir, "utest_results.json", payload) + + if mode in {"db", "both"}: + if artifacts_store is None: + raise RuntimeError("Artifacts DB mode requires a configured PostgresArtifactsStore.") + artifacts_store.upsert_run( + run_id=run_id, + env_name=env_name, + model_engine=model_engine, + meta={"started_at": started_at, "finished_at": finished_at, "kind": "utest"}, + ) + artifacts_store.write_artifact( + run_id=run_id, + artifact_type="utest_results", + payload=payload, + ) + + return run_id diff --git a/src/fastflowtransform/artifacts.py b/src/fastflowtransform/artifacts/files.py similarity index 82% rename from src/fastflowtransform/artifacts.py rename to src/fastflowtransform/artifacts/files.py index 513111e..65cffc7 100644 --- a/src/fastflowtransform/artifacts.py +++ b/src/fastflowtransform/artifacts/files.py @@ -69,17 +69,15 @@ def _json_dump(path: Path, obj: Any) -> None: # ---------- MANIFEST ---------- -def write_manifest(project_dir: Path) -> Path: +def build_manifest(project_dir: Path) -> dict[str, Any]: """ - Write manifest.json with minimal compatibility: + Build manifest.json payload with minimal compatibility: - nodes: {name, path, deps, materialized, relation, kind} - macros: {name -> path} - sources: verbatim REGISTRY.sources - generated_at """ project_dir = Path(project_dir) - out_dir = _target_dir(project_dir) - manifest_path = out_dir / "manifest.json" nodes = {} for name, node in sorted(REGISTRY.nodes.items(), key=lambda x: x[0]): @@ -106,6 +104,14 @@ def write_manifest(project_dir: Path) -> Path: "macros": macros, "sources": REGISTRY.sources or {}, } + return data + + +def write_manifest(project_dir: Path) -> Path: + project_dir = Path(project_dir) + out_dir = _target_dir(project_dir) + manifest_path = out_dir / "manifest.json" + data = build_manifest(project_dir) _json_dump(manifest_path, data) return manifest_path @@ -129,22 +135,18 @@ class RunNodeResult: query_duration_ms: int | None = None -def write_run_results( +def build_run_results( project_dir: Path, *, started_at: str, finished_at: str, node_results: list[RunNodeResult], budgets: dict[str, Any] | None = None, -) -> Path: +) -> dict[str, Any]: """ - Write run_results.json containing run envelope and per-node results. + Build run_results.json payload containing run envelope and per-node results. Optionally includes a 'budgets' summary block. """ - project_dir = Path(project_dir) - out_dir = _target_dir(project_dir) - results_path = out_dir / "run_results.json" - data = { "metadata": {"tool": "fastflowtransform", "generated_at": _iso_now()}, "run_started_at": started_at, @@ -155,6 +157,27 @@ def write_run_results( if budgets is not None: data["budgets"] = budgets + return data + + +def write_run_results( + project_dir: Path, + *, + started_at: str, + finished_at: str, + node_results: list[RunNodeResult], + budgets: dict[str, Any] | None = None, +) -> Path: + project_dir = Path(project_dir) + out_dir = _target_dir(project_dir) + results_path = out_dir / "run_results.json" + data = build_run_results( + project_dir, + started_at=started_at, + finished_at=finished_at, + node_results=node_results, + budgets=budgets, + ) _json_dump(results_path, data) return results_path @@ -288,14 +311,12 @@ def _try_columns_for(executor: Any, table: str) -> list[dict[str, Any]]: return [] -def write_catalog(project_dir: Path, executor: Any) -> Path: +def build_catalog(project_dir: Path, executor: Any) -> dict[str, Any]: """ - Write catalog.json: + Build catalog.json payload: - relations: map of relation -> {columns:[{name,dtype,nullable}]} """ project_dir = Path(project_dir) - out_dir = _target_dir(project_dir) - catalog_path = out_dir / "catalog.json" relations: dict[str, Any] = {} rel_names = sorted([relation_for(n) for n in REGISTRY.nodes]) @@ -311,6 +332,14 @@ def write_catalog(project_dir: Path, executor: Any) -> Path: "metadata": {"tool": "fastflowtransform", "generated_at": _iso_now()}, "relations": relations, } + return data + + +def write_catalog(project_dir: Path, executor: Any) -> Path: + project_dir = Path(project_dir) + out_dir = _target_dir(project_dir) + catalog_path = out_dir / "catalog.json" + data = build_catalog(project_dir, executor) _json_dump(catalog_path, data) return catalog_path @@ -318,21 +347,37 @@ def write_catalog(project_dir: Path, executor: Any) -> Path: # ---------- READ DURATIONS ---------- -def load_last_run_durations(project_dir: Path) -> dict[str, float]: +def load_last_run_durations( + project_dir: Path, + *, + artifacts_mode: str | None = None, + artifacts_store: Any | None = None, + env_name: str | None = None, + model_engine: str | None = None, +) -> dict[str, float]: """ Best-effort reader for the last run_results.json. Returns: { model_name: duration_in_seconds }. On any error or missing file: {}. """ - path = _target_dir(project_dir) - if not path.exists(): - return {} + mode = (artifacts_mode or "files").strip().lower() + raw: dict[str, Any] | None = None - try: - raw = json.loads(path.read_text(encoding="utf-8")) - except Exception: - return {} + if mode == "db" and artifacts_store is not None: + try: + raw = artifacts_store.get_latest_artifact("run_results", env_name, model_engine) + except Exception: + return {} + + if raw is None: + path = _target_dir(project_dir) + if not path.exists(): + return {} + try: + raw = json.loads(path.read_text(encoding="utf-8")) + except Exception: + return {} # tolerate a few possible shapes items: list[dict[str, Any]] = ( @@ -367,26 +412,41 @@ class TestResult: example_sql: str | None = None -def write_test_results( +def build_test_results( project_dir: Path, *, started_at: str, finished_at: str, results: list[TestResult], -) -> Path: +) -> dict[str, Any]: """ - Write test_results.json containing a run envelope + individual test outcomes. + Build test_results.json payload containing a run envelope + individual test outcomes. """ - project_dir = Path(project_dir) - out_dir = _target_dir(project_dir) - path = out_dir / "test_results.json" - data = { "metadata": {"tool": "fastflowtransform", "generated_at": _iso_now()}, "test_started_at": started_at, "test_finished_at": finished_at, "results": [asdict(r) for r in results], } + return data + + +def write_test_results( + project_dir: Path, + *, + started_at: str, + finished_at: str, + results: list[TestResult], +) -> Path: + project_dir = Path(project_dir) + out_dir = _target_dir(project_dir) + path = out_dir / "test_results.json" + data = build_test_results( + project_dir, + started_at=started_at, + finished_at=finished_at, + results=results, + ) _json_dump(path, data) return path @@ -407,7 +467,7 @@ class UTestResult: spec_path: str = "" -def write_utest_results( +def build_utest_results( project_dir: Path, *, started_at: str, @@ -415,14 +475,10 @@ def write_utest_results( failures: int, results: list[UTestResult], engine: str | None = None, -) -> Path: +) -> dict[str, Any]: """ - Write utest_results.json containing a run envelope + per-case results. + Build utest_results.json payload containing a run envelope + per-case results. """ - project_dir = Path(project_dir) - out_dir = _target_dir(project_dir) - path = out_dir / "utest_results.json" - data = { "metadata": {"tool": "fastflowtransform", "generated_at": _iso_now()}, "utest_started_at": started_at, @@ -431,5 +487,28 @@ def write_utest_results( "failures": int(failures or 0), "results": [asdict(r) for r in results], } + return data + + +def write_utest_results( + project_dir: Path, + *, + started_at: str, + finished_at: str, + failures: int, + results: list[UTestResult], + engine: str | None = None, +) -> Path: + project_dir = Path(project_dir) + out_dir = _target_dir(project_dir) + path = out_dir / "utest_results.json" + data = build_utest_results( + project_dir, + started_at=started_at, + finished_at=finished_at, + failures=failures, + results=results, + engine=engine, + ) _json_dump(path, data) return path diff --git a/src/fastflowtransform/artifacts/postgres_store.py b/src/fastflowtransform/artifacts/postgres_store.py new file mode 100644 index 0000000..4c5a9a2 --- /dev/null +++ b/src/fastflowtransform/artifacts/postgres_store.py @@ -0,0 +1,307 @@ +from __future__ import annotations + +import json +import uuid +from collections.abc import Callable, Iterable +from dataclasses import dataclass +from datetime import UTC, datetime +from typing import Any + +from fastflowtransform.executors.postgres import PostgresExecutor +from fastflowtransform.logging import warn + + +def _json_dumps(v: Any) -> str: + # Be tolerant: artifacts should be JSON-serializable, but keep it robust. + return json.dumps(v, ensure_ascii=False, default=str) + + +@dataclass(frozen=True) +class PostgresArtifactsConfig: + dsn: str + db_schema: str + mode: str = "files" # files|db|both + + +class PostgresArtifactsStore: + """ + Stores artifacts in Postgres in a schema isolated from model execution. + + - raw JSONB: ff_artifacts_raw + - exploded "records": ff_artifacts_records (still JSONB, but row-wise) + """ + + def __init__(self, *, dsn: str, schema: str) -> None: + self._ex = PostgresExecutor(dsn=dsn, schema=schema) + self._schema = schema + self._tables_ready = False + + @property + def schema(self) -> str: + return self._schema + + def ensure_tables(self) -> None: + if self._tables_ready: + return + + # DDL is "maintenance": no budget/stat recording. + self._ex._execute_sql_maintenance( + """ + CREATE TABLE IF NOT EXISTS ff_artifacts_runs ( + run_id text PRIMARY KEY, + inserted_at timestamptz NOT NULL DEFAULT now(), + env_name text NULL, + model_engine text NULL, + meta jsonb NOT NULL DEFAULT '{}'::jsonb + ); + """ + ) + + self._ex._execute_sql_maintenance( + """ + CREATE TABLE IF NOT EXISTS ff_artifacts_raw ( + run_id text NOT NULL, + artifact_type text NOT NULL, + inserted_at timestamptz NOT NULL DEFAULT now(), + payload jsonb NOT NULL, + PRIMARY KEY (run_id, artifact_type) + ); + """ + ) + + self._ex._execute_sql_maintenance( + """ + CREATE TABLE IF NOT EXISTS ff_artifacts_records ( + run_id text NOT NULL, + artifact_type text NOT NULL, + record_type text NOT NULL, + record_id text NOT NULL, + inserted_at timestamptz NOT NULL DEFAULT now(), + payload jsonb NOT NULL, + PRIMARY KEY (run_id, artifact_type, record_type, record_id) + ); + """ + ) + + self._ex._execute_sql_maintenance( + "CREATE INDEX IF NOT EXISTS idx_ff_artifacts_records_run " + + "ON ff_artifacts_records (run_id);" + ) + self._ex._execute_sql_maintenance( + "CREATE INDEX IF NOT EXISTS idx_ff_artifacts_records_type " + + "ON ff_artifacts_records (artifact_type, record_type);" + ) + self._ex._execute_sql_maintenance( + "CREATE INDEX IF NOT EXISTS idx_ff_artifacts_records_id " + + "ON ff_artifacts_records (record_id);" + ) + + self._tables_ready = True + + def new_run_id(self) -> str: + return uuid.uuid4().hex + + def upsert_run( + self, + *, + run_id: str, + env_name: str | None, + model_engine: str | None, + meta: dict[str, Any] | None = None, + ) -> None: + self.ensure_tables() + payload = meta or {} + sql = """ + INSERT INTO ff_artifacts_runs (run_id, env_name, model_engine, meta) + VALUES (:run_id, :env_name, :model_engine, :meta::jsonb) + ON CONFLICT (run_id) DO UPDATE + SET env_name = EXCLUDED.env_name, + model_engine = EXCLUDED.model_engine, + meta = EXCLUDED.meta + """ + self._ex._execute_sql_maintenance( + sql, + { + "run_id": run_id, + "env_name": env_name, + "model_engine": model_engine, + "meta": _json_dumps(payload), + }, + ) + + def write_artifact( + self, + *, + run_id: str, + artifact_type: str, + payload: Any, + explode_records: bool = True, + ) -> None: + self.ensure_tables() + + # 1) raw + raw_sql = """ + INSERT INTO ff_artifacts_raw (run_id, artifact_type, payload) + VALUES (:run_id, :artifact_type, :payload::jsonb) + ON CONFLICT (run_id, artifact_type) DO UPDATE + SET payload = EXCLUDED.payload, + inserted_at = now() + """ + self._ex._execute_sql_maintenance( + raw_sql, + { + "run_id": run_id, + "artifact_type": artifact_type, + "payload": _json_dumps(payload), + }, + ) + + # 2) exploded records + if not explode_records: + return + + rows = list(self._explode_to_records(artifact_type, payload)) + if not rows: + return + + rec_sql = """ + INSERT INTO ff_artifacts_records (run_id, artifact_type, record_type, record_id, payload) + VALUES (:run_id, :artifact_type, :record_type, :record_id, :payload::jsonb) + ON CONFLICT (run_id, artifact_type, record_type, record_id) DO UPDATE + SET payload = EXCLUDED.payload, + inserted_at = now() + """ + for record_type, record_id, rec_payload in rows: + self._ex._execute_sql_maintenance( + rec_sql, + { + "run_id": run_id, + "artifact_type": artifact_type, + "record_type": record_type, + "record_id": record_id, + "payload": _json_dumps(rec_payload), + }, + ) + + def _explode_to_records( + self, artifact_type: str, payload: Any + ) -> Iterable[tuple[str, str, Any]]: + """ + Convert "big JSON" artifacts into row-wise JSONB records. + This is intentionally schema-light (no guessing your artifact schema), + but still gives you proper relational tables for querying. + + Heuristics: + - dict with 'results': list -> ('result', , item) + - dict with common dict blocks (nodes/sources/macros/...) -> (singular, key, value) + - list -> ('item', str(idx), item) + """ + if isinstance(payload, dict): + # 1) results list (common pattern: run/test results) + results = payload.get("results") + if isinstance(results, list): + for i, item in enumerate(results): + rid = None + if isinstance(item, dict): + rid = ( + item.get("unique_id") + or item.get("node_id") + or item.get("name") + or item.get("node") + ) + yield ("result", str(rid) if rid else str(i), item) + + # 2) common dict blocks (manifest-like) + for key in ( + "nodes", + "sources", + "macros", + "metrics", + "exposures", + "selectors", + "parent_map", + "child_map", + ): + block = payload.get(key) + if isinstance(block, dict): + rtype = key[:-1] if key.endswith("s") else key + for k, v in block.items(): + yield (rtype, str(k), v) + + return + + if isinstance(payload, list): + for i, item in enumerate(payload): + yield ("item", str(i), item) + + # -------- safe wrapper (warn + continue policy) -------- + + def safe_call(self, fn_name: str, fn: Callable, *args: Any, **kwargs: Any) -> None: + try: + fn(*args, **kwargs) + except Exception as exc: + warn(f"[artifacts-db] {fn_name} failed (schema={self.schema}): {exc}") + + # -------- read helpers -------- + + def get_latest_run_id(self, env_name: str | None, model_engine: str | None) -> str | None: + """ + Return the latest run_id for a given env_name + model_engine. + """ + self.ensure_tables() + clauses: list[str] = [] + params: dict[str, Any] = {} + + if env_name is None: + clauses.append("env_name IS NULL") + else: + clauses.append("env_name = :env_name") + params["env_name"] = env_name + + if model_engine is None: + clauses.append("model_engine IS NULL") + else: + clauses.append("model_engine = :model_engine") + params["model_engine"] = model_engine + + where_sql = " AND ".join(clauses) if clauses else "1=1" + sql = f""" + SELECT run_id + FROM ff_artifacts_runs + WHERE {where_sql} + ORDER BY inserted_at DESC + LIMIT 1 + """ + row = self._ex._execute_sql_maintenance(sql, params).fetchone() + return str(row[0]) if row and row[0] else None + + def get_artifact(self, run_id: str, artifact_type: str) -> Any | None: + """ + Return the raw JSON payload for a run_id + artifact_type, or None. + """ + self.ensure_tables() + sql = """ + SELECT payload + FROM ff_artifacts_raw + WHERE run_id = :run_id AND artifact_type = :artifact_type + LIMIT 1 + """ + row = self._ex._execute_sql_maintenance( + sql, {"run_id": run_id, "artifact_type": artifact_type} + ).fetchone() + return row[0] if row else None + + def get_latest_artifact( + self, artifact_type: str, env_name: str | None, model_engine: str | None + ) -> Any | None: + """ + Return the raw JSON payload for the latest run (env_name + engine scoped). + """ + run_id = self.get_latest_run_id(env_name, model_engine) + if not run_id: + return None + return self.get_artifact(run_id, artifact_type) + + +def now_utc_iso() -> str: + return datetime.now(UTC).isoformat() diff --git a/src/fastflowtransform/cli/bootstrap.py b/src/fastflowtransform/cli/bootstrap.py index 02ca729..58aba28 100644 --- a/src/fastflowtransform/cli/bootstrap.py +++ b/src/fastflowtransform/cli/bootstrap.py @@ -14,6 +14,8 @@ from dotenv import dotenv_values from jinja2 import Environment +from fastflowtransform.artifacts.config import resolve_artifacts_db +from fastflowtransform.artifacts.postgres_store import PostgresArtifactsStore from fastflowtransform.config.budgets import BudgetsConfig, load_budgets_config from fastflowtransform.contracts.core import _load_project_contracts, load_contracts from fastflowtransform.core import REGISTRY @@ -32,16 +34,32 @@ @dataclass class CLIContext: project: Path + env_name: str jinja_env: Environment env_settings: EnvSettings profile: Profile budgets_cfg: BudgetsConfig | None = None + artifacts_mode: str = "files" + artifacts_pg_dsn: str | None = None + artifacts_pg_schema: str | None = None def make_executor(self) -> tuple[BaseExecutor, Callable, Callable]: executor, run_sql, run_py = _make_executor(self.profile, self.jinja_env) self._configure_budget_limit(executor) return executor, run_sql, run_py + def make_artifacts_store(self) -> PostgresArtifactsStore | None: + """ + Create a PostgresArtifactsStore if artifacts.mode is 'db' or 'both'. + Warn+continue behavior is applied at call sites (or via store.safe_call). + """ + mode = (self.artifacts_mode or "files").lower().strip() + if mode not in ("db", "both"): + return None + if not self.artifacts_pg_dsn or not self.artifacts_pg_schema: + return None + return PostgresArtifactsStore(dsn=self.artifacts_pg_dsn, schema=self.artifacts_pg_schema) + def _configure_budget_limit(self, executor: Any) -> None: if executor is None or not hasattr(executor, "configure_query_budget_limit"): return @@ -300,12 +318,27 @@ def _prepare_context( except Exception as exc: raise typer.BadParameter(f"Failed to parse budgets.yml: {exc}") from exc + # ---- artifacts DB config (profiles.yml: .artifacts.*) ---- + artifacts_mode = os.getenv("FF_ARTIFACTS_MODE", "").strip().lower() or "files" + artifacts_pg_dsn = os.getenv("FF_ARTIFACTS_PG_DSN") + artifacts_pg_schema = os.getenv("FF_ARTIFACTS_PG_SCHEMA") + + resolved = resolve_artifacts_db(proj, env_name) + if resolved: + artifacts_mode = resolved.mode or artifacts_mode + artifacts_pg_dsn = artifacts_pg_dsn or resolved.dsn + artifacts_pg_schema = artifacts_pg_schema or resolved.db_schema + return CLIContext( project=proj, + env_name=env_name, jinja_env=jenv, env_settings=env_settings, profile=prof, budgets_cfg=budgets_cfg, + artifacts_mode=artifacts_mode, + artifacts_pg_dsn=artifacts_pg_dsn, + artifacts_pg_schema=artifacts_pg_schema, ) diff --git a/src/fastflowtransform/cli/dag_cmd.py b/src/fastflowtransform/cli/dag_cmd.py index c0af63e..808a3d3 100644 --- a/src/fastflowtransform/cli/dag_cmd.py +++ b/src/fastflowtransform/cli/dag_cmd.py @@ -43,6 +43,10 @@ def dag( filtered_nodes = {k: v for k, v in REGISTRY.nodes.items() if pred(v)} if html: + if (ctx.artifacts_mode or "files").strip().lower() == "db": + raise typer.BadParameter( + "HTML DAG generation is disabled when artifacts.mode=db (no asset files allowed)." + ) ex, *_ = ctx.make_executor() try: render_site(dag_out, filtered_nodes, executor=ex, with_schema=with_schema) diff --git a/src/fastflowtransform/cli/docgen_cmd.py b/src/fastflowtransform/cli/docgen_cmd.py index 5e24319..0f459d5 100644 --- a/src/fastflowtransform/cli/docgen_cmd.py +++ b/src/fastflowtransform/cli/docgen_cmd.py @@ -41,6 +41,10 @@ def docgen( out.mkdir(parents=True, exist_ok=True) ctx = _prepare_context(project, env_name, engine, vars) + if (ctx.artifacts_mode or "files").strip().lower() == "db": + raise typer.BadParameter( + "Docs generation is disabled when artifacts.mode=db (no asset files allowed)." + ) ex, *_ = ctx.make_executor() dag_out = _resolve_dag_out_dir(ctx.project, out) dag_out.mkdir(parents=True, exist_ok=True) diff --git a/src/fastflowtransform/cli/docs_cmd.py b/src/fastflowtransform/cli/docs_cmd.py index 24c9e07..b2cf9c6 100644 --- a/src/fastflowtransform/cli/docs_cmd.py +++ b/src/fastflowtransform/cli/docs_cmd.py @@ -321,6 +321,10 @@ def _build_docs_once( out.mkdir(parents=True, exist_ok=True) ctx = _prepare_context(project, env_name, engine, vars) + if (ctx.artifacts_mode or "files").strip().lower() == "db": + raise typer.BadParameter( + "Docs generation is disabled when artifacts.mode=db (no asset files allowed)." + ) ex, *_ = ctx.make_executor() out_dir = _resolve_dag_out_dir(ctx.project, out) diff --git a/src/fastflowtransform/cli/run.py b/src/fastflowtransform/cli/run.py index 412f170..39ad719 100644 --- a/src/fastflowtransform/cli/run.py +++ b/src/fastflowtransform/cli/run.py @@ -15,13 +15,8 @@ import typer -from fastflowtransform.artifacts import ( - RunNodeResult, - load_last_run_durations, - write_catalog, - write_manifest, - write_run_results, -) +from fastflowtransform.artifacts import RunNodeResult, load_last_run_durations +from fastflowtransform.artifacts.emitter import emit_run_artifacts from fastflowtransform.cache import FingerprintCache, can_skip_node from fastflowtransform.ci.changed_since import ( compute_affected_models, @@ -1652,7 +1647,13 @@ def _run_schedule( # Best-effort: use previous run timings to batch small models per worker. try: - prev_durations_s = load_last_run_durations(ctx.project) + prev_durations_s = load_last_run_durations( + ctx.project, + artifacts_mode=ctx.artifacts_mode, + artifacts_store=ctx.make_artifacts_store(), + env_name=ctx.env_name, + model_engine=getattr(ctx.profile, "engine", None), + ) except Exception: prev_durations_s = {} @@ -1686,8 +1687,6 @@ def _write_artifacts( engine_: _RunEngine, budgets: dict[str, Any] | None, ) -> None: - write_manifest(ctx.project) - node_results: list[RunNodeResult] = [] failed = result.failed or {} all_names = set(result.per_node_s.keys()) | set(failed.keys()) @@ -1720,21 +1719,29 @@ def _write_artifacts( ) ) - write_run_results( + execu = None + try: + execu, _, _ = ctx.make_executor() + except Exception: + execu = None + + emit_run_artifacts( ctx.project, + artifacts_mode=ctx.artifacts_mode, + artifacts_store=ctx.make_artifacts_store(), + env_name=ctx.env_name, + model_engine=getattr(ctx.profile, "engine", None), started_at=started_at, finished_at=finished_at, node_results=node_results, budgets=budgets, + executor=execu, + include_catalog=execu is not None, ) def _attempt_catalog(ctx: CLIContext) -> None: - try: - execu, _, _ = ctx.make_executor() - write_catalog(ctx.project, execu) - except Exception: - pass + return def _emit_logs_and_errors(logq: LogQueue, result: ScheduleResult, engine_: _RunEngine) -> None: diff --git a/src/fastflowtransform/cli/selectors.py b/src/fastflowtransform/cli/selectors.py index 65b6776..9c9a98a 100644 --- a/src/fastflowtransform/cli/selectors.py +++ b/src/fastflowtransform/cli/selectors.py @@ -2,9 +2,13 @@ import fnmatch import json +import os from collections.abc import Callable, Iterable +from pathlib import Path from typing import Any, cast +from fastflowtransform.artifacts.config import resolve_artifacts_db +from fastflowtransform.artifacts.postgres_store import PostgresArtifactsStore from fastflowtransform.core import REGISTRY, relation_for @@ -54,8 +58,7 @@ def _load_result_sets() -> dict[str, set[str]]: wrn: set[str] = set() try: proj = REGISTRY.get_project_dir() - path = proj / ".fastflowtransform" / "target" / "run_results.json" - data = json.loads(path.read_text(encoding="utf-8")) + data = _load_run_results_from_files_or_db(proj) for r in data.get("results") or []: name = r.get("name") status = (r.get("status") or "").lower() @@ -137,6 +140,36 @@ def _p_tag(n, w=want): return preds +def _load_run_results_from_files_or_db(project_dir: Any) -> dict[str, Any]: + project_dir = Path(project_dir) + # 1) Try local file first + path = project_dir / ".fastflowtransform" / "target" / "run_results.json" + if path.exists(): + return json.loads(path.read_text(encoding="utf-8")) + + # 2) Fallback to DB if configured + env_name = os.getenv("FF_ENV") or "dev" + engine = REGISTRY._current_engine() + mode = os.getenv("FF_ARTIFACTS_MODE", "").strip().lower() + + dsn = os.getenv("FF_ARTIFACTS_PG_DSN") + schema = os.getenv("FF_ARTIFACTS_PG_SCHEMA") + + if not dsn or not schema: + resolved = resolve_artifacts_db(project_dir, env_name) + if resolved: + mode = mode or resolved.mode + dsn = dsn or resolved.dsn + schema = schema or resolved.db_schema + + if mode != "db" or not dsn or not schema: + return {} + + store = PostgresArtifactsStore(dsn=dsn, schema=schema) + payload = store.get_latest_artifact("run_results", env_name, engine) + return payload if isinstance(payload, dict) else {} + + def _downstream_closure(names: set[str]) -> set[str]: """Return names plus all downstream nodes (transitively).""" # Build reverse edges: dep -> [dependents] diff --git a/src/fastflowtransform/cli/test_cmd.py b/src/fastflowtransform/cli/test_cmd.py index 0f6deb4..1da21cf 100644 --- a/src/fastflowtransform/cli/test_cmd.py +++ b/src/fastflowtransform/cli/test_cmd.py @@ -4,7 +4,6 @@ import re import time from collections.abc import Callable, Iterable, Mapping -from contextlib import suppress from dataclasses import dataclass from datetime import UTC, datetime from pathlib import Path @@ -12,7 +11,8 @@ import typer -from fastflowtransform.artifacts import TestResult, write_test_results +from fastflowtransform.artifacts import TestResult +from fastflowtransform.artifacts.emitter import emit_test_results from fastflowtransform.cli.bootstrap import _prepare_context, configure_executor_contracts from fastflowtransform.cli.options import ( EngineOpt, @@ -511,28 +511,30 @@ def test( finished_at = datetime.now(UTC).isoformat(timespec="seconds") - # Persist for docs (best-effort; never fail the command because of artifact IO) - with suppress(Exception): - write_test_results( - ctx.project, - started_at=started_at, - finished_at=finished_at, - results=[ - TestResult( - kind=r.kind, - table=r.table, - relation=r.relation, - column=r.column, - ok=bool(r.ok), - severity=str(r.severity), - duration_ms=int(r.ms), - msg=r.msg, - param_str=r.param_str, - example_sql=r.example_sql, - ) - for r in results - ], - ) + emit_test_results( + ctx.project, + artifacts_mode=ctx.artifacts_mode, + artifacts_store=ctx.make_artifacts_store(), + env_name=ctx.env_name, + model_engine=getattr(ctx.profile, "engine", None), + started_at=started_at, + finished_at=finished_at, + results=[ + TestResult( + kind=r.kind, + table=r.table, + relation=r.relation, + column=r.column, + ok=bool(r.ok), + severity=str(r.severity), + duration_ms=int(r.ms), + msg=r.msg, + param_str=r.param_str, + example_sql=r.example_sql, + ) + for r in results + ], + ) # Exit code: count only ERROR fails failed = sum((not r.ok) and (r.severity != "warn") for r in results) diff --git a/src/fastflowtransform/cli/utest_cmd.py b/src/fastflowtransform/cli/utest_cmd.py index 61992e1..ba53393 100644 --- a/src/fastflowtransform/cli/utest_cmd.py +++ b/src/fastflowtransform/cli/utest_cmd.py @@ -1,12 +1,12 @@ # fastflowtransform/cli/utest_cmd.py from __future__ import annotations -from contextlib import suppress from datetime import UTC, datetime import typer -from fastflowtransform.artifacts import UTestResult, write_utest_results +from fastflowtransform.artifacts import UTestResult +from fastflowtransform.artifacts.emitter import emit_utest_results from fastflowtransform.cli.bootstrap import _prepare_context from fastflowtransform.cli.options import ( CaseOpt, @@ -57,31 +57,33 @@ def utest( ) finished_at = datetime.now(UTC).isoformat(timespec="seconds") - # Write artifact for docs (best-effort; never block exit) - with suppress(Exception): - write_utest_results( - ctx.project, - started_at=started_at, - finished_at=finished_at, - failures=failures, - engine=getattr(ex, "engine_name", None), - results=[ - UTestResult( - model=str(r.get("model") or ""), - case=str(r.get("case") or ""), - status=str(r.get("status") or ""), - duration_ms=int(r.get("duration_ms") or 0), - cache_hit=bool(r.get("cache_hit")), - message=(str(r.get("message")) if r.get("message") else None), - target_relation=( - str(r.get("target_relation")) if r.get("target_relation") else None - ), - spec_path=str(r.get("spec_path") or ""), - ) - for r in collected - if (r.get("model") and r.get("case")) - ], - ) + emit_utest_results( + ctx.project, + artifacts_mode=ctx.artifacts_mode, + artifacts_store=ctx.make_artifacts_store(), + env_name=ctx.env_name, + model_engine=getattr(ctx.profile, "engine", None), + started_at=started_at, + finished_at=finished_at, + failures=failures, + engine=getattr(ex, "engine_name", None), + results=[ + UTestResult( + model=str(r.get("model") or ""), + case=str(r.get("case") or ""), + status=str(r.get("status") or ""), + duration_ms=int(r.get("duration_ms") or 0), + cache_hit=bool(r.get("cache_hit")), + message=(str(r.get("message")) if r.get("message") else None), + target_relation=( + str(r.get("target_relation")) if r.get("target_relation") else None + ), + spec_path=str(r.get("spec_path") or ""), + ) + for r in collected + if (r.get("model") and r.get("case")) + ], + ) raise typer.Exit(code=2 if failures > 0 else 0) diff --git a/src/fastflowtransform/config/loaders.py b/src/fastflowtransform/config/loaders.py index 7032b88..5d9c5cb 100644 --- a/src/fastflowtransform/config/loaders.py +++ b/src/fastflowtransform/config/loaders.py @@ -1,3 +1,4 @@ +# src/fastflowtransform/config/loaders.py import yaml from yaml.loader import SafeLoader diff --git a/src/fastflowtransform/docs.py b/src/fastflowtransform/docs.py index 02cb1ab..65bfb1b 100644 --- a/src/fastflowtransform/docs.py +++ b/src/fastflowtransform/docs.py @@ -1077,7 +1077,10 @@ def render_site( spa: bool = True, legacy_pages: bool = False, include_rendered_sql: bool | None = None, + allow_assets: bool = True, ) -> None: + if not allow_assets: + raise RuntimeError("Docs generation disabled when assets are not allowed.") out_dir.mkdir(parents=True, exist_ok=True) _copy_template_assets(out_dir) env = _init_jinja() diff --git a/src/fastflowtransform/settings.py b/src/fastflowtransform/settings.py index 99cdd36..0227113 100644 --- a/src/fastflowtransform/settings.py +++ b/src/fastflowtransform/settings.py @@ -9,7 +9,7 @@ import yaml from jinja2 import Environment, StrictUndefined -from pydantic import BaseModel, ConfigDict, Field, TypeAdapter +from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, model_validator from pydantic_settings import BaseSettings, SettingsConfigDict from fastflowtransform.errors import ProfileConfigError @@ -33,6 +33,30 @@ class PostgresConfig(BaseConfig): db_schema: str = "public" +class ArtifactsPostgresConfig(BaseConfig): + dsn: str | None = None + db_schema: str | None = Field(default=None, alias="schema") + + +class ArtifactsConfig(BaseConfig): + mode: Literal["files", "db", "both"] = "files" + engine: Literal["postgres"] = "postgres" + postgres: ArtifactsPostgresConfig | None = None + + @model_validator(mode="after") + def _validate_postgres_required(self) -> ArtifactsConfig: + if self.mode in {"db", "both"}: + if self.engine != "postgres": + raise ValueError("artifacts.engine must be 'postgres' for db/both mode.") + if self.postgres is None: + raise ValueError("artifacts.postgres must be set for db/both mode.") + if not self.postgres.dsn or not str(self.postgres.dsn).strip(): + raise ValueError("artifacts.postgres.dsn must be set for db/both mode.") + if not self.postgres.db_schema or not str(self.postgres.db_schema).strip(): + raise ValueError("artifacts.postgres.db_schema must be set for db/both mode.") + return self + + class BigQueryConfig(BaseConfig): project: str | None = None dataset: str | None = None @@ -67,26 +91,31 @@ class SnowflakeSnowparkConfig(BaseConfig): class DuckDBProfile(BaseConfig): engine: Literal["duckdb"] duckdb: DuckDBConfig + artifacts: ArtifactsConfig | None = None class PostgresProfile(BaseConfig): engine: Literal["postgres"] postgres: PostgresConfig + artifacts: ArtifactsConfig | None = None class BigQueryProfile(BaseConfig): engine: Literal["bigquery"] bigquery: BigQueryConfig + artifacts: ArtifactsConfig | None = None class DatabricksSparkProfile(BaseConfig): engine: Literal["databricks_spark"] databricks_spark: DatabricksSparkConfig + artifacts: ArtifactsConfig | None = None class SnowflakeSnowparkProfile(BaseConfig): engine: Literal["snowflake_snowpark"] snowflake_snowpark: SnowflakeSnowparkConfig + artifacts: ArtifactsConfig | None = None Profile = Annotated[ @@ -118,6 +147,11 @@ class EnvSettings(BaseSettings): PG_DSN: str | None = None PG_SCHEMA: str | None = None + # Artifacts + ARTIFACTS_MODE: str | None = None + ARTIFACTS_PG_DSN: str | None = None + ARTIFACTS_PG_SCHEMA: str | None = None + # bigquery BQ_PROJECT: str | None = None BQ_DATASET: str | None = None @@ -341,6 +375,7 @@ def _apply_env_overrides(raw: dict[str, Any], env: EnvSettings) -> None: handler = handlers.get(eng) if handler: handler(raw, env) + _ov_artifacts(raw, env) def _set_if(d: dict[str, Any], key: str, value: Any | None) -> None: @@ -415,6 +450,28 @@ def _ov_snowflake_snowpark(raw: dict[str, Any], env: EnvSettings) -> None: sf["allow_create_schema"] = bool(acs) +def _ov_artifacts(raw: dict[str, Any], env: EnvSettings) -> None: + mode = getattr(env, "ARTIFACTS_MODE", None) + pg_dsn = getattr(env, "ARTIFACTS_PG_DSN", None) + pg_schema = getattr(env, "ARTIFACTS_PG_SCHEMA", None) + if mode is None and pg_dsn is None and pg_schema is None: + return + + artifacts = raw.setdefault("artifacts", {}) + if mode is not None: + artifacts["mode"] = mode + + engine = artifacts.get("engine") or "postgres" + artifacts["engine"] = engine + + if engine == "postgres": + pg = artifacts.setdefault("postgres", {}) + if pg_dsn is not None: + pg["dsn"] = pg_dsn + if pg_schema is not None: + pg["db_schema"] = pg_schema + + # ---------- Sanity Checks ---------- CheckFn = Callable[[Profile], None] From 5844bbce93b14881c3820b8afc707a4a752be1fe Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Wed, 11 Feb 2026 18:25:19 +0100 Subject: [PATCH 3/4] Adjusted auto docs to support data from api endpoint --- docs/Auto_Docs.md | 28 +++++++ src/fastflowtransform/templates/assets/spa.js | 78 +++++++++++++++++-- src/fastflowtransform/templates/index.html.j2 | 21 +++++ 3 files changed, 119 insertions(+), 8 deletions(-) diff --git a/docs/Auto_Docs.md b/docs/Auto_Docs.md index 03f8365..d177bb4 100644 --- a/docs/Auto_Docs.md +++ b/docs/Auto_Docs.md @@ -58,6 +58,34 @@ Validation rules: - `mode: files` does not require any Postgres settings. - `mode: db` or `both` requires `artifacts.engine=postgres` and a valid `dsn` + `db_schema`. +Frontend contract (SPA data sources): +- The SPA reads globals (if present) to resolve artifact URLs: + - `__FFT_MANIFEST_PATH__` + - `__FFT_RUN_RESULTS_PATH__` + - `__FFT_TEST_RESULTS_PATH__` + - `__FFT_UTEST_RESULTS_PATH__` + - `__FFT_ARTIFACTS_API_BASE__` + - `__FFT_ENV__` + - `__FFT_ENGINE__` + - `__FFT_RUN_ID__` +- URL resolution order: + 1. Explicit `__FFT_*_PATH__` globals + 2. API base + params (`__FFT_ARTIFACTS_API_BASE__` with env/engine/run_id) + 3. Query params (`?manifest=...&run=...&test=...&utest=...`) + 4. Local assets (`assets/*.json`) +- Expected endpoint shapes (examples): + - Latest by env/engine: + - `/artifacts/latest/docs_manifest?env=&engine=` + - `/artifacts/latest/run_results?env=&engine=` + - `/artifacts/latest/test_results?env=&engine=` + - `/artifacts/latest/utest_results?env=&engine=` + - Specific run: + - `/artifacts/run//docs_manifest` + - `/artifacts/run//run_results` + - `/artifacts/run//test_results` + - `/artifacts/run//utest_results` +- Response payload can be either raw JSON or wrapped as `{ payload: {...} }` or `{ data: {...} }`. + ### Classic (DAG-only) ```bash diff --git a/src/fastflowtransform/templates/assets/spa.js b/src/fastflowtransform/templates/assets/spa.js index d450757..cb49aec 100644 --- a/src/fastflowtransform/templates/assets/spa.js +++ b/src/fastflowtransform/templates/assets/spa.js @@ -1,7 +1,59 @@ -const MANIFEST_URL = window.__FFT_MANIFEST_PATH__ || "assets/docs_manifest.json"; -const RUN_RESULTS_URL = window.__FFT_RUN_RESULTS_PATH__ || "assets/run_results.json"; -const TEST_RESULTS_URL = window.__FFT_TEST_RESULTS_PATH__ || "assets/test_results.json"; -const UTEST_RESULTS_URL = window.__FFT_UTEST_RESULTS_PATH__ || "assets/utest_results.json"; +function resolveArtifactUrls() { + const qs = new URLSearchParams(window.location.search || ""); + const qsManifest = qs.get("manifest"); + const qsRun = qs.get("run"); + const qsTest = qs.get("test"); + const qsUTest = qs.get("utest"); + + const globalManifest = window.__FFT_MANIFEST_PATH__; + const globalRun = window.__FFT_RUN_RESULTS_PATH__; + const globalTest = window.__FFT_TEST_RESULTS_PATH__; + const globalUTest = window.__FFT_UTEST_RESULTS_PATH__; + + const apiBase = window.__FFT_ARTIFACTS_API_BASE__; + const envName = window.__FFT_ENV__; + const engineName = window.__FFT_ENGINE__; + const runId = window.__FFT_RUN_ID__; + + function apiUrl(kind) { + if (!apiBase) return null; + if (runId) return `${apiBase}/artifacts/run/${encodeURIComponent(runId)}/${kind}`; + const params = new URLSearchParams(); + if (envName) params.set("env", envName); + if (engineName) params.set("engine", engineName); + const suffix = params.toString(); + return `${apiBase}/artifacts/latest/${kind}${suffix ? "?" + suffix : ""}`; + } + + return { + manifest: + globalManifest || + apiUrl("docs_manifest") || + qsManifest || + "assets/docs_manifest.json", + run_results: + globalRun || + apiUrl("run_results") || + qsRun || + "assets/run_results.json", + test_results: + globalTest || + apiUrl("test_results") || + qsTest || + "assets/test_results.json", + utest_results: + globalUTest || + apiUrl("utest_results") || + qsUTest || + "assets/utest_results.json", + }; +} + +const ARTIFACT_URLS = resolveArtifactUrls(); +const MANIFEST_URL = ARTIFACT_URLS.manifest; +const RUN_RESULTS_URL = ARTIFACT_URLS.run_results; +const TEST_RESULTS_URL = ARTIFACT_URLS.test_results; +const UTEST_RESULTS_URL = ARTIFACT_URLS.utest_results; function el(tag, attrs = {}, ...children) { const n = document.createElement(tag); @@ -4312,7 +4364,7 @@ function renderHealthCardForModel(state, m) { const runBlock = (() => { if (!hasRuns) return el("p", { class: "empty" }, "No run results were loaded."); - if (!run) return el("p", { class: "empty" }, "No run info found for this model in run_results.json."); + if (!run) return el("p", { class: "empty" }, "No run info found for this model."); return el("div", { class: "kv" }, el("div", { class: "k" }, "Status"), @@ -4453,7 +4505,12 @@ async function loadOptionalJson(url) { try { const res = await fetch(url, { cache: "no-store" }); if (!res.ok) return null; - return await res.json(); + const body = await res.json(); + if (body && typeof body === "object") { + if (body.payload && typeof body.payload === "object") return body.payload; + if (body.data && typeof body.data === "object") return body.data; + } + return body; } catch { return null; } @@ -4467,7 +4524,12 @@ async function copyText(text) { async function loadManifest() { const res = await fetch(MANIFEST_URL, { cache: "no-store" }); if (!res.ok) throw new Error(`Failed to load manifest: ${res.status}`); - return await res.json(); + const body = await res.json(); + if (body && typeof body === "object") { + if (body.payload && typeof body.payload === "object") return body.payload; + if (body.data && typeof body.data === "object") return body.data; + } + return body; } async function main() { @@ -5579,4 +5641,4 @@ main().catch((e) => { ) ) ); -}); \ No newline at end of file +}); diff --git a/src/fastflowtransform/templates/index.html.j2 b/src/fastflowtransform/templates/index.html.j2 index 30216e8..01def77 100644 --- a/src/fastflowtransform/templates/index.html.j2 +++ b/src/fastflowtransform/templates/index.html.j2 @@ -8,6 +8,27 @@ From cdee368a3d07b51b675060627ccb56966110a957 Mon Sep 17 00:00:00 2001 From: Marko Lekic Date: Wed, 11 Feb 2026 19:36:02 +0100 Subject: [PATCH 4/4] Fixed some bugs --- Makefile.dev | 13 +++++++++++++ src/fastflowtransform/artifacts/postgres_store.py | 6 +++--- src/fastflowtransform/settings.py | 1 + 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/Makefile.dev b/Makefile.dev index 19a277c..9e26ddd 100644 --- a/Makefile.dev +++ b/Makefile.dev @@ -65,3 +65,16 @@ export-demo: show-structure: tree -a -I '.git|.venv|__pycache__|.pytest_cache|.mypy_cache|.ruff_cache|node_modules|dist|build|htmlcov|site|.fastflowtransform|.DS_Store|.idea|.vscode|.local|metastore_db|.uv-cache|_exports|_scripts|articles|examples_article|tickets' + +spinup-pg: + docker rm -f ff_pg + docker volume rm pgdata + docker volume create pgdata + + docker run -d --name ff_pg \ + -e POSTGRES_USER=postgres \ + -e POSTGRES_PASSWORD=postgres \ + -e POSTGRES_DB=ffdb \ + -p 5432:5432 \ + -v pgdata:/var/lib/postgresql/data \ + postgres:17 diff --git a/src/fastflowtransform/artifacts/postgres_store.py b/src/fastflowtransform/artifacts/postgres_store.py index 4c5a9a2..24ae2e3 100644 --- a/src/fastflowtransform/artifacts/postgres_store.py +++ b/src/fastflowtransform/artifacts/postgres_store.py @@ -113,7 +113,7 @@ def upsert_run( payload = meta or {} sql = """ INSERT INTO ff_artifacts_runs (run_id, env_name, model_engine, meta) - VALUES (:run_id, :env_name, :model_engine, :meta::jsonb) + VALUES (:run_id, :env_name, :model_engine, CAST(:meta AS jsonb)) ON CONFLICT (run_id) DO UPDATE SET env_name = EXCLUDED.env_name, model_engine = EXCLUDED.model_engine, @@ -142,7 +142,7 @@ def write_artifact( # 1) raw raw_sql = """ INSERT INTO ff_artifacts_raw (run_id, artifact_type, payload) - VALUES (:run_id, :artifact_type, :payload::jsonb) + VALUES (:run_id, :artifact_type, CAST(:payload AS jsonb)) ON CONFLICT (run_id, artifact_type) DO UPDATE SET payload = EXCLUDED.payload, inserted_at = now() @@ -166,7 +166,7 @@ def write_artifact( rec_sql = """ INSERT INTO ff_artifacts_records (run_id, artifact_type, record_type, record_id, payload) - VALUES (:run_id, :artifact_type, :record_type, :record_id, :payload::jsonb) + VALUES (:run_id, :artifact_type, :record_type, :record_id, CAST(:payload AS jsonb)) ON CONFLICT (run_id, artifact_type, record_type, record_id) DO UPDATE SET payload = EXCLUDED.payload, inserted_at = now() diff --git a/src/fastflowtransform/settings.py b/src/fastflowtransform/settings.py index 0227113..7fbdcc2 100644 --- a/src/fastflowtransform/settings.py +++ b/src/fastflowtransform/settings.py @@ -34,6 +34,7 @@ class PostgresConfig(BaseConfig): class ArtifactsPostgresConfig(BaseConfig): + model_config = ConfigDict(populate_by_name=True, extra="forbid") dsn: str | None = None db_schema: str | None = Field(default=None, alias="schema")