From 94f0e1d7132df30e8698e6510b94cbf7c73f978a Mon Sep 17 00:00:00 2001 From: Michael Kuehl Date: Thu, 23 Apr 2026 13:04:10 +0200 Subject: [PATCH] feat(engine): add state transition validation to GraphSync (fixes #523) --- agent_fox/engine/graph_sync.py | 61 +++++++++++++++++-- docs/memory.md | 9 ++- tests/unit/engine/test_sync.py | 108 +++++++++++++++++++++++++++++++++ 3 files changed, 172 insertions(+), 6 deletions(-) diff --git a/agent_fox/engine/graph_sync.py b/agent_fox/engine/graph_sync.py index b90ae0a1..94393c75 100644 --- a/agent_fox/engine/graph_sync.py +++ b/agent_fox/engine/graph_sync.py @@ -7,8 +7,11 @@ from __future__ import annotations +import logging from collections import Counter, deque +logger = logging.getLogger(__name__) + def _spec_name(node_id: str) -> str: """Extract spec name from node ID (everything before first colon). @@ -166,6 +169,15 @@ class GraphSync: dependents, and detect stall conditions. """ + VALID_TRANSITIONS: dict[str, set[str]] = { + "pending": {"in_progress", "blocked"}, + "in_progress": {"completed", "failed", "blocked"}, + "deferred": {"pending", "blocked"}, + "failed": {"pending"}, + "blocked": {"pending"}, + "completed": set(), # terminal — no outbound transitions + } + def __init__( self, node_states: dict[str, str], @@ -191,6 +203,8 @@ def __init__( self._edges = edges self._node_archetypes = node_archetypes + self._transition_log: list[dict[str, str]] = [] + # Build reverse adjacency: node -> list of nodes that depend on it. # Used for cascade blocking (BFS forward through dependents). self._dependents: dict[str, list[str]] = {n: [] for n in node_states} @@ -199,6 +213,39 @@ def __init__( if dep in self._dependents: self._dependents[dep].append(node) + def _transition(self, node_id: str, to_status: str, *, reason: str = "") -> None: + """Validate and apply a state transition, logging the change. + + Logs a warning on invalid transitions but always applies the + change — the orchestrator must remain resilient. + """ + from_status = self.node_states.get(node_id, "unknown") + valid_targets = self.VALID_TRANSITIONS.get(from_status) + if valid_targets is not None and to_status not in valid_targets: + logger.warning( + "Invalid state transition for %s: %s -> %s (reason: %s)", + node_id, + from_status, + to_status, + reason or "none", + ) + self.node_states[node_id] = to_status + logger.info( + "State transition: node=%s from=%s to=%s reason=%s", + node_id, + from_status, + to_status, + reason or "none", + ) + self._transition_log.append( + { + "node_id": node_id, + "from_status": from_status, + "to_status": to_status, + "reason": reason, + } + ) + def ready_tasks( self, duration_hints: dict[str, int] | None = None, @@ -258,7 +305,7 @@ def predecessors(self, node_id: str) -> list[str]: def mark_completed(self, node_id: str) -> None: """Mark a task as completed.""" - self.node_states[node_id] = "completed" + self._transition(node_id, "completed", reason="session completed") def mark_blocked(self, node_id: str, reason: str) -> list[str]: """Mark a task as blocked and cascade-block all dependents. @@ -274,7 +321,7 @@ def mark_blocked(self, node_id: str, reason: str) -> list[str]: List of node_ids that were cascade-blocked (does not include the originally blocked node itself). """ - self.node_states[node_id] = "blocked" + self._transition(node_id, "blocked", reason=reason) # BFS through dependents to cascade the block cascade_blocked: list[str] = [] @@ -301,7 +348,11 @@ def mark_blocked(self, node_id: str, reason: str) -> list[str]: if self.node_states.get(dependent) == "in_progress": queue.append(dependent) continue - self.node_states[dependent] = "blocked" + self._transition( + dependent, + "blocked", + reason=f"cascade from {node_id}", + ) cascade_blocked.append(dependent) queue.append(dependent) @@ -309,7 +360,7 @@ def mark_blocked(self, node_id: str, reason: str) -> list[str]: def mark_in_progress(self, node_id: str) -> None: """Mark a task as in_progress (being executed).""" - self.node_states[node_id] = "in_progress" + self._transition(node_id, "in_progress", reason="dispatched") def promote_deferred(self, limit: int = 1) -> list[str]: """Promote up to *limit* deferred nodes to pending. @@ -322,7 +373,7 @@ def promote_deferred(self, limit: int = 1) -> list[str]: continue deps = self._edges.get(node_id, []) if all(self.node_states.get(d) == "completed" for d in deps): - self.node_states[node_id] = "pending" + self._transition(node_id, "pending", reason="promoted from deferred") promoted.append(node_id) if len(promoted) >= limit: break diff --git a/docs/memory.md b/docs/memory.md index b18389a2..d47d608b 100644 --- a/docs/memory.md +++ b/docs/memory.md @@ -1,6 +1,13 @@ # Agent-Fox Memory -_3175 facts | last updated: 2026-04-23_ +_3176 facts | last updated: 2026-04-23_ + +**2026-04-23 state transition validation (issue #523):** Added +`VALID_TRANSITIONS` table and `_transition()` validation method to +`GraphSync` in `engine/graph_sync.py`. All `mark_*()` methods now validate +transitions (warning on invalid, never crashing) and emit structured log +entries for debugging. `_transition_log` list provides an in-memory audit +trail. +12 tests (4342 total pass). **2026-04-23 errata generation (issue #522):** Added lightweight errata auto-generation from reviewer blocking. When a reviewer blocks a coder task diff --git a/tests/unit/engine/test_sync.py b/tests/unit/engine/test_sync.py index debd6fe0..6804be17 100644 --- a/tests/unit/engine/test_sync.py +++ b/tests/unit/engine/test_sync.py @@ -8,6 +8,8 @@ from __future__ import annotations +import logging + from agent_fox.engine.graph_sync import GraphSync @@ -437,3 +439,109 @@ def test_deferred_nodes_excluded_from_ready(self) -> None: assert "C" in ready assert "B" not in ready + + +class TestTransitionValidation: + """Tests for state transition validation (issue #523).""" + + def test_valid_transition_pending_to_in_progress(self) -> None: + node_states = {"A": "pending"} + sync = GraphSync(node_states, {}) + sync.mark_in_progress("A") + assert sync.node_states["A"] == "in_progress" + + def test_valid_transition_in_progress_to_completed(self) -> None: + node_states = {"A": "in_progress"} + sync = GraphSync(node_states, {}) + sync.mark_completed("A") + assert sync.node_states["A"] == "completed" + + def test_valid_transition_pending_to_blocked(self) -> None: + node_states = {"A": "pending"} + sync = GraphSync(node_states, {}) + sync.mark_blocked("A", "test") + assert sync.node_states["A"] == "blocked" + + def test_valid_transition_deferred_to_pending(self) -> None: + node_states = {"A": "deferred"} + sync = GraphSync(node_states, {}) + promoted = sync.promote_deferred(limit=1) + assert promoted == ["A"] + assert sync.node_states["A"] == "pending" + + def test_invalid_transition_completed_to_pending_warns( + self, caplog: logging.LogRecordArgs + ) -> None: + """Completed is terminal — transitioning away logs a warning.""" + node_states = {"A": "completed"} + sync = GraphSync(node_states, {}) + with caplog.at_level(logging.WARNING, logger="agent_fox.engine.graph_sync"): # type: ignore[union-attr] + sync._transition("A", "pending", reason="reset attempt") + assert sync.node_states["A"] == "pending" + assert any("Invalid state transition" in r.message for r in caplog.records) + + def test_invalid_transition_completed_to_in_progress_warns( + self, caplog: logging.LogRecordArgs + ) -> None: + node_states = {"A": "completed"} + sync = GraphSync(node_states, {}) + with caplog.at_level(logging.WARNING, logger="agent_fox.engine.graph_sync"): # type: ignore[union-attr] + sync.mark_in_progress("A") + assert sync.node_states["A"] == "in_progress" + assert any("Invalid state transition" in r.message for r in caplog.records) + + def test_invalid_transition_pending_to_completed_warns( + self, caplog: logging.LogRecordArgs + ) -> None: + """pending -> completed is not valid (must go via in_progress).""" + node_states = {"A": "pending"} + sync = GraphSync(node_states, {}) + with caplog.at_level(logging.WARNING, logger="agent_fox.engine.graph_sync"): # type: ignore[union-attr] + sync.mark_completed("A") + assert sync.node_states["A"] == "completed" + assert any("Invalid state transition" in r.message for r in caplog.records) + + def test_unknown_source_state_no_crash(self) -> None: + """Unknown source states do not crash — no entry in VALID_TRANSITIONS.""" + node_states = {"A": "mystery"} + sync = GraphSync(node_states, {}) + sync._transition("A", "pending", reason="recovery") + assert sync.node_states["A"] == "pending" + + def test_transition_table_completed_is_terminal(self) -> None: + """Completed has an empty set — no outbound transitions.""" + assert GraphSync.VALID_TRANSITIONS["completed"] == set() + + +class TestTransitionLogging: + """Tests for structured transition event logging (issue #523).""" + + def test_transition_log_recorded(self) -> None: + node_states = {"A": "pending"} + sync = GraphSync(node_states, {}) + sync.mark_in_progress("A") + assert len(sync._transition_log) == 1 + entry = sync._transition_log[0] + assert entry["node_id"] == "A" + assert entry["from_status"] == "pending" + assert entry["to_status"] == "in_progress" + assert entry["reason"] == "dispatched" + + def test_cascade_blocking_logs_all_transitions(self) -> None: + node_states = {"A": "pending", "B": "pending", "C": "pending"} + edges = {"B": ["A"], "C": ["B"]} + sync = GraphSync(node_states, edges) + sync.mark_blocked("A", "retries exhausted") + assert len(sync._transition_log) == 3 + node_ids = [e["node_id"] for e in sync._transition_log] + assert node_ids == ["A", "B", "C"] + + def test_structured_log_message_emitted( + self, caplog: logging.LogRecordArgs + ) -> None: + node_states = {"X": "pending"} + sync = GraphSync(node_states, {}) + with caplog.at_level(logging.INFO, logger="agent_fox.engine.graph_sync"): # type: ignore[union-attr] + sync.mark_in_progress("X") + info_messages = [r.message for r in caplog.records if r.levelno == logging.INFO] + assert any("State transition:" in m and "node=X" in m for m in info_messages)