Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions agent_fox/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ def _init_run(
logger.debug("Failed to create DB run record", exc_info=True)

self._graph_sync = GraphSync(state.node_states, edges_dict)
_defer_ready_reviews(graph, self._graph_sync, self._knowledge_db_conn)
self._result_handler = SessionResultHandler(
graph_sync=self._graph_sync,
routing_ladders=self._routing.ladders,
Expand Down Expand Up @@ -571,6 +572,16 @@ async def run(self) -> ExecutionState:
if self._planning_config.file_conflict_detection and self._is_parallel and len(ready) > 1:
ready = self._filter_file_conflicts(ready)

if not ready:
max_slots = self._parallel_runner.max_parallelism if self._parallel_runner else 1
promoted = self._graph_sync.promote_deferred(limit=max_slots)
if promoted:
logger.info(
"Promoted %d deferred review node(s)",
len(promoted),
)
ready = self._graph_sync.ready_tasks()

if not ready:
if self._graph_sync.is_stalled():
state.run_status = RunStatus.STALLED
Expand Down Expand Up @@ -1077,6 +1088,16 @@ async def _dispatch_parallel(
# Re-evaluate ready tasks and fill empty pool slots
if not self._signal.interrupted:
new_ready = graph_sync.ready_tasks()
if not new_ready and len(pool) < parallel_runner.max_parallelism:
promoted = graph_sync.promote_deferred(
parallel_runner.max_parallelism - len(pool),
)
if promoted:
logger.info(
"Promoted %d deferred review node(s)",
len(promoted),
)
new_ready = graph_sync.ready_tasks()
await self._fill_parallel_pool(
pool,
new_ready,
Expand Down Expand Up @@ -1403,6 +1424,7 @@ async def _hot_load_new_specs(self, state: ExecutionState) -> None:
# Rebuild GraphSync with updated graph
edges_dict = _build_edges_dict_from_graph(self._graph)
self._graph_sync = GraphSync(state.node_states, edges_dict)
_defer_ready_reviews(self._graph, self._graph_sync, self._knowledge_db_conn)

def _block_task(
self,
Expand Down Expand Up @@ -1710,6 +1732,41 @@ def _reset_blocked_tasks(
persist_node_status(conn, node_id, "pending")


def _defer_ready_reviews(
graph: TaskGraph,
graph_sync: GraphSync,
conn: Any = None,
) -> list[str]:
"""Mark non-auto_pre review nodes as deferred when deps are already completed.

Returns list of node IDs that were deferred.
"""
deferred: list[str] = []
for nid, node in graph.nodes.items():
if graph_sync.node_states.get(nid) != "pending":
continue
if _is_auto_pre(nid):
continue
if node.archetype not in _REVIEW_ARCHETYPES:
continue
preds = graph_sync.predecessors(nid)
if preds and all(graph_sync.node_states.get(p) == "completed" for p in preds):
graph_sync.node_states[nid] = "deferred"
deferred.append(nid)
if deferred and conn is not None:
from agent_fox.engine.state import persist_node_status

for nid in deferred:
persist_node_status(conn, nid, "deferred")
if deferred:
logger.info(
"Deferred %d review node(s) with already-completed deps: %s",
len(deferred),
", ".join(deferred),
)
return deferred


def _init_attempt_tracker(state: ExecutionState) -> dict[str, int]:
"""Initialize attempt counter from session history.

Expand Down
24 changes: 24 additions & 0 deletions agent_fox/engine/graph_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,23 @@ def mark_in_progress(self, node_id: str) -> None:
"""Mark a task as in_progress (being executed)."""
self.node_states[node_id] = "in_progress"

def promote_deferred(self, limit: int = 1) -> list[str]:
"""Promote up to *limit* deferred nodes to pending.

Only nodes whose dependencies are all completed are promoted.
"""
promoted: list[str] = []
for node_id, status in list(self.node_states.items()):
if status != "deferred":
continue
deps = self._edges.get(node_id, [])
if all(self.node_states.get(d) == "completed" for d in deps):
self.node_states[node_id] = "pending"
promoted.append(node_id)
if len(promoted) >= limit:
break
return promoted

def is_stalled(self) -> bool:
"""Check if no progress is possible.

Expand All @@ -303,6 +320,13 @@ def is_stalled(self) -> bool:
if has_ready or has_in_progress or all_completed:
return False

has_promotable_deferred = any(
status == "deferred" and all(self.node_states.get(d) == "completed" for d in self._edges.get(nid, []))
for nid, status in self.node_states.items()
)
if has_promotable_deferred:
return False

return True

def completed_spec_names(self) -> set[str]:
Expand Down
1 change: 1 addition & 0 deletions agent_fox/graph/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class NodeStatus(StrEnum):
SKIPPED = "skipped"
COST_BLOCKED = "cost_blocked"
MERGE_BLOCKED = "merge_blocked"
DEFERRED = "deferred"


@dataclass
Expand Down
Loading