diff --git a/agent_fox/engine/engine.py b/agent_fox/engine/engine.py index e8cfba01..313162e5 100644 --- a/agent_fox/engine/engine.py +++ b/agent_fox/engine/engine.py @@ -464,7 +464,8 @@ def _init_run( except Exception: logger.debug("Failed to create DB run record", exc_info=True) - self._graph_sync = GraphSync(state.node_states, edges_dict) + node_archetypes = {nid: n.archetype for nid, n in graph.nodes.items()} + self._graph_sync = GraphSync(state.node_states, edges_dict, node_archetypes) _defer_ready_reviews(graph, self._graph_sync, self._knowledge_db_conn) self._result_handler = SessionResultHandler( graph_sync=self._graph_sync, @@ -1423,7 +1424,8 @@ async def _hot_load_new_specs(self, state: ExecutionState) -> None: # Rebuild GraphSync with updated graph edges_dict = _build_edges_dict_from_graph(self._graph) - self._graph_sync = GraphSync(state.node_states, edges_dict) + node_archetypes = {nid: n.archetype for nid, n in self._graph.nodes.items()} + self._graph_sync = GraphSync(state.node_states, edges_dict, node_archetypes) _defer_ready_reviews(self._graph, self._graph_sync, self._knowledge_db_conn) def _block_task( diff --git a/agent_fox/engine/graph_sync.py b/agent_fox/engine/graph_sync.py index b5960c25..b90ae0a1 100644 --- a/agent_fox/engine/graph_sync.py +++ b/agent_fox/engine/graph_sync.py @@ -102,15 +102,21 @@ def _interleave_by_spec( ready: list[str], duration_hints: dict[str, int] | None = None, fan_out_weights: dict[str, int] | None = None, + node_archetypes: dict[str, str] | None = None, ) -> list[str]: - """Order ready tasks with pre-review priority and spec-fair interleaving. + """Order ready tasks with three-tier priority and spec-fair interleaving. - Partitions ready tasks into two tiers: + Partitions ready tasks into three tiers: 1. **Pre-review tier** (auto_pre nodes at group 0): sorted by spec fan-out descending so critical-path specs surface blockers first. - 2. **Regular tier** (coder and post-review nodes): sorted by spec - number ascending with spec-fair round-robin interleaving. + 2. **Coder tier** (implementation nodes): sorted by spec number + ascending with spec-fair round-robin interleaving. + 3. **Review tier** (non-auto_pre review/verifier nodes): sorted by + spec number ascending with spec-fair round-robin interleaving. + + When ``node_archetypes`` is not provided, tiers 2 and 3 are merged + (backward-compatible two-tier behavior). Within each tier, tasks are interleaved round-robin across spec groups. @@ -120,9 +126,12 @@ def _interleave_by_spec( duration_hints: Optional mapping of node_id -> predicted duration ms. fan_out_weights: Optional mapping of spec_name -> fan-out weight (count of distinct downstream specs). + node_archetypes: Optional mapping of node_id -> archetype string. + When provided, non-auto_pre nodes are partitioned into coder + (archetype == "coder") and review (all others) tiers. Returns: - Pre-review-prioritized, spec-fair-ordered list of node IDs. + Priority-ordered, spec-fair list of node IDs. Requirements: 69-REQ-1.1, 69-REQ-1.3, 69-REQ-2.1, 69-REQ-2.2, 69-REQ-2.3 """ @@ -135,7 +144,15 @@ def _interleave_by_spec( result: list[str] = [] if pre: result.extend(_spec_round_robin(pre, duration_hints, fan_out_weights)) - if regular: + + if node_archetypes: + coders = [n for n in regular if node_archetypes.get(n, "coder") == "coder"] + reviews = [n for n in regular if node_archetypes.get(n, "coder") != "coder"] + if coders: + result.extend(_spec_round_robin(coders, duration_hints)) + if reviews: + result.extend(_spec_round_robin(reviews, duration_hints)) + elif regular: result.extend(_spec_round_robin(regular, duration_hints)) return result @@ -153,6 +170,7 @@ def __init__( self, node_states: dict[str, str], edges: dict[str, list[str]], + node_archetypes: dict[str, str] | None = None, ) -> None: """Initialise graph sync with node states and dependency edges. @@ -165,9 +183,13 @@ def __init__( edges: Adjacency list where each key is a node_id and its value is a list of dependency node_ids (predecessors that must complete before this node can execute). + node_archetypes: Optional mapping of node_id -> archetype + string. When provided, ``ready_tasks()`` uses three-tier + priority ordering (auto_pre > coders > reviews). """ self.node_states = node_states self._edges = edges + self._node_archetypes = node_archetypes # Build reverse adjacency: node -> list of nodes that depend on it. # Used for cascade blocking (BFS forward through dependents). @@ -213,7 +235,7 @@ def ready_tasks( ready.append(node_id) fan_out = self._compute_spec_fan_out() - return _interleave_by_spec(ready, duration_hints, fan_out) + return _interleave_by_spec(ready, duration_hints, fan_out, self._node_archetypes) def _compute_spec_fan_out(self) -> dict[str, int]: """Count distinct cross-spec dependent specs. diff --git a/tests/unit/engine/test_spec_fair_scheduling.py b/tests/unit/engine/test_spec_fair_scheduling.py index 52537d9d..342cde83 100644 --- a/tests/unit/engine/test_spec_fair_scheduling.py +++ b/tests/unit/engine/test_spec_fair_scheduling.py @@ -449,3 +449,203 @@ def test_no_colon_node_id(self) -> None: from agent_fox.engine.graph_sync import _spec_name assert _spec_name("orphan_node") == "orphan_node" + + +# --------------------------------------------------------------------------- +# Three-tier priority ordering: auto_pre > coders > reviews (fixes #490) +# --------------------------------------------------------------------------- + + +class TestThreeTierPriority: + """Tests for three-tier archetype-aware scheduling. + + When node_archetypes is provided, _interleave_by_spec uses: + Tier 1 = auto_pre reviews (group 0) + Tier 2 = coder nodes + Tier 3 = non-auto_pre review nodes + """ + + def test_coders_before_reviews(self) -> None: + """Coder nodes appear before non-auto_pre review nodes.""" + from agent_fox.engine.graph_sync import _interleave_by_spec + + ready = ["01_a:1", "02_b:3:reviewer:audit-review", "03_c:1"] + archetypes = { + "01_a:1": "coder", + "02_b:3:reviewer:audit-review": "reviewer", + "03_c:1": "coder", + } + result = _interleave_by_spec(ready, node_archetypes=archetypes) + assert result == ["01_a:1", "03_c:1", "02_b:3:reviewer:audit-review"] + + def test_three_tier_full_ordering(self) -> None: + """Auto_pre first, then coders, then reviews.""" + from agent_fox.engine.graph_sync import _interleave_by_spec + + ready = [ + "01_a:1", + "02_b:0:reviewer:pre-review", + "03_c:3:reviewer:audit-review", + "04_d:1", + ] + archetypes = { + "01_a:1": "coder", + "02_b:0:reviewer:pre-review": "reviewer", + "03_c:3:reviewer:audit-review": "reviewer", + "04_d:1": "coder", + } + result = _interleave_by_spec(ready, node_archetypes=archetypes) + # Tier 1: pre-review + assert result[0] == "02_b:0:reviewer:pre-review" + # Tier 2: coders (round-robin: 01, 04) + assert result[1] == "01_a:1" + assert result[2] == "04_d:1" + # Tier 3: review + assert result[3] == "03_c:3:reviewer:audit-review" + + def test_verifier_after_coders(self) -> None: + """Verifier (non-coder archetype) is placed after coders.""" + from agent_fox.engine.graph_sync import _interleave_by_spec + + ready = ["01_a:5", "01_a:1", "02_b:1"] + archetypes = { + "01_a:5": "verifier", + "01_a:1": "coder", + "02_b:1": "coder", + } + result = _interleave_by_spec(ready, node_archetypes=archetypes) + # Coders first, then verifier + assert result == ["01_a:1", "02_b:1", "01_a:5"] + + def test_multiple_review_archetypes_in_tier_3(self) -> None: + """Multiple review archetypes are all placed in tier 3.""" + from agent_fox.engine.graph_sync import _interleave_by_spec + + ready = [ + "01_a:2:reviewer:audit-review", + "02_b:1", + "03_c:4", + ] + archetypes = { + "01_a:2:reviewer:audit-review": "reviewer", + "02_b:1": "coder", + "03_c:4": "verifier", + } + result = _interleave_by_spec(ready, node_archetypes=archetypes) + assert result[0] == "02_b:1" # coder first + # Reviews after + review_nodes = result[1:] + assert set(review_nodes) == {"01_a:2:reviewer:audit-review", "03_c:4"} + + def test_without_archetypes_backward_compat(self) -> None: + """Without node_archetypes, behavior is two-tier (backward compatible).""" + from agent_fox.engine.graph_sync import _interleave_by_spec + + ready = ["01_a:1", "02_b:3:reviewer:audit-review", "03_c:1"] + result_without = _interleave_by_spec(ready) + # All non-auto_pre nodes in one tier, round-robin by spec + assert result_without == ["01_a:1", "02_b:3:reviewer:audit-review", "03_c:1"] + + def test_duration_hints_within_tiers(self) -> None: + """Duration hints apply within each tier independently.""" + from agent_fox.engine.graph_sync import _interleave_by_spec + + ready = [ + "01_a:1", + "01_a:2", + "01_a:3:reviewer:audit-review", + "01_a:4:reviewer:audit-review", + ] + archetypes = { + "01_a:1": "coder", + "01_a:2": "coder", + "01_a:3:reviewer:audit-review": "reviewer", + "01_a:4:reviewer:audit-review": "reviewer", + } + hints = { + "01_a:1": 100, + "01_a:2": 500, + "01_a:3:reviewer:audit-review": 800, + "01_a:4:reviewer:audit-review": 200, + } + result = _interleave_by_spec( + ready, duration_hints=hints, node_archetypes=archetypes + ) + # Tier 2 (coders): 01_a:2 (500) before 01_a:1 (100) — duration desc + assert result[0] == "01_a:2" + assert result[1] == "01_a:1" + # Tier 3 (reviews): 01_a:3 (800) before 01_a:4 (200) — duration desc + assert result[2] == "01_a:3:reviewer:audit-review" + assert result[3] == "01_a:4:reviewer:audit-review" + + def test_cross_spec_round_robin_within_coder_tier(self) -> None: + """Cross-spec round-robin is preserved within the coder tier.""" + from agent_fox.engine.graph_sync import _interleave_by_spec + + ready = ["01_a:1", "01_a:2", "02_b:1", "02_b:2"] + archetypes = { + "01_a:1": "coder", + "01_a:2": "coder", + "02_b:1": "coder", + "02_b:2": "coder", + } + result = _interleave_by_spec(ready, node_archetypes=archetypes) + assert result == ["01_a:1", "02_b:1", "01_a:2", "02_b:2"] + + def test_unknown_archetype_defaults_to_coder_tier(self) -> None: + """Nodes not in archetypes dict default to coder tier.""" + from agent_fox.engine.graph_sync import _interleave_by_spec + + ready = ["01_a:1", "02_b:3:reviewer:audit-review"] + archetypes = {"02_b:3:reviewer:audit-review": "reviewer"} + result = _interleave_by_spec(ready, node_archetypes=archetypes) + # 01_a:1 defaults to coder, so it's in tier 2 + assert result[0] == "01_a:1" + assert result[1] == "02_b:3:reviewer:audit-review" + + +class TestThreeTierGraphSyncIntegration: + """Integration tests for ready_tasks() with three-tier ordering.""" + + def test_ready_tasks_with_archetypes(self) -> None: + """ready_tasks() uses three-tier ordering when archetypes are set.""" + from agent_fox.engine.graph_sync import GraphSync + + states = { + "01_a:1": "pending", + "02_b:3:reviewer:audit-review": "pending", + "03_c:1": "pending", + } + edges = { + "01_a:1": [], + "02_b:3:reviewer:audit-review": [], + "03_c:1": [], + } + archetypes = { + "01_a:1": "coder", + "02_b:3:reviewer:audit-review": "reviewer", + "03_c:1": "coder", + } + gs = GraphSync(states, edges, node_archetypes=archetypes) + result = gs.ready_tasks() + # Coders first, then reviews + assert result == ["01_a:1", "03_c:1", "02_b:3:reviewer:audit-review"] + + def test_ready_tasks_without_archetypes_unchanged(self) -> None: + """ready_tasks() without archetypes preserves two-tier behavior.""" + from agent_fox.engine.graph_sync import GraphSync + + states = { + "01_a:1": "pending", + "02_b:3:reviewer:audit-review": "pending", + "03_c:1": "pending", + } + edges = { + "01_a:1": [], + "02_b:3:reviewer:audit-review": [], + "03_c:1": [], + } + gs = GraphSync(states, edges) + result = gs.ready_tasks() + # Two-tier: all non-auto_pre in one group, round-robin by spec + assert result == ["01_a:1", "02_b:3:reviewer:audit-review", "03_c:1"]