diff --git a/src/apps/api/routers/agents.py b/src/apps/api/routers/agents.py index 559bfa1..dda663e 100644 --- a/src/apps/api/routers/agents.py +++ b/src/apps/api/routers/agents.py @@ -3,11 +3,12 @@ from typing import Any from fastapi import APIRouter, Depends, HTTPException, status -from sqlalchemy import case, func, select +from sqlalchemy import select from sqlalchemy.orm import Session from src.apps.api.deps import get_db from src.packages.core.db.models import AgentRoleORM, ExecutionRunORM +from src.packages.core.costs import estimate_cost from src.packages.core.schemas import ( AgentCapabilityDeclaration, AgentRoleDetailRead, @@ -50,9 +51,10 @@ def _to_agent_detail(agent_role: AgentRoleORM) -> AgentRoleDetailRead: def _to_agent_registry_item( agent_role: AgentRoleORM, *, - total_runs: int, - success_runs: int, + stats: dict[str, Any], ) -> AgentRegistryListItemRead: + total_runs = int(stats.get("total_runs", 0)) + success_runs = int(stats.get("success_runs", 0)) success_rate = None if total_runs > 0: success_rate = round((success_runs / total_runs) * 100, 2) @@ -70,6 +72,16 @@ def _to_agent_registry_item( total_runs=total_runs, success_runs=success_runs, success_rate=success_rate, + average_latency_ms=stats.get("average_latency_ms"), + retry_rate=stats.get("retry_rate"), + average_prompt_tokens=stats.get("average_prompt_tokens", 0), + average_completion_tokens=stats.get("average_completion_tokens", 0), + average_total_tokens=stats.get("average_total_tokens", 0), + total_prompt_tokens=stats.get("total_prompt_tokens", 0), + total_completion_tokens=stats.get("total_completion_tokens", 0), + total_tokens=stats.get("total_tokens", 0), + average_cost_estimate=stats.get("average_cost_estimate", 0), + total_cost_estimate=stats.get("total_cost_estimate", 0), ) @@ -182,31 +194,64 @@ def get_agent_registry( ) -> AgentRegistryResponse: agent_roles = db.scalars(select(AgentRoleORM).order_by(AgentRoleORM.role_name)).all() - run_stats_rows = db.execute( - select( - ExecutionRunORM.agent_role_id, - func.count(ExecutionRunORM.id).label("total_runs"), - func.coalesce( - func.sum(case((ExecutionRunORM.run_status == "success", 1), else_=0)), - 0, - ).label("success_runs"), + runs = db.scalars(select(ExecutionRunORM).order_by(ExecutionRunORM.started_at.asc(), ExecutionRunORM.id.asc())).all() + run_stats_by_role_id: dict[str, dict[str, Any]] = {} + task_run_counts: dict[str, int] = {} + for run in runs: + task_run_counts[run.task_id] = task_run_counts.get(run.task_id, 0) + 1 + + for run in runs: + stats = run_stats_by_role_id.setdefault( + run.agent_role_id, + { + "total_runs": 0, + "success_runs": 0, + "latency_sum": 0, + "latency_count": 0, + "total_prompt_tokens": 0, + "total_completion_tokens": 0, + "total_tokens": 0, + "total_cost_estimate": 0.0, + "retry_runs": 0, + }, ) - .group_by(ExecutionRunORM.agent_role_id) - ).all() - - run_stats_by_role_id: dict[str, dict[str, Any]] = { - row.agent_role_id: { - "total_runs": int(row.total_runs or 0), - "success_runs": int(row.success_runs or 0), - } - for row in run_stats_rows - } + stats["total_runs"] += 1 + if run.run_status == "success": + stats["success_runs"] += 1 + if run.latency_ms is not None: + stats["latency_sum"] += int(run.latency_ms) + stats["latency_count"] += 1 + prompt_tokens = int(run.token_usage.get("prompt_tokens", 0) or 0) + completion_tokens = int(run.token_usage.get("completion_tokens", 0) or 0) + total_tokens = int(run.token_usage.get("total_tokens", 0) or 0) + if total_tokens == 0: + total_tokens = prompt_tokens + completion_tokens + stats["total_prompt_tokens"] += prompt_tokens + stats["total_completion_tokens"] += completion_tokens + stats["total_tokens"] += total_tokens + stats["total_cost_estimate"] += estimate_cost(run.token_usage) + if task_run_counts.get(run.task_id, 0) > 1: + stats["retry_runs"] += 1 + + for stats in run_stats_by_role_id.values(): + total_runs = stats["total_runs"] + latency_count = stats["latency_count"] + stats["average_latency_ms"] = ( + round(stats["latency_sum"] / latency_count, 2) if latency_count else None + ) + stats["retry_rate"] = round((stats["retry_runs"] / total_runs) * 100, 2) if total_runs else None + stats["average_prompt_tokens"] = round(stats["total_prompt_tokens"] / total_runs, 2) if total_runs else 0 + stats["average_completion_tokens"] = ( + round(stats["total_completion_tokens"] / total_runs, 2) if total_runs else 0 + ) + stats["average_total_tokens"] = round(stats["total_tokens"] / total_runs, 2) if total_runs else 0 + stats["total_cost_estimate"] = round(stats["total_cost_estimate"], 6) + stats["average_cost_estimate"] = round(stats["total_cost_estimate"] / total_runs, 6) if total_runs else 0 items = [ _to_agent_registry_item( agent_role, - total_runs=run_stats_by_role_id.get(agent_role.id, {}).get("total_runs", 0), - success_runs=run_stats_by_role_id.get(agent_role.id, {}).get("success_runs", 0), + stats=run_stats_by_role_id.get(agent_role.id, {}), ) for agent_role in agent_roles ] diff --git a/src/apps/api/routers/runs.py b/src/apps/api/routers/runs.py index 98902e0..ecb77b6 100644 --- a/src/apps/api/routers/runs.py +++ b/src/apps/api/routers/runs.py @@ -5,7 +5,9 @@ from sqlalchemy.orm import Session from src.apps.api.deps import get_db +from src.packages.core.costs import estimate_cost from src.packages.core.db.models import AgentRoleORM, AssignmentORM, EventLogORM, ExecutionRunORM, TaskORM +from src.packages.core.error_classification import classify_run_error from src.packages.core.schemas import ( ExecutionRunRead, RunDetailRead, @@ -89,6 +91,13 @@ def get_run_detail(run_id: str, db: Session = Depends(get_db)) -> RunDetailRead: for item in runs ], events=[TaskEventRead.model_validate(event) for event in events], + cost_estimate=estimate_cost(run.token_usage), + error_category=classify_run_error( + run_status=run.run_status, + error_message=run.error_message, + logs=run.logs, + routing_reason=assignment.routing_reason if assignment is not None else None, + ), ) diff --git a/src/apps/api/routers/task_batches.py b/src/apps/api/routers/task_batches.py index c1a5e87..63e1ccd 100644 --- a/src/apps/api/routers/task_batches.py +++ b/src/apps/api/routers/task_batches.py @@ -19,10 +19,12 @@ TaskBatchORM, TaskORM, ) +from src.packages.core.error_classification import classify_task_error, summarize_failure_categories from src.packages.core.schemas import ( BatchTimelineRead, BatchArtifactRead, BatchCountsRead, + FailureCategorySummaryRead, BatchProgressRead, BatchTaskResultRead, TaskBatchListItemRead, @@ -142,6 +144,36 @@ def _load_batch_artifacts(task_ids: list[str], db: Session) -> tuple[list[Artifa return artifacts, artifact_counts +def _load_active_assignments(task_ids: list[str], db: Session) -> dict[str, AssignmentORM]: + if not task_ids: + return {} + + assignments = db.scalars( + select(AssignmentORM) + .where(AssignmentORM.task_id.in_(task_ids)) + .order_by(AssignmentORM.assigned_at.desc(), AssignmentORM.id.desc()) + ).all() + latest_assignments: dict[str, AssignmentORM] = {} + for assignment in assignments: + latest_assignments.setdefault(assignment.task_id, assignment) + return latest_assignments + + +def _load_latest_reviews(task_ids: list[str], db: Session) -> dict[str, ReviewCheckpointORM]: + if not task_ids: + return {} + + reviews = db.scalars( + select(ReviewCheckpointORM) + .where(ReviewCheckpointORM.task_id.in_(task_ids)) + .order_by(ReviewCheckpointORM.created_at.desc(), ReviewCheckpointORM.id.desc()) + ).all() + latest_reviews: dict[str, ReviewCheckpointORM] = {} + for review in reviews: + latest_reviews.setdefault(review.task_id, review) + return latest_reviews + + def _batch_updated_at(task_batch: TaskBatchORM, tasks: list[TaskORM]) -> datetime: if not tasks: return task_batch.created_at @@ -454,6 +486,8 @@ def get_task_batch_summary(batch_id: str, db: Session = Depends(get_db)) -> Task task_ids = [task.id for task in tasks] latest_runs = _load_latest_runs(task_ids, db) artifacts, artifact_counts = _load_batch_artifacts(task_ids, db) + latest_assignments = _load_active_assignments(task_ids, db) + latest_reviews = _load_latest_reviews(task_ids, db) counts = _build_batch_counts(tasks) progress = _build_batch_progress(tasks) derived_status = _derive_batch_status(tasks) @@ -471,6 +505,15 @@ def get_task_batch_summary(batch_id: str, db: Session = Depends(get_db)) -> Task output_snapshot=latest_runs.get(task.id).output_snapshot if latest_runs.get(task.id) is not None else {}, error_message=latest_runs.get(task.id).error_message if latest_runs.get(task.id) is not None else None, cancel_reason=latest_runs.get(task.id).cancel_reason if latest_runs.get(task.id) is not None else None, + error_category=classify_task_error( + task_status=task.status, + dependency_ids=task.dependency_ids, + run_status=latest_runs.get(task.id).run_status if latest_runs.get(task.id) is not None else None, + error_message=latest_runs.get(task.id).error_message if latest_runs.get(task.id) is not None else None, + logs=latest_runs.get(task.id).logs if latest_runs.get(task.id) is not None else None, + routing_reason=latest_assignments.get(task.id).routing_reason if latest_assignments.get(task.id) is not None else None, + review_reason=latest_reviews.get(task.id).reason if latest_reviews.get(task.id) is not None else None, + ), artifact_count=artifact_counts.get(task.id, 0), ) for task in tasks @@ -496,4 +539,23 @@ def get_task_batch_summary(batch_id: str, db: Session = Depends(get_db)) -> Task progress=progress, tasks=task_results, artifacts=artifact_reads, + failure_categories=[ + FailureCategorySummaryRead.model_validate(item) + for item in summarize_failure_categories( + [ + { + "task_id": task.task_id, + "error_category": task.error_category, + "error_message": task.error_message, + "cancel_reason": task.cancel_reason, + "routing_reason": ( + latest_assignments.get(task.task_id).routing_reason + if latest_assignments.get(task.task_id) is not None + else None + ), + } + for task in task_results + ] + ) + ], ) diff --git a/src/apps/web/agents.js b/src/apps/web/agents.js index e91144f..8fd30c7 100644 --- a/src/apps/web/agents.js +++ b/src/apps/web/agents.js @@ -22,6 +22,10 @@ function formatPercent(value) { return `${value}%`; } +function formatCurrency(value) { + return `$${Number(value ?? 0).toFixed(6)}`; +} + function formatJson(value) { const entries = value && typeof value === "object" ? Object.keys(value) : []; if (!entries.length) { @@ -127,6 +131,18 @@ function renderAgents(items) {
Success rate
${escapeHtml(formatPercent(item.success_rate))}
+
+
Avg latency
${escapeHtml(item.average_latency_ms ?? "n/a")} ms
+
Retry rate
${escapeHtml(formatPercent(item.retry_rate))}
+
Total tokens
${escapeHtml(item.total_tokens)}
+
+ +
+
Avg total tokens
${escapeHtml(item.average_total_tokens)}
+
Avg cost estimate
${escapeHtml(formatCurrency(item.average_cost_estimate))}
+
Total cost estimate
${escapeHtml(formatCurrency(item.total_cost_estimate))}
+
+
Input schema
${formatJson(item.input_schema)}
diff --git a/src/apps/web/batch-detail.js b/src/apps/web/batch-detail.js index be34bd5..85f7119 100644 --- a/src/apps/web/batch-detail.js +++ b/src/apps/web/batch-detail.js @@ -55,7 +55,8 @@ function renderOverview(summary) { .join(""); } -function renderRiskGroups(tasks) { +function renderRiskGroups(summary) { + const tasks = summary.tasks; const groups = [ { key: "failed", @@ -77,7 +78,14 @@ function renderRiskGroups(tasks) { }, ]; - const activeGroups = groups.filter((group) => group.items.length > 0); + const categoryGroups = (summary.failure_categories ?? []).map((item) => ({ + key: item.category, + label: `Error category ยท ${item.category}`, + items: tasks.filter((task) => task.error_category === item.category), + description: `${item.count} task${item.count === 1 ? "" : "s"} currently grouped here.`, + })); + + const activeGroups = [...groups.filter((group) => group.items.length > 0), ...categoryGroups]; if (!activeGroups.length) { riskGroups.innerHTML = `
No failed, blocked, or review-pending tasks in this batch.
`; return; @@ -201,6 +209,9 @@ function taskFlags(task) { if (task.status === "failed" && task.error_message) { flags.push("Latest run returned an error"); } + if (task.error_category) { + flags.push(`Error category: ${task.error_category}`); + } return flags; } @@ -235,6 +246,11 @@ function renderTasks(tasks) { agent ${escapeHtml(task.assigned_agent_role ?? "unassigned")} latest run ${escapeHtml(task.latest_run_status ?? "not started")} ${escapeHtml(task.artifact_count)} artifacts + ${ + task.error_category + ? `error ${escapeHtml(task.error_category)}` + : "" + } ${ task.latest_run_id @@ -317,7 +333,7 @@ async function loadBatchDetail() { ]); statusText.textContent = `Batch ${summary.batch.id} is currently ${summary.derived_status}.`; renderOverview(summary); - renderRiskGroups(summary.tasks); + renderRiskGroups(summary); renderDependencyMap(summary.tasks); renderArtifacts(summary.artifacts); renderTimeline(timeline.items); diff --git a/src/apps/web/run-detail.js b/src/apps/web/run-detail.js index 86896df..27e6e30 100644 --- a/src/apps/web/run-detail.js +++ b/src/apps/web/run-detail.js @@ -64,6 +64,8 @@ function renderOverview(detail) { ["Prompt tokens", detail.run.token_usage?.prompt_tokens ?? 0], ["Completion tokens", detail.run.token_usage?.completion_tokens ?? 0], ["Total tokens", detail.run.token_usage?.total_tokens ?? 0], + ["Cost estimate", `$${Number(detail.cost_estimate ?? 0).toFixed(6)}`], + ["Error category", detail.error_category ?? "n/a"], ]; overviewMetrics.innerHTML = metrics .map(([label, value]) => `
${label}
${escapeHtml(value)}
`) @@ -101,6 +103,11 @@ function renderErrorAndLogs(detail) {
${detail.run.run_status === "cancelled" ? "Cancel context" : "Error message"}

${escapeHtml(detail.run.error_message ?? detail.run.cancel_reason)}

+ ${ + detail.error_category + ? `

Error category: ${escapeHtml(detail.error_category)}

` + : "" + }
`; } else { diff --git a/src/packages/core/costs.py b/src/packages/core/costs.py new file mode 100644 index 0000000..119bd48 --- /dev/null +++ b/src/packages/core/costs.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +PROMPT_COST_PER_1K = 0.001 +COMPLETION_COST_PER_1K = 0.002 + + +def estimate_cost(token_usage: dict | None) -> float: + usage = token_usage or {} + prompt_tokens = int(usage.get("prompt_tokens", 0) or 0) + completion_tokens = int(usage.get("completion_tokens", 0) or 0) + cost = (prompt_tokens / 1000) * PROMPT_COST_PER_1K + cost += (completion_tokens / 1000) * COMPLETION_COST_PER_1K + return round(cost, 6) diff --git a/src/packages/core/error_classification.py b/src/packages/core/error_classification.py new file mode 100644 index 0000000..0721079 --- /dev/null +++ b/src/packages/core/error_classification.py @@ -0,0 +1,131 @@ +from __future__ import annotations + + +ERROR_CATEGORIES = { + "timeout", + "validation_error", + "routing_error", + "dependency_blocked", + "execution_error", + "external_tool_error", +} + + +def _normalize_text_parts(*parts: str | None, logs: list[str] | None = None) -> str: + chunks = [part.strip().lower() for part in parts if part and part.strip()] + if logs: + chunks.extend(line.strip().lower() for line in logs if line and line.strip()) + return "\n".join(chunks) + + +def _is_timeout(text: str) -> bool: + return "timed out" in text or "timeout" in text or "time out" in text + + +def _is_validation_error(text: str) -> bool: + return any( + marker in text + for marker in ( + "validation", + "must be a non-empty string", + "output must be a dict", + "invalid input", + "invalid output", + ) + ) + + +def _is_external_tool_error(text: str) -> bool: + return any( + marker in text + for marker in ( + "subprocess", + "tool call", + "external tool", + "command failed", + "process exited", + "exit code", + ) + ) + + +def _is_routing_error(text: str) -> bool: + return "no eligible agent role found" in text or "routing" in text and "no eligible" in text + + +def classify_run_error( + *, + run_status: str, + error_message: str | None, + logs: list[str] | None = None, + routing_reason: str | None = None, +) -> str | None: + if run_status != "failed": + return None + + normalized = _normalize_text_parts(error_message, routing_reason, logs=logs) + if _is_timeout(normalized): + return "timeout" + if _is_validation_error(normalized): + return "validation_error" + if _is_external_tool_error(normalized): + return "external_tool_error" + if _is_routing_error(normalized): + return "routing_error" + return "execution_error" + + +def classify_task_error( + *, + task_status: str, + dependency_ids: list[str] | None = None, + run_status: str | None = None, + error_message: str | None = None, + logs: list[str] | None = None, + routing_reason: str | None = None, + review_reason: str | None = None, +) -> str | None: + dependency_ids = dependency_ids or [] + + if task_status == "blocked" and dependency_ids: + return "dependency_blocked" + + normalized_context = _normalize_text_parts(routing_reason, review_reason) + if task_status in {"needs_review", "failed"} and _is_routing_error(normalized_context): + return "routing_error" + + if run_status == "failed": + return classify_run_error( + run_status=run_status, + error_message=error_message, + logs=logs, + routing_reason=routing_reason, + ) + + return None + + +def summarize_failure_categories( + task_items: list[dict[str, object]], +) -> list[dict[str, object]]: + grouped: dict[str, dict[str, object]] = {} + for item in task_items: + category = item.get("error_category") + if not category: + continue + if category not in grouped: + grouped[category] = { + "category": category, + "count": 0, + "task_ids": [], + "sample_messages": [], + } + grouped_item = grouped[category] + grouped_item["count"] = int(grouped_item["count"]) + 1 + grouped_item["task_ids"].append(item["task_id"]) + message = item.get("error_message") or item.get("cancel_reason") or item.get("routing_reason") + if message and message not in grouped_item["sample_messages"] and len(grouped_item["sample_messages"]) < 3: + grouped_item["sample_messages"].append(message) + + ordered_categories = sorted(grouped.keys(), key=lambda value: (-int(grouped[value]["count"]), value)) + return [grouped[category] for category in ordered_categories] diff --git a/src/packages/core/schemas.py b/src/packages/core/schemas.py index 9fd0cbc..c683a07 100644 --- a/src/packages/core/schemas.py +++ b/src/packages/core/schemas.py @@ -102,6 +102,7 @@ class BatchTaskResultRead(SchemaModel): output_snapshot: dict[str, Any] = Field(default_factory=dict) error_message: str | None = None cancel_reason: str | None = None + error_category: str | None = None artifact_count: int = 0 @@ -115,6 +116,13 @@ class BatchArtifactRead(SchemaModel): created_at: datetime +class FailureCategorySummaryRead(SchemaModel): + category: str + count: int + task_ids: list[str] = Field(default_factory=list) + sample_messages: list[str] = Field(default_factory=list) + + class TaskBatchSummaryRead(SchemaModel): batch: TaskBatchRead derived_status: str @@ -122,6 +130,7 @@ class TaskBatchSummaryRead(SchemaModel): progress: BatchProgressRead tasks: list[BatchTaskResultRead] artifacts: list[BatchArtifactRead] + failure_categories: list[FailureCategorySummaryRead] = Field(default_factory=list) class TaskBatchListItemRead(SchemaModel): @@ -283,6 +292,16 @@ class AgentRegistryListItemRead(SchemaModel): total_runs: int = 0 success_runs: int = 0 success_rate: float | None = None + average_latency_ms: float | None = None + retry_rate: float | None = None + average_prompt_tokens: float = 0 + average_completion_tokens: float = 0 + average_total_tokens: float = 0 + total_prompt_tokens: int = 0 + total_completion_tokens: int = 0 + total_tokens: int = 0 + average_cost_estimate: float = 0 + total_cost_estimate: float = 0 class AgentRegistryDiagnosisRead(SchemaModel): @@ -362,6 +381,8 @@ class RunDetailRead(SchemaModel): routing: RunRoutingRead retry_history: list[RunRetryHistoryItemRead] = Field(default_factory=list) events: list[TaskEventRead] = Field(default_factory=list) + cost_estimate: float = 0 + error_category: str | None = None class ReviewCheckpointCreate(SchemaModel): diff --git a/src/tests/test_agent_registry_page.py b/src/tests/test_agent_registry_page.py index 610c244..e741bec 100644 --- a/src/tests/test_agent_registry_page.py +++ b/src/tests/test_agent_registry_page.py @@ -9,6 +9,8 @@ from sqlalchemy import create_engine, text from sqlalchemy.orm import Session +from src.packages.core.db.models import ExecutionRunORM + ROOT = Path(__file__).resolve().parents[2] TEST_ROLE_PREFIX = "registry-test-role-" @@ -124,49 +126,23 @@ def _create_batch_with_task(client: TestClient, *, task_type: str, suffix: str) def _insert_runs(task_id: str, agent_role_id: str, run_statuses: list[str]) -> None: engine = create_engine(_database_url()) with Session(engine) as session: - for run_status in run_statuses: - session.execute( - text( - """ - INSERT INTO execution_runs ( - id, - task_id, - agent_role_id, - run_status, - started_at, - finished_at, - logs, - input_snapshot, - output_snapshot, - error_message, - cancelled_at, - cancel_reason, - token_usage, - latency_ms - ) VALUES ( - :id, - :task_id, - :agent_role_id, - :run_status, - NOW(), - NOW(), - ARRAY[]::text[], - '{}'::jsonb, - '{}'::jsonb, - NULL, - NULL, - NULL, - '{}'::jsonb, - 100 - ) - """ - ), - { - "id": f"run_{uuid.uuid4().hex}", - "task_id": task_id, - "agent_role_id": agent_role_id, - "run_status": run_status, - }, + for index, run_status in enumerate(run_statuses, start=1): + session.add( + ExecutionRunORM( + id=f"run_{uuid.uuid4().hex}", + task_id=task_id, + agent_role_id=agent_role_id, + run_status=run_status, + logs=[], + input_snapshot={}, + output_snapshot={}, + token_usage=( + {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15} + if index == 1 + else {"prompt_tokens": 20, "completion_tokens": 10, "total_tokens": 30} + ), + latency_ms=100 * index, + ) ) session.commit() @@ -264,52 +240,10 @@ def _seed_registry_history(*, suffix: str, run_statuses: list[str]) -> dict: "assigned_agent_role": role_name, }, ) - for run_status in run_statuses: - session.execute( - text( - """ - INSERT INTO execution_runs ( - id, - task_id, - agent_role_id, - run_status, - started_at, - finished_at, - logs, - input_snapshot, - output_snapshot, - error_message, - cancelled_at, - cancel_reason, - token_usage, - latency_ms - ) VALUES ( - :id, - :task_id, - :agent_role_id, - :run_status, - NOW(), - NOW(), - ARRAY[]::text[], - '{}'::jsonb, - '{}'::jsonb, - NULL, - NULL, - NULL, - '{}'::jsonb, - 100 - ) - """ - ), - { - "id": f"run_{uuid.uuid4().hex}", - "task_id": task_id, - "agent_role_id": role_id, - "run_status": run_status, - }, - ) session.commit() + _insert_runs(task_id, role_id, run_statuses) + return role @@ -342,6 +276,12 @@ def test_agent_registry_aggregates_run_history_and_success_rate() -> None: assert registry_item["total_runs"] == 3 assert registry_item["success_runs"] == 1 assert registry_item["success_rate"] == 33.33 + assert registry_item["average_latency_ms"] == 200.0 + assert registry_item["retry_rate"] == 100.0 + assert registry_item["total_tokens"] == 75 + assert registry_item["average_total_tokens"] == 25.0 + assert registry_item["total_cost_estimate"] == 0.0001 + assert registry_item["average_cost_estimate"] == 0.000033 def test_agent_registry_diagnosis_distinguishes_enabled_disabled_and_missing_matches() -> None: @@ -395,6 +335,9 @@ def test_agent_registry_reports_no_run_history_when_role_has_no_runs() -> None: assert registry_item["total_runs"] == 0 assert registry_item["success_runs"] == 0 assert registry_item["success_rate"] is None + assert registry_item["average_latency_ms"] is None + assert registry_item["retry_rate"] is None + assert registry_item["total_cost_estimate"] == 0 def test_console_agent_registry_page_is_accessible() -> None: @@ -408,5 +351,8 @@ def test_agent_registry_assets_include_success_rate_and_diagnosis_copy() -> None response = client.get("/console/assets/agents.js") assert response.status_code == 200 assert "Success rate" in response.text + assert "Avg latency" in response.text + assert "Retry rate" in response.text + assert "Total cost estimate" in response.text assert "Why no suitable role?" in client.get("/console/agents").text assert "No run history" in response.text diff --git a/src/tests/test_batch_detail_page.py b/src/tests/test_batch_detail_page.py index 253a1b1..0cd057e 100644 --- a/src/tests/test_batch_detail_page.py +++ b/src/tests/test_batch_detail_page.py @@ -101,7 +101,7 @@ def _batch_payload(task_type: str, suffix: str) -> dict: _cleanup_database() from src.apps.api.app import app # noqa: E402 -from src.packages.core.db.models import AssignmentORM, ExecutionRunORM, TaskORM # noqa: E402 +from src.packages.core.db.models import AssignmentORM, ExecutionRunORM, ReviewCheckpointORM, TaskORM # noqa: E402 client = TestClient(app) @@ -152,6 +152,7 @@ def test_batch_detail_page_assets_include_batch_timeline() -> None: asset_response = client.get("/console/assets/batch-detail.js") assert asset_response.status_code == 200 assert "/task-batches/${batchId}/timeline" in asset_response.text + assert "Error category" in asset_response.text def test_batch_detail_summary_supports_mixed_risk_sections() -> None: @@ -188,6 +189,13 @@ def test_batch_detail_summary_supports_mixed_risk_sections() -> None: output_snapshot={"step": "compile"}, ) ) + session.add( + ReviewCheckpointORM( + task_id=third_task.id, + reason="No eligible agent role found for task_type=unmatched_type", + review_status="pending", + ) + ) session.commit() summary_response = client.get(f"/task-batches/{batch_id}/summary") @@ -203,5 +211,13 @@ def test_batch_detail_summary_supports_mixed_risk_sections() -> None: review_task = next(item for item in payload["tasks"] if item["task_id"] == task_ids[2]) assert failed_task["error_message"] == "detail page should highlight this failure" + assert failed_task["error_category"] == "execution_error" assert blocked_task["dependency_ids"] == [task_ids[0]] + assert blocked_task["error_category"] == "dependency_blocked" assert review_task["status"] == "needs_review" + assert review_task["error_category"] == "routing_error" + assert {item["category"] for item in payload["failure_categories"]} == { + "execution_error", + "dependency_blocked", + "routing_error", + } diff --git a/src/tests/test_error_classification.py b/src/tests/test_error_classification.py new file mode 100644 index 0000000..e8c279a --- /dev/null +++ b/src/tests/test_error_classification.py @@ -0,0 +1,62 @@ +from src.packages.core.error_classification import classify_run_error, classify_task_error + + +def test_classify_run_error_detects_timeout() -> None: + assert ( + classify_run_error( + run_status="failed", + error_message="subprocess timed out after 30 seconds", + ) + == "timeout" + ) + + +def test_classify_run_error_detects_validation_error() -> None: + assert ( + classify_run_error( + run_status="failed", + error_message="input_payload.text must be a non-empty string", + ) + == "validation_error" + ) + + +def test_classify_task_error_detects_routing_error_from_review_context() -> None: + assert ( + classify_task_error( + task_status="needs_review", + review_reason="No eligible agent role found for task_type=no_match", + ) + == "routing_error" + ) + + +def test_classify_task_error_detects_dependency_blocked() -> None: + assert ( + classify_task_error( + task_status="blocked", + dependency_ids=["task_upstream"], + ) + == "dependency_blocked" + ) + + +def test_classify_run_error_detects_external_tool_error() -> None: + assert ( + classify_run_error( + run_status="failed", + error_message="command failed with exit code 1", + logs=["subprocess execution failed"], + ) + == "external_tool_error" + ) + + +def test_classify_run_error_falls_back_to_execution_error() -> None: + assert ( + classify_run_error( + run_status="failed", + error_message="unexpected boom", + ) + == "execution_error" + ) diff --git a/src/tests/test_run_detail_page.py b/src/tests/test_run_detail_page.py index fdad5e5..ff66049 100644 --- a/src/tests/test_run_detail_page.py +++ b/src/tests/test_run_detail_page.py @@ -183,9 +183,45 @@ def test_run_detail_endpoint_returns_routing_and_retry_history() -> None: assert [item["run_status"] for item in payload["retry_history"]] == ["success", "failed"] assert payload["retry_history"][0]["is_current"] is True assert payload["run"]["token_usage"]["total_tokens"] == 8 + assert payload["cost_estimate"] == 0.000011 + assert payload["error_category"] is None assert payload["events"][-1]["event_type"] == "run_completed" +def test_run_detail_endpoint_returns_error_category_for_failed_run() -> None: + suffix = uuid.uuid4().hex[:8] + role_name = f"{TEST_PREFIX}timeout-{suffix}" + _register_agent(client, role_name=role_name, supported_task_types=["generate"]) + response = client.post("/task-batches", json=_batch_payload("generate", suffix)) + assert response.status_code == 201 + task_id = response.json()["tasks"][0]["task_id"] + + engine = create_engine(_database_url()) + with Session(engine) as session: + task = session.get(TaskORM, task_id) + assert task is not None + assignment = session.query(AssignmentORM).filter(AssignmentORM.task_id == task.id).first() + assert assignment is not None + run = ExecutionRunORM( + task_id=task.id, + agent_role_id=assignment.agent_role_id, + run_status="failed", + error_message="tool timed out while waiting for response", + logs=["command timed out"], + input_snapshot={"attempt": 1}, + output_snapshot={}, + ) + session.add(run) + session.commit() + run_id = run.id + + detail_response = client.get(f"/runs/{run_id}/detail") + assert detail_response.status_code == 200 + payload = detail_response.json() + assert payload["run"]["run_status"] == "failed" + assert payload["error_category"] == "timeout" + + def test_run_detail_endpoint_handles_cancelled_run_without_logs() -> None: suffix = uuid.uuid4().hex[:8] _register_agent(client, role_name="default_worker", supported_task_types=[]) @@ -219,6 +255,7 @@ def test_run_detail_endpoint_handles_cancelled_run_without_logs() -> None: assert payload["run"]["run_status"] == "cancelled" assert payload["run"]["cancel_reason"] == "user requested cancellation" assert payload["run"]["logs"] == [] + assert payload["error_category"] is None def test_console_run_detail_page_is_accessible() -> None: @@ -236,6 +273,8 @@ def test_run_detail_page_assets_include_task_lifecycle_timeline() -> None: asset_response = client.get("/console/assets/run-detail.js") assert asset_response.status_code == 200 assert "/tasks/${detail.task.task_id}/timeline" in asset_response.text + assert "Cost estimate" in asset_response.text + assert "Error category" in asset_response.text def test_batch_detail_assets_link_to_run_detail_page() -> None: diff --git a/src/tests/test_task_batch_summary.py b/src/tests/test_task_batch_summary.py index 70f337a..b1a3786 100644 --- a/src/tests/test_task_batch_summary.py +++ b/src/tests/test_task_batch_summary.py @@ -173,6 +173,9 @@ def test_summary_returns_needs_review_status_for_unrouted_batch() -> None: assert payload["derived_status"] == "needs_review" assert payload["counts"]["needs_review_count"] == 3 assert payload["progress"]["completed_count"] == 0 + assert all(task["error_category"] == "routing_error" for task in payload["tasks"]) + assert payload["failure_categories"][0]["category"] == "routing_error" + assert payload["failure_categories"][0]["count"] == 3 def test_summary_returns_running_status_when_task_is_in_progress() -> None: @@ -275,4 +278,66 @@ def test_summary_aggregates_latest_run_and_artifacts() -> None: assert task_summary["latest_run_status"] == "success" assert task_summary["output_snapshot"] == {"version": 2} assert task_summary["artifact_count"] == 1 + assert task_summary["error_category"] is None assert len(payload["artifacts"]) == 1 + + +def test_summary_groups_failure_categories_from_latest_task_context() -> None: + suffix = uuid.uuid4().hex[:8] + _register_agent(client, role_name="default_worker", supported_task_types=[]) + response = client.post("/task-batches", json=_batch_payload("unmatched_type", suffix)) + assert response.status_code == 201 + batch_id = response.json()["batch_id"] + task_ids = [task["task_id"] for task in response.json()["tasks"]] + + engine = create_engine(_database_url()) + with Session(engine) as session: + tasks = [session.get(TaskORM, task_id) for task_id in task_ids] + assert all(task is not None for task in tasks) + first_task, second_task, third_task = tasks + first_assignment = session.query(AssignmentORM).filter(AssignmentORM.task_id == first_task.id).first() + third_assignment = session.query(AssignmentORM).filter(AssignmentORM.task_id == third_task.id).first() + assert first_assignment is not None + assert third_assignment is not None + + first_task.status = "failed" + second_task.status = "blocked" + third_task.status = "failed" + + session.add( + ExecutionRunORM( + task_id=first_task.id, + agent_role_id=first_assignment.agent_role_id, + run_status="failed", + error_message="input_payload.text must be a non-empty string", + logs=["validation failed"], + output_snapshot={}, + ) + ) + session.add( + ExecutionRunORM( + task_id=third_task.id, + agent_role_id=third_assignment.agent_role_id, + run_status="failed", + error_message="command failed with exit code 1", + logs=["subprocess call failed"], + output_snapshot={}, + ) + ) + session.commit() + + summary_response = client.get(f"/task-batches/{batch_id}/summary") + assert summary_response.status_code == 200 + payload = summary_response.json() + + validation_task = next(item for item in payload["tasks"] if item["task_id"] == task_ids[0]) + blocked_task = next(item for item in payload["tasks"] if item["task_id"] == task_ids[1]) + tool_task = next(item for item in payload["tasks"] if item["task_id"] == task_ids[2]) + assert validation_task["error_category"] == "validation_error" + assert blocked_task["error_category"] == "dependency_blocked" + assert tool_task["error_category"] == "external_tool_error" + + category_counts = {item["category"]: item["count"] for item in payload["failure_categories"]} + assert category_counts["validation_error"] == 1 + assert category_counts["dependency_blocked"] == 1 + assert category_counts["external_tool_error"] == 1