Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
363 changes: 361 additions & 2 deletions src/fastflowtransform/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import heapq
import re
from collections import defaultdict
from typing import Any

from .core import Node, relation_for
from .errors import DependencyNotFoundError, ModelCycleError
from fastflowtransform.core import Node, relation_for
from fastflowtransform.errors import DependencyNotFoundError, ModelCycleError


def topo_sort(nodes: dict[str, Node]) -> list[str]:
Expand Down Expand Up @@ -154,3 +155,361 @@ def mermaid(

lines.append("")
return "\n".join(lines)


# --- SPA Graph (layout computed server-side) -----------------------------


def _clamp(n: float, lo: float, hi: float) -> float:
return lo if n < lo else hi if n > hi else n


def _approx_node_size(title: str, subtitle: str | None) -> tuple[int, int]:
"""
Approximate label width without font measurement.
Produces stable, decent-looking boxes for SVG render.
"""
title = title or ""
subtitle = subtitle or ""
longest = max(len(title), len(subtitle))
# ~7px per char + padding
w = int(_clamp(longest * 7.2 + 56, 170, 380))
h = 52 if subtitle else 44
return w, h


def _barycenter_order(
level: list[str],
parents: dict[str, list[str]],
prev_pos: dict[str, int],
) -> list[str]:
"""
Order nodes in a level by the average position of their parents in the previous level.
Stable fallback to name.
"""

def key(nm: str) -> tuple[float, str]:
ps = [p for p in parents.get(nm, []) if p in prev_pos]
if not ps:
return (1e12, nm)
avg = sum(prev_pos[p] for p in ps) / max(1, len(ps))
return (avg, nm)

return sorted(level, key=key)


_SRC_PREFIX = "__src__:"


def _collect_source_keys(
*,
sources_by_key: dict[tuple[str, str], Any],
model_source_refs: dict[str, list[tuple[str, str]]],
) -> list[tuple[str, str]]:
seen: set[tuple[str, str]] = set()
out: list[tuple[str, str]] = []

for refs in model_source_refs.values():
for key in refs or []:
if key in sources_by_key and key not in seen:
seen.add(key)
out.append(key)

out.sort(key=lambda k: (k[0], k[1]))
return out


def _build_parents(nodes: dict[str, "Node"]) -> dict[str, list[str]]:
return {nm: [d for d in (n.deps or []) if d in nodes] for nm, n in nodes.items()}


def _build_ordered_levels(
*,
lvls: list[list[str]],
parents: dict[str, list[str]],
model_rank_offset: int,
source_keys: list[tuple[str, str]],
) -> dict[int, list[str]]:
ordered_levels: dict[int, list[str]] = {}
prev_positions: dict[str, int] = {}

if source_keys:
ordered_levels[0] = [f"{_SRC_PREFIX}{s}.{t}" for (s, t) in source_keys]

for i, lvl in enumerate(lvls):
r = i + model_rank_offset
ordered = sorted(lvl) if i == 0 else _barycenter_order(lvl, parents, prev_positions)
ordered_levels[r] = ordered
prev_positions = {nm: idx for idx, nm in enumerate(ordered)}

return ordered_levels


def _rank_xy(
*,
r: int,
idx: int,
rank_len: int,
max_count: int,
rank_spacing: int,
node_spacing: int,
padding: int,
) -> tuple[int, int]:
y0 = padding + int((max_count - rank_len) * node_spacing * 0.5)
x = padding + r * rank_spacing
y = y0 + idx * node_spacing
return x, y


def _model_payload(
*,
nm: str,
nodes: dict[str, "Node"],
r: int,
idx: int,
rank_len: int,
max_count: int,
rank_spacing: int,
node_spacing: int,
padding: int,
) -> dict[str, Any]:
n = nodes[nm]
rel = relation_for(nm)
mat = (getattr(n, "meta", {}) or {}).get("materialized", "table")
w, h = _approx_node_size(nm, rel)
x, y = _rank_xy(
r=r,
idx=idx,
rank_len=rank_len,
max_count=max_count,
rank_spacing=rank_spacing,
node_spacing=node_spacing,
padding=padding,
)

return {
"id": f"m:{nm}",
"kind": "model",
"name": nm,
"route": f"#/model/{nm}",
"type": getattr(n, "kind", "sql"),
"materialized": mat,
"relation": rel,
"rank": r,
"x": x,
"y": y,
"w": w,
"h": h,
}


def _source_payload(
*,
src: str,
tbl: str,
sources_by_key: dict[tuple[str, str], Any],
r: int,
idx: int,
rank_len: int,
max_count: int,
rank_spacing: int,
node_spacing: int,
padding: int,
) -> dict[str, Any]:
doc = sources_by_key.get((src, tbl))
rel = (
getattr(doc, "relation", None)
or (doc.get("relation") if isinstance(doc, dict) else None)
or f"{src}.{tbl}"
)
label = f"{src}.{tbl}"
w, h = _approx_node_size(label, rel)
x, y = _rank_xy(
r=r,
idx=idx,
rank_len=rank_len,
max_count=max_count,
rank_spacing=rank_spacing,
node_spacing=node_spacing,
padding=padding,
)

return {
"id": f"s:{src}.{tbl}",
"kind": "source",
"source_name": src,
"table_name": tbl,
"route": f"#/source/{src}/{tbl}",
"relation": rel,
"rank": r,
"x": x,
"y": y,
"w": w,
"h": h,
}


def _emit_nodes(
*,
nodes: dict[str, "Node"],
sources_by_key: dict[tuple[str, str], Any],
ordered_levels: dict[int, list[str]],
max_count: int,
rank_spacing: int,
node_spacing: int,
padding: int,
) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []

for r, names in ordered_levels.items():
rank_len = len(names)
for idx, item in enumerate(names):
if item.startswith(_SRC_PREFIX):
raw = item.split(":", 1)[1]
src, tbl = raw.split(".", 1)
out.append(
_source_payload(
src=src,
tbl=tbl,
sources_by_key=sources_by_key,
r=r,
idx=idx,
rank_len=rank_len,
max_count=max_count,
rank_spacing=rank_spacing,
node_spacing=node_spacing,
padding=padding,
)
)
else:
out.append(
_model_payload(
nm=item,
nodes=nodes,
r=r,
idx=idx,
rank_len=rank_len,
max_count=max_count,
rank_spacing=rank_spacing,
node_spacing=node_spacing,
padding=padding,
)
)

return out


def _emit_edges(
*,
nodes: dict[str, "Node"],
sources_by_key: dict[tuple[str, str], Any],
model_source_refs: dict[str, list[tuple[str, str]]],
) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []

# model deps
for nm, n in nodes.items():
for d in n.deps or []:
if d in nodes:
out.append({"from": f"m:{d}", "to": f"m:{nm}", "kind": "dep"})

# sources -> models
for nm, refs in model_source_refs.items():
for src, tbl in refs or []:
if (src, tbl) in sources_by_key:
out.append({"from": f"s:{src}.{tbl}", "to": f"m:{nm}", "kind": "source"})

return out


def _apply_direction(direction: str, out_nodes: list[dict[str, Any]]) -> str:
d = direction.upper()
if d == "TB":
for n in out_nodes:
n["x"], n["y"] = n["y"], n["x"]
return d


def _bounds(out_nodes: list[dict[str, Any]], padding: int) -> dict[str, int]:
if not out_nodes:
return {"minx": 0, "miny": 0, "maxx": 0, "maxy": 0, "width": padding, "height": padding}

minx = min(n["x"] for n in out_nodes)
miny = min(n["y"] for n in out_nodes)
maxx = max(n["x"] + n["w"] for n in out_nodes)
maxy = max(n["y"] + n["h"] for n in out_nodes)

return {
"minx": int(minx),
"miny": int(miny),
"maxx": int(maxx),
"maxy": int(maxy),
"width": int(maxx - minx + padding),
"height": int(maxy - miny + padding),
}


def spa_graph(
nodes: dict[str, "Node"],
*,
sources_by_key: dict[tuple[str, str], Any] | None = None,
model_source_refs: dict[str, list[tuple[str, str]]] | None = None,
direction: str = "LR", # LR or TB
rank_spacing: int = 280,
node_spacing: int = 84,
padding: int = 24,
) -> dict[str, Any]:
"""
Build a lightweight graph payload for the SPA:
- Layout computed here (no JS graph libs needed)
- Browser renders SVG + pan/zoom + navigation

sources_by_key values may be SourceDoc or dict-like; we only access:
.source_name, .table_name, .relation
"""
sources_by_key = sources_by_key or {}
model_source_refs = model_source_refs or {}

# Levels for models only (sources aren't Nodes)
lvls = levels(nodes) # raises on cycles/missing deps like topo_sort

source_keys = _collect_source_keys(
sources_by_key=sources_by_key,
model_source_refs=model_source_refs,
)
has_sources = bool(source_keys)
model_rank_offset = 1 if has_sources else 0

parents = _build_parents(nodes)
ordered_levels = _build_ordered_levels(
lvls=lvls,
parents=parents,
model_rank_offset=model_rank_offset,
source_keys=source_keys,
)
max_count = max((len(v) for v in ordered_levels.values()), default=1)

out_nodes = _emit_nodes(
nodes=nodes,
sources_by_key=sources_by_key,
ordered_levels=ordered_levels,
max_count=max_count,
rank_spacing=rank_spacing,
node_spacing=node_spacing,
padding=padding,
)
out_edges = _emit_edges(
nodes=nodes,
sources_by_key=sources_by_key,
model_source_refs=model_source_refs,
)

normalized_direction = _apply_direction(direction, out_nodes)
bounds = _bounds(out_nodes, padding)

return {
"direction": normalized_direction,
"nodes": out_nodes,
"edges": out_edges,
"bounds": bounds,
}
Loading