diff --git a/agent_fox/nightshift/engine.py b/agent_fox/nightshift/engine.py index 20468647..fba5a7d1 100644 --- a/agent_fox/nightshift/engine.py +++ b/agent_fox/nightshift/engine.py @@ -528,6 +528,7 @@ async def _process_fix(self, issue: object, issue_body: str = "") -> None: task_callback=self._task_callback, sink_dispatcher=self._sink, spinner_callback=self._spinner_callback, + conn=self._conn, ) effective_body = issue_body if issue_body else getattr(issue, "body", "") diff --git a/agent_fox/nightshift/fix_pipeline.py b/agent_fox/nightshift/fix_pipeline.py index 798bd191..866a087c 100644 --- a/agent_fox/nightshift/fix_pipeline.py +++ b/agent_fox/nightshift/fix_pipeline.py @@ -34,6 +34,8 @@ from agent_fox.workspace import WorkspaceInfo if TYPE_CHECKING: + import duckdb + from agent_fox.knowledge.sink import SinkDispatcher logger = logging.getLogger(__name__) @@ -80,6 +82,7 @@ def __init__( task_callback: TaskCallback | None = None, sink_dispatcher: SinkDispatcher | None = None, spinner_callback: SpinnerCallback | None = None, + conn: duckdb.DuckDBPyConnection | None = None, ) -> None: self._config = config self._platform = platform @@ -87,6 +90,7 @@ def __init__( self._task_callback = task_callback self._sink = sink_dispatcher self._spinner_callback = spinner_callback + self._conn = conn self._run_id: str = "" def _update_spinner(self, text: str) -> None: @@ -232,6 +236,98 @@ def _get_model_id(self, archetype: str) -> str: except Exception: return "claude-sonnet-4-6" + def _try_complete_run(self, status: str) -> None: + """Mark the runs row as finished (best-effort). + + No-op when conn is not set. The *status* value should be a + ``RunStatus`` string (e.g. ``"completed"`` or ``"interrupted"``). + """ + if self._conn is None: + return + try: + from agent_fox.engine.state import complete_run + + complete_run(self._conn, self._run_id, status) + except Exception: + logger.debug("Failed to complete run record for run %s", self._run_id, exc_info=True) + + def _record_session_to_db( + self, + outcome: object, + archetype: str, + run_id: str, + *, + node_id: str = "", + attempt: int = 1, + cost: float = 0.0, + ) -> None: + """Write a session outcome row to session_outcomes and update runs totals. + + Best-effort: exceptions are logged and swallowed so the pipeline is + never interrupted by a telemetry failure. + """ + if self._conn is None: + return + + import uuid as _uuid + from datetime import UTC, datetime + + from agent_fox.engine.state import ( + SessionOutcomeRecord, + record_session, + update_run_totals, + ) + + try: + input_tokens = getattr(outcome, "input_tokens", 0) + output_tokens = getattr(outcome, "output_tokens", 0) + duration_ms = getattr(outcome, "duration_ms", 0) + status = getattr(outcome, "status", "completed") + error_message = getattr(outcome, "error_message", None) + is_transport_error = getattr(outcome, "is_transport_error", False) + + # Parse spec_name and task_group from node_id (format: spec:group:archetype) + parts = node_id.split(":", 2) + spec_name = parts[0] if parts else "" + task_group = parts[1] if len(parts) > 1 else "0" + + model_id = self._get_model_id(archetype) + + record = SessionOutcomeRecord( + id=str(_uuid.uuid4()), + spec_name=spec_name, + task_group=task_group, + node_id=node_id, + touched_path="", + status=status, + input_tokens=input_tokens, + output_tokens=output_tokens, + duration_ms=duration_ms, + created_at=datetime.now(UTC).isoformat(), + run_id=run_id, + attempt=attempt, + cost=cost, + model=model_id, + archetype=archetype, + commit_sha="", + error_message=error_message, + is_transport_error=is_transport_error, + ) + record_session(self._conn, record) + update_run_totals( + self._conn, + run_id, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost=cost, + ) + except Exception: + logger.warning( + "Failed to record session to DB for %s", + node_id, + exc_info=True, + ) + def _emit_session_event( self, outcome: object, @@ -243,6 +339,9 @@ def _emit_session_event( ) -> None: """Emit session.complete or session.fail based on outcome status. + Also writes a row to session_outcomes and updates the runs totals via + _record_session_to_db (best-effort). + Best-effort: exceptions from audit infrastructure are logged and swallowed so the fix pipeline is never interrupted. @@ -289,6 +388,7 @@ def _emit_session_event( }, ) else: + cost = 0.0 emit_audit_event( self._sink, run_id, @@ -303,6 +403,16 @@ def _emit_session_event( }, ) + # Write to session_outcomes table and update runs totals (best-effort). + self._record_session_to_db( + outcome, + archetype, + run_id, + node_id=node_id, + attempt=attempt, + cost=cost, + ) + # ------------------------------------------------------------------ # Comment formatting (82-REQ-3.1, 82-REQ-6.1) # ------------------------------------------------------------------ @@ -871,6 +981,19 @@ async def process_issue( # (91-REQ-2.1) self._run_id = generate_run_id() + # Create a run row in the runs table (best-effort). + if self._conn is not None: + try: + from agent_fox.engine.state import create_run + + create_run(self._conn, self._run_id, f"fix-issue-{issue.number}") + except Exception: + logger.debug( + "Failed to create run record for issue #%d", + issue.number, + exc_info=True, + ) + # 61-REQ-6.E2: reject empty issue body if not issue_body or not issue_body.strip(): await self._platform.add_issue_comment( # type: ignore[attr-defined] @@ -879,6 +1002,7 @@ async def process_issue( "Please add more detail describing the problem and expected behavior. " f"(run: `{self._run_id}`)", ) + self._try_complete_run("completed") return metrics spec = build_in_memory_spec(issue, issue_body) @@ -927,6 +1051,7 @@ async def process_issue( if not success: # Ladder exhausted — do NOT close issue + self._try_complete_run("completed") return metrics # Optionally push fix branch to upstream remote (93-REQ-3.1). @@ -958,6 +1083,7 @@ async def process_issue( issue.number, exc, ) + self._try_complete_run("interrupted") return metrics finally: await self._cleanup_workspace(workspace) @@ -976,6 +1102,7 @@ async def process_issue( issue.number, exc, ) + self._try_complete_run("completed") return metrics # Close the originating issue with a comment pointing to the branch. @@ -1024,6 +1151,7 @@ async def process_issue( issue.number, spec.branch_name, ) + self._try_complete_run("completed") return metrics async def _push_fix_branch_upstream( diff --git a/tests/unit/nightshift/test_fix_pipeline.py b/tests/unit/nightshift/test_fix_pipeline.py index 78caaeff..fc5b3bee 100644 --- a/tests/unit/nightshift/test_fix_pipeline.py +++ b/tests/unit/nightshift/test_fix_pipeline.py @@ -886,3 +886,352 @@ async def mock_run_session(archetype: str, workspace: object = None, **kwargs: o # Issue should NOT be closed (max_retries=0 exhausted) mock_platform.close_issue.assert_not_awaited() + + +# --------------------------------------------------------------------------- +# Issue #467: fix pipeline must populate session_outcomes and runs tables +# --------------------------------------------------------------------------- + + +class TestFixPipelineDbTelemetry: + """Fix pipeline writes to session_outcomes and runs tables (issue #467). + + The fix pipeline previously only wrote to audit_events. After the fix, + every session must produce a row in session_outcomes and the runs table + must be created/completed for each pipeline invocation. + """ + + @pytest.mark.asyncio + async def test_session_outcomes_written_for_each_session(self) -> None: + """record_session is called for triage, coder, and reviewer sessions.""" + import json + from unittest.mock import AsyncMock, MagicMock, patch + + from agent_fox.nightshift.fix_pipeline import FixPipeline + from agent_fox.platform.protocol import IssueResult + + config = MagicMock() + config.orchestrator.retries_before_escalation = 1 + config.orchestrator.max_retries = 3 + mock_platform = AsyncMock() + + # Provide a mock DuckDB connection + mock_conn = MagicMock() + + pipeline = FixPipeline(config=config, platform=mock_platform, conn=mock_conn) + pipeline._setup_workspace = AsyncMock(return_value=_mock_workspace()) # type: ignore[method-assign] + pipeline._cleanup_workspace = AsyncMock() # type: ignore[method-assign] + + triage_response = json.dumps( + { + "summary": "s", + "affected_files": [], + "acceptance_criteria": [ + {"id": "AC-1", "description": "d", "preconditions": "p", "expected": "e", "assertion": "a"}, + ], + } + ) + review_response = json.dumps( + { + "verdicts": [{"criterion_id": "AC-1", "verdict": "PASS", "evidence": "ok"}], + "overall_verdict": "PASS", + "summary": "ok", + } + ) + + async def mock_run_session(archetype: str, workspace: object = None, **kwargs: object) -> MagicMock: + outcome = MagicMock() + outcome.status = "completed" + outcome.input_tokens = 10 + outcome.output_tokens = 5 + outcome.cache_read_input_tokens = 0 + outcome.cache_creation_input_tokens = 0 + outcome.duration_ms = 1000 + outcome.error_message = None + outcome.is_transport_error = False + if archetype == "maintainer": + outcome.response = triage_response + elif archetype == "reviewer": + outcome.response = review_response + else: + outcome.response = "" + return outcome + + pipeline._run_session = mock_run_session # type: ignore[assignment] + + issue = IssueResult( + number=467, + title="Fix broken telemetry", + html_url="https://github.com/test/repo/issues/467", + ) + + with ( + patch("agent_fox.engine.state.record_session") as mock_record_session, + patch("agent_fox.engine.state.update_run_totals") as mock_update_run_totals, + patch("agent_fox.engine.state.create_run") as mock_create_run, + patch("agent_fox.engine.state.complete_run") as mock_complete_run, + patch.object(pipeline, "_harvest_and_push", AsyncMock(return_value="merged")), + ): + await pipeline.process_issue(issue, issue_body="The telemetry is broken.") + + # create_run called once at the start + mock_create_run.assert_called_once() + run_id_arg = mock_create_run.call_args[0][1] + assert run_id_arg == pipeline._run_id + + # record_session called for triage (maintainer), coder, reviewer + assert mock_record_session.call_count >= 3, ( + f"Expected at least 3 record_session calls (triage+coder+reviewer), " + f"got {mock_record_session.call_count}" + ) + + # update_run_totals called after each session + assert mock_update_run_totals.call_count >= 3 + + # complete_run called exactly once at the end + mock_complete_run.assert_called_once() + completed_run_id = mock_complete_run.call_args[0][1] + assert completed_run_id == pipeline._run_id + + @pytest.mark.asyncio + async def test_runs_row_created_even_on_empty_body(self) -> None: + """create_run is called even when the issue body is empty.""" + from unittest.mock import AsyncMock, MagicMock, patch + + from agent_fox.nightshift.fix_pipeline import FixPipeline + from agent_fox.platform.protocol import IssueResult + + config = MagicMock() + mock_platform = AsyncMock() + mock_conn = MagicMock() + + pipeline = FixPipeline(config=config, platform=mock_platform, conn=mock_conn) + + issue = IssueResult( + number=467, + title="Fix something", + html_url="https://github.com/test/repo/issues/467", + ) + + with ( + patch("agent_fox.engine.state.create_run") as mock_create_run, + patch("agent_fox.engine.state.complete_run") as mock_complete_run, + ): + await pipeline.process_issue(issue, issue_body="") + + mock_create_run.assert_called_once() + mock_complete_run.assert_called_once() + + @pytest.mark.asyncio + async def test_session_outcome_has_run_id_and_archetype(self) -> None: + """SessionOutcomeRecord written with correct run_id and archetype.""" + import json + from unittest.mock import AsyncMock, MagicMock, patch + + from agent_fox.engine.state import SessionOutcomeRecord + from agent_fox.nightshift.fix_pipeline import FixPipeline + from agent_fox.platform.protocol import IssueResult + + config = MagicMock() + config.orchestrator.retries_before_escalation = 1 + config.orchestrator.max_retries = 3 + mock_platform = AsyncMock() + mock_conn = MagicMock() + + pipeline = FixPipeline(config=config, platform=mock_platform, conn=mock_conn) + pipeline._setup_workspace = AsyncMock(return_value=_mock_workspace()) # type: ignore[method-assign] + pipeline._cleanup_workspace = AsyncMock() # type: ignore[method-assign] + + review_response = json.dumps( + { + "verdicts": [{"criterion_id": "AC-1", "verdict": "PASS", "evidence": "ok"}], + "overall_verdict": "PASS", + "summary": "ok", + } + ) + triage_response = json.dumps( + { + "summary": "s", + "affected_files": [], + "acceptance_criteria": [ + {"id": "AC-1", "description": "d", "preconditions": "p", "expected": "e", "assertion": "a"}, + ], + } + ) + + async def mock_run_session(archetype: str, workspace: object = None, **kwargs: object) -> MagicMock: + outcome = MagicMock() + outcome.status = "completed" + outcome.input_tokens = 10 + outcome.output_tokens = 5 + outcome.cache_read_input_tokens = 0 + outcome.cache_creation_input_tokens = 0 + outcome.duration_ms = 500 + outcome.error_message = None + outcome.is_transport_error = False + if archetype == "maintainer": + outcome.response = triage_response + elif archetype == "reviewer": + outcome.response = review_response + else: + outcome.response = "" + return outcome + + pipeline._run_session = mock_run_session # type: ignore[assignment] + + issue = IssueResult( + number=467, + title="Fix telemetry", + html_url="https://github.com/test/repo/issues/467", + ) + + recorded: list[SessionOutcomeRecord] = [] + + def capture_record_session(conn: object, record: SessionOutcomeRecord) -> None: + recorded.append(record) + + with ( + patch("agent_fox.engine.state.record_session", side_effect=capture_record_session), + patch("agent_fox.engine.state.update_run_totals"), + patch("agent_fox.engine.state.create_run"), + patch("agent_fox.engine.state.complete_run"), + patch.object(pipeline, "_harvest_and_push", AsyncMock(return_value="merged")), + ): + await pipeline.process_issue(issue, issue_body="Telemetry is broken.") + + # All records must have the same run_id as the pipeline + expected_run_id = pipeline._run_id + for rec in recorded: + assert rec.run_id == expected_run_id, f"Record run_id mismatch: {rec.run_id!r} != {expected_run_id!r}" + + # Archetypes must include maintainer (triage), coder, reviewer + archetypes_recorded = {rec.archetype for rec in recorded} + assert "maintainer" in archetypes_recorded, f"triage (maintainer) not in {archetypes_recorded}" + assert "coder" in archetypes_recorded, f"coder not in {archetypes_recorded}" + assert "reviewer" in archetypes_recorded, f"reviewer not in {archetypes_recorded}" + + @pytest.mark.asyncio + async def test_no_db_writes_when_conn_is_none(self) -> None: + """When conn=None, no DB functions are called (no-op path).""" + import json + from unittest.mock import AsyncMock, MagicMock, patch + + from agent_fox.nightshift.fix_pipeline import FixPipeline + from agent_fox.platform.protocol import IssueResult + + config = MagicMock() + config.orchestrator.retries_before_escalation = 1 + config.orchestrator.max_retries = 3 + mock_platform = AsyncMock() + + # conn=None (default) + pipeline = FixPipeline(config=config, platform=mock_platform) + pipeline._setup_workspace = AsyncMock(return_value=_mock_workspace()) # type: ignore[method-assign] + pipeline._cleanup_workspace = AsyncMock() # type: ignore[method-assign] + + review_response = json.dumps( + { + "verdicts": [{"criterion_id": "AC-1", "verdict": "PASS", "evidence": "ok"}], + "overall_verdict": "PASS", + "summary": "ok", + } + ) + triage_response = json.dumps( + { + "summary": "s", + "affected_files": [], + "acceptance_criteria": [ + {"id": "AC-1", "description": "d", "preconditions": "p", "expected": "e", "assertion": "a"}, + ], + } + ) + + async def mock_run_session(archetype: str, workspace: object = None, **kwargs: object) -> MagicMock: + outcome = MagicMock() + outcome.status = "completed" + outcome.input_tokens = 10 + outcome.output_tokens = 5 + outcome.cache_read_input_tokens = 0 + outcome.cache_creation_input_tokens = 0 + outcome.duration_ms = 500 + outcome.error_message = None + outcome.is_transport_error = False + if archetype == "maintainer": + outcome.response = triage_response + elif archetype == "reviewer": + outcome.response = review_response + else: + outcome.response = "" + return outcome + + pipeline._run_session = mock_run_session # type: ignore[assignment] + + issue = IssueResult( + number=467, + title="Fix telemetry", + html_url="https://github.com/test/repo/issues/467", + ) + + with ( + patch("agent_fox.engine.state.record_session") as mock_record_session, + patch("agent_fox.engine.state.create_run") as mock_create_run, + patch("agent_fox.engine.state.complete_run") as mock_complete_run, + patch.object(pipeline, "_harvest_and_push", AsyncMock(return_value="merged")), + ): + await pipeline.process_issue(issue, issue_body="Telemetry is broken.") + + # None of the DB functions should have been called + mock_record_session.assert_not_called() + mock_create_run.assert_not_called() + mock_complete_run.assert_not_called() + + @pytest.mark.asyncio + async def test_engine_passes_conn_to_fix_pipeline(self) -> None: + """NightShiftEngine passes self._conn to FixPipeline.""" + import json + from unittest.mock import AsyncMock, MagicMock, patch + + from agent_fox.nightshift.engine import NightShiftEngine + from agent_fox.platform.protocol import IssueResult + + config = MagicMock() + config.orchestrator.max_cost = None + config.orchestrator.max_sessions = None + config.night_shift.push_fix_branch = False + + mock_platform = AsyncMock() + mock_conn = MagicMock() + + engine = NightShiftEngine(config=config, platform=mock_platform, conn=mock_conn) + + issue = IssueResult( + number=467, + title="Fix something", + html_url="https://github.com/test/repo/issues/467", + body="The issue body", + ) + + captured_pipelines: list[object] = [] + + original_fix_pipeline = __import__( + "agent_fox.nightshift.fix_pipeline", fromlist=["FixPipeline"] + ).FixPipeline + + class CapturingFixPipeline(original_fix_pipeline): # type: ignore[misc] + def __init__(self, *args: object, **kwargs: object) -> None: + captured_pipelines.append(kwargs.get("conn")) + super().__init__(*args, **kwargs) + + async def process_issue(self, *args: object, **kwargs: object) -> object: # type: ignore[override] + return MagicMock(sessions_run=0) + + with patch( + "agent_fox.nightshift.engine.FixPipeline", + CapturingFixPipeline, + ): + await engine._process_fix(issue) + + assert len(captured_pipelines) == 1 + assert captured_pipelines[0] is mock_conn, ( + f"Expected conn={mock_conn!r} to be passed, got {captured_pipelines[0]!r}" + )