diff --git a/agent_fox/engine/blocking.py b/agent_fox/engine/blocking.py new file mode 100644 index 00000000..60004efb --- /dev/null +++ b/agent_fox/engine/blocking.py @@ -0,0 +1,215 @@ +"""Review-blocking evaluation: decides whether review findings block downstream tasks. + +Extracted from result_handler.py to isolate blocking decision logic. + +Requirements: 26-REQ-9.3, 30-REQ-2.3, 84-REQ-3.1, 84-REQ-3.E1 +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import Any + +from agent_fox.core.config import ArchetypesConfig +from agent_fox.core.node_id import parse_node_id +from agent_fox.engine.audit_helpers import emit_audit_event +from agent_fox.engine.state import SessionRecord +from agent_fox.knowledge.audit import AuditEventType + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class BlockDecision: + """Result of evaluating whether a review session should block a task.""" + + should_block: bool + coder_node_id: str = "" + reason: str = "" + + +def _format_block_reason( + archetype: str, + findings: list[Any], + threshold: int, + spec_name: str, + task_group: str, +) -> str: + """Format an enriched blocking reason string with finding IDs and descriptions. + + Includes the count of critical findings, up to 3 finding IDs as `F-<8hex>` + short prefixes, truncated descriptions (max 60 chars each), and "and N more" + when there are more than 3 critical findings. + + Requirements: 84-REQ-3.1, 84-REQ-3.E1 + """ + critical_findings = [f for f in findings if f.severity.lower() == "critical"] + n = len(critical_findings) + + header = ( + f"{archetype.capitalize()} found {n} critical finding(s) (threshold: {threshold}) for {spec_name}:{task_group}" + ) + + if n == 0: + return header + + shown = critical_findings[:3] + parts = [] + for finding in shown: + # Build F-<8hex> short ID from the UUID + raw_id = finding.id.replace("-", "")[:8] + short_id = f"F-{raw_id}" + desc = finding.description[:60] + if len(finding.description) > 60: + desc += "…" + parts.append(f"{short_id}: {desc}") + + detail = ", ".join(parts) + if n > 3: + detail += f", and {n - 3} more" + + return f"{header} — {detail}" + + +def evaluate_review_blocking( + record: SessionRecord, + archetypes_config: ArchetypesConfig | None, + knowledge_db_conn: Any | None, + *, + mode: str | None = None, + sink: Any | None = None, + run_id: str = "", +) -> BlockDecision: + """Evaluate whether a reviewer session should block its downstream task. + + Supports the consolidated reviewer archetype with modes (pre-review, + drift-review) as well as legacy archetype names for backward compat. + + Queries persisted review findings from DuckDB, counts critical findings, + applies the configured (or learned) block threshold. + + Critical findings with category='security' always trigger blocking, + regardless of the numeric threshold, because security vulnerabilities + must be remediated before downstream work can proceeded. + + Returns a BlockDecision indicating whether blocking should occur and why. + """ + archetype = record.archetype + + # Only reviewer pre-review and drift-review modes can block. + # Audit-review and fix-review do not participate in blocking. + if archetype == "reviewer": + if mode not in ("pre-review", "drift-review"): + return BlockDecision(should_block=False) + elif archetype not in ("skeptic", "oracle"): + # Legacy names kept for backward compat with old session records + return BlockDecision(should_block=False) + + if knowledge_db_conn is None: + return BlockDecision(should_block=False) + + parsed = parse_node_id(record.node_id) + spec_name = parsed.spec_name + task_group = str(parsed.group_number) if parsed.group_number else "1" + coder_node_id = f"{spec_name}:{task_group}" + + # Display label for log messages + display_name = f"reviewer:{mode}" if archetype == "reviewer" and mode else archetype + + try: + from agent_fox.knowledge.review_store import query_findings_by_session + + session_id = f"{record.node_id}:{record.attempt}" + findings = query_findings_by_session(knowledge_db_conn, session_id) + + critical_count = sum(1 for f in findings if f.severity.lower() == "critical") + + if critical_count == 0: + return BlockDecision(should_block=False) + + # Security bypass: critical findings with category='security' always block, + # regardless of the numeric threshold. + security_critical = [ + f for f in findings if f.severity.lower() == "critical" and getattr(f, "category", None) == "security" + ] + if security_critical: + shown = security_critical[:3] + detail = ", ".join( + f"F-{f.id.replace('-', '')[:8]}: {f.description[:60]}" + ("…" if len(f.description) > 60 else "") + for f in shown + ) + reason = ( + f"[SECURITY] {display_name.capitalize()} found {len(security_critical)} critical " + f"security finding(s) for {spec_name}:{task_group} — {detail}" + ) + logger.warning("SECURITY blocking %s: %s", coder_node_id, reason) + emit_audit_event( + sink, + run_id, + AuditEventType.SECURITY_FINDING_BLOCKED, + node_id=record.node_id, + session_id=session_id, + archetype=archetype, + payload={ + "spec_name": spec_name, + "task_group": task_group, + "security_critical_count": len(security_critical), + "finding_ids": [str(f.id) for f in security_critical], + }, + ) + return BlockDecision( + should_block=True, + coder_node_id=coder_node_id, + reason=reason, + ) + + # Resolve threshold from ReviewerConfig by mode (or legacy archetype name) + configured_threshold = 3 # conservative default + if archetypes_config is not None: + rc = archetypes_config.reviewer_config + if archetype == "reviewer": + if mode == "pre-review": + configured_threshold = rc.pre_review_block_threshold + elif mode == "drift-review": + if rc.drift_review_block_threshold is None: + return BlockDecision(should_block=False) + configured_threshold = rc.drift_review_block_threshold + elif archetype == "skeptic": + configured_threshold = rc.pre_review_block_threshold + elif archetype == "oracle": + if rc.drift_review_block_threshold is None: + return BlockDecision(should_block=False) + configured_threshold = rc.drift_review_block_threshold + + blocked = critical_count > configured_threshold + + if blocked: + reason = _format_block_reason( + display_name, + findings, + configured_threshold, + spec_name, + task_group, + ) + logger.warning( + "%s blocking %s: %s", + display_name.capitalize(), + coder_node_id, + reason, + ) + return BlockDecision( + should_block=True, + coder_node_id=coder_node_id, + reason=reason, + ) + + except Exception: + logger.warning( + "Failed to evaluate %s blocking for %s", + display_name, + record.node_id, + exc_info=True, + ) + + return BlockDecision(should_block=False) diff --git a/agent_fox/engine/dispatch.py b/agent_fox/engine/dispatch.py new file mode 100644 index 00000000..7ba044d8 --- /dev/null +++ b/agent_fox/engine/dispatch.py @@ -0,0 +1,296 @@ +"""Dispatch strategies: serial and parallel task execution. + +Extracted from engine.py to isolate dispatch mechanics from orchestration +control flow. Each dispatcher manages the loop of preparing, launching, +and processing sessions for ready tasks. + +Requirements: 04-REQ-1.1, 04-REQ-1.2, 04-REQ-2.1 +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +from agent_fox.engine.graph_sync import _is_auto_pre +from agent_fox.engine.session_lifecycle import _REVIEW_ARCHETYPES +from agent_fox.engine.state import SessionRecord + +logger = logging.getLogger(__name__) + + +class SerialDispatcher: + """Dispatches one ready task at a time with inter-session delay.""" + + def __init__(self, orch: Any) -> None: + self._orch = orch + + async def dispatch( + self, + ready: list[str], + state: Any, + attempt_tracker: dict[str, int], + error_tracker: dict[str, str | None], + first_dispatch: bool, + ) -> bool: + """Dispatch one ready task serially. Returns updated first_dispatch.""" + orch = self._orch + assert orch._graph_sync is not None # noqa: S101 + + for node_id in ready: + if orch._signal.interrupted: + break + + launch = await orch._prepare_launch( + node_id, + state, + attempt_tracker, + error_tracker, + ) + if launch is None: + continue + + ( + _, + attempt, + previous_error, + node_archetype, + node_instances, + assessed_tier, + node_mode, + ) = launch + + if not first_dispatch: + await orch._serial_runner.delay() + first_dispatch = False + + orch._graph_sync.mark_in_progress(node_id) + + timeout_override: int | None = None + max_turns_override: int | None = None + if orch._result_handler is not None: + timeout_override = orch._result_handler._node_timeout.get(node_id) + if node_id in orch._result_handler._node_max_turns: + max_turns_override = orch._result_handler._node_max_turns[node_id] + + record = await orch._serial_runner.execute( + node_id, + attempt, + previous_error, + archetype=node_archetype, + mode=node_mode, + instances=node_instances, + assessed_tier=assessed_tier, + run_id=orch._run_id, + timeout_override=timeout_override, + max_turns_override=max_turns_override, + ) + + assert orch._result_handler is not None # noqa: S101 + orch._result_handler.process( + record, + attempt, + state, + attempt_tracker, + error_tracker, + ) + + if record.status == "completed": + await orch._run_sync_barrier_if_needed(state) + + break + + return first_dispatch + + +class ParallelDispatcher: + """Dispatches ready tasks using a streaming pool of concurrent sessions.""" + + def __init__(self, orch: Any) -> None: + self._orch = orch + + async def dispatch( + self, + ready: list[str], + state: Any, + attempt_tracker: dict[str, int], + error_tracker: dict[str, str | None], + ) -> None: + """Dispatch ready tasks using a streaming pool. + + Maintains a pool of up to ``max_parallelism`` concurrent asyncio + tasks. When a task completes, ``ready_tasks()`` is re-evaluated + and empty pool slots are filled with newly-unblocked work. + """ + orch = self._orch + assert orch._graph_sync is not None # noqa: S101 + assert orch._parallel_runner is not None # noqa: S101 + + graph_sync = orch._graph_sync + parallel_runner = orch._parallel_runner + + pool: set[asyncio.Task[SessionRecord]] = set() + + await self.fill_pool(pool, ready, state, attempt_tracker, error_tracker) + + if not pool: + return + + parallel_runner.track_tasks(list(pool)) + + while pool: + if orch._signal.interrupted: + break + + done, pool = await asyncio.wait(pool, return_when=asyncio.FIRST_COMPLETED) + + barrier_needed = self.process_completed( + done, + state, + attempt_tracker, + error_tracker, + ) + + if barrier_needed and pool: + if orch._signal.interrupted: + break + logger.info("Barrier triggered — draining %d in-flight tasks", len(pool)) + try: + drain_done, pool = await asyncio.wait(pool) + except asyncio.CancelledError: + break + self.process_completed(drain_done, state, attempt_tracker, error_tracker) + + if barrier_needed: + await orch._run_sync_barrier_if_needed(state) + + if not orch._signal.interrupted: + new_ready = graph_sync.ready_tasks() + if not new_ready and len(pool) < parallel_runner.max_parallelism: + promoted = graph_sync.promote_deferred( + parallel_runner.max_parallelism - len(pool), + ) + if promoted: + logger.info("Promoted %d deferred review node(s)", len(promoted)) + new_ready = graph_sync.ready_tasks() + await self.fill_pool(pool, new_ready, state, attempt_tracker, error_tracker) + + parallel_runner.track_tasks(list(pool)) + + async def fill_pool( + self, + pool: set[asyncio.Task[SessionRecord]], + candidates: list[str], + state: Any, + attempt_tracker: dict[str, int], + error_tracker: dict[str, str | None], + ) -> None: + """Launch candidates into the parallel pool up to max_parallelism. + + Review archetype sessions (excluding auto_pre group-0 nodes) are + capped at ``max(1, max_pool * max_review_fraction)`` concurrent + slots to prevent slot starvation. + """ + orch = self._orch + assert orch._graph_sync is not None # noqa: S101 + assert orch._parallel_runner is not None # noqa: S101 + + max_pool = orch._parallel_runner.max_parallelism + max_review = max(1, int(max_pool * orch._config.max_review_fraction)) + + review_in_pool = 0 + for t in pool: + name = t.get_name() + if name.startswith("parallel-"): + pool_node_id = name[len("parallel-") :] + if not _is_auto_pre(pool_node_id): + pool_archetype = orch._get_node_archetype(pool_node_id) + if pool_archetype in _REVIEW_ARCHETYPES: + review_in_pool += 1 + + for node_id in candidates: + if len(pool) >= max_pool: + break + if orch._signal.interrupted: + break + + if orch._graph_sync.node_states.get(node_id) == "blocked": + continue + + candidate_archetype = orch._get_node_archetype(node_id) + if candidate_archetype in _REVIEW_ARCHETYPES and not _is_auto_pre(node_id) and review_in_pool >= max_review: + continue + + launch = await orch._prepare_launch( + node_id, + state, + attempt_tracker, + error_tracker, + ) + if launch is None: + continue + + _, attempt, previous_error, archetype, instances, assessed_tier, node_mode = launch + + orch._graph_sync.mark_in_progress(node_id) + + timeout_override_p: int | None = None + max_turns_override_p: int | None = None + if orch._result_handler is not None: + timeout_override_p = orch._result_handler._node_timeout.get(node_id) + if node_id in orch._result_handler._node_max_turns: + max_turns_override_p = orch._result_handler._node_max_turns[node_id] + + task = asyncio.create_task( + orch._parallel_runner.execute_one( + node_id, + attempt, + previous_error, + archetype=archetype, + mode=node_mode, + instances=instances, + assessed_tier=assessed_tier, + run_id=orch._run_id, + timeout_override=timeout_override_p, + max_turns_override=max_turns_override_p, + ), + name=f"parallel-{node_id}", + ) + pool.add(task) + + if archetype in _REVIEW_ARCHETYPES and not _is_auto_pre(node_id): + review_in_pool += 1 + + def process_completed( + self, + done: set[asyncio.Task[SessionRecord]], + state: Any, + attempt_tracker: dict[str, int], + error_tracker: dict[str, str | None], + ) -> bool: + """Process completed parallel tasks. Returns True if a barrier is needed.""" + orch = self._orch + assert orch._result_handler is not None # noqa: S101 + + barrier_needed = False + for completed_task in done: + try: + record = completed_task.result() + except Exception as exc: + logger.error("Parallel task raised: %s", exc) + continue + + orch._result_handler.process( + record, + attempt_tracker.get(record.node_id, 1), + state, + attempt_tracker, + error_tracker, + ) + + if record.status == "completed": + if orch._should_trigger_barrier(state): + barrier_needed = True + + return barrier_needed diff --git a/agent_fox/engine/engine.py b/agent_fox/engine/engine.py index 9591e4bc..5dee6883 100644 --- a/agent_fox/engine/engine.py +++ b/agent_fox/engine/engine.py @@ -37,6 +37,7 @@ from agent_fox.engine.audit_helpers import emit_audit_event from agent_fox.engine.barrier import _count_node_status, run_sync_barrier_sequence from agent_fox.engine.circuit import CircuitBreaker +from agent_fox.engine.dispatch import ParallelDispatcher, SerialDispatcher from agent_fox.engine.graph_sync import GraphSync, _is_auto_pre from agent_fox.engine.hot_load import ( _build_nodes_and_edges, @@ -292,6 +293,11 @@ def __init__( inter_session_delay=float(config.inter_session_delay), ) + self._serial_dispatcher = SerialDispatcher(self) + self._parallel_dispatcher: ParallelDispatcher | None = None + if self._is_parallel: + self._parallel_dispatcher = ParallelDispatcher(self) + @property def _repo_root(self) -> Path: """Return the repository root (parent of the .agent-fox directory). @@ -942,6 +948,14 @@ def _check_launch( return "blocked" return "limited" + def _get_parallel_dispatcher(self) -> ParallelDispatcher: + """Return the parallel dispatcher, creating one lazily if needed.""" + dispatcher = getattr(self, "_parallel_dispatcher", None) + if dispatcher is None: + dispatcher = ParallelDispatcher(self) + self._parallel_dispatcher = dispatcher + return dispatcher + async def _dispatch_serial( self, ready: list[str], @@ -950,78 +964,17 @@ async def _dispatch_serial( error_tracker: dict[str, str | None], first_dispatch: bool, ) -> bool: - """Dispatch one ready task serially. Returns updated first_dispatch.""" - assert self._graph_sync is not None # noqa: S101 - - for node_id in ready: - if self._signal.interrupted: - break - - launch = await self._prepare_launch( - node_id, - state, - attempt_tracker, - error_tracker, - ) - if launch is None: - # _check_launch returned blocked or limited; the serial - # path can't distinguish, so just skip to re-evaluate. - continue - - ( - _, - attempt, - previous_error, - node_archetype, - node_instances, - assessed_tier, - node_mode, - ) = launch - - if not first_dispatch: - await self._serial_runner.delay() - first_dispatch = False - - self._graph_sync.mark_in_progress(node_id) - - # 75-REQ-3.5: Pass per-node timeout/turns overrides if available - timeout_override: int | None = None - max_turns_override: int | None = None - if self._result_handler is not None: - timeout_override = self._result_handler._node_timeout.get(node_id) - if node_id in self._result_handler._node_max_turns: - max_turns_override = self._result_handler._node_max_turns[node_id] - - record = await self._serial_runner.execute( - node_id, - attempt, - previous_error, - archetype=node_archetype, - mode=node_mode, - instances=node_instances, - assessed_tier=assessed_tier, - run_id=self._run_id, - timeout_override=timeout_override, - max_turns_override=max_turns_override, - ) - - assert self._result_handler is not None # noqa: S101 - self._result_handler.process( - record, - attempt, - state, - attempt_tracker, - error_tracker, - ) - - # 06-REQ-6.1: Check sync barrier after task completion - if record.status == "completed": - await self._run_sync_barrier_if_needed(state) - - # Re-evaluate ready tasks after each completion - break - - return first_dispatch + """Dispatch one ready task serially. Delegates to SerialDispatcher.""" + dispatcher = getattr(self, "_serial_dispatcher", None) + if dispatcher is None: + dispatcher = SerialDispatcher(self) + return await dispatcher.dispatch( + ready, + state, + attempt_tracker, + error_tracker, + first_dispatch, + ) async def _dispatch_parallel( self, @@ -1030,105 +983,14 @@ async def _dispatch_parallel( attempt_tracker: dict[str, int], error_tracker: dict[str, str | None], ) -> None: - """Dispatch ready tasks using a streaming pool. - - Maintains a pool of up to ``max_parallelism`` concurrent asyncio - tasks. When a task completes, ``ready_tasks()`` is re-evaluated - and empty pool slots are filled with newly-unblocked work. - - Only tasks that are *actually running* are marked ``in_progress`` - — queued tasks remain ``pending`` until a pool slot opens. - - This replaces the former batch-and-wait model which over-committed - all ready tasks as ``in_progress`` and delayed newly-unblocked - tasks until the entire batch completed. - """ - assert self._graph_sync is not None # noqa: S101 - assert self._parallel_runner is not None # noqa: S101 - - graph_sync = self._graph_sync - parallel_runner = self._parallel_runner - - pool: set[asyncio.Task[SessionRecord]] = set() - - await self._fill_parallel_pool( - pool, + """Dispatch ready tasks in parallel. Delegates to ParallelDispatcher.""" + await self._get_parallel_dispatcher().dispatch( ready, state, attempt_tracker, error_tracker, ) - if not pool: - return - - parallel_runner.track_tasks(list(pool)) - - while pool: - if self._signal.interrupted: - break - - # Wait for any task to complete - done, pool = await asyncio.wait( - pool, - return_when=asyncio.FIRST_COMPLETED, - ) - - barrier_needed = self._process_completed_parallel( - done, - state, - attempt_tracker, - error_tracker, - ) - - # 51-REQ-1.1, 51-REQ-1.2, 51-REQ-1.3: Drain remaining pool - # before entering the barrier sequence. - if barrier_needed and pool: - if self._signal.interrupted: - break - logger.info( - "Barrier triggered — draining %d in-flight tasks", - len(pool), - ) - try: - drain_done, pool = await asyncio.wait(pool) - except asyncio.CancelledError: - # 51-REQ-1.E2: SIGINT during drain - break - self._process_completed_parallel( - drain_done, - state, - attempt_tracker, - error_tracker, - ) - - # Run the barrier after draining - if barrier_needed: - await self._run_sync_barrier_if_needed(state) - - # Re-evaluate ready tasks and fill empty pool slots - if not self._signal.interrupted: - new_ready = graph_sync.ready_tasks() - if not new_ready and len(pool) < parallel_runner.max_parallelism: - promoted = graph_sync.promote_deferred( - parallel_runner.max_parallelism - len(pool), - ) - if promoted: - logger.info( - "Promoted %d deferred review node(s)", - len(promoted), - ) - new_ready = graph_sync.ready_tasks() - await self._fill_parallel_pool( - pool, - new_ready, - state, - attempt_tracker, - error_tracker, - ) - - parallel_runner.track_tasks(list(pool)) - async def _fill_parallel_pool( self, pool: set[asyncio.Task[SessionRecord]], @@ -1137,92 +999,14 @@ async def _fill_parallel_pool( attempt_tracker: dict[str, int], error_tracker: dict[str, str | None], ) -> None: - """Launch candidates into the parallel pool up to max_parallelism. - - Review archetype sessions (excluding auto_pre group-0 nodes) are - capped at ``max(1, max_pool * max_review_fraction)`` concurrent - slots to prevent slot starvation when many review nodes become - ready simultaneously after a sync barrier. - """ - assert self._graph_sync is not None # noqa: S101 - assert self._parallel_runner is not None # noqa: S101 - - max_pool = self._parallel_runner.max_parallelism - max_review = max(1, int(max_pool * self._config.max_review_fraction)) - - review_in_pool = 0 - for t in pool: - name = t.get_name() - if name.startswith("parallel-"): - pool_node_id = name[len("parallel-") :] - if not _is_auto_pre(pool_node_id): - pool_archetype = self._get_node_archetype(pool_node_id) - if pool_archetype in _REVIEW_ARCHETYPES: - review_in_pool += 1 - - for node_id in candidates: - if len(pool) >= max_pool: - break - if self._signal.interrupted: - break - - # Defense-in-depth: skip any candidate whose status changed to - # "blocked" between ready_tasks() evaluation and this dispatch - # point (issue #481). The primary fix is in graph_sync.mark_blocked - # which now cascades through in-progress nodes to pre-block their - # pending dependents; this guard is a last-resort safety net. - if self._graph_sync.node_states.get(node_id) == "blocked": - continue - - # Review concurrency cap: check BEFORE _prepare_launch to avoid - # incrementing the attempt counter for tasks that won't launch. - # _prepare_launch updates attempt_tracker on "allowed" verdicts, - # so skipping afterward would silently consume retry budget - # (issue #503). - candidate_archetype = self._get_node_archetype(node_id) - if candidate_archetype in _REVIEW_ARCHETYPES and not _is_auto_pre(node_id) and review_in_pool >= max_review: - continue - - launch = await self._prepare_launch( - node_id, - state, - attempt_tracker, - error_tracker, - ) - if launch is None: - continue - - _, attempt, previous_error, archetype, instances, assessed_tier, node_mode = launch - - self._graph_sync.mark_in_progress(node_id) - - # 75-REQ-3.5: Pass per-node timeout/turns overrides if available - timeout_override_p: int | None = None - max_turns_override_p: int | None = None - if self._result_handler is not None: - timeout_override_p = self._result_handler._node_timeout.get(node_id) - if node_id in self._result_handler._node_max_turns: - max_turns_override_p = self._result_handler._node_max_turns[node_id] - - task = asyncio.create_task( - self._parallel_runner.execute_one( - node_id, - attempt, - previous_error, - archetype=archetype, - mode=node_mode, - instances=instances, - assessed_tier=assessed_tier, - run_id=self._run_id, - timeout_override=timeout_override_p, - max_turns_override=max_turns_override_p, - ), - name=f"parallel-{node_id}", - ) - pool.add(task) - - if archetype in _REVIEW_ARCHETYPES and not _is_auto_pre(node_id): - review_in_pool += 1 + """Launch candidates into the parallel pool. Delegates to ParallelDispatcher.""" + await self._get_parallel_dispatcher().fill_pool( + pool, + candidates, + state, + attempt_tracker, + error_tracker, + ) def _process_completed_parallel( self, @@ -1231,31 +1015,13 @@ def _process_completed_parallel( attempt_tracker: dict[str, int], error_tracker: dict[str, str | None], ) -> bool: - """Process completed parallel tasks. Returns True if a barrier is needed.""" - assert self._result_handler is not None # noqa: S101 - - barrier_needed = False - for completed_task in done: - try: - record = completed_task.result() - except Exception as exc: - logger.error("Parallel task raised: %s", exc) - continue - - self._result_handler.process( - record, - attempt_tracker.get(record.node_id, 1), - state, - attempt_tracker, - error_tracker, - ) - - # 06-REQ-6.1: Check sync barrier after task completion - if record.status == "completed": - if self._should_trigger_barrier(state): - barrier_needed = True - - return barrier_needed + """Process completed parallel tasks. Delegates to ParallelDispatcher.""" + return self._get_parallel_dispatcher().process_completed( + done, + state, + attempt_tracker, + error_tracker, + ) def _run_preflight(self, node_id: str) -> bool: """Run pre-flight check and skip the session if work is done. diff --git a/agent_fox/engine/result_handler.py b/agent_fox/engine/result_handler.py index 6e812345..920883b8 100644 --- a/agent_fox/engine/result_handler.py +++ b/agent_fox/engine/result_handler.py @@ -1,4 +1,4 @@ -"""Session result processing: retry decisions, escalation, blocking. +"""Session result processing: retry decisions, escalation, timeout handling. Extracted from engine.py to reduce the Orchestrator class size. Handles the outcome of each completed session: marking success, deciding retries, @@ -15,14 +15,12 @@ import logging import math from collections.abc import Callable -from dataclasses import dataclass from typing import Any from agent_fox.archetypes import get_archetype -from agent_fox.core.config import ArchetypesConfig from agent_fox.core.models import ModelTier -from agent_fox.core.node_id import parse_node_id from agent_fox.engine.audit_helpers import emit_audit_event +from agent_fox.engine.blocking import evaluate_review_blocking from agent_fox.engine.graph_sync import GraphSync from agent_fox.engine.state import ExecutionState, SessionRecord, update_state_with_session from agent_fox.knowledge.audit import AuditEventType @@ -32,207 +30,6 @@ logger = logging.getLogger(__name__) -# --------------------------------------------------------------------------- -# Blocking logic (inlined from former engine/blocking.py) -# Requirements: 26-REQ-9.3, 30-REQ-2.3 -# --------------------------------------------------------------------------- - - -@dataclass(frozen=True) -class BlockDecision: - """Result of evaluating whether a review session should block a task.""" - - should_block: bool - coder_node_id: str = "" - reason: str = "" - - -def _format_block_reason( - archetype: str, - findings: list[Any], - threshold: int, - spec_name: str, - task_group: str, -) -> str: - """Format an enriched blocking reason string with finding IDs and descriptions. - - Includes the count of critical findings, up to 3 finding IDs as `F-<8hex>` - short prefixes, truncated descriptions (max 60 chars each), and "and N more" - when there are more than 3 critical findings. - - Requirements: 84-REQ-3.1, 84-REQ-3.E1 - """ - critical_findings = [f for f in findings if f.severity.lower() == "critical"] - n = len(critical_findings) - - header = ( - f"{archetype.capitalize()} found {n} critical finding(s) (threshold: {threshold}) for {spec_name}:{task_group}" - ) - - if n == 0: - return header - - shown = critical_findings[:3] - parts = [] - for finding in shown: - # Build F-<8hex> short ID from the UUID - raw_id = finding.id.replace("-", "")[:8] - short_id = f"F-{raw_id}" - desc = finding.description[:60] - if len(finding.description) > 60: - desc += "…" - parts.append(f"{short_id}: {desc}") - - detail = ", ".join(parts) - if n > 3: - detail += f", and {n - 3} more" - - return f"{header} — {detail}" - - -def evaluate_review_blocking( - record: SessionRecord, - archetypes_config: ArchetypesConfig | None, - knowledge_db_conn: Any | None, - *, - mode: str | None = None, - sink: Any | None = None, - run_id: str = "", -) -> BlockDecision: - """Evaluate whether a reviewer session should block its downstream task. - - Supports the consolidated reviewer archetype with modes (pre-review, - drift-review) as well as legacy archetype names for backward compat. - - Queries persisted review findings from DuckDB, counts critical findings, - applies the configured (or learned) block threshold. - - Critical findings with category='security' always trigger blocking, - regardless of the numeric threshold, because security vulnerabilities - must be remediated before downstream work can proceeded. - - Returns a BlockDecision indicating whether blocking should occur and why. - """ - archetype = record.archetype - - # Only reviewer pre-review and drift-review modes can block. - # Audit-review and fix-review do not participate in blocking. - if archetype == "reviewer": - if mode not in ("pre-review", "drift-review"): - return BlockDecision(should_block=False) - elif archetype not in ("skeptic", "oracle"): - # Legacy names kept for backward compat with old session records - return BlockDecision(should_block=False) - - if knowledge_db_conn is None: - return BlockDecision(should_block=False) - - parsed = parse_node_id(record.node_id) - spec_name = parsed.spec_name - task_group = str(parsed.group_number) if parsed.group_number else "1" - coder_node_id = f"{spec_name}:{task_group}" - - # Display label for log messages - display_name = f"reviewer:{mode}" if archetype == "reviewer" and mode else archetype - - try: - from agent_fox.knowledge.review_store import query_findings_by_session - - session_id = f"{record.node_id}:{record.attempt}" - findings = query_findings_by_session(knowledge_db_conn, session_id) - - critical_count = sum(1 for f in findings if f.severity.lower() == "critical") - - if critical_count == 0: - return BlockDecision(should_block=False) - - # Security bypass: critical findings with category='security' always block, - # regardless of the numeric threshold. - security_critical = [ - f for f in findings if f.severity.lower() == "critical" and getattr(f, "category", None) == "security" - ] - if security_critical: - shown = security_critical[:3] - detail = ", ".join( - f"F-{f.id.replace('-', '')[:8]}: {f.description[:60]}" + ("…" if len(f.description) > 60 else "") - for f in shown - ) - reason = ( - f"[SECURITY] {display_name.capitalize()} found {len(security_critical)} critical " - f"security finding(s) for {spec_name}:{task_group} — {detail}" - ) - logger.warning("SECURITY blocking %s: %s", coder_node_id, reason) - emit_audit_event( - sink, - run_id, - AuditEventType.SECURITY_FINDING_BLOCKED, - node_id=record.node_id, - session_id=session_id, - archetype=archetype, - payload={ - "spec_name": spec_name, - "task_group": task_group, - "security_critical_count": len(security_critical), - "finding_ids": [str(f.id) for f in security_critical], - }, - ) - return BlockDecision( - should_block=True, - coder_node_id=coder_node_id, - reason=reason, - ) - - # Resolve threshold from ReviewerConfig by mode (or legacy archetype name) - configured_threshold = 3 # conservative default - if archetypes_config is not None: - rc = archetypes_config.reviewer_config - if archetype == "reviewer": - if mode == "pre-review": - configured_threshold = rc.pre_review_block_threshold - elif mode == "drift-review": - if rc.drift_review_block_threshold is None: - return BlockDecision(should_block=False) - configured_threshold = rc.drift_review_block_threshold - elif archetype == "skeptic": - configured_threshold = rc.pre_review_block_threshold - elif archetype == "oracle": - if rc.drift_review_block_threshold is None: - return BlockDecision(should_block=False) - configured_threshold = rc.drift_review_block_threshold - - blocked = critical_count > configured_threshold - - if blocked: - reason = _format_block_reason( - display_name, - findings, - configured_threshold, - spec_name, - task_group, - ) - logger.warning( - "%s blocking %s: %s", - display_name.capitalize(), - coder_node_id, - reason, - ) - return BlockDecision( - should_block=True, - coder_node_id=coder_node_id, - reason=reason, - ) - - except Exception: - logger.warning( - "Failed to evaluate %s blocking for %s", - display_name, - record.node_id, - exc_info=True, - ) - - return BlockDecision(should_block=False) - - class SessionResultHandler: """Processes session outcomes: success, retry, escalation, blocking. diff --git a/agent_fox/nightshift/coder_reviewer.py b/agent_fox/nightshift/coder_reviewer.py new file mode 100644 index 00000000..ec63cbc8 --- /dev/null +++ b/agent_fox/nightshift/coder_reviewer.py @@ -0,0 +1,340 @@ +"""Coder-reviewer retry/escalation loop for the fix pipeline. + +Extracted from fix_pipeline.py to isolate the retry state machine. +Manages the escalation ladder, reviewer parse-fail retry, and +verdict checking in a single cohesive class. + +Requirements: 82-REQ-7.1, 82-REQ-8.1, 82-REQ-8.2, 82-REQ-8.3, + 82-REQ-8.4, 82-REQ-8.E1 +""" + +from __future__ import annotations + +import logging +import time +from typing import Any + +from agent_fox.engine.audit_helpers import emit_audit_event +from agent_fox.knowledge.audit import AuditEventType +from agent_fox.nightshift.fix_types import FixReviewResult, TriageResult +from agent_fox.nightshift.spec_builder import InMemorySpec +from agent_fox.ui.progress import TaskEvent +from agent_fox.workspace import WorkspaceInfo + +logger = logging.getLogger(__name__) + + +class CoderReviewerLoop: + """Coder-reviewer retry/escalation state machine. + + Runs coder -> reviewer in a loop with escalation ladder support, + reviewer parse-fail retry, and verdict checking. Delegates I/O + operations (session running, comment posting) to the pipeline. + + Requirements: 82-REQ-7.1, 82-REQ-8.1 through 82-REQ-8.4, 82-REQ-8.E1 + """ + + def __init__(self, pipeline: Any) -> None: + self._pipeline = pipeline + + async def run( + self, + spec: InMemorySpec, + triage: TriageResult, + metrics: Any, + workspace: WorkspaceInfo, + ) -> bool: + """Run the coder-reviewer loop. Returns True on PASS, False on exhaustion.""" + from agent_fox.core.models import ModelTier, resolve_model + from agent_fox.routing.escalation import EscalationLadder + + p = self._pipeline + + retries_before = getattr(p._config.orchestrator, "retries_before_escalation", 1) + max_retries = getattr(p._config.orchestrator, "max_retries", 3) + + ladder = EscalationLadder( + starting_tier=ModelTier.STANDARD, + tier_ceiling=ModelTier.ADVANCED, + retries_before_escalation=retries_before, + ) + + review_feedback: FixReviewResult | None = None + + for attempt in range(max_retries + 1): + tier = ladder.current_tier + model_entry = resolve_model(tier.value) + model_id: str | None = model_entry.model_id + + await self._run_coder_phase( + spec, + triage, + workspace, + metrics, + model_id, + review_feedback, + attempt, + ) + + review_result = await self._run_reviewer_phase( + spec, + triage, + workspace, + metrics, + attempt, + ) + + review_comment = p._format_review_comment(review_result) + f"\n(run: `{p._run_id}`)" + await p._post_comment(spec.issue_number, review_comment) + + if review_result.overall_verdict == "PASS": + return True + + ladder.record_failure() + + if ladder.is_exhausted or attempt >= max_retries: + await p._post_comment( + spec.issue_number, + "Fix pipeline exhausted all retries. " + "The issue could not be resolved automatically. " + f"Manual intervention is required. (run: `{p._run_id}`)", + ) + return False + + review_feedback = review_result + + return False # pragma: no cover + + async def _run_coder_phase( + self, + spec: InMemorySpec, + triage: TriageResult, + workspace: WorkspaceInfo, + metrics: Any, + model_id: str | None, + review_feedback: FixReviewResult | None, + attempt: int, + ) -> object: + """Run one coder session, emitting events and tracking metrics.""" + p = self._pipeline + + system_prompt, task_prompt = p._build_coder_prompt(spec, triage, review_feedback=review_feedback) + node_id = f"fix-issue-{spec.issue_number}:0:coder" + attempt_suffix = f" (attempt {attempt + 1})" if attempt > 0 else "" + p._update_spinner(f"Running coder for issue #{spec.issue_number}{attempt_suffix}…") + + t0 = time.monotonic() + try: + coder_outcome = await p._run_coder_session( + workspace, + spec, + system_prompt, + task_prompt, + model_id=model_id, + ) + p._accumulate_metrics(metrics, coder_outcome) + p._emit_session_event( + coder_outcome, + "coder", + p._run_id, + node_id=node_id, + attempt=attempt + 1, + ) + duration = time.monotonic() - t0 + if p._task_callback is not None: + p._task_callback(TaskEvent(node_id=node_id, status="completed", duration_s=duration, archetype="coder")) + return coder_outcome + except Exception as exc: + duration = time.monotonic() - t0 + emit_audit_event( + p._sink, + p._run_id, + AuditEventType.SESSION_FAIL, + node_id=node_id, + archetype="coder", + payload={ + "archetype": "coder", + "model_id": model_id or p._get_model_id("coder"), + "error_message": str(exc), + "attempt": attempt + 1, + }, + ) + if p._task_callback is not None: + p._task_callback(TaskEvent(node_id=node_id, status="failed", duration_s=duration, archetype="coder")) + raise + + async def _run_reviewer_phase( + self, + spec: InMemorySpec, + triage: TriageResult, + workspace: WorkspaceInfo, + metrics: Any, + attempt: int, + ) -> FixReviewResult: + """Run reviewer session with parse-fail retry. Returns the review result.""" + from agent_fox.session.review_parser import parse_fix_review_output + + p = self._pipeline + + reviewer_system, reviewer_task = p._build_reviewer_prompt(spec, triage) + reviewer_node_id = f"fix-issue-{spec.issue_number}:0:reviewer" + p._update_spinner(f"Reviewing fix for issue #{spec.issue_number}…") + + reviewer_outcome = await self._run_single_reviewer( + workspace, + spec, + reviewer_system, + reviewer_task, + metrics, + reviewer_node_id, + attempt, + ) + + reviewer_response = getattr(reviewer_outcome, "response", "") or "" + review_result = parse_fix_review_output( + reviewer_response, + f"fix-issue-{spec.issue_number}", + f"fix-issue-{spec.issue_number}:0:reviewer", + ) + + if review_result.is_parse_failure: + review_result = await self._retry_reviewer_on_parse_failure( + spec, + workspace, + metrics, + reviewer_system, + reviewer_task, + review_result, + attempt, + ) + + return review_result + + async def _run_single_reviewer( + self, + workspace: WorkspaceInfo, + spec: InMemorySpec, + system_prompt: str, + task_prompt: str, + metrics: Any, + node_id: str, + attempt: int, + ) -> object: + """Run a single reviewer session, emitting events and tracking metrics.""" + p = self._pipeline + + t0 = time.monotonic() + try: + outcome = await p._run_session( + "reviewer", + workspace, + spec=spec, + system_prompt=system_prompt, + task_prompt=task_prompt, + mode="fix-review", + ) + p._accumulate_metrics(metrics, outcome) + p._emit_session_event( + outcome, + "reviewer", + p._run_id, + node_id=node_id, + attempt=attempt + 1, + ) + duration = time.monotonic() - t0 + if p._task_callback is not None: + p._task_callback( + TaskEvent(node_id=node_id, status="completed", duration_s=duration, archetype="reviewer") + ) + return outcome + except Exception as exc: + duration = time.monotonic() - t0 + emit_audit_event( + p._sink, + p._run_id, + AuditEventType.SESSION_FAIL, + node_id=node_id, + archetype="reviewer", + payload={ + "archetype": "reviewer", + "model_id": p._get_model_id("reviewer"), + "error_message": str(exc), + "attempt": attempt + 1, + }, + ) + if p._task_callback is not None: + p._task_callback(TaskEvent(node_id=node_id, status="failed", duration_s=duration, archetype="reviewer")) + raise + + async def _retry_reviewer_on_parse_failure( + self, + spec: InMemorySpec, + workspace: WorkspaceInfo, + metrics: Any, + reviewer_system: str, + reviewer_task: str, + original_result: FixReviewResult, + attempt: int, + ) -> FixReviewResult: + """Retry reviewer once on parse failure. Returns best available result.""" + from agent_fox.session.review_parser import parse_fix_review_output + + p = self._pipeline + + logger.info("Reviewer output unparseable for issue #%d, retrying reviewer", spec.issue_number) + retry_node_id = f"fix-issue-{spec.issue_number}:0:reviewer_retry" + + t0 = time.monotonic() + try: + retry_outcome = await p._run_session( + "reviewer", + workspace, + spec=spec, + system_prompt=reviewer_system, + task_prompt=reviewer_task, + mode="fix-review", + ) + p._accumulate_metrics(metrics, retry_outcome) + p._emit_session_event( + retry_outcome, + "reviewer", + p._run_id, + node_id=retry_node_id, + attempt=attempt + 1, + ) + duration = time.monotonic() - t0 + if p._task_callback is not None: + p._task_callback( + TaskEvent(node_id=retry_node_id, status="completed", duration_s=duration, archetype="reviewer") + ) + retry_response = getattr(retry_outcome, "response", "") or "" + retry_result = parse_fix_review_output( + retry_response, + f"fix-issue-{spec.issue_number}", + f"fix-issue-{spec.issue_number}:0:reviewer_retry", + ) + if not retry_result.is_parse_failure: + return retry_result + logger.warning("Reviewer retry also unparseable for issue #%d, treating as FAIL", spec.issue_number) + except Exception as exc: + duration = time.monotonic() - t0 + emit_audit_event( + p._sink, + p._run_id, + AuditEventType.SESSION_FAIL, + node_id=retry_node_id, + archetype="reviewer", + payload={ + "archetype": "reviewer", + "model_id": p._get_model_id("reviewer"), + "error_message": str(exc), + "attempt": attempt + 1, + }, + ) + if p._task_callback is not None: + p._task_callback( + TaskEvent(node_id=retry_node_id, status="failed", duration_s=duration, archetype="reviewer") + ) + logger.warning("Reviewer retry failed for issue #%d, treating as FAIL", spec.issue_number, exc_info=True) + + return original_result diff --git a/agent_fox/nightshift/fix_pipeline.py b/agent_fox/nightshift/fix_pipeline.py index 9461319b..8b99ce48 100644 --- a/agent_fox/nightshift/fix_pipeline.py +++ b/agent_fox/nightshift/fix_pipeline.py @@ -676,10 +676,6 @@ async def _run_triage( return triage - # ------------------------------------------------------------------ - # Coder-reviewer loop (82-REQ-8.1 through 82-REQ-8.4, 82-REQ-8.E1) - # ------------------------------------------------------------------ - async def _coder_review_loop( self, spec: InMemorySpec, @@ -689,268 +685,15 @@ async def _coder_review_loop( ) -> bool: """Coder-reviewer loop with retry and escalation. + Delegates to CoderReviewerLoop collaborator class. Returns True on PASS, False on exhaustion. Requirements: 82-REQ-7.1, 82-REQ-8.1, 82-REQ-8.2, 82-REQ-8.3, 82-REQ-8.4, 82-REQ-8.E1 """ - from agent_fox.core.models import ModelTier, resolve_model - from agent_fox.routing.escalation import EscalationLadder - from agent_fox.session.review_parser import parse_fix_review_output - - retries_before = getattr( - self._config.orchestrator, - "retries_before_escalation", - 1, - ) - max_retries = getattr( - self._config.orchestrator, - "max_retries", - 3, - ) - - ladder = EscalationLadder( - starting_tier=ModelTier.STANDARD, - tier_ceiling=ModelTier.ADVANCED, - retries_before_escalation=retries_before, - ) - - review_feedback: FixReviewResult | None = None - - for _attempt in range(max_retries + 1): - # Resolve model from current tier - tier = ladder.current_tier - model_entry = resolve_model(tier.value) - model_id: str | None = model_entry.model_id - - # Build and run coder session - system_prompt, task_prompt = self._build_coder_prompt(spec, triage, review_feedback=review_feedback) - - node_id = f"fix-issue-{spec.issue_number}:0:coder" - attempt_suffix = f" (attempt {_attempt + 1})" if _attempt > 0 else "" - self._update_spinner(f"Running coder for issue #{spec.issue_number}{attempt_suffix}\u2026") - t0 = time.monotonic() - try: - coder_outcome = await self._run_coder_session( - workspace, - spec, - system_prompt, - task_prompt, - model_id=model_id, - ) - self._accumulate_metrics(metrics, coder_outcome) - self._emit_session_event( - coder_outcome, - "coder", - self._run_id, - node_id=node_id, - attempt=_attempt + 1, - ) - duration = time.monotonic() - t0 - if self._task_callback is not None: - self._task_callback( - TaskEvent( - node_id=node_id, - status="completed", - duration_s=duration, - archetype="coder", - ) - ) - except Exception as _coder_exc: - duration = time.monotonic() - t0 - emit_audit_event( - self._sink, - self._run_id, - AuditEventType.SESSION_FAIL, - node_id=node_id, - archetype="coder", - payload={ - "archetype": "coder", - "model_id": model_id or self._get_model_id("coder"), - "error_message": str(_coder_exc), - "attempt": _attempt + 1, - }, - ) - if self._task_callback is not None: - self._task_callback( - TaskEvent( - node_id=node_id, - status="failed", - duration_s=duration, - archetype="coder", - ) - ) - raise - - # Build and run reviewer session - reviewer_system, reviewer_task = self._build_reviewer_prompt(spec, triage) - - reviewer_node_id = f"fix-issue-{spec.issue_number}:0:reviewer" - self._update_spinner(f"Reviewing fix for issue #{spec.issue_number}\u2026") - t0 = time.monotonic() - try: - reviewer_outcome = await self._run_session( - "reviewer", - workspace, - spec=spec, - system_prompt=reviewer_system, - task_prompt=reviewer_task, - mode="fix-review", - ) - self._accumulate_metrics(metrics, reviewer_outcome) - self._emit_session_event( - reviewer_outcome, - "reviewer", - self._run_id, - node_id=reviewer_node_id, - attempt=_attempt + 1, - ) - duration = time.monotonic() - t0 - if self._task_callback is not None: - self._task_callback( - TaskEvent( - node_id=reviewer_node_id, - status="completed", - duration_s=duration, - archetype="reviewer", - ) - ) - except Exception as _reviewer_exc: - duration = time.monotonic() - t0 - emit_audit_event( - self._sink, - self._run_id, - AuditEventType.SESSION_FAIL, - node_id=reviewer_node_id, - archetype="reviewer", - payload={ - "archetype": "reviewer", - "model_id": self._get_model_id("reviewer"), - "error_message": str(_reviewer_exc), - "attempt": _attempt + 1, - }, - ) - if self._task_callback is not None: - self._task_callback( - TaskEvent( - node_id=reviewer_node_id, - status="failed", - duration_s=duration, - archetype="reviewer", - ) - ) - raise - - # Parse reviewer output, retrying reviewer once on parse failure - reviewer_response = getattr(reviewer_outcome, "response", "") or "" - review_result = parse_fix_review_output( - reviewer_response, - f"fix-issue-{spec.issue_number}", - f"fix-issue-{spec.issue_number}:0:reviewer", - ) - - if review_result.is_parse_failure: - logger.info( - "Reviewer output unparseable for issue #%d, retrying reviewer", - spec.issue_number, - ) - retry_node_id = f"fix-issue-{spec.issue_number}:0:reviewer_retry" - t0 = time.monotonic() - try: - retry_outcome = await self._run_session( - "reviewer", - workspace, - spec=spec, - system_prompt=reviewer_system, - task_prompt=reviewer_task, - mode="fix-review", - ) - self._accumulate_metrics(metrics, retry_outcome) - self._emit_session_event( - retry_outcome, - "reviewer", - self._run_id, - node_id=retry_node_id, - attempt=_attempt + 1, - ) - duration = time.monotonic() - t0 - if self._task_callback is not None: - self._task_callback( - TaskEvent( - node_id=retry_node_id, - status="completed", - duration_s=duration, - archetype="reviewer", - ) - ) - retry_response = getattr(retry_outcome, "response", "") or "" - retry_result = parse_fix_review_output( - retry_response, - f"fix-issue-{spec.issue_number}", - f"fix-issue-{spec.issue_number}:0:reviewer_retry", - ) - if not retry_result.is_parse_failure: - review_result = retry_result - else: - logger.warning( - "Reviewer retry also unparseable for issue #%d, treating as FAIL", - spec.issue_number, - ) - except Exception as _retry_exc: - duration = time.monotonic() - t0 - emit_audit_event( - self._sink, - self._run_id, - AuditEventType.SESSION_FAIL, - node_id=retry_node_id, - archetype="reviewer", - payload={ - "archetype": "reviewer", - "model_id": self._get_model_id("reviewer"), - "error_message": str(_retry_exc), - "attempt": _attempt + 1, - }, - ) - if self._task_callback is not None: - self._task_callback( - TaskEvent( - node_id=retry_node_id, - status="failed", - duration_s=duration, - archetype="reviewer", - ) - ) - logger.warning( - "Reviewer retry failed for issue #%d, treating as FAIL", - spec.issue_number, - exc_info=True, - ) - - # Post review comment - review_comment = self._format_review_comment(review_result) + f"\n(run: `{self._run_id}`)" - await self._post_comment(spec.issue_number, review_comment) - - # Check verdict - if review_result.overall_verdict == "PASS": - return True - - # FAIL: record and maybe escalate - ladder.record_failure() - - if ladder.is_exhausted or _attempt >= max_retries: - await self._post_comment( - spec.issue_number, - "Fix pipeline exhausted all retries. " - "The issue could not be resolved automatically. " - f"Manual intervention is required. (run: `{self._run_id}`)", - ) - return False - - # Set up feedback for next coder attempt - review_feedback = review_result + from agent_fox.nightshift.coder_reviewer import CoderReviewerLoop - # Should not reach here, but safety fallback - return False # pragma: no cover + return await CoderReviewerLoop(self).run(spec, triage, metrics, workspace) # ------------------------------------------------------------------ # Main entry point diff --git a/docs/memory.md b/docs/memory.md index c5495a7f..f047f66f 100644 --- a/docs/memory.md +++ b/docs/memory.md @@ -2,6 +2,14 @@ _3175 facts | last updated: 2026-04-23_ +**2026-04-23 strategy extraction (issue #518):** Extracted collaborator classes +from the three largest files: blocking logic from result_handler.py to +engine/blocking.py, dispatch strategies from engine.py to engine/dispatch.py +(SerialDispatcher, ParallelDispatcher), and coder-reviewer loop from +fix_pipeline.py to nightshift/coder_reviewer.py (CoderReviewerLoop). Net: +engine.py 2051→1817, fix_pipeline.py 1237→980, result_handler.py 793→590; ++3 new focused modules (851 LOC total). All 4292 tests pass unchanged. + **2026-04-23 simplification pass:** Deleted dead code (nightshift/extraction.py, nightshift/ignore_ingest.py, unused llm_validation functions) and consolidated small single-consumer files (_text_utils→prompt_safety, assessment→engine, diff --git a/tests/integration/test_review_visibility_smoke.py b/tests/integration/test_review_visibility_smoke.py index 11da72c7..132269ba 100644 --- a/tests/integration/test_review_visibility_smoke.py +++ b/tests/integration/test_review_visibility_smoke.py @@ -134,7 +134,7 @@ class TestEnrichedBlockingReasonE2E: """ def test_enriched_reason_from_real_db(self, knowledge_conn: duckdb.DuckDBPyConnection) -> None: - from agent_fox.engine.result_handler import evaluate_review_blocking + from agent_fox.engine.blocking import evaluate_review_blocking from agent_fox.engine.state import SessionRecord findings = [ diff --git a/tests/property/test_review_visibility_props.py b/tests/property/test_review_visibility_props.py index 175264e4..54e482ea 100644 --- a/tests/property/test_review_visibility_props.py +++ b/tests/property/test_review_visibility_props.py @@ -122,7 +122,7 @@ class TestBlockReasonIdCapProperty: @settings(max_examples=20) def test_finding_id_count_capped(self, n: int) -> None: """For any N critical findings, reason has min(N, 3) F- IDs.""" - from agent_fox.engine.result_handler import _format_block_reason + from agent_fox.engine.blocking import _format_block_reason from agent_fox.knowledge.review_store import ReviewFinding findings = [ diff --git a/tests/unit/engine/test_block_reason_enrichment.py b/tests/unit/engine/test_block_reason_enrichment.py index 03b47fea..0cbb7ce6 100644 --- a/tests/unit/engine/test_block_reason_enrichment.py +++ b/tests/unit/engine/test_block_reason_enrichment.py @@ -15,7 +15,7 @@ import duckdb -from agent_fox.engine.result_handler import evaluate_review_blocking +from agent_fox.engine.blocking import evaluate_review_blocking from agent_fox.engine.state import SessionRecord from agent_fox.knowledge.review_store import ReviewFinding, insert_findings diff --git a/tests/unit/knowledge/test_security_blocking.py b/tests/unit/knowledge/test_security_blocking.py index b26db0f4..f865c30a 100644 --- a/tests/unit/knowledge/test_security_blocking.py +++ b/tests/unit/knowledge/test_security_blocking.py @@ -16,7 +16,7 @@ import duckdb -from agent_fox.engine.result_handler import evaluate_review_blocking +from agent_fox.engine.blocking import evaluate_review_blocking from agent_fox.engine.state import SessionRecord from agent_fox.knowledge.review_store import ( ReviewFinding,