From 61c25646946445deba5e13c711c4066ab347fd15 Mon Sep 17 00:00:00 2001 From: Michael Kuehl Date: Mon, 20 Apr 2026 09:39:56 +0200 Subject: [PATCH] =?UTF-8?q?feat(engine):=20deferred=20review=20injection?= =?UTF-8?q?=20=E2=80=94=20lazy=20promotion=20when=20slots=20idle=20(fixes?= =?UTF-8?q?=20#491)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent_fox/engine/engine.py | 57 +++++ agent_fox/engine/graph_sync.py | 24 ++ agent_fox/graph/types.py | 1 + tests/unit/engine/test_deferred_reviews.py | 284 +++++++++++++++++++++ tests/unit/engine/test_sync.py | 117 +++++++++ 5 files changed, 483 insertions(+) create mode 100644 tests/unit/engine/test_deferred_reviews.py diff --git a/agent_fox/engine/engine.py b/agent_fox/engine/engine.py index a3ec465f..e8cfba01 100644 --- a/agent_fox/engine/engine.py +++ b/agent_fox/engine/engine.py @@ -465,6 +465,7 @@ def _init_run( logger.debug("Failed to create DB run record", exc_info=True) self._graph_sync = GraphSync(state.node_states, edges_dict) + _defer_ready_reviews(graph, self._graph_sync, self._knowledge_db_conn) self._result_handler = SessionResultHandler( graph_sync=self._graph_sync, routing_ladders=self._routing.ladders, @@ -571,6 +572,16 @@ async def run(self) -> ExecutionState: if self._planning_config.file_conflict_detection and self._is_parallel and len(ready) > 1: ready = self._filter_file_conflicts(ready) + if not ready: + max_slots = self._parallel_runner.max_parallelism if self._parallel_runner else 1 + promoted = self._graph_sync.promote_deferred(limit=max_slots) + if promoted: + logger.info( + "Promoted %d deferred review node(s)", + len(promoted), + ) + ready = self._graph_sync.ready_tasks() + if not ready: if self._graph_sync.is_stalled(): state.run_status = RunStatus.STALLED @@ -1077,6 +1088,16 @@ async def _dispatch_parallel( # Re-evaluate ready tasks and fill empty pool slots if not self._signal.interrupted: new_ready = graph_sync.ready_tasks() + if not new_ready and len(pool) < parallel_runner.max_parallelism: + promoted = graph_sync.promote_deferred( + parallel_runner.max_parallelism - len(pool), + ) + if promoted: + logger.info( + "Promoted %d deferred review node(s)", + len(promoted), + ) + new_ready = graph_sync.ready_tasks() await self._fill_parallel_pool( pool, new_ready, @@ -1403,6 +1424,7 @@ async def _hot_load_new_specs(self, state: ExecutionState) -> None: # Rebuild GraphSync with updated graph edges_dict = _build_edges_dict_from_graph(self._graph) self._graph_sync = GraphSync(state.node_states, edges_dict) + _defer_ready_reviews(self._graph, self._graph_sync, self._knowledge_db_conn) def _block_task( self, @@ -1710,6 +1732,41 @@ def _reset_blocked_tasks( persist_node_status(conn, node_id, "pending") +def _defer_ready_reviews( + graph: TaskGraph, + graph_sync: GraphSync, + conn: Any = None, +) -> list[str]: + """Mark non-auto_pre review nodes as deferred when deps are already completed. + + Returns list of node IDs that were deferred. + """ + deferred: list[str] = [] + for nid, node in graph.nodes.items(): + if graph_sync.node_states.get(nid) != "pending": + continue + if _is_auto_pre(nid): + continue + if node.archetype not in _REVIEW_ARCHETYPES: + continue + preds = graph_sync.predecessors(nid) + if preds and all(graph_sync.node_states.get(p) == "completed" for p in preds): + graph_sync.node_states[nid] = "deferred" + deferred.append(nid) + if deferred and conn is not None: + from agent_fox.engine.state import persist_node_status + + for nid in deferred: + persist_node_status(conn, nid, "deferred") + if deferred: + logger.info( + "Deferred %d review node(s) with already-completed deps: %s", + len(deferred), + ", ".join(deferred), + ) + return deferred + + def _init_attempt_tracker(state: ExecutionState) -> dict[str, int]: """Initialize attempt counter from session history. diff --git a/agent_fox/engine/graph_sync.py b/agent_fox/engine/graph_sync.py index 78a79e42..b5960c25 100644 --- a/agent_fox/engine/graph_sync.py +++ b/agent_fox/engine/graph_sync.py @@ -289,6 +289,23 @@ def mark_in_progress(self, node_id: str) -> None: """Mark a task as in_progress (being executed).""" self.node_states[node_id] = "in_progress" + def promote_deferred(self, limit: int = 1) -> list[str]: + """Promote up to *limit* deferred nodes to pending. + + Only nodes whose dependencies are all completed are promoted. + """ + promoted: list[str] = [] + for node_id, status in list(self.node_states.items()): + if status != "deferred": + continue + deps = self._edges.get(node_id, []) + if all(self.node_states.get(d) == "completed" for d in deps): + self.node_states[node_id] = "pending" + promoted.append(node_id) + if len(promoted) >= limit: + break + return promoted + def is_stalled(self) -> bool: """Check if no progress is possible. @@ -303,6 +320,13 @@ def is_stalled(self) -> bool: if has_ready or has_in_progress or all_completed: return False + has_promotable_deferred = any( + status == "deferred" and all(self.node_states.get(d) == "completed" for d in self._edges.get(nid, [])) + for nid, status in self.node_states.items() + ) + if has_promotable_deferred: + return False + return True def completed_spec_names(self) -> set[str]: diff --git a/agent_fox/graph/types.py b/agent_fox/graph/types.py index 342cbf7a..c98b79dd 100644 --- a/agent_fox/graph/types.py +++ b/agent_fox/graph/types.py @@ -19,6 +19,7 @@ class NodeStatus(StrEnum): SKIPPED = "skipped" COST_BLOCKED = "cost_blocked" MERGE_BLOCKED = "merge_blocked" + DEFERRED = "deferred" @dataclass diff --git a/tests/unit/engine/test_deferred_reviews.py b/tests/unit/engine/test_deferred_reviews.py new file mode 100644 index 00000000..82a257b1 --- /dev/null +++ b/tests/unit/engine/test_deferred_reviews.py @@ -0,0 +1,284 @@ +"""Tests for deferred review injection (issue #491). + +Verifies that review nodes for already-completed specs are injected as +'deferred' instead of 'pending', and that promotion only occurs when no +coder candidates are available. +""" + +from __future__ import annotations + +from agent_fox.engine.graph_sync import GraphSync + + +class TestDeferReadyReviews: + """Tests for _defer_ready_reviews helper.""" + + def test_review_node_deferred_when_deps_completed(self) -> None: + """Auto_post review node with completed deps is set to deferred.""" + from agent_fox.graph.types import Edge, Node, PlanMetadata, TaskGraph + + from agent_fox.engine.engine import _defer_ready_reviews + + coder = Node( + id="myspec:1", + spec_name="myspec", + group_number=1, + title="Coder", + optional=False, + archetype="coder", + ) + verifier = Node( + id="myspec:2", + spec_name="myspec", + group_number=2, + title="Verifier Check", + optional=False, + archetype="verifier", + ) + graph = TaskGraph( + nodes={coder.id: coder, verifier.id: verifier}, + edges=[Edge(source="myspec:1", target="myspec:2", kind="intra_spec")], + order=["myspec:1", "myspec:2"], + metadata=PlanMetadata(created_at="2026-01-01"), + ) + + node_states = {"myspec:1": "completed", "myspec:2": "pending"} + edges_dict = {"myspec:1": [], "myspec:2": ["myspec:1"]} + gs = GraphSync(node_states, edges_dict) + + deferred = _defer_ready_reviews(graph, gs) + + assert deferred == ["myspec:2"] + assert gs.node_states["myspec:2"] == "deferred" + + def test_auto_pre_node_never_deferred(self) -> None: + """Auto_pre (group 0) review nodes are exempt from deferral.""" + from agent_fox.graph.types import Node, PlanMetadata, TaskGraph + + from agent_fox.engine.engine import _defer_ready_reviews + + pre_review = Node( + id="myspec:0:reviewer:pre-review", + spec_name="myspec", + group_number=0, + title="Reviewer (pre-review)", + optional=False, + archetype="reviewer", + mode="pre-review", + ) + graph = TaskGraph( + nodes={pre_review.id: pre_review}, + edges=[], + order=[pre_review.id], + metadata=PlanMetadata(created_at="2026-01-01"), + ) + + node_states = {"myspec:0:reviewer:pre-review": "pending"} + gs = GraphSync(node_states, {}) + + deferred = _defer_ready_reviews(graph, gs) + + assert deferred == [] + assert gs.node_states["myspec:0:reviewer:pre-review"] == "pending" + + def test_coder_node_never_deferred(self) -> None: + """Coder nodes are never deferred, even with all deps completed.""" + from agent_fox.graph.types import Edge, Node, PlanMetadata, TaskGraph + + from agent_fox.engine.engine import _defer_ready_reviews + + setup = Node( + id="myspec:1", + spec_name="myspec", + group_number=1, + title="Setup", + optional=False, + archetype="coder", + ) + impl = Node( + id="myspec:2", + spec_name="myspec", + group_number=2, + title="Implement", + optional=False, + archetype="coder", + ) + graph = TaskGraph( + nodes={setup.id: setup, impl.id: impl}, + edges=[Edge(source="myspec:1", target="myspec:2", kind="intra_spec")], + order=["myspec:1", "myspec:2"], + metadata=PlanMetadata(created_at="2026-01-01"), + ) + + node_states = {"myspec:1": "completed", "myspec:2": "pending"} + edges_dict = {"myspec:1": [], "myspec:2": ["myspec:1"]} + gs = GraphSync(node_states, edges_dict) + + deferred = _defer_ready_reviews(graph, gs) + + assert deferred == [] + assert gs.node_states["myspec:2"] == "pending" + + def test_review_node_not_deferred_when_deps_incomplete(self) -> None: + """Review node stays pending when deps are not yet completed.""" + from agent_fox.graph.types import Edge, Node, PlanMetadata, TaskGraph + + from agent_fox.engine.engine import _defer_ready_reviews + + coder = Node( + id="myspec:1", + spec_name="myspec", + group_number=1, + title="Coder", + optional=False, + archetype="coder", + ) + verifier = Node( + id="myspec:2", + spec_name="myspec", + group_number=2, + title="Verifier Check", + optional=False, + archetype="verifier", + ) + graph = TaskGraph( + nodes={coder.id: coder, verifier.id: verifier}, + edges=[Edge(source="myspec:1", target="myspec:2", kind="intra_spec")], + order=["myspec:1", "myspec:2"], + metadata=PlanMetadata(created_at="2026-01-01"), + ) + + node_states = {"myspec:1": "in_progress", "myspec:2": "pending"} + edges_dict = {"myspec:1": [], "myspec:2": ["myspec:1"]} + gs = GraphSync(node_states, edges_dict) + + deferred = _defer_ready_reviews(graph, gs) + + assert deferred == [] + assert gs.node_states["myspec:2"] == "pending" + + def test_audit_review_deferred_when_deps_completed(self) -> None: + """Auto_mid reviewer:audit-review node is deferred when deps completed.""" + from agent_fox.graph.types import Edge, Node, PlanMetadata, TaskGraph + + from agent_fox.engine.engine import _defer_ready_reviews + + coder = Node( + id="myspec:2", + spec_name="myspec", + group_number=2, + title="Write tests", + optional=False, + archetype="coder", + ) + audit = Node( + id="myspec:2:reviewer:audit-review", + spec_name="myspec", + group_number=2, + title="Reviewer (audit-review)", + optional=False, + archetype="reviewer", + mode="audit-review", + ) + graph = TaskGraph( + nodes={coder.id: coder, audit.id: audit}, + edges=[Edge(source="myspec:2", target="myspec:2:reviewer:audit-review", kind="intra_spec")], + order=["myspec:2", "myspec:2:reviewer:audit-review"], + metadata=PlanMetadata(created_at="2026-01-01"), + ) + + node_states = { + "myspec:2": "completed", + "myspec:2:reviewer:audit-review": "pending", + } + edges_dict = { + "myspec:2": [], + "myspec:2:reviewer:audit-review": ["myspec:2"], + } + gs = GraphSync(node_states, edges_dict) + + deferred = _defer_ready_reviews(graph, gs) + + assert deferred == ["myspec:2:reviewer:audit-review"] + assert gs.node_states["myspec:2:reviewer:audit-review"] == "deferred" + + +class TestDeferredPromotionIntegration: + """Integration tests for deferred → promotion → ready flow.""" + + def test_deferred_then_promoted_then_ready(self) -> None: + """Full lifecycle: defer at init, promote when idle, appear in ready.""" + from agent_fox.graph.types import Edge, Node, PlanMetadata, TaskGraph + + from agent_fox.engine.engine import _defer_ready_reviews + + coder = Node( + id="s:1", spec_name="s", group_number=1, + title="Coder", optional=False, archetype="coder", + ) + review = Node( + id="s:2", spec_name="s", group_number=2, + title="Verifier", optional=False, archetype="verifier", + ) + graph = TaskGraph( + nodes={coder.id: coder, review.id: review}, + edges=[Edge(source="s:1", target="s:2", kind="intra_spec")], + order=["s:1", "s:2"], + metadata=PlanMetadata(created_at="2026-01-01"), + ) + + node_states = {"s:1": "completed", "s:2": "pending"} + edges_dict = {"s:1": [], "s:2": ["s:1"]} + gs = GraphSync(node_states, edges_dict) + + _defer_ready_reviews(graph, gs) + assert gs.node_states["s:2"] == "deferred" + assert gs.ready_tasks() == [] + + promoted = gs.promote_deferred(limit=1) + assert promoted == ["s:2"] + assert gs.ready_tasks() == ["s:2"] + + def test_promotion_respects_review_cap_interaction(self) -> None: + """Promoted deferred nodes still go through normal ready_tasks ordering.""" + from agent_fox.graph.types import Edge, Node, PlanMetadata, TaskGraph + + from agent_fox.engine.engine import _defer_ready_reviews + + nodes_dict = {} + edges_list = [] + for i in range(1, 4): + spec = f"spec{i:02d}" + coder = Node( + id=f"{spec}:1", spec_name=spec, group_number=1, + title="Coder", optional=False, archetype="coder", + ) + verifier = Node( + id=f"{spec}:2", spec_name=spec, group_number=2, + title="Verifier", optional=False, archetype="verifier", + ) + nodes_dict[coder.id] = coder + nodes_dict[verifier.id] = verifier + edges_list.append(Edge(source=f"{spec}:1", target=f"{spec}:2", kind="intra_spec")) + + graph = TaskGraph( + nodes=nodes_dict, + edges=edges_list, + order=list(nodes_dict.keys()), + metadata=PlanMetadata(created_at="2026-01-01"), + ) + + node_states = {nid: "completed" if ":1" in nid else "pending" for nid in nodes_dict} + edges_dict_map = {nid: [] for nid in nodes_dict} + for e in edges_list: + edges_dict_map[e.target] = [e.source] + gs = GraphSync(node_states, edges_dict_map) + + deferred = _defer_ready_reviews(graph, gs) + assert len(deferred) == 3 + + promoted = gs.promote_deferred(limit=2) + assert len(promoted) == 2 + + ready = gs.ready_tasks() + assert len(ready) == 2 diff --git a/tests/unit/engine/test_sync.py b/tests/unit/engine/test_sync.py index d348132e..debd6fe0 100644 --- a/tests/unit/engine/test_sync.py +++ b/tests/unit/engine/test_sync.py @@ -303,6 +303,30 @@ def test_stalled_mix_of_blocked_and_completed(self) -> None: assert sync.is_stalled() is True + def test_not_stalled_when_promotable_deferred(self) -> None: + """Not stalled when deferred nodes can be promoted.""" + node_states = { + "A": "completed", + "B": "deferred", + } + edges = {"B": ["A"]} + + sync = GraphSync(node_states, edges) + + assert sync.is_stalled() is False + + def test_stalled_when_deferred_deps_not_met(self) -> None: + """Stalled when deferred nodes exist but deps are not completed.""" + node_states = { + "A": "blocked", + "B": "deferred", + } + edges = {"B": ["A"]} + + sync = GraphSync(node_states, edges) + + assert sync.is_stalled() is True + def test_summary_returns_status_counts(self) -> None: """Verify summary() returns correct counts per status.""" node_states = { @@ -320,3 +344,96 @@ def test_summary_returns_status_counts(self) -> None: assert summary["blocked"] == 1 assert summary["pending"] == 1 assert summary["in_progress"] == 1 + + +class TestPromoteDeferred: + """Tests for GraphSync.promote_deferred().""" + + def test_promote_deferred_node_to_pending(self) -> None: + """Deferred node with completed deps is promoted to pending.""" + node_states = { + "A": "completed", + "B": "deferred", + } + edges = {"B": ["A"]} + + sync = GraphSync(node_states, edges) + promoted = sync.promote_deferred(limit=5) + + assert promoted == ["B"] + assert sync.node_states["B"] == "pending" + + def test_promote_respects_limit(self) -> None: + """Only promote up to the specified limit.""" + node_states = { + "A": "completed", + "B": "deferred", + "C": "deferred", + "D": "deferred", + } + edges = {"B": ["A"], "C": ["A"], "D": ["A"]} + + sync = GraphSync(node_states, edges) + promoted = sync.promote_deferred(limit=2) + + assert len(promoted) == 2 + assert all(sync.node_states[p] == "pending" for p in promoted) + deferred_remaining = [ + nid for nid, s in sync.node_states.items() if s == "deferred" + ] + assert len(deferred_remaining) == 1 + + def test_promote_skips_unmet_deps(self) -> None: + """Deferred nodes with incomplete deps are not promoted.""" + node_states = { + "A": "pending", + "B": "deferred", + } + edges = {"B": ["A"]} + + sync = GraphSync(node_states, edges) + promoted = sync.promote_deferred(limit=5) + + assert promoted == [] + assert sync.node_states["B"] == "deferred" + + def test_promote_no_deferred_nodes(self) -> None: + """Returns empty list when no deferred nodes exist.""" + node_states = {"A": "pending", "B": "completed"} + edges = {"A": ["B"]} + + sync = GraphSync(node_states, edges) + promoted = sync.promote_deferred(limit=5) + + assert promoted == [] + + def test_promoted_nodes_become_ready(self) -> None: + """Promoted nodes appear in ready_tasks() after promotion.""" + node_states = { + "A": "completed", + "B": "deferred", + } + edges = {"B": ["A"]} + + sync = GraphSync(node_states, edges) + + assert sync.ready_tasks() == [] + + sync.promote_deferred(limit=1) + + assert sync.ready_tasks() == ["B"] + + def test_deferred_nodes_excluded_from_ready(self) -> None: + """Deferred nodes do not appear in ready_tasks().""" + node_states = { + "A": "completed", + "B": "deferred", + "C": "pending", + } + edges = {"B": ["A"], "C": ["A"]} + + sync = GraphSync(node_states, edges) + ready = sync.ready_tasks() + + assert "C" in ready + assert "B" not in ready