Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions agent_fox/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,8 @@ def _init_run(
except Exception:
logger.debug("Failed to create DB run record", exc_info=True)

self._graph_sync = GraphSync(state.node_states, edges_dict)
node_archetypes = {nid: n.archetype for nid, n in graph.nodes.items()}
self._graph_sync = GraphSync(state.node_states, edges_dict, node_archetypes)
_defer_ready_reviews(graph, self._graph_sync, self._knowledge_db_conn)
self._result_handler = SessionResultHandler(
graph_sync=self._graph_sync,
Expand Down Expand Up @@ -1423,7 +1424,8 @@ async def _hot_load_new_specs(self, state: ExecutionState) -> None:

# Rebuild GraphSync with updated graph
edges_dict = _build_edges_dict_from_graph(self._graph)
self._graph_sync = GraphSync(state.node_states, edges_dict)
node_archetypes = {nid: n.archetype for nid, n in self._graph.nodes.items()}
self._graph_sync = GraphSync(state.node_states, edges_dict, node_archetypes)
_defer_ready_reviews(self._graph, self._graph_sync, self._knowledge_db_conn)

def _block_task(
Expand Down
36 changes: 29 additions & 7 deletions agent_fox/engine/graph_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,21 @@ def _interleave_by_spec(
ready: list[str],
duration_hints: dict[str, int] | None = None,
fan_out_weights: dict[str, int] | None = None,
node_archetypes: dict[str, str] | None = None,
) -> list[str]:
"""Order ready tasks with pre-review priority and spec-fair interleaving.
"""Order ready tasks with three-tier priority and spec-fair interleaving.

Partitions ready tasks into two tiers:
Partitions ready tasks into three tiers:

1. **Pre-review tier** (auto_pre nodes at group 0): sorted by spec
fan-out descending so critical-path specs surface blockers first.
2. **Regular tier** (coder and post-review nodes): sorted by spec
number ascending with spec-fair round-robin interleaving.
2. **Coder tier** (implementation nodes): sorted by spec number
ascending with spec-fair round-robin interleaving.
3. **Review tier** (non-auto_pre review/verifier nodes): sorted by
spec number ascending with spec-fair round-robin interleaving.

When ``node_archetypes`` is not provided, tiers 2 and 3 are merged
(backward-compatible two-tier behavior).

Within each tier, tasks are interleaved round-robin across spec
groups.
Expand All @@ -120,9 +126,12 @@ def _interleave_by_spec(
duration_hints: Optional mapping of node_id -> predicted duration ms.
fan_out_weights: Optional mapping of spec_name -> fan-out weight
(count of distinct downstream specs).
node_archetypes: Optional mapping of node_id -> archetype string.
When provided, non-auto_pre nodes are partitioned into coder
(archetype == "coder") and review (all others) tiers.

Returns:
Pre-review-prioritized, spec-fair-ordered list of node IDs.
Priority-ordered, spec-fair list of node IDs.

Requirements: 69-REQ-1.1, 69-REQ-1.3, 69-REQ-2.1, 69-REQ-2.2, 69-REQ-2.3
"""
Expand All @@ -135,7 +144,15 @@ def _interleave_by_spec(
result: list[str] = []
if pre:
result.extend(_spec_round_robin(pre, duration_hints, fan_out_weights))
if regular:

if node_archetypes:
coders = [n for n in regular if node_archetypes.get(n, "coder") == "coder"]
reviews = [n for n in regular if node_archetypes.get(n, "coder") != "coder"]
if coders:
result.extend(_spec_round_robin(coders, duration_hints))
if reviews:
result.extend(_spec_round_robin(reviews, duration_hints))
elif regular:
result.extend(_spec_round_robin(regular, duration_hints))

return result
Expand All @@ -153,6 +170,7 @@ def __init__(
self,
node_states: dict[str, str],
edges: dict[str, list[str]],
node_archetypes: dict[str, str] | None = None,
) -> None:
"""Initialise graph sync with node states and dependency edges.

Expand All @@ -165,9 +183,13 @@ def __init__(
edges: Adjacency list where each key is a node_id and its
value is a list of dependency node_ids (predecessors
that must complete before this node can execute).
node_archetypes: Optional mapping of node_id -> archetype
string. When provided, ``ready_tasks()`` uses three-tier
priority ordering (auto_pre > coders > reviews).
"""
self.node_states = node_states
self._edges = edges
self._node_archetypes = node_archetypes

# Build reverse adjacency: node -> list of nodes that depend on it.
# Used for cascade blocking (BFS forward through dependents).
Expand Down Expand Up @@ -213,7 +235,7 @@ def ready_tasks(
ready.append(node_id)

fan_out = self._compute_spec_fan_out()
return _interleave_by_spec(ready, duration_hints, fan_out)
return _interleave_by_spec(ready, duration_hints, fan_out, self._node_archetypes)

def _compute_spec_fan_out(self) -> dict[str, int]:
"""Count distinct cross-spec dependent specs.
Expand Down
200 changes: 200 additions & 0 deletions tests/unit/engine/test_spec_fair_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,3 +449,203 @@ def test_no_colon_node_id(self) -> None:
from agent_fox.engine.graph_sync import _spec_name

assert _spec_name("orphan_node") == "orphan_node"


# ---------------------------------------------------------------------------
# Three-tier priority ordering: auto_pre > coders > reviews (fixes #490)
# ---------------------------------------------------------------------------


class TestThreeTierPriority:
"""Tests for three-tier archetype-aware scheduling.

When node_archetypes is provided, _interleave_by_spec uses:
Tier 1 = auto_pre reviews (group 0)
Tier 2 = coder nodes
Tier 3 = non-auto_pre review nodes
"""

def test_coders_before_reviews(self) -> None:
"""Coder nodes appear before non-auto_pre review nodes."""
from agent_fox.engine.graph_sync import _interleave_by_spec

ready = ["01_a:1", "02_b:3:reviewer:audit-review", "03_c:1"]
archetypes = {
"01_a:1": "coder",
"02_b:3:reviewer:audit-review": "reviewer",
"03_c:1": "coder",
}
result = _interleave_by_spec(ready, node_archetypes=archetypes)
assert result == ["01_a:1", "03_c:1", "02_b:3:reviewer:audit-review"]

def test_three_tier_full_ordering(self) -> None:
"""Auto_pre first, then coders, then reviews."""
from agent_fox.engine.graph_sync import _interleave_by_spec

ready = [
"01_a:1",
"02_b:0:reviewer:pre-review",
"03_c:3:reviewer:audit-review",
"04_d:1",
]
archetypes = {
"01_a:1": "coder",
"02_b:0:reviewer:pre-review": "reviewer",
"03_c:3:reviewer:audit-review": "reviewer",
"04_d:1": "coder",
}
result = _interleave_by_spec(ready, node_archetypes=archetypes)
# Tier 1: pre-review
assert result[0] == "02_b:0:reviewer:pre-review"
# Tier 2: coders (round-robin: 01, 04)
assert result[1] == "01_a:1"
assert result[2] == "04_d:1"
# Tier 3: review
assert result[3] == "03_c:3:reviewer:audit-review"

def test_verifier_after_coders(self) -> None:
"""Verifier (non-coder archetype) is placed after coders."""
from agent_fox.engine.graph_sync import _interleave_by_spec

ready = ["01_a:5", "01_a:1", "02_b:1"]
archetypes = {
"01_a:5": "verifier",
"01_a:1": "coder",
"02_b:1": "coder",
}
result = _interleave_by_spec(ready, node_archetypes=archetypes)
# Coders first, then verifier
assert result == ["01_a:1", "02_b:1", "01_a:5"]

def test_multiple_review_archetypes_in_tier_3(self) -> None:
"""Multiple review archetypes are all placed in tier 3."""
from agent_fox.engine.graph_sync import _interleave_by_spec

ready = [
"01_a:2:reviewer:audit-review",
"02_b:1",
"03_c:4",
]
archetypes = {
"01_a:2:reviewer:audit-review": "reviewer",
"02_b:1": "coder",
"03_c:4": "verifier",
}
result = _interleave_by_spec(ready, node_archetypes=archetypes)
assert result[0] == "02_b:1" # coder first
# Reviews after
review_nodes = result[1:]
assert set(review_nodes) == {"01_a:2:reviewer:audit-review", "03_c:4"}

def test_without_archetypes_backward_compat(self) -> None:
"""Without node_archetypes, behavior is two-tier (backward compatible)."""
from agent_fox.engine.graph_sync import _interleave_by_spec

ready = ["01_a:1", "02_b:3:reviewer:audit-review", "03_c:1"]
result_without = _interleave_by_spec(ready)
# All non-auto_pre nodes in one tier, round-robin by spec
assert result_without == ["01_a:1", "02_b:3:reviewer:audit-review", "03_c:1"]

def test_duration_hints_within_tiers(self) -> None:
"""Duration hints apply within each tier independently."""
from agent_fox.engine.graph_sync import _interleave_by_spec

ready = [
"01_a:1",
"01_a:2",
"01_a:3:reviewer:audit-review",
"01_a:4:reviewer:audit-review",
]
archetypes = {
"01_a:1": "coder",
"01_a:2": "coder",
"01_a:3:reviewer:audit-review": "reviewer",
"01_a:4:reviewer:audit-review": "reviewer",
}
hints = {
"01_a:1": 100,
"01_a:2": 500,
"01_a:3:reviewer:audit-review": 800,
"01_a:4:reviewer:audit-review": 200,
}
result = _interleave_by_spec(
ready, duration_hints=hints, node_archetypes=archetypes
)
# Tier 2 (coders): 01_a:2 (500) before 01_a:1 (100) — duration desc
assert result[0] == "01_a:2"
assert result[1] == "01_a:1"
# Tier 3 (reviews): 01_a:3 (800) before 01_a:4 (200) — duration desc
assert result[2] == "01_a:3:reviewer:audit-review"
assert result[3] == "01_a:4:reviewer:audit-review"

def test_cross_spec_round_robin_within_coder_tier(self) -> None:
"""Cross-spec round-robin is preserved within the coder tier."""
from agent_fox.engine.graph_sync import _interleave_by_spec

ready = ["01_a:1", "01_a:2", "02_b:1", "02_b:2"]
archetypes = {
"01_a:1": "coder",
"01_a:2": "coder",
"02_b:1": "coder",
"02_b:2": "coder",
}
result = _interleave_by_spec(ready, node_archetypes=archetypes)
assert result == ["01_a:1", "02_b:1", "01_a:2", "02_b:2"]

def test_unknown_archetype_defaults_to_coder_tier(self) -> None:
"""Nodes not in archetypes dict default to coder tier."""
from agent_fox.engine.graph_sync import _interleave_by_spec

ready = ["01_a:1", "02_b:3:reviewer:audit-review"]
archetypes = {"02_b:3:reviewer:audit-review": "reviewer"}
result = _interleave_by_spec(ready, node_archetypes=archetypes)
# 01_a:1 defaults to coder, so it's in tier 2
assert result[0] == "01_a:1"
assert result[1] == "02_b:3:reviewer:audit-review"


class TestThreeTierGraphSyncIntegration:
"""Integration tests for ready_tasks() with three-tier ordering."""

def test_ready_tasks_with_archetypes(self) -> None:
"""ready_tasks() uses three-tier ordering when archetypes are set."""
from agent_fox.engine.graph_sync import GraphSync

states = {
"01_a:1": "pending",
"02_b:3:reviewer:audit-review": "pending",
"03_c:1": "pending",
}
edges = {
"01_a:1": [],
"02_b:3:reviewer:audit-review": [],
"03_c:1": [],
}
archetypes = {
"01_a:1": "coder",
"02_b:3:reviewer:audit-review": "reviewer",
"03_c:1": "coder",
}
gs = GraphSync(states, edges, node_archetypes=archetypes)
result = gs.ready_tasks()
# Coders first, then reviews
assert result == ["01_a:1", "03_c:1", "02_b:3:reviewer:audit-review"]

def test_ready_tasks_without_archetypes_unchanged(self) -> None:
"""ready_tasks() without archetypes preserves two-tier behavior."""
from agent_fox.engine.graph_sync import GraphSync

states = {
"01_a:1": "pending",
"02_b:3:reviewer:audit-review": "pending",
"03_c:1": "pending",
}
edges = {
"01_a:1": [],
"02_b:3:reviewer:audit-review": [],
"03_c:1": [],
}
gs = GraphSync(states, edges)
result = gs.ready_tasks()
# Two-tier: all non-auto_pre in one group, round-robin by spec
assert result == ["01_a:1", "02_b:3:reviewer:audit-review", "03_c:1"]