Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
entry_point_module,
)
from .markdown.pipeline import generate_markdown_pages
from .wassirman.ir import ValidationIR
from .wassirman.pipeline import generate_validation_ir

log = logging.getLogger(__name__)

__all__ = ["cli"]

_OUTPUT_FORMATS = ("markdown",)
_OUTPUT_FORMATS = ("markdown", "wassirman")

_FEATURE_FRONTMATTER = "---\nsidebar_position: 1\n---\n\n"

Expand Down Expand Up @@ -120,7 +122,26 @@ def generate(
)
)

_generate_markdown(feature_specs, schema_root, output_dir)
if output_format == "markdown":
_generate_markdown(feature_specs, schema_root, output_dir)
elif output_format == "wassirman":
_generate_wassirman(feature_specs, output_dir)


def _generate_wassirman(
feature_specs: list[FeatureSpec],
output_dir: Path | None,
) -> None:
"""Generate validation IR as YAML."""
ir = generate_validation_ir(feature_specs)
if output_dir:
for dataset in ir.datasets:
file_path = output_dir / f"{dataset.name}.yaml"
file_path.parent.mkdir(parents=True, exist_ok=True)
single_ir = ValidationIR(datasets=[dataset])
file_path.write_text(single_ir.to_yaml())
else:
click.echo(ir.to_yaml())


def _generate_markdown(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"PRIMITIVE_TYPES",
"get_type_mapping",
"is_semantic_newtype",
"is_storage_primitive_source",
"resolve_type_name",
]

Expand Down Expand Up @@ -82,6 +83,27 @@ def get_type_mapping(type_name: str) -> TypeMapping | None:
return PRIMITIVE_TYPES.get(type_name)


def is_storage_primitive_source(source_name: str | None) -> bool:
"""Whether a ConstraintSource name refers to a registered storage primitive.

Used by validation renderers to filter out storage-level constraints
(e.g., int32 range) in favor of domain-level constraints.

Parameters
----------
source_name
The NewType or primitive name to check, or None.

Returns
-------
bool
True if source_name is a key in PRIMITIVE_TYPES.
"""
if source_name is None:
return False
return source_name in PRIMITIVE_TYPES


def resolve_type_name(type_info: TypeInfo, target: str) -> str:
"""Resolve a TypeInfo to the base type string for a given target.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""Validation IR data types for YAML serialization."""

from __future__ import annotations

from dataclasses import dataclass

import yaml

__all__ = ["ConditionIR", "DatasetIR", "RuleIR", "ValidationIR"]


@dataclass(frozen=True, slots=True)
class ConditionIR:
"""Guard predicate for conditional rules."""

column: str
check: str
value: object | None = None

def to_dict(self) -> dict[str, object]:
"""Serialize to dict, omitting None fields."""
d: dict[str, object] = {"column": self.column, "check": self.check}
if self.value is not None:
d["value"] = self.value
return d


@dataclass(frozen=True, slots=True)
class RuleIR:
"""Single validation rule."""

name: str
check: str
severity: str
column: str | None = None
columns: list[str] | None = None
value: object | None = None
list_columns: list[str] | None = None
when: ConditionIR | None = None

def to_dict(self) -> dict[str, object]:
"""Serialize to dict, omitting None fields."""
d: dict[str, object] = {"name": self.name}
if self.column is not None:
d["column"] = self.column
if self.columns is not None:
d["columns"] = self.columns
d["check"] = self.check
if self.value is not None:
d["value"] = self.value
if self.list_columns is not None:
d["list_columns"] = self.list_columns
if self.when is not None:
d["when"] = self.when.to_dict()
d["severity"] = self.severity
return d


@dataclass(frozen=True, slots=True)
class DatasetIR:
"""Validation rules for one feature type."""

name: str
source_model: str
id_column: str
rules: list[RuleIR]

def to_dict(self) -> dict[str, object]:
"""Serialize to dict."""
return {
"name": self.name,
"source_model": self.source_model,
"id_column": self.id_column,
"rules": [r.to_dict() for r in self.rules],
}


@dataclass(frozen=True, slots=True)
class ValidationIR:
"""Full validation IR envelope."""

datasets: list[DatasetIR]
version: str = "1"

def to_yaml(self) -> str:
"""Serialize to YAML string."""
data = {
"version": self.version,
"datasets": [ds.to_dict() for ds in self.datasets],
}
return yaml.dump(data, default_flow_style=False, sort_keys=False)
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Validation IR generation pipeline."""

from __future__ import annotations

from collections.abc import Sequence

from ..extraction.model_extraction import expand_model_tree
from ..extraction.specs import FeatureSpec, ModelSpec
from ..extraction.type_analyzer import TypeKind
from .ir import DatasetIR, ValidationIR
from .walker import walk_feature

__all__ = ["generate_validation_ir"]


def _dataset_name(spec: FeatureSpec) -> str:
"""Derive dataset name from the model's type Literal field."""
for field_spec in spec.fields:
if field_spec.name == "type" and field_spec.type_info.kind == TypeKind.LITERAL:
vals = field_spec.type_info.literal_values
if vals and len(vals) == 1:
return str(vals[0])
return spec.name.lower()


def _source_model_fqn(spec: FeatureSpec) -> str:
"""Fully qualified name of the source model."""
src = spec.source_type
if src is None:
return spec.name
return f"{src.__module__}.{src.__qualname__}"


def generate_validation_ir(
feature_specs: Sequence[FeatureSpec],
) -> ValidationIR:
"""Generate validation IR from feature specs.

Parameters
----------
feature_specs
Extracted feature specs to convert to validation IR.

Returns
-------
ValidationIR
Full validation IR with one dataset per feature spec.
"""
cache: dict[type, ModelSpec] = {}
for spec in feature_specs:
expand_model_tree(spec, cache)

datasets: list[DatasetIR] = []
for spec in feature_specs:
name = _dataset_name(spec)
rules = walk_feature(spec, name)
datasets.append(
DatasetIR(
name=name,
source_model=_source_model_fqn(spec),
id_column="id",
rules=rules,
)
)

return ValidationIR(datasets=datasets)
Loading
Loading