From 4c90b2f6612d670988a816126e4043ec567f0358 Mon Sep 17 00:00:00 2001 From: Michael Kuehl Date: Fri, 17 Apr 2026 17:06:55 +0200 Subject: [PATCH] fix(engine): remove duplicate session_outcomes insertion (fixes #473) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two independent code paths were inserting rows into session_outcomes for every session: the sink-based path in session_lifecycle._record_session_to_sink (basic fields, cost=0, model=NULL) and the DB-based path in result_handler.SessionResultHandler.process (complete fields). Remove the redundant sink path — the result_handler path is the authoritative single source of truth. --- agent_fox/engine/review_persistence.py | 20 +---- agent_fox/engine/session_lifecycle.py | 31 +------- tests/unit/cli/test_code.py | 60 --------------- tests/unit/engine/test_session_lifecycle.py | 81 +++++++++++++++++++++ 4 files changed, 83 insertions(+), 109 deletions(-) diff --git a/agent_fox/engine/review_persistence.py b/agent_fox/engine/review_persistence.py index 8a25efaa..95a88cf0 100644 --- a/agent_fox/engine/review_persistence.py +++ b/agent_fox/engine/review_persistence.py @@ -18,7 +18,7 @@ from agent_fox.core.json_extraction import extract_json_array from agent_fox.engine.audit_helpers import emit_audit_event from agent_fox.knowledge.audit import AuditEventType, AuditSeverity -from agent_fox.knowledge.sink import SessionOutcome, SessionSink, SinkDispatcher +from agent_fox.knowledge.sink import SessionSink, SinkDispatcher if TYPE_CHECKING: from agent_fox.knowledge.review_store import ReviewFinding @@ -132,24 +132,6 @@ def _emit_persistence_event( ) -def record_session_to_sink( - sink: SinkDispatcher | SessionSink | None, - outcome: SessionOutcome, - node_id: str, -) -> None: - """Record a session outcome to the sink dispatcher (best-effort).""" - if sink is None: - return - try: - sink.record_session_outcome(outcome) - except Exception: - logger.warning( - "Failed to record session outcome to sink for %s", - node_id, - exc_info=True, - ) - - def persist_review_findings( transcript: str, node_id: str, diff --git a/agent_fox/engine/session_lifecycle.py b/agent_fox/engine/session_lifecycle.py index 7e5fd0c9..296e62a4 100644 --- a/agent_fox/engine/session_lifecycle.py +++ b/agent_fox/engine/session_lifecycle.py @@ -11,7 +11,6 @@ from __future__ import annotations -import dataclasses import json import logging from datetime import UTC, datetime @@ -24,10 +23,7 @@ from agent_fox.core.prompt_safety import sanitize_prompt_content from agent_fox.engine.audit_helpers import emit_audit_event from agent_fox.engine.knowledge_harvest import extract_and_store_knowledge -from agent_fox.engine.review_persistence import ( - persist_review_findings, - record_session_to_sink, -) +from agent_fox.engine.review_persistence import persist_review_findings from agent_fox.engine.sdk_params import ( clamp_instances, resolve_fallback_model, @@ -635,23 +631,6 @@ async def _run_and_harvest( if touched_files and status == "completed": commit_sha = await _capture_develop_head(repo_root) - # Record and emit audit events - sink_outcome = outcome - if status != outcome.status or error_message != outcome.error_message: - sink_outcome = dataclasses.replace( - sink_outcome, - status=status, - error_message=error_message, - ) - if touched_files: - sink_outcome = dataclasses.replace( - sink_outcome, - touched_paths=touched_files, - ) - - # 11-REQ-4.2: Record session outcome to sinks (always, best-effort) - self._record_session_to_sink(sink_outcome, node_id) - # 40-REQ-7.2, 40-REQ-7.3: Emit session.complete or session.fail if status == "completed": emit_audit_event( @@ -731,14 +710,6 @@ async def _run_and_harvest( is_transport_error=getattr(outcome, "is_transport_error", False), ) - def _record_session_to_sink( - self, - outcome: SessionOutcome, - node_id: str, - ) -> None: - """Record a session outcome to the sink dispatcher (best-effort).""" - record_session_to_sink(self._sink, outcome, node_id) - def _persist_review_findings( self, transcript: str, diff --git a/tests/unit/cli/test_code.py b/tests/unit/cli/test_code.py index 5fc176ed..bb881e7d 100644 --- a/tests/unit/cli/test_code.py +++ b/tests/unit/cli/test_code.py @@ -515,66 +515,6 @@ async def test_harvest_error_returns_failed_record_with_context( assert record.input_tokens == 100 # Session metrics preserved assert record.output_tokens == 200 - @pytest.mark.asyncio - async def test_harvest_error_records_failed_status_to_sink( - self, - ) -> None: - """Sink receives failed status when harvest fails after completed session.""" - from agent_fox.core.errors import IntegrationError - from agent_fox.engine.session_lifecycle import NodeSessionRunner - from agent_fox.knowledge.sink import SessionOutcome - - config = AgentFoxConfig() - sink = MagicMock() - runner = NodeSessionRunner("test_spec:1", config, sink_dispatcher=sink, knowledge_db=_MOCK_KB) - - mock_outcome = SessionOutcome( - spec_name="test_spec", - task_group="1", - node_id="test_spec:1", - status="completed", - input_tokens=100, - output_tokens=200, - duration_ms=5000, - ) - - with ( - patch( - "agent_fox.engine.session_lifecycle.run_session", - new_callable=AsyncMock, - return_value=mock_outcome, - ), - patch( - "agent_fox.engine.session_lifecycle.harvest", - new_callable=AsyncMock, - side_effect=IntegrationError( - "Merge conflict in foo.py", - ), - ), - ): - from agent_fox.workspace import WorkspaceInfo - - workspace = WorkspaceInfo( - path=Path("/tmp/fake-worktree"), - spec_name="test_spec", - task_group=1, - branch="feature/test_spec/1", - ) - await runner._run_and_harvest( - "test_spec:1", - 1, - workspace, - "system prompt", - "task prompt", - Path("/tmp/fake-repo"), - ) - - sink.record_session_outcome.assert_called_once() - recorded = sink.record_session_outcome.call_args.args[0] - assert recorded.status == "failed" - assert recorded.error_message is not None - assert "harvest failed" in recorded.error_message.lower() - @pytest.mark.asyncio async def test_session_summary_read_before_cleanup( self, diff --git a/tests/unit/engine/test_session_lifecycle.py b/tests/unit/engine/test_session_lifecycle.py index a64b1e22..5b84b7ce 100644 --- a/tests/unit/engine/test_session_lifecycle.py +++ b/tests/unit/engine/test_session_lifecycle.py @@ -20,6 +20,8 @@ from agent_fox.knowledge.db import KnowledgeDB from agent_fox.workspace import WorkspaceInfo +from agent_fox.knowledge.sink import SessionOutcome + _MOCK_KB = MagicMock(spec=KnowledgeDB) # --------------------------------------------------------------------------- @@ -238,3 +240,82 @@ async def _fake_run_and_harvest(node_id, attempt, workspace, system_prompt, task assert "type error in foo" in captured_prompts["task"] assert "retry attempt 2" in captured_prompts["task"].lower() + + +# --------------------------------------------------------------------------- +# Regression: no duplicate session_outcomes rows (fixes #473) +# --------------------------------------------------------------------------- + + +class TestNoDuplicateSessionOutcomeWrite: + """Verify _run_and_harvest does not write session outcomes to the sink. + + Session outcomes are written exclusively by SessionResultHandler.process() + via state.record_session(). The old sink-based path caused duplicate rows + in the session_outcomes table (issue #473). + """ + + @pytest.mark.asyncio + async def test_run_and_harvest_does_not_call_sink_record_session_outcome(self) -> None: + """_run_and_harvest must not dispatch record_session_outcome to sinks.""" + config = AgentFoxConfig() + sink = MagicMock() + sink.record_session_outcome = MagicMock() + + runner = NodeSessionRunner( + "spec:1", + config, + knowledge_db=_MOCK_KB, + sink_dispatcher=sink, + ) + + workspace = WorkspaceInfo( + path=Path("/tmp/ws"), + spec_name="spec", + task_group=1, + branch="feature/spec/1", + ) + + fake_outcome = SessionOutcome( + spec_name="spec", + task_group="1", + node_id="spec:1", + status="completed", + input_tokens=100, + output_tokens=200, + duration_ms=5000, + ) + + with ( + patch.object( + runner, + "_execute_session", + new_callable=AsyncMock, + return_value=fake_outcome, + ), + patch.object( + runner, + "_harvest_and_integrate", + new_callable=AsyncMock, + return_value=("completed", None, []), + ), + patch( + "agent_fox.engine.session_lifecycle._capture_develop_head", + new_callable=AsyncMock, + return_value="abc123", + ), + patch( + "agent_fox.engine.session_lifecycle.emit_audit_event", + ), + patch.object( + runner, + "_extract_knowledge_and_findings", + new_callable=AsyncMock, + ), + ): + record = await runner._run_and_harvest( + "spec:1", 1, workspace, "sys", "task", Path("/tmp"), + ) + + assert record.status == "completed" + sink.record_session_outcome.assert_not_called()