Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 56 additions & 5 deletions agent_fox/engine/graph_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@

from __future__ import annotations

import logging
from collections import Counter, deque

logger = logging.getLogger(__name__)


def _spec_name(node_id: str) -> str:
"""Extract spec name from node ID (everything before first colon).
Expand Down Expand Up @@ -166,6 +169,15 @@ class GraphSync:
dependents, and detect stall conditions.
"""

VALID_TRANSITIONS: dict[str, set[str]] = {
"pending": {"in_progress", "blocked"},
"in_progress": {"completed", "failed", "blocked"},
"deferred": {"pending", "blocked"},
"failed": {"pending"},
"blocked": {"pending"},
"completed": set(), # terminal — no outbound transitions
}

def __init__(
self,
node_states: dict[str, str],
Expand All @@ -191,6 +203,8 @@ def __init__(
self._edges = edges
self._node_archetypes = node_archetypes

self._transition_log: list[dict[str, str]] = []

# Build reverse adjacency: node -> list of nodes that depend on it.
# Used for cascade blocking (BFS forward through dependents).
self._dependents: dict[str, list[str]] = {n: [] for n in node_states}
Expand All @@ -199,6 +213,39 @@ def __init__(
if dep in self._dependents:
self._dependents[dep].append(node)

def _transition(self, node_id: str, to_status: str, *, reason: str = "") -> None:
"""Validate and apply a state transition, logging the change.

Logs a warning on invalid transitions but always applies the
change — the orchestrator must remain resilient.
"""
from_status = self.node_states.get(node_id, "unknown")
valid_targets = self.VALID_TRANSITIONS.get(from_status)
if valid_targets is not None and to_status not in valid_targets:
logger.warning(
"Invalid state transition for %s: %s -> %s (reason: %s)",
node_id,
from_status,
to_status,
reason or "none",
)
self.node_states[node_id] = to_status
logger.info(
"State transition: node=%s from=%s to=%s reason=%s",
node_id,
from_status,
to_status,
reason or "none",
)
self._transition_log.append(
{
"node_id": node_id,
"from_status": from_status,
"to_status": to_status,
"reason": reason,
}
)

def ready_tasks(
self,
duration_hints: dict[str, int] | None = None,
Expand Down Expand Up @@ -258,7 +305,7 @@ def predecessors(self, node_id: str) -> list[str]:

def mark_completed(self, node_id: str) -> None:
"""Mark a task as completed."""
self.node_states[node_id] = "completed"
self._transition(node_id, "completed", reason="session completed")

def mark_blocked(self, node_id: str, reason: str) -> list[str]:
"""Mark a task as blocked and cascade-block all dependents.
Expand All @@ -274,7 +321,7 @@ def mark_blocked(self, node_id: str, reason: str) -> list[str]:
List of node_ids that were cascade-blocked (does not include
the originally blocked node itself).
"""
self.node_states[node_id] = "blocked"
self._transition(node_id, "blocked", reason=reason)

# BFS through dependents to cascade the block
cascade_blocked: list[str] = []
Expand All @@ -301,15 +348,19 @@ def mark_blocked(self, node_id: str, reason: str) -> list[str]:
if self.node_states.get(dependent) == "in_progress":
queue.append(dependent)
continue
self.node_states[dependent] = "blocked"
self._transition(
dependent,
"blocked",
reason=f"cascade from {node_id}",
)
cascade_blocked.append(dependent)
queue.append(dependent)

return cascade_blocked

def mark_in_progress(self, node_id: str) -> None:
"""Mark a task as in_progress (being executed)."""
self.node_states[node_id] = "in_progress"
self._transition(node_id, "in_progress", reason="dispatched")

def promote_deferred(self, limit: int = 1) -> list[str]:
"""Promote up to *limit* deferred nodes to pending.
Expand All @@ -322,7 +373,7 @@ def promote_deferred(self, limit: int = 1) -> list[str]:
continue
deps = self._edges.get(node_id, [])
if all(self.node_states.get(d) == "completed" for d in deps):
self.node_states[node_id] = "pending"
self._transition(node_id, "pending", reason="promoted from deferred")
promoted.append(node_id)
if len(promoted) >= limit:
break
Expand Down
9 changes: 8 additions & 1 deletion docs/memory.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Agent-Fox Memory

_3175 facts | last updated: 2026-04-23_
_3176 facts | last updated: 2026-04-23_

**2026-04-23 state transition validation (issue #523):** Added
`VALID_TRANSITIONS` table and `_transition()` validation method to
`GraphSync` in `engine/graph_sync.py`. All `mark_*()` methods now validate
transitions (warning on invalid, never crashing) and emit structured log
entries for debugging. `_transition_log` list provides an in-memory audit
trail. +12 tests (4342 total pass).

**2026-04-23 errata generation (issue #522):** Added lightweight errata
auto-generation from reviewer blocking. When a reviewer blocks a coder task
Expand Down
108 changes: 108 additions & 0 deletions tests/unit/engine/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from __future__ import annotations

import logging

from agent_fox.engine.graph_sync import GraphSync


Expand Down Expand Up @@ -437,3 +439,109 @@ def test_deferred_nodes_excluded_from_ready(self) -> None:

assert "C" in ready
assert "B" not in ready


class TestTransitionValidation:
"""Tests for state transition validation (issue #523)."""

def test_valid_transition_pending_to_in_progress(self) -> None:
node_states = {"A": "pending"}
sync = GraphSync(node_states, {})
sync.mark_in_progress("A")
assert sync.node_states["A"] == "in_progress"

def test_valid_transition_in_progress_to_completed(self) -> None:
node_states = {"A": "in_progress"}
sync = GraphSync(node_states, {})
sync.mark_completed("A")
assert sync.node_states["A"] == "completed"

def test_valid_transition_pending_to_blocked(self) -> None:
node_states = {"A": "pending"}
sync = GraphSync(node_states, {})
sync.mark_blocked("A", "test")
assert sync.node_states["A"] == "blocked"

def test_valid_transition_deferred_to_pending(self) -> None:
node_states = {"A": "deferred"}
sync = GraphSync(node_states, {})
promoted = sync.promote_deferred(limit=1)
assert promoted == ["A"]
assert sync.node_states["A"] == "pending"

def test_invalid_transition_completed_to_pending_warns(
self, caplog: logging.LogRecordArgs
) -> None:
"""Completed is terminal — transitioning away logs a warning."""
node_states = {"A": "completed"}
sync = GraphSync(node_states, {})
with caplog.at_level(logging.WARNING, logger="agent_fox.engine.graph_sync"): # type: ignore[union-attr]
sync._transition("A", "pending", reason="reset attempt")
assert sync.node_states["A"] == "pending"
assert any("Invalid state transition" in r.message for r in caplog.records)

def test_invalid_transition_completed_to_in_progress_warns(
self, caplog: logging.LogRecordArgs
) -> None:
node_states = {"A": "completed"}
sync = GraphSync(node_states, {})
with caplog.at_level(logging.WARNING, logger="agent_fox.engine.graph_sync"): # type: ignore[union-attr]
sync.mark_in_progress("A")
assert sync.node_states["A"] == "in_progress"
assert any("Invalid state transition" in r.message for r in caplog.records)

def test_invalid_transition_pending_to_completed_warns(
self, caplog: logging.LogRecordArgs
) -> None:
"""pending -> completed is not valid (must go via in_progress)."""
node_states = {"A": "pending"}
sync = GraphSync(node_states, {})
with caplog.at_level(logging.WARNING, logger="agent_fox.engine.graph_sync"): # type: ignore[union-attr]
sync.mark_completed("A")
assert sync.node_states["A"] == "completed"
assert any("Invalid state transition" in r.message for r in caplog.records)

def test_unknown_source_state_no_crash(self) -> None:
"""Unknown source states do not crash — no entry in VALID_TRANSITIONS."""
node_states = {"A": "mystery"}
sync = GraphSync(node_states, {})
sync._transition("A", "pending", reason="recovery")
assert sync.node_states["A"] == "pending"

def test_transition_table_completed_is_terminal(self) -> None:
"""Completed has an empty set — no outbound transitions."""
assert GraphSync.VALID_TRANSITIONS["completed"] == set()


class TestTransitionLogging:
"""Tests for structured transition event logging (issue #523)."""

def test_transition_log_recorded(self) -> None:
node_states = {"A": "pending"}
sync = GraphSync(node_states, {})
sync.mark_in_progress("A")
assert len(sync._transition_log) == 1
entry = sync._transition_log[0]
assert entry["node_id"] == "A"
assert entry["from_status"] == "pending"
assert entry["to_status"] == "in_progress"
assert entry["reason"] == "dispatched"

def test_cascade_blocking_logs_all_transitions(self) -> None:
node_states = {"A": "pending", "B": "pending", "C": "pending"}
edges = {"B": ["A"], "C": ["B"]}
sync = GraphSync(node_states, edges)
sync.mark_blocked("A", "retries exhausted")
assert len(sync._transition_log) == 3
node_ids = [e["node_id"] for e in sync._transition_log]
assert node_ids == ["A", "B", "C"]

def test_structured_log_message_emitted(
self, caplog: logging.LogRecordArgs
) -> None:
node_states = {"X": "pending"}
sync = GraphSync(node_states, {})
with caplog.at_level(logging.INFO, logger="agent_fox.engine.graph_sync"): # type: ignore[union-attr]
sync.mark_in_progress("X")
info_messages = [r.message for r in caplog.records if r.levelno == logging.INFO]
assert any("State transition:" in m and "node=X" in m for m in info_messages)