Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agent_fox/nightshift/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ async def _process_fix(self, issue: object, issue_body: str = "") -> None:
task_callback=self._task_callback,
sink_dispatcher=self._sink,
spinner_callback=self._spinner_callback,
conn=self._conn,
)

effective_body = issue_body if issue_body else getattr(issue, "body", "")
Expand Down
128 changes: 128 additions & 0 deletions agent_fox/nightshift/fix_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from agent_fox.workspace import WorkspaceInfo

if TYPE_CHECKING:
import duckdb

from agent_fox.knowledge.sink import SinkDispatcher

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -80,13 +82,15 @@ def __init__(
task_callback: TaskCallback | None = None,
sink_dispatcher: SinkDispatcher | None = None,
spinner_callback: SpinnerCallback | None = None,
conn: duckdb.DuckDBPyConnection | None = None,
) -> None:
self._config = config
self._platform = platform
self._activity_callback = activity_callback
self._task_callback = task_callback
self._sink = sink_dispatcher
self._spinner_callback = spinner_callback
self._conn = conn
self._run_id: str = ""

def _update_spinner(self, text: str) -> None:
Expand Down Expand Up @@ -232,6 +236,98 @@ def _get_model_id(self, archetype: str) -> str:
except Exception:
return "claude-sonnet-4-6"

def _try_complete_run(self, status: str) -> None:
"""Mark the runs row as finished (best-effort).

No-op when conn is not set. The *status* value should be a
``RunStatus`` string (e.g. ``"completed"`` or ``"interrupted"``).
"""
if self._conn is None:
return
try:
from agent_fox.engine.state import complete_run

complete_run(self._conn, self._run_id, status)
except Exception:
logger.debug("Failed to complete run record for run %s", self._run_id, exc_info=True)

def _record_session_to_db(
self,
outcome: object,
archetype: str,
run_id: str,
*,
node_id: str = "",
attempt: int = 1,
cost: float = 0.0,
) -> None:
"""Write a session outcome row to session_outcomes and update runs totals.

Best-effort: exceptions are logged and swallowed so the pipeline is
never interrupted by a telemetry failure.
"""
if self._conn is None:
return

import uuid as _uuid
from datetime import UTC, datetime

from agent_fox.engine.state import (
SessionOutcomeRecord,
record_session,
update_run_totals,
)

try:
input_tokens = getattr(outcome, "input_tokens", 0)
output_tokens = getattr(outcome, "output_tokens", 0)
duration_ms = getattr(outcome, "duration_ms", 0)
status = getattr(outcome, "status", "completed")
error_message = getattr(outcome, "error_message", None)
is_transport_error = getattr(outcome, "is_transport_error", False)

# Parse spec_name and task_group from node_id (format: spec:group:archetype)
parts = node_id.split(":", 2)
spec_name = parts[0] if parts else ""
task_group = parts[1] if len(parts) > 1 else "0"

model_id = self._get_model_id(archetype)

record = SessionOutcomeRecord(
id=str(_uuid.uuid4()),
spec_name=spec_name,
task_group=task_group,
node_id=node_id,
touched_path="",
status=status,
input_tokens=input_tokens,
output_tokens=output_tokens,
duration_ms=duration_ms,
created_at=datetime.now(UTC).isoformat(),
run_id=run_id,
attempt=attempt,
cost=cost,
model=model_id,
archetype=archetype,
commit_sha="",
error_message=error_message,
is_transport_error=is_transport_error,
)
record_session(self._conn, record)
update_run_totals(
self._conn,
run_id,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost,
)
except Exception:
logger.warning(
"Failed to record session to DB for %s",
node_id,
exc_info=True,
)

def _emit_session_event(
self,
outcome: object,
Expand All @@ -243,6 +339,9 @@ def _emit_session_event(
) -> None:
"""Emit session.complete or session.fail based on outcome status.

Also writes a row to session_outcomes and updates the runs totals via
_record_session_to_db (best-effort).

Best-effort: exceptions from audit infrastructure are logged and
swallowed so the fix pipeline is never interrupted.

Expand Down Expand Up @@ -289,6 +388,7 @@ def _emit_session_event(
},
)
else:
cost = 0.0
emit_audit_event(
self._sink,
run_id,
Expand All @@ -303,6 +403,16 @@ def _emit_session_event(
},
)

# Write to session_outcomes table and update runs totals (best-effort).
self._record_session_to_db(
outcome,
archetype,
run_id,
node_id=node_id,
attempt=attempt,
cost=cost,
)

# ------------------------------------------------------------------
# Comment formatting (82-REQ-3.1, 82-REQ-6.1)
# ------------------------------------------------------------------
Expand Down Expand Up @@ -871,6 +981,19 @@ async def process_issue(
# (91-REQ-2.1)
self._run_id = generate_run_id()

# Create a run row in the runs table (best-effort).
if self._conn is not None:
try:
from agent_fox.engine.state import create_run

create_run(self._conn, self._run_id, f"fix-issue-{issue.number}")
except Exception:
logger.debug(
"Failed to create run record for issue #%d",
issue.number,
exc_info=True,
)

# 61-REQ-6.E2: reject empty issue body
if not issue_body or not issue_body.strip():
await self._platform.add_issue_comment( # type: ignore[attr-defined]
Expand All @@ -879,6 +1002,7 @@ async def process_issue(
"Please add more detail describing the problem and expected behavior. "
f"(run: `{self._run_id}`)",
)
self._try_complete_run("completed")
return metrics

spec = build_in_memory_spec(issue, issue_body)
Expand Down Expand Up @@ -927,6 +1051,7 @@ async def process_issue(

if not success:
# Ladder exhausted — do NOT close issue
self._try_complete_run("completed")
return metrics

# Optionally push fix branch to upstream remote (93-REQ-3.1).
Expand Down Expand Up @@ -958,6 +1083,7 @@ async def process_issue(
issue.number,
exc,
)
self._try_complete_run("interrupted")
return metrics
finally:
await self._cleanup_workspace(workspace)
Expand All @@ -976,6 +1102,7 @@ async def process_issue(
issue.number,
exc,
)
self._try_complete_run("completed")
return metrics

# Close the originating issue with a comment pointing to the branch.
Expand Down Expand Up @@ -1024,6 +1151,7 @@ async def process_issue(
issue.number,
spec.branch_name,
)
self._try_complete_run("completed")
return metrics

async def _push_fix_branch_upstream(
Expand Down
Loading