Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conformance/harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def conformance_root() -> Path:
"23-choice",
"24-choice-chain",
"25-choice-invalid",
"26-unreachable",
"27-dead-branch",
"28-reachable-ok",
}
)

Expand Down
2 changes: 1 addition & 1 deletion conformance/test_conformance.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_bundled_schema_matches_spec() -> None:
def test_suite_present() -> None:
if not CONFORMANCE_DIR.exists():
pytest.skip("conformance suite not fetched (offline; set HAREL_CONFORMANCE_DIR)")
assert len(engine_cases()) == 25, "expected 25 engine cases"
assert len(engine_cases()) == 28, "expected 28 engine cases"
assert len(cli_cases()) == 3, "expected 3 CLI cases"


Expand Down
2 changes: 1 addition & 1 deletion src/harel/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
is exactly one place to bump.
"""

__version__ = "0.0.2"
__version__ = "0.0.3"
112 changes: 111 additions & 1 deletion src/harel/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ def validate(doc: dict[str, Any]) -> None:


def collect_errors(doc: dict[str, Any]) -> list[ErrorRecord]:
"""Return all validation errors (structural + reserved names)."""
"""Return all validation errors (structural + reserved names + static analysis)."""
errors: list[ErrorRecord] = list(_structural_errors(doc))
errors.extend(_reserved_name_errors(doc))
if isinstance(doc, dict) and isinstance(doc.get("top"), dict):
errors.extend(_reachability_errors(doc["top"]))
return errors


Expand Down Expand Up @@ -112,6 +114,109 @@ def _check_choice(path: str, branches: list[Any], errors: list[ErrorRecord]) ->
)


def _check_transition_list(path: str, branches: list[Any], errors: list[ErrorRecord]) -> None:
"""In a guarded transition list, an unguarded branch shadows all later ones, so it
MUST be last — otherwise the later branches are dead (SPEC §2)."""
for br in branches[:-1]:
if isinstance(br, dict) and "guard" not in br:
errors.append(
ErrorRecord(
path=path,
message="an unguarded transition must be last; later branches are dead",
)
)
return


def _reachability_errors(top: dict[str, Any]) -> list[ErrorRecord]:
"""Flag declared states unreachable from ``top`` (SPEC §2). Conservative and
guard-agnostic: reachability follows every ``initial``/region-initial/``on_events``/
``after``/``choice`` target regardless of guards, and entering a state implies its
ancestors (whose own edges are then also followed)."""
raws: dict[str, dict[str, Any]] = {}
parent: dict[str, str | None] = {}
children: dict[str, dict[str, str]] = {}

def _build(path: str, node: dict[str, Any], par: str | None) -> None:
raws[path] = node
parent[path] = par
children[path] = {}
for cn, cd in (node.get("states") or {}).items():
if isinstance(cd, dict):
children[path][cn] = f"{path}.{cn}"
_build(f"{path}.{cn}", cd, path)
for region in node.get("regions") or []:
for cn, cd in (region.get("states") or {}).items() if isinstance(region, dict) else []:
if isinstance(cd, dict):
children[path][cn] = f"{path}.{cn}"
_build(f"{path}.{cn}", cd, path)

_build("top", top, None)

def _resolve(src: str, ref: object) -> str | None:
if not isinstance(ref, str):
return None
parts = ref.split(".")
cur: str | None = src
anchor: str | None = None
while cur is not None:
if parts[0] in children.get(cur, {}):
anchor = children[cur][parts[0]]
break
cur = parent.get(cur)
if anchor is None:
return None
node = anchor
for p in parts[1:]:
node = children.get(node, {}).get(p, "")
if not node:
return None
return node

def _targets(node: dict[str, Any]) -> list[object]:
out: list[object] = []
initials = [node.get("initial")]
initials += [r.get("initial") for r in node.get("regions") or [] if isinstance(r, dict)]
for t in initials:
if isinstance(t, dict):
out.append(t.get("transition_to"))
for spec in (node.get("on_events") or {}).values():
for tr in spec if isinstance(spec, list) else [spec]:
if isinstance(tr, dict):
out.append(tr.get("transition_to"))
for group in (node.get("after") or [], node.get("choice") or []):
for tr in group:
if isinstance(tr, dict):
out.append(tr.get("transition_to"))
return out

reachable: set[str] = set()
stack = ["top"]
while stack:
path = stack.pop()
if path in reachable:
continue
reachable.add(path)
par = parent.get(path)
if par is not None and par not in reachable:
stack.append(par) # entering a state implies its ancestors; follow their edges too
for ref in _targets(raws[path]):
tgt = _resolve(path, ref)
if tgt is not None and tgt not in reachable:
stack.append(tgt)

errors: list[ErrorRecord] = []
for path in raws:
if path != "top" and path not in reachable:
errors.append(
ErrorRecord(
path="/" + path.replace(".", "/"),
message=f"unreachable state '{path.rsplit('.', 1)[-1]}'",
)
)
return sorted(errors, key=lambda e: e["path"])


def _forbid(
name: object, reserved: frozenset[str], path: str, errors: list[ErrorRecord]
) -> None:
Expand All @@ -133,6 +238,11 @@ def _walk_state(path: str, state: Any, errors: list[ErrorRecord]) -> None:
choice = state.get("choice")
if isinstance(choice, list):
_check_choice(path, choice, errors)
on_events = state.get("on_events")
if isinstance(on_events, dict):
for ev, spec in on_events.items():
if isinstance(spec, list):
_check_transition_list(f"{path}/on_events/{ev}", spec, errors)
esvs = state.get("esvs")
if isinstance(esvs, dict):
for name in esvs:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
initial: { transition_to: a }
states:
a: { on_events: { go: { transition_to: b } } }
b: {}
b: { on_events: { go: { transition_to: d } } }
d: {}
"""

Expand Down
126 changes: 126 additions & 0 deletions tests/test_static_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""Static validation: unreachable states & dead transition branches (SPEC §2)."""

from __future__ import annotations

import pytest
import yaml

from harel import collect_errors, load_definitions
from harel.errors import ValidationError


def _errors(src: str) -> list[str]:
return [e["message"] for e in collect_errors(yaml.safe_load(src))]


# --- unreachable states -----------------------------------------------------
def test_unreachable_state_flagged() -> None:
src = """\
id: m
events: { go: {} }
top:
initial: { transition_to: a }
states:
a: { on_events: { go: { transition_to: b } } }
b: {}
orphan: {}
"""
msgs = _errors(src)
assert any("unreachable state 'orphan'" in m for m in msgs)
with pytest.raises(ValidationError):
load_definitions(src)


def test_state_reached_only_via_composite_initial_is_ok() -> None:
src = """\
id: m
events: { go: {} }
top:
initial: { transition_to: outer }
states:
outer:
initial: { transition_to: inner }
states:
inner: { on_events: { go: { transition_to: fin } } }
fin: { type: final }
"""
assert _errors(src) == []


def test_deeply_targeted_state_marks_ancestors_reachable() -> None:
# Transitioning straight to outer.inner must not flag outer as unreachable.
src = """\
id: m
events: { go: {} }
top:
initial: { transition_to: start }
states:
start: { on_events: { go: { transition_to: outer.inner } } }
outer:
initial: { transition_to: inner }
states:
inner: {}
"""
assert _errors(src) == []


def test_orthogonal_region_reachability() -> None:
src = """\
id: m
events: { go: {} }
top:
initial: { transition_to: par }
states:
par:
type: orthogonal
regions:
- initial: { transition_to: r1a }
states:
r1a: { on_events: { go: { transition_to: r1b } } }
r1b: {}
- initial: { transition_to: r2a }
states: { r2a: {}, r2orphan: {} }
"""
# r1a/r1b/r2a reachable via region initials + a transition; r2orphan is not.
msgs = _errors(src)
assert any("unreachable state 'r2orphan'" in m for m in msgs)
assert not any("'r1a'" in m or "'r1b'" in m or "'r2a'" in m for m in msgs)


# --- dead branches ----------------------------------------------------------
def test_unguarded_branch_not_last_is_dead() -> None:
src = """\
id: m
events: { check: { payload: { n: { type: int, required: true } } } }
top:
initial: { transition_to: s }
states:
s:
on_events:
check:
- { transition_to: a }
- { guard: "event.payload.n > 0", transition_to: b }
a: {}
b: {}
"""
assert any("must be last" in m for m in _errors(src))
with pytest.raises(ValidationError):
load_definitions(src)


def test_guarded_list_with_unguarded_last_is_ok() -> None:
src = """\
id: m
events: { check: { payload: { n: { type: int, required: true } } } }
top:
initial: { transition_to: s }
states:
s:
on_events:
check:
- { guard: "event.payload.n > 0", transition_to: a }
- { transition_to: b }
a: {}
b: {}
"""
assert _errors(src) == []
3 changes: 1 addition & 2 deletions tests/test_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
SKELETON_STATES = """
id: m
top:
initial: {{ transition_to: s }}
initial: {{ transition_to: {name} }}
states:
{name}: {{}}
s: {{}}
"""

SKELETON_ESVS = """
Expand Down
Loading