Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 1 addition & 19 deletions agent_fox/engine/review_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from agent_fox.core.json_extraction import extract_json_array
from agent_fox.engine.audit_helpers import emit_audit_event
from agent_fox.knowledge.audit import AuditEventType, AuditSeverity
from agent_fox.knowledge.sink import SessionOutcome, SessionSink, SinkDispatcher
from agent_fox.knowledge.sink import SessionSink, SinkDispatcher

if TYPE_CHECKING:
from agent_fox.knowledge.review_store import ReviewFinding
Expand Down Expand Up @@ -132,24 +132,6 @@ def _emit_persistence_event(
)


def record_session_to_sink(
sink: SinkDispatcher | SessionSink | None,
outcome: SessionOutcome,
node_id: str,
) -> None:
"""Record a session outcome to the sink dispatcher (best-effort)."""
if sink is None:
return
try:
sink.record_session_outcome(outcome)
except Exception:
logger.warning(
"Failed to record session outcome to sink for %s",
node_id,
exc_info=True,
)


def persist_review_findings(
transcript: str,
node_id: str,
Expand Down
31 changes: 1 addition & 30 deletions agent_fox/engine/session_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from __future__ import annotations

import dataclasses
import json
import logging
from datetime import UTC, datetime
Expand All @@ -24,10 +23,7 @@
from agent_fox.core.prompt_safety import sanitize_prompt_content
from agent_fox.engine.audit_helpers import emit_audit_event
from agent_fox.engine.knowledge_harvest import extract_and_store_knowledge
from agent_fox.engine.review_persistence import (
persist_review_findings,
record_session_to_sink,
)
from agent_fox.engine.review_persistence import persist_review_findings
from agent_fox.engine.sdk_params import (
clamp_instances,
resolve_fallback_model,
Expand Down Expand Up @@ -635,23 +631,6 @@ async def _run_and_harvest(
if touched_files and status == "completed":
commit_sha = await _capture_develop_head(repo_root)

# Record and emit audit events
sink_outcome = outcome
if status != outcome.status or error_message != outcome.error_message:
sink_outcome = dataclasses.replace(
sink_outcome,
status=status,
error_message=error_message,
)
if touched_files:
sink_outcome = dataclasses.replace(
sink_outcome,
touched_paths=touched_files,
)

# 11-REQ-4.2: Record session outcome to sinks (always, best-effort)
self._record_session_to_sink(sink_outcome, node_id)

# 40-REQ-7.2, 40-REQ-7.3: Emit session.complete or session.fail
if status == "completed":
emit_audit_event(
Expand Down Expand Up @@ -731,14 +710,6 @@ async def _run_and_harvest(
is_transport_error=getattr(outcome, "is_transport_error", False),
)

def _record_session_to_sink(
self,
outcome: SessionOutcome,
node_id: str,
) -> None:
"""Record a session outcome to the sink dispatcher (best-effort)."""
record_session_to_sink(self._sink, outcome, node_id)

def _persist_review_findings(
self,
transcript: str,
Expand Down
60 changes: 0 additions & 60 deletions tests/unit/cli/test_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,66 +515,6 @@ async def test_harvest_error_returns_failed_record_with_context(
assert record.input_tokens == 100 # Session metrics preserved
assert record.output_tokens == 200

@pytest.mark.asyncio
async def test_harvest_error_records_failed_status_to_sink(
self,
) -> None:
"""Sink receives failed status when harvest fails after completed session."""
from agent_fox.core.errors import IntegrationError
from agent_fox.engine.session_lifecycle import NodeSessionRunner
from agent_fox.knowledge.sink import SessionOutcome

config = AgentFoxConfig()
sink = MagicMock()
runner = NodeSessionRunner("test_spec:1", config, sink_dispatcher=sink, knowledge_db=_MOCK_KB)

mock_outcome = SessionOutcome(
spec_name="test_spec",
task_group="1",
node_id="test_spec:1",
status="completed",
input_tokens=100,
output_tokens=200,
duration_ms=5000,
)

with (
patch(
"agent_fox.engine.session_lifecycle.run_session",
new_callable=AsyncMock,
return_value=mock_outcome,
),
patch(
"agent_fox.engine.session_lifecycle.harvest",
new_callable=AsyncMock,
side_effect=IntegrationError(
"Merge conflict in foo.py",
),
),
):
from agent_fox.workspace import WorkspaceInfo

workspace = WorkspaceInfo(
path=Path("/tmp/fake-worktree"),
spec_name="test_spec",
task_group=1,
branch="feature/test_spec/1",
)
await runner._run_and_harvest(
"test_spec:1",
1,
workspace,
"system prompt",
"task prompt",
Path("/tmp/fake-repo"),
)

sink.record_session_outcome.assert_called_once()
recorded = sink.record_session_outcome.call_args.args[0]
assert recorded.status == "failed"
assert recorded.error_message is not None
assert "harvest failed" in recorded.error_message.lower()

@pytest.mark.asyncio
async def test_session_summary_read_before_cleanup(
self,
Expand Down
81 changes: 81 additions & 0 deletions tests/unit/engine/test_session_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from agent_fox.knowledge.db import KnowledgeDB
from agent_fox.workspace import WorkspaceInfo

from agent_fox.knowledge.sink import SessionOutcome

_MOCK_KB = MagicMock(spec=KnowledgeDB)

# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -238,3 +240,82 @@ async def _fake_run_and_harvest(node_id, attempt, workspace, system_prompt, task

assert "type error in foo" in captured_prompts["task"]
assert "retry attempt 2" in captured_prompts["task"].lower()


# ---------------------------------------------------------------------------
# Regression: no duplicate session_outcomes rows (fixes #473)
# ---------------------------------------------------------------------------


class TestNoDuplicateSessionOutcomeWrite:
"""Verify _run_and_harvest does not write session outcomes to the sink.

Session outcomes are written exclusively by SessionResultHandler.process()
via state.record_session(). The old sink-based path caused duplicate rows
in the session_outcomes table (issue #473).
"""

@pytest.mark.asyncio
async def test_run_and_harvest_does_not_call_sink_record_session_outcome(self) -> None:
"""_run_and_harvest must not dispatch record_session_outcome to sinks."""
config = AgentFoxConfig()
sink = MagicMock()
sink.record_session_outcome = MagicMock()

runner = NodeSessionRunner(
"spec:1",
config,
knowledge_db=_MOCK_KB,
sink_dispatcher=sink,
)

workspace = WorkspaceInfo(
path=Path("/tmp/ws"),
spec_name="spec",
task_group=1,
branch="feature/spec/1",
)

fake_outcome = SessionOutcome(
spec_name="spec",
task_group="1",
node_id="spec:1",
status="completed",
input_tokens=100,
output_tokens=200,
duration_ms=5000,
)

with (
patch.object(
runner,
"_execute_session",
new_callable=AsyncMock,
return_value=fake_outcome,
),
patch.object(
runner,
"_harvest_and_integrate",
new_callable=AsyncMock,
return_value=("completed", None, []),
),
patch(
"agent_fox.engine.session_lifecycle._capture_develop_head",
new_callable=AsyncMock,
return_value="abc123",
),
patch(
"agent_fox.engine.session_lifecycle.emit_audit_event",
),
patch.object(
runner,
"_extract_knowledge_and_findings",
new_callable=AsyncMock,
),
):
record = await runner._run_and_harvest(
"spec:1", 1, workspace, "sys", "task", Path("/tmp"),
)

assert record.status == "completed"
sink.record_session_outcome.assert_not_called()