From 1231d426399ae88ba9b5108a78ca3a062247e447 Mon Sep 17 00:00:00 2001 From: Christian-Manuel Butzke Date: Thu, 2 Jul 2026 01:21:59 +0900 Subject: [PATCH] =?UTF-8?q?engine:=20static=20validation=20=E2=80=94=20unr?= =?UTF-8?q?eachable=20states=20&=20dead=20branches=20(SPEC=20=C2=A72)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit collect_errors now rejects (a) any declared state unreachable from the root's initial (conservative, guard-agnostic reachability over initial/region-initial/ on_events/after/choice targets; entering a state implies its ancestors), and (b) a guarded transition list or choice whose unguarded default branch is not last (later branches dead). Wired into CLI validate + the conformance static mode. Verified zero false positives across all 28 conformance definitions + examples. Two unit fixtures had genuinely unreachable states — made them reachable. Bump 0.0.3. Closes #42. --- conformance/harness.py | 3 + conformance/test_conformance.py | 2 +- src/harel/__about__.py | 2 +- src/harel/validator.py | 112 +++++++++++++++++++++++++++- tests/test_model.py | 2 +- tests/test_static_validation.py | 126 ++++++++++++++++++++++++++++++++ tests/test_validator.py | 3 +- 7 files changed, 244 insertions(+), 6 deletions(-) create mode 100644 tests/test_static_validation.py diff --git a/conformance/harness.py b/conformance/harness.py index 6c2b3cb..7129b3e 100644 --- a/conformance/harness.py +++ b/conformance/harness.py @@ -63,6 +63,9 @@ def conformance_root() -> Path: "23-choice", "24-choice-chain", "25-choice-invalid", + "26-unreachable", + "27-dead-branch", + "28-reachable-ok", } ) diff --git a/conformance/test_conformance.py b/conformance/test_conformance.py index 34dad67..cc7e7f4 100644 --- a/conformance/test_conformance.py +++ b/conformance/test_conformance.py @@ -92,7 +92,7 @@ def test_bundled_schema_matches_spec() -> None: def test_suite_present() -> None: if not CONFORMANCE_DIR.exists(): pytest.skip("conformance suite not fetched (offline; set HAREL_CONFORMANCE_DIR)") - assert len(engine_cases()) == 25, "expected 25 engine cases" + assert len(engine_cases()) == 28, "expected 28 engine cases" assert len(cli_cases()) == 3, "expected 3 CLI cases" diff --git a/src/harel/__about__.py b/src/harel/__about__.py index 058c3ab..c8d00a4 100644 --- a/src/harel/__about__.py +++ b/src/harel/__about__.py @@ -5,4 +5,4 @@ is exactly one place to bump. """ -__version__ = "0.0.2" +__version__ = "0.0.3" diff --git a/src/harel/validator.py b/src/harel/validator.py index 92ca5f3..3668e5d 100644 --- a/src/harel/validator.py +++ b/src/harel/validator.py @@ -58,9 +58,11 @@ def validate(doc: dict[str, Any]) -> None: def collect_errors(doc: dict[str, Any]) -> list[ErrorRecord]: - """Return all validation errors (structural + reserved names).""" + """Return all validation errors (structural + reserved names + static analysis).""" errors: list[ErrorRecord] = list(_structural_errors(doc)) errors.extend(_reserved_name_errors(doc)) + if isinstance(doc, dict) and isinstance(doc.get("top"), dict): + errors.extend(_reachability_errors(doc["top"])) return errors @@ -112,6 +114,109 @@ def _check_choice(path: str, branches: list[Any], errors: list[ErrorRecord]) -> ) +def _check_transition_list(path: str, branches: list[Any], errors: list[ErrorRecord]) -> None: + """In a guarded transition list, an unguarded branch shadows all later ones, so it + MUST be last — otherwise the later branches are dead (SPEC §2).""" + for br in branches[:-1]: + if isinstance(br, dict) and "guard" not in br: + errors.append( + ErrorRecord( + path=path, + message="an unguarded transition must be last; later branches are dead", + ) + ) + return + + +def _reachability_errors(top: dict[str, Any]) -> list[ErrorRecord]: + """Flag declared states unreachable from ``top`` (SPEC §2). Conservative and + guard-agnostic: reachability follows every ``initial``/region-initial/``on_events``/ + ``after``/``choice`` target regardless of guards, and entering a state implies its + ancestors (whose own edges are then also followed).""" + raws: dict[str, dict[str, Any]] = {} + parent: dict[str, str | None] = {} + children: dict[str, dict[str, str]] = {} + + def _build(path: str, node: dict[str, Any], par: str | None) -> None: + raws[path] = node + parent[path] = par + children[path] = {} + for cn, cd in (node.get("states") or {}).items(): + if isinstance(cd, dict): + children[path][cn] = f"{path}.{cn}" + _build(f"{path}.{cn}", cd, path) + for region in node.get("regions") or []: + for cn, cd in (region.get("states") or {}).items() if isinstance(region, dict) else []: + if isinstance(cd, dict): + children[path][cn] = f"{path}.{cn}" + _build(f"{path}.{cn}", cd, path) + + _build("top", top, None) + + def _resolve(src: str, ref: object) -> str | None: + if not isinstance(ref, str): + return None + parts = ref.split(".") + cur: str | None = src + anchor: str | None = None + while cur is not None: + if parts[0] in children.get(cur, {}): + anchor = children[cur][parts[0]] + break + cur = parent.get(cur) + if anchor is None: + return None + node = anchor + for p in parts[1:]: + node = children.get(node, {}).get(p, "") + if not node: + return None + return node + + def _targets(node: dict[str, Any]) -> list[object]: + out: list[object] = [] + initials = [node.get("initial")] + initials += [r.get("initial") for r in node.get("regions") or [] if isinstance(r, dict)] + for t in initials: + if isinstance(t, dict): + out.append(t.get("transition_to")) + for spec in (node.get("on_events") or {}).values(): + for tr in spec if isinstance(spec, list) else [spec]: + if isinstance(tr, dict): + out.append(tr.get("transition_to")) + for group in (node.get("after") or [], node.get("choice") or []): + for tr in group: + if isinstance(tr, dict): + out.append(tr.get("transition_to")) + return out + + reachable: set[str] = set() + stack = ["top"] + while stack: + path = stack.pop() + if path in reachable: + continue + reachable.add(path) + par = parent.get(path) + if par is not None and par not in reachable: + stack.append(par) # entering a state implies its ancestors; follow their edges too + for ref in _targets(raws[path]): + tgt = _resolve(path, ref) + if tgt is not None and tgt not in reachable: + stack.append(tgt) + + errors: list[ErrorRecord] = [] + for path in raws: + if path != "top" and path not in reachable: + errors.append( + ErrorRecord( + path="/" + path.replace(".", "/"), + message=f"unreachable state '{path.rsplit('.', 1)[-1]}'", + ) + ) + return sorted(errors, key=lambda e: e["path"]) + + def _forbid( name: object, reserved: frozenset[str], path: str, errors: list[ErrorRecord] ) -> None: @@ -133,6 +238,11 @@ def _walk_state(path: str, state: Any, errors: list[ErrorRecord]) -> None: choice = state.get("choice") if isinstance(choice, list): _check_choice(path, choice, errors) + on_events = state.get("on_events") + if isinstance(on_events, dict): + for ev, spec in on_events.items(): + if isinstance(spec, list): + _check_transition_list(f"{path}/on_events/{ev}", spec, errors) esvs = state.get("esvs") if isinstance(esvs, dict): for name in esvs: diff --git a/tests/test_model.py b/tests/test_model.py index 3d6a781..f0978b3 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -19,7 +19,7 @@ initial: { transition_to: a } states: a: { on_events: { go: { transition_to: b } } } - b: {} + b: { on_events: { go: { transition_to: d } } } d: {} """ diff --git a/tests/test_static_validation.py b/tests/test_static_validation.py new file mode 100644 index 0000000..cbd446c --- /dev/null +++ b/tests/test_static_validation.py @@ -0,0 +1,126 @@ +"""Static validation: unreachable states & dead transition branches (SPEC §2).""" + +from __future__ import annotations + +import pytest +import yaml + +from harel import collect_errors, load_definitions +from harel.errors import ValidationError + + +def _errors(src: str) -> list[str]: + return [e["message"] for e in collect_errors(yaml.safe_load(src))] + + +# --- unreachable states ----------------------------------------------------- +def test_unreachable_state_flagged() -> None: + src = """\ +id: m +events: { go: {} } +top: + initial: { transition_to: a } + states: + a: { on_events: { go: { transition_to: b } } } + b: {} + orphan: {} +""" + msgs = _errors(src) + assert any("unreachable state 'orphan'" in m for m in msgs) + with pytest.raises(ValidationError): + load_definitions(src) + + +def test_state_reached_only_via_composite_initial_is_ok() -> None: + src = """\ +id: m +events: { go: {} } +top: + initial: { transition_to: outer } + states: + outer: + initial: { transition_to: inner } + states: + inner: { on_events: { go: { transition_to: fin } } } + fin: { type: final } +""" + assert _errors(src) == [] + + +def test_deeply_targeted_state_marks_ancestors_reachable() -> None: + # Transitioning straight to outer.inner must not flag outer as unreachable. + src = """\ +id: m +events: { go: {} } +top: + initial: { transition_to: start } + states: + start: { on_events: { go: { transition_to: outer.inner } } } + outer: + initial: { transition_to: inner } + states: + inner: {} +""" + assert _errors(src) == [] + + +def test_orthogonal_region_reachability() -> None: + src = """\ +id: m +events: { go: {} } +top: + initial: { transition_to: par } + states: + par: + type: orthogonal + regions: + - initial: { transition_to: r1a } + states: + r1a: { on_events: { go: { transition_to: r1b } } } + r1b: {} + - initial: { transition_to: r2a } + states: { r2a: {}, r2orphan: {} } +""" + # r1a/r1b/r2a reachable via region initials + a transition; r2orphan is not. + msgs = _errors(src) + assert any("unreachable state 'r2orphan'" in m for m in msgs) + assert not any("'r1a'" in m or "'r1b'" in m or "'r2a'" in m for m in msgs) + + +# --- dead branches ---------------------------------------------------------- +def test_unguarded_branch_not_last_is_dead() -> None: + src = """\ +id: m +events: { check: { payload: { n: { type: int, required: true } } } } +top: + initial: { transition_to: s } + states: + s: + on_events: + check: + - { transition_to: a } + - { guard: "event.payload.n > 0", transition_to: b } + a: {} + b: {} +""" + assert any("must be last" in m for m in _errors(src)) + with pytest.raises(ValidationError): + load_definitions(src) + + +def test_guarded_list_with_unguarded_last_is_ok() -> None: + src = """\ +id: m +events: { check: { payload: { n: { type: int, required: true } } } } +top: + initial: { transition_to: s } + states: + s: + on_events: + check: + - { guard: "event.payload.n > 0", transition_to: a } + - { transition_to: b } + a: {} + b: {} +""" + assert _errors(src) == [] diff --git a/tests/test_validator.py b/tests/test_validator.py index e346b5a..9f8625a 100644 --- a/tests/test_validator.py +++ b/tests/test_validator.py @@ -26,10 +26,9 @@ SKELETON_STATES = """ id: m top: - initial: {{ transition_to: s }} + initial: {{ transition_to: {name} }} states: {name}: {{}} - s: {{}} """ SKELETON_ESVS = """